chameleon.py revision b4fd848987c6ac62a9e65ff4bb91ef281ab6894e
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import httplib
6import logging
7import socket
8import time
9import xmlrpclib
10from contextlib import contextmanager
11
12from PIL import Image
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.cros.chameleon import edid
17
18
19CHAMELEON_PORT = 9992
20
21
22class ChameleonConnectionError(error.TestError):
23    """Indicates that connecting to Chameleon failed.
24
25    It is fatal to the test unless caught.
26    """
27    pass
28
29
30class ChameleonConnection(object):
31    """ChameleonConnection abstracts the network connection to the board.
32
33    ChameleonBoard and ChameleonPort use it for accessing Chameleon RPC.
34
35    """
36
37    def __init__(self, hostname, port=CHAMELEON_PORT):
38        """Constructs a ChameleonConnection.
39
40        @param hostname: Hostname the chameleond process is running.
41        @param port: Port number the chameleond process is listening on.
42
43        @raise ChameleonConnectionError if connection failed.
44        """
45        self.chameleond_proxy = ChameleonConnection._create_server_proxy(
46                hostname, port)
47
48
49    @staticmethod
50    def _create_server_proxy(hostname, port):
51        """Creates the chameleond server proxy.
52
53        @param hostname: Hostname the chameleond process is running.
54        @param port: Port number the chameleond process is listening on.
55
56        @return ServerProxy object to chameleond.
57
58        @raise ChameleonConnectionError if connection failed.
59        """
60        remote = 'http://%s:%s' % (hostname, port)
61        chameleond_proxy = xmlrpclib.ServerProxy(remote, allow_none=True)
62        # Call a RPC to test.
63        try:
64            chameleond_proxy.GetSupportedPorts()
65        except (socket.error,
66                xmlrpclib.ProtocolError,
67                httplib.BadStatusLine) as e:
68            raise ChameleonConnectionError(e)
69        return chameleond_proxy
70
71
72class ChameleonBoard(object):
73    """ChameleonBoard is an abstraction of a Chameleon board.
74
75    A Chameleond RPC proxy is passed to the construction such that it can
76    use this proxy to control the Chameleon board.
77
78    """
79
80    def __init__(self, chameleon_connection):
81        """Construct a ChameleonBoard.
82
83        @param chameleon_connection: ChameleonConnection object.
84        """
85        self._chameleond_proxy = chameleon_connection.chameleond_proxy
86
87
88    def reset(self):
89        """Resets Chameleon board."""
90        self._chameleond_proxy.Reset()
91
92
93    def get_all_ports(self):
94        """Gets all the ports on Chameleon board which are connected.
95
96        @return: A list of ChameleonPort objects.
97        """
98        ports = self._chameleond_proxy.ProbePorts()
99        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
100
101
102    def get_all_inputs(self):
103        """Gets all the input ports on Chameleon board which are connected.
104
105        @return: A list of ChameleonPort objects.
106        """
107        ports = self._chameleond_proxy.ProbeInputs()
108        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
109
110
111    def get_all_outputs(self):
112        """Gets all the output ports on Chameleon board which are connected.
113
114        @return: A list of ChameleonPort objects.
115        """
116        ports = self._chameleond_proxy.ProbeOutputs()
117        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
118
119
120    def get_label(self):
121        """Gets the label which indicates the display connection.
122
123        @return: A string of the label, like 'hdmi', 'dp_hdmi', etc.
124        """
125        connectors = []
126        for port in self._chameleond_proxy.ProbeInputs():
127            if self._chameleond_proxy.HasVideoSupport(port):
128                connector = self._chameleond_proxy.GetConnectorType(port).lower()
129                connectors.append(connector)
130        # Eliminate duplicated ports. It simplifies the labels of dual-port
131        # devices, i.e. dp_dp categorized into dp.
132        return '_'.join(sorted(set(connectors)))
133
134
135class ChameleonPort(object):
136    """ChameleonPort is an abstraction of a general port of a Chameleon board.
137
138    It only contains some common methods shared with audio and video ports.
139
140    A Chameleond RPC proxy and an port_id are passed to the construction.
141    The port_id is the unique identity to the port.
142    """
143
144    def __init__(self, chameleond_proxy, port_id):
145        """Construct a ChameleonPort.
146
147        @param chameleond_proxy: Chameleond RPC proxy object.
148        @param port_id: The ID of the input port.
149        """
150        self.chameleond_proxy = chameleond_proxy
151        self.port_id = port_id
152
153
154    def get_connector_id(self):
155        """Returns the connector ID.
156
157        @return: A number of connector ID.
158        """
159        return self.port_id
160
161
162    def get_connector_type(self):
163        """Returns the human readable string for the connector type.
164
165        @return: A string, like "VGA", "DVI", "HDMI", or "DP".
166        """
167        return self.chameleond_proxy.GetConnectorType(self.port_id)
168
169
170    def has_audio_support(self):
171        """Returns if the input has audio support.
172
173        @return: True if the input has audio support; otherwise, False.
174        """
175        return self.chameleond_proxy.HasAudioSupport(self.port_id)
176
177
178    def has_video_support(self):
179        """Returns if the input has video support.
180
181        @return: True if the input has video support; otherwise, False.
182        """
183        return self.chameleond_proxy.HasVideoSupport(self.port_id)
184
185
186    def plug(self):
187        """Asserts HPD line to high, emulating plug."""
188        logging.info('Plug Chameleon port %d', self.port_id)
189        self.chameleond_proxy.Plug(self.port_id)
190
191
192    def unplug(self):
193        """Deasserts HPD line to low, emulating unplug."""
194        logging.info('Unplug Chameleon port %d', self.port_id)
195        self.chameleond_proxy.Unplug(self.port_id)
196
197
198    def set_plug(self, plug_status):
199        """Sets plug/unplug by plug_status.
200
201        @param plug_status: True to plug; False to unplug.
202        """
203        if plug_status:
204            self.plug()
205        else:
206            self.unplug()
207
208
209    @property
210    def plugged(self):
211        """
212        @returns True if this port is plugged to Chameleon, False otherwise.
213
214        """
215        return self.chameleond_proxy.IsPlugged(self.port_id)
216
217
218class ChameleonVideoInput(ChameleonPort):
219    """ChameleonVideoInput is an abstraction of a video input port.
220
221    It contains some special methods to control a video input.
222    """
223
224    _DURATION_UNPLUG_FOR_EDID = 5
225    _TIMEOUT_VIDEO_STABLE_PROBE = 10
226
227    def __init__(self, chameleon_port):
228        """Construct a ChameleonVideoInput.
229
230        @param chameleon_port: A general ChameleonPort object.
231        """
232        self.chameleond_proxy = chameleon_port.chameleond_proxy
233        self.port_id = chameleon_port.port_id
234
235
236    def wait_video_input_stable(self, timeout=None):
237        """Waits the video input stable or timeout.
238
239        @param timeout: The time period to wait for.
240
241        @return: True if the video input becomes stable within the timeout
242                 period; otherwise, False.
243        """
244        return self.chameleond_proxy.WaitVideoInputStable(self.port_id,
245                                                          timeout)
246
247
248    def read_edid(self):
249        """Reads the EDID.
250
251        @return: An Edid object.
252        """
253        # Read EDID without verify. It may be made corrupted as intended
254        # for the test purpose.
255        return edid.Edid(self.chameleond_proxy.ReadEdid(self.port_id).data,
256                         skip_verify=True)
257
258
259    def apply_edid(self, edid):
260        """Applies the given EDID.
261
262        @param edid: An Edid object.
263        """
264        edid_id = self.chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid.data))
265        self.chameleond_proxy.ApplyEdid(self.port_id, edid_id)
266        self.chameleond_proxy.DestroyEdid(edid_id)
267
268
269    @contextmanager
270    def use_edid(self, edid):
271        """Uses the given EDID in a with statement.
272
273        It sets the EDID up in the beginning and restores to the original
274        EDID in the end. This function is expected to be used in a with
275        statement, like the following:
276
277            with chameleon_port.use_edid(edid):
278                do_some_test_on(chameleon_port)
279
280        @param edid: An EDID object.
281        """
282        # Set the EDID up in the beginning.
283        plugged = self.plugged
284        if plugged:
285            self.unplug()
286
287        original_edid = self.read_edid()
288        logging.info('Apply EDID on port %d', self.port_id)
289        self.apply_edid(edid)
290
291        if plugged:
292            time.sleep(self._DURATION_UNPLUG_FOR_EDID)
293            self.plug()
294            self.wait_video_input_stable(self._TIMEOUT_VIDEO_STABLE_PROBE)
295
296        # Yeild to execute the with statement.
297        yield
298
299        # Restore the original EDID in the end.
300        current_edid = self.read_edid()
301        if original_edid.data != current_edid.data:
302            logging.info('Restore the original EDID.')
303            self.apply_edid(original_edid)
304
305
306    def use_edid_file(self, filename):
307        """Uses the given EDID file in a with statement.
308
309        It sets the EDID up in the beginning and restores to the original
310        EDID in the end. This function is expected to be used in a with
311        statement, like the following:
312
313            with chameleon_port.use_edid_file(filename):
314                do_some_test_on(chameleon_port)
315
316        @param filename: A path to the EDID file.
317        """
318        return self.use_edid(edid.Edid.from_file(filename))
319
320
321    def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None,
322                       repeat_count=1, end_level=1):
323
324        """Fires one or more HPD pulse (low -> high -> low -> ...).
325
326        @param deassert_interval_usec: The time in microsecond of the
327                deassert pulse.
328        @param assert_interval_usec: The time in microsecond of the
329                assert pulse. If None, then use the same value as
330                deassert_interval_usec.
331        @param repeat_count: The count of HPD pulses to fire.
332        @param end_level: HPD ends with 0 for LOW (unplugged) or 1 for
333                HIGH (plugged).
334        """
335        self.chameleond_proxy.FireHpdPulse(
336                self.port_id, deassert_interval_usec,
337                assert_interval_usec, repeat_count, int(bool(end_level)))
338
339
340    def fire_mixed_hpd_pulses(self, widths):
341        """Fires one or more HPD pulses, starting at low, of mixed widths.
342
343        One must specify a list of segment widths in the widths argument where
344        widths[0] is the width of the first low segment, widths[1] is that of
345        the first high segment, widths[2] is that of the second low segment...
346        etc. The HPD line stops at low if even number of segment widths are
347        specified; otherwise, it stops at high.
348
349        @param widths: list of pulse segment widths in usec.
350        """
351        self.chameleond_proxy.FireMixedHpdPulses(self.port_id, widths)
352
353
354    def capture_screen(self):
355        """Captures Chameleon framebuffer.
356
357        @return An Image object.
358        """
359        return Image.fromstring(
360                'RGB',
361                self.get_resolution(),
362                self.chameleond_proxy.DumpPixels(self.port_id).data)
363
364
365    def get_resolution(self):
366        """Gets the source resolution.
367
368        @return: A (width, height) tuple.
369        """
370        # The return value of RPC is converted to a list. Convert it back to
371        # a tuple.
372        return tuple(self.chameleond_proxy.DetectResolution(self.port_id))
373
374
375class ChameleonAudioInput(ChameleonPort):
376    """ChameleonAudioInput is an abstraction of an audio input port.
377
378    It contains some special methods to control an audio input.
379    """
380
381    def __init__(self, chameleon_port):
382        """Construct a ChameleonAudioInput.
383
384        @param chameleon_port: A general ChameleonPort object.
385        """
386        self.chameleond_proxy = chameleon_port.chameleond_proxy
387        self.port_id = chameleon_port.port_id
388
389
390    def start_capturing_audio(self):
391        """Starts capturing audio."""
392        return self.chameleond_proxy.StartCapturingAudio(self.port_id)
393
394
395    def stop_capturing_audio(self):
396        """Stops capturing audio.
397
398        Returns:
399          A tuple (data, format).
400          data: The captured binary data.
401          format: A dict containing:
402            file_type: 'raw' or 'wav'.
403            sample_format: 'S32_LE' for 32-bit signed integer in little-endian.
404              Refer to aplay manpage for other formats.
405            channel: channel number.
406            rate: sampling rate.
407        """
408        rpc_data, data_format = self.chameleond_proxy.StopCapturingAudio(
409                self.port_id)
410        return rpc_data.data, data_format
411
412
413def make_chameleon_hostname(dut_hostname):
414    """Given a DUT's hostname, returns the hostname of its Chameleon.
415
416    @param dut_hostname: Hostname of a DUT.
417
418    @return Hostname of the DUT's Chameleon.
419    """
420    host_parts = dut_hostname.split('.')
421    host_parts[0] = host_parts[0] + '-chameleon'
422    return '.'.join(host_parts)
423
424
425def create_chameleon_board(dut_hostname, args):
426    """Given either DUT's hostname or argments, creates a ChameleonBoard object.
427
428    If the DUT's hostname is in the lab zone, it connects to the Chameleon by
429    append the hostname with '-chameleon' suffix. If not, checks if the args
430    contains the key-value pair 'chameleon_host=IP'.
431
432    @param dut_hostname: Hostname of a DUT.
433    @param args: A string of arguments passed from the command line.
434
435    @return A ChameleonBoard object.
436
437    @raise ChameleonConnectionError if unknown hostname.
438    """
439    connection = None
440    hostname = make_chameleon_hostname(dut_hostname)
441    if utils.host_is_in_lab_zone(hostname):
442        connection = ChameleonConnection(hostname)
443    else:
444        args_dict = utils.args_to_dict(args)
445        hostname = args_dict.get('chameleon_host', None)
446        port = args_dict.get('chameleon_port', CHAMELEON_PORT)
447        if hostname:
448            connection = ChameleonConnection(hostname, port)
449        else:
450            raise ChameleonConnectionError('No chameleon_host is given in args')
451
452    return ChameleonBoard(connection)
453