chameleon.py revision 72e354c97eb4890f4ce770ea27caa62cf620b4c1
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import httplib
6import logging
7import socket
8import time
9import xmlrpclib
10
11from PIL import Image
12
13from autotest_lib.client.bin import utils
14from autotest_lib.client.common_lib import error
15from autotest_lib.client.cros.chameleon import edid
16
17
18CHAMELEON_PORT = 9992
19
20
21class ChameleonConnectionError(error.TestError):
22    """Indicates that connecting to Chameleon failed.
23
24    It is fatal to the test unless caught.
25    """
26    pass
27
28
29class ChameleonConnection(object):
30    """ChameleonConnection abstracts the network connection to the board.
31
32    ChameleonBoard and ChameleonPort use it for accessing Chameleon RPC.
33
34    """
35
36    def __init__(self, hostname, port=CHAMELEON_PORT):
37        """Constructs a ChameleonConnection.
38
39        @param hostname: Hostname the chameleond process is running.
40        @param port: Port number the chameleond process is listening on.
41
42        @raise ChameleonConnectionError if connection failed.
43        """
44        self.chameleond_proxy = ChameleonConnection._create_server_proxy(
45                hostname, port)
46
47
48    @staticmethod
49    def _create_server_proxy(hostname, port):
50        """Creates the chameleond server proxy.
51
52        @param hostname: Hostname the chameleond process is running.
53        @param port: Port number the chameleond process is listening on.
54
55        @return ServerProxy object to chameleond.
56
57        @raise ChameleonConnectionError if connection failed.
58        """
59        remote = 'http://%s:%s' % (hostname, port)
60        chameleond_proxy = xmlrpclib.ServerProxy(remote, allow_none=True)
61        # Call a RPC to test.
62        try:
63            chameleond_proxy.ProbeInputs()
64        except (socket.error,
65                xmlrpclib.ProtocolError,
66                httplib.BadStatusLine) as e:
67            raise ChameleonConnectionError(e)
68        return chameleond_proxy
69
70
71class ChameleonBoard(object):
72    """ChameleonBoard is an abstraction of a Chameleon board.
73
74    A Chameleond RPC proxy is passed to the construction such that it can
75    use this proxy to control the Chameleon board.
76
77    """
78
79    def __init__(self, chameleon_connection):
80        """Construct a ChameleonBoard.
81
82        @param chameleon_connection: ChameleonConnection object.
83        """
84        self._chameleond_proxy = chameleon_connection.chameleond_proxy
85
86
87    def reset(self):
88        """Resets Chameleon board."""
89        self._chameleond_proxy.Reset()
90
91
92    def is_healthy(self):
93        """Returns if the Chameleon is healthy or any repair is needed.
94
95        @return: True if the Chameleon is healthy;
96                 otherwise, False, need to repair.
97        """
98        return self._chameleond_proxy.IsHealthy()
99
100
101    def repair(self):
102        """Repairs the Chameleon.
103
104        It is a synchronous call. It returns after repairs.
105        """
106        repair_time = self._chameleond_proxy.Repair()
107        time.sleep(repair_time)
108
109
110    def get_all_ports(self):
111        """Gets all the ports on Chameleon board which are connected.
112
113        @return: A list of ChameleonPort objects.
114        """
115        ports = self._chameleond_proxy.ProbePorts()
116        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
117
118
119    def get_all_inputs(self):
120        """Gets all the input ports on Chameleon board which are connected.
121
122        @return: A list of ChameleonPort objects.
123        """
124        ports = self._chameleond_proxy.ProbeInputs()
125        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
126
127
128    def get_all_outputs(self):
129        """Gets all the output ports on Chameleon board which are connected.
130
131        @return: A list of ChameleonPort objects.
132        """
133        ports = self._chameleond_proxy.ProbeOutputs()
134        return [ChameleonPort(self._chameleond_proxy, port) for port in ports]
135
136
137    def get_label(self):
138        """Gets the label which indicates the display connection.
139
140        @return: A string of the label, like 'hdmi', 'dp_hdmi', etc.
141        """
142        connectors = []
143        for port in self._chameleond_proxy.ProbeInputs():
144            if self._chameleond_proxy.HasVideoSupport(port):
145                connector = self._chameleond_proxy.GetConnectorType(port).lower()
146                connectors.append(connector)
147        # Eliminate duplicated ports. It simplifies the labels of dual-port
148        # devices, i.e. dp_dp categorized into dp.
149        return '_'.join(sorted(set(connectors)))
150
151
152class ChameleonPort(object):
153    """ChameleonPort is an abstraction of a general port of a Chameleon board.
154
155    It only contains some common methods shared with audio and video ports.
156
157    A Chameleond RPC proxy and an port_id are passed to the construction.
158    The port_id is the unique identity to the port.
159    """
160
161    def __init__(self, chameleond_proxy, port_id):
162        """Construct a ChameleonPort.
163
164        @param chameleond_proxy: Chameleond RPC proxy object.
165        @param port_id: The ID of the input port.
166        """
167        self.chameleond_proxy = chameleond_proxy
168        self.port_id = port_id
169
170
171    def get_connector_id(self):
172        """Returns the connector ID.
173
174        @return: A number of connector ID.
175        """
176        return self.port_id
177
178
179    def get_connector_type(self):
180        """Returns the human readable string for the connector type.
181
182        @return: A string, like "VGA", "DVI", "HDMI", or "DP".
183        """
184        return self.chameleond_proxy.GetConnectorType(self.port_id)
185
186
187    def has_audio_support(self):
188        """Returns if the input has audio support.
189
190        @return: True if the input has audio support; otherwise, False.
191        """
192        return self.chameleond_proxy.HasAudioSupport(self.port_id)
193
194
195    def has_video_support(self):
196        """Returns if the input has video support.
197
198        @return: True if the input has video support; otherwise, False.
199        """
200        return self.chameleond_proxy.HasVideoSupport(self.port_id)
201
202
203    def plug(self):
204        """Asserts HPD line to high, emulating plug."""
205        logging.info('Plug Chameleon port %d', self.port_id)
206        self.chameleond_proxy.Plug(self.port_id)
207
208
209    def unplug(self):
210        """Deasserts HPD line to low, emulating unplug."""
211        logging.info('Unplug Chameleon port %d', self.port_id)
212        self.chameleond_proxy.Unplug(self.port_id)
213
214
215    def set_plug(self, plug_status):
216        """Sets plug/unplug by plug_status.
217
218        @param plug_status: True to plug; False to unplug.
219        """
220        if plug_status:
221            self.plug()
222        else:
223            self.unplug()
224
225
226    @property
227    def plugged(self):
228        """
229        @returns True if this port is plugged to Chameleon, False otherwise.
230
231        """
232        return self.chameleond_proxy.IsPlugged(self.port_id)
233
234
235class ChameleonVideoInput(ChameleonPort):
236    """ChameleonVideoInput is an abstraction of a video input port.
237
238    It contains some special methods to control a video input.
239    """
240
241    def __init__(self, chameleon_port):
242        """Construct a ChameleonVideoInput.
243
244        @param chameleon_port: A general ChameleonPort object.
245        """
246        self.chameleond_proxy = chameleon_port.chameleond_proxy
247        self.port_id = chameleon_port.port_id
248
249
250    def wait_video_input_stable(self, timeout=None):
251        """Waits the video input stable or timeout.
252
253        @param timeout: The time period to wait for.
254
255        @return: True if the video input becomes stable within the timeout
256                 period; otherwise, False.
257        """
258        return self.chameleond_proxy.WaitVideoInputStable(self.port_id,
259                                                          timeout)
260
261
262    def read_edid(self):
263        """Reads the EDID.
264
265        @return: An Edid object.
266        """
267        # Read EDID without verify. It may be made corrupted as intended
268        # for the test purpose.
269        return edid.Edid(self.chameleond_proxy.ReadEdid(self.port_id).data,
270                         skip_verify=True)
271
272
273    def apply_edid(self, edid):
274        """Applies the given EDID.
275
276        @param edid: An Edid object.
277        """
278        edid_id = self.chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid.data))
279        self.chameleond_proxy.ApplyEdid(self.port_id, edid_id)
280        self.chameleond_proxy.DestroyEdid(edid_id)
281
282
283    def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None,
284                       repeat_count=1, end_level=1):
285
286        """Fires one or more HPD pulse (low -> high -> low -> ...).
287
288        @param deassert_interval_usec: The time in microsecond of the
289                deassert pulse.
290        @param assert_interval_usec: The time in microsecond of the
291                assert pulse. If None, then use the same value as
292                deassert_interval_usec.
293        @param repeat_count: The count of HPD pulses to fire.
294        @param end_level: HPD ends with 0 for LOW (unplugged) or 1 for
295                HIGH (plugged).
296        """
297        self.chameleond_proxy.FireHpdPulse(
298                self.port_id, deassert_interval_usec,
299                assert_interval_usec, repeat_count, int(bool(end_level)))
300
301
302    def fire_mixed_hpd_pulses(self, widths):
303        """Fires one or more HPD pulses, starting at low, of mixed widths.
304
305        One must specify a list of segment widths in the widths argument where
306        widths[0] is the width of the first low segment, widths[1] is that of
307        the first high segment, widths[2] is that of the second low segment...
308        etc. The HPD line stops at low if even number of segment widths are
309        specified; otherwise, it stops at high.
310
311        @param widths: list of pulse segment widths in usec.
312        """
313        self.chameleond_proxy.FireMixedHpdPulses(self.port_id, widths)
314
315
316    def capture_screen(self):
317        """Captures Chameleon framebuffer.
318
319        @return An Image object.
320        """
321        return Image.fromstring(
322                'RGB',
323                self.get_resolution(),
324                self.chameleond_proxy.DumpPixels(self.port_id).data)
325
326
327    def get_resolution(self):
328        """Gets the source resolution.
329
330        @return: A (width, height) tuple.
331        """
332        # The return value of RPC is converted to a list. Convert it back to
333        # a tuple.
334        return tuple(self.chameleond_proxy.DetectResolution(self.port_id))
335
336
337class ChameleonAudioInput(ChameleonPort):
338    """ChameleonAudioInput is an abstraction of an audio input port.
339
340    It contains some special methods to control an audio input.
341    """
342
343    def __init__(self, chameleon_port):
344        """Construct a ChameleonAudioInput.
345
346        @param chameleon_port: A general ChameleonPort object.
347        """
348        self.chameleond_proxy = chameleon_port.chameleond_proxy
349        self.port_id = chameleon_port.port_id
350
351
352    def start_capturing_audio(self):
353        """Starts capturing audio."""
354        return self.chameleond_proxy.StartCapturingAudio(self.port_id)
355
356
357    def stop_capturing_audio(self):
358        """Stops capturing audio.
359
360        Returns:
361          A tuple (data, format).
362          data: The captured binary data.
363          format: A dict containing:
364            file_type: 'raw' or 'wav'.
365            sample_format: 'S32_LE' for 32-bit signed integer in little-endian.
366              Refer to aplay manpage for other formats.
367            channel: channel number.
368            rate: sampling rate.
369        """
370        rpc_data, data_format = self.chameleond_proxy.StopCapturingAudio(
371                self.port_id)
372        return rpc_data.data, data_format
373
374
375def make_chameleon_hostname(dut_hostname):
376    """Given a DUT's hostname, returns the hostname of its Chameleon.
377
378    @param dut_hostname: Hostname of a DUT.
379
380    @return Hostname of the DUT's Chameleon.
381    """
382    host_parts = dut_hostname.split('.')
383    host_parts[0] = host_parts[0] + '-chameleon'
384    return '.'.join(host_parts)
385
386
387def create_chameleon_board(dut_hostname, args):
388    """Given either DUT's hostname or argments, creates a ChameleonBoard object.
389
390    If the DUT's hostname is in the lab zone, it connects to the Chameleon by
391    append the hostname with '-chameleon' suffix. If not, checks if the args
392    contains the key-value pair 'chameleon_host=IP'.
393
394    @param dut_hostname: Hostname of a DUT.
395    @param args: A string of arguments passed from the command line.
396
397    @return A ChameleonBoard object.
398
399    @raise ChameleonConnectionError if unknown hostname.
400    """
401    connection = None
402    hostname = make_chameleon_hostname(dut_hostname)
403    if utils.host_is_in_lab_zone(hostname):
404        connection = ChameleonConnection(hostname)
405    else:
406        args_dict = utils.args_to_dict(args)
407        hostname = args_dict.get('chameleon_host', None)
408        port = args_dict.get('chameleon_port', CHAMELEON_PORT)
409        if hostname:
410            connection = ChameleonConnection(hostname, port)
411        else:
412            raise ChameleonConnectionError('No chameleon_host is given in args')
413
414    return ChameleonBoard(connection)
415