chameleon.py revision b4fd848987c6ac62a9e65ff4bb91ef281ab6894e
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import httplib 6import logging 7import socket 8import time 9import xmlrpclib 10from contextlib import contextmanager 11 12from PIL import Image 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.client.cros.chameleon import edid 17 18 19CHAMELEON_PORT = 9992 20 21 22class ChameleonConnectionError(error.TestError): 23 """Indicates that connecting to Chameleon failed. 24 25 It is fatal to the test unless caught. 26 """ 27 pass 28 29 30class ChameleonConnection(object): 31 """ChameleonConnection abstracts the network connection to the board. 32 33 ChameleonBoard and ChameleonPort use it for accessing Chameleon RPC. 34 35 """ 36 37 def __init__(self, hostname, port=CHAMELEON_PORT): 38 """Constructs a ChameleonConnection. 39 40 @param hostname: Hostname the chameleond process is running. 41 @param port: Port number the chameleond process is listening on. 42 43 @raise ChameleonConnectionError if connection failed. 44 """ 45 self.chameleond_proxy = ChameleonConnection._create_server_proxy( 46 hostname, port) 47 48 49 @staticmethod 50 def _create_server_proxy(hostname, port): 51 """Creates the chameleond server proxy. 52 53 @param hostname: Hostname the chameleond process is running. 54 @param port: Port number the chameleond process is listening on. 55 56 @return ServerProxy object to chameleond. 57 58 @raise ChameleonConnectionError if connection failed. 59 """ 60 remote = 'http://%s:%s' % (hostname, port) 61 chameleond_proxy = xmlrpclib.ServerProxy(remote, allow_none=True) 62 # Call a RPC to test. 63 try: 64 chameleond_proxy.GetSupportedPorts() 65 except (socket.error, 66 xmlrpclib.ProtocolError, 67 httplib.BadStatusLine) as e: 68 raise ChameleonConnectionError(e) 69 return chameleond_proxy 70 71 72class ChameleonBoard(object): 73 """ChameleonBoard is an abstraction of a Chameleon board. 74 75 A Chameleond RPC proxy is passed to the construction such that it can 76 use this proxy to control the Chameleon board. 77 78 """ 79 80 def __init__(self, chameleon_connection): 81 """Construct a ChameleonBoard. 82 83 @param chameleon_connection: ChameleonConnection object. 84 """ 85 self._chameleond_proxy = chameleon_connection.chameleond_proxy 86 87 88 def reset(self): 89 """Resets Chameleon board.""" 90 self._chameleond_proxy.Reset() 91 92 93 def get_all_ports(self): 94 """Gets all the ports on Chameleon board which are connected. 95 96 @return: A list of ChameleonPort objects. 97 """ 98 ports = self._chameleond_proxy.ProbePorts() 99 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 100 101 102 def get_all_inputs(self): 103 """Gets all the input ports on Chameleon board which are connected. 104 105 @return: A list of ChameleonPort objects. 106 """ 107 ports = self._chameleond_proxy.ProbeInputs() 108 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 109 110 111 def get_all_outputs(self): 112 """Gets all the output ports on Chameleon board which are connected. 113 114 @return: A list of ChameleonPort objects. 115 """ 116 ports = self._chameleond_proxy.ProbeOutputs() 117 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 118 119 120 def get_label(self): 121 """Gets the label which indicates the display connection. 122 123 @return: A string of the label, like 'hdmi', 'dp_hdmi', etc. 124 """ 125 connectors = [] 126 for port in self._chameleond_proxy.ProbeInputs(): 127 if self._chameleond_proxy.HasVideoSupport(port): 128 connector = self._chameleond_proxy.GetConnectorType(port).lower() 129 connectors.append(connector) 130 # Eliminate duplicated ports. It simplifies the labels of dual-port 131 # devices, i.e. dp_dp categorized into dp. 132 return '_'.join(sorted(set(connectors))) 133 134 135class ChameleonPort(object): 136 """ChameleonPort is an abstraction of a general port of a Chameleon board. 137 138 It only contains some common methods shared with audio and video ports. 139 140 A Chameleond RPC proxy and an port_id are passed to the construction. 141 The port_id is the unique identity to the port. 142 """ 143 144 def __init__(self, chameleond_proxy, port_id): 145 """Construct a ChameleonPort. 146 147 @param chameleond_proxy: Chameleond RPC proxy object. 148 @param port_id: The ID of the input port. 149 """ 150 self.chameleond_proxy = chameleond_proxy 151 self.port_id = port_id 152 153 154 def get_connector_id(self): 155 """Returns the connector ID. 156 157 @return: A number of connector ID. 158 """ 159 return self.port_id 160 161 162 def get_connector_type(self): 163 """Returns the human readable string for the connector type. 164 165 @return: A string, like "VGA", "DVI", "HDMI", or "DP". 166 """ 167 return self.chameleond_proxy.GetConnectorType(self.port_id) 168 169 170 def has_audio_support(self): 171 """Returns if the input has audio support. 172 173 @return: True if the input has audio support; otherwise, False. 174 """ 175 return self.chameleond_proxy.HasAudioSupport(self.port_id) 176 177 178 def has_video_support(self): 179 """Returns if the input has video support. 180 181 @return: True if the input has video support; otherwise, False. 182 """ 183 return self.chameleond_proxy.HasVideoSupport(self.port_id) 184 185 186 def plug(self): 187 """Asserts HPD line to high, emulating plug.""" 188 logging.info('Plug Chameleon port %d', self.port_id) 189 self.chameleond_proxy.Plug(self.port_id) 190 191 192 def unplug(self): 193 """Deasserts HPD line to low, emulating unplug.""" 194 logging.info('Unplug Chameleon port %d', self.port_id) 195 self.chameleond_proxy.Unplug(self.port_id) 196 197 198 def set_plug(self, plug_status): 199 """Sets plug/unplug by plug_status. 200 201 @param plug_status: True to plug; False to unplug. 202 """ 203 if plug_status: 204 self.plug() 205 else: 206 self.unplug() 207 208 209 @property 210 def plugged(self): 211 """ 212 @returns True if this port is plugged to Chameleon, False otherwise. 213 214 """ 215 return self.chameleond_proxy.IsPlugged(self.port_id) 216 217 218class ChameleonVideoInput(ChameleonPort): 219 """ChameleonVideoInput is an abstraction of a video input port. 220 221 It contains some special methods to control a video input. 222 """ 223 224 _DURATION_UNPLUG_FOR_EDID = 5 225 _TIMEOUT_VIDEO_STABLE_PROBE = 10 226 227 def __init__(self, chameleon_port): 228 """Construct a ChameleonVideoInput. 229 230 @param chameleon_port: A general ChameleonPort object. 231 """ 232 self.chameleond_proxy = chameleon_port.chameleond_proxy 233 self.port_id = chameleon_port.port_id 234 235 236 def wait_video_input_stable(self, timeout=None): 237 """Waits the video input stable or timeout. 238 239 @param timeout: The time period to wait for. 240 241 @return: True if the video input becomes stable within the timeout 242 period; otherwise, False. 243 """ 244 return self.chameleond_proxy.WaitVideoInputStable(self.port_id, 245 timeout) 246 247 248 def read_edid(self): 249 """Reads the EDID. 250 251 @return: An Edid object. 252 """ 253 # Read EDID without verify. It may be made corrupted as intended 254 # for the test purpose. 255 return edid.Edid(self.chameleond_proxy.ReadEdid(self.port_id).data, 256 skip_verify=True) 257 258 259 def apply_edid(self, edid): 260 """Applies the given EDID. 261 262 @param edid: An Edid object. 263 """ 264 edid_id = self.chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid.data)) 265 self.chameleond_proxy.ApplyEdid(self.port_id, edid_id) 266 self.chameleond_proxy.DestroyEdid(edid_id) 267 268 269 @contextmanager 270 def use_edid(self, edid): 271 """Uses the given EDID in a with statement. 272 273 It sets the EDID up in the beginning and restores to the original 274 EDID in the end. This function is expected to be used in a with 275 statement, like the following: 276 277 with chameleon_port.use_edid(edid): 278 do_some_test_on(chameleon_port) 279 280 @param edid: An EDID object. 281 """ 282 # Set the EDID up in the beginning. 283 plugged = self.plugged 284 if plugged: 285 self.unplug() 286 287 original_edid = self.read_edid() 288 logging.info('Apply EDID on port %d', self.port_id) 289 self.apply_edid(edid) 290 291 if plugged: 292 time.sleep(self._DURATION_UNPLUG_FOR_EDID) 293 self.plug() 294 self.wait_video_input_stable(self._TIMEOUT_VIDEO_STABLE_PROBE) 295 296 # Yeild to execute the with statement. 297 yield 298 299 # Restore the original EDID in the end. 300 current_edid = self.read_edid() 301 if original_edid.data != current_edid.data: 302 logging.info('Restore the original EDID.') 303 self.apply_edid(original_edid) 304 305 306 def use_edid_file(self, filename): 307 """Uses the given EDID file in a with statement. 308 309 It sets the EDID up in the beginning and restores to the original 310 EDID in the end. This function is expected to be used in a with 311 statement, like the following: 312 313 with chameleon_port.use_edid_file(filename): 314 do_some_test_on(chameleon_port) 315 316 @param filename: A path to the EDID file. 317 """ 318 return self.use_edid(edid.Edid.from_file(filename)) 319 320 321 def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None, 322 repeat_count=1, end_level=1): 323 324 """Fires one or more HPD pulse (low -> high -> low -> ...). 325 326 @param deassert_interval_usec: The time in microsecond of the 327 deassert pulse. 328 @param assert_interval_usec: The time in microsecond of the 329 assert pulse. If None, then use the same value as 330 deassert_interval_usec. 331 @param repeat_count: The count of HPD pulses to fire. 332 @param end_level: HPD ends with 0 for LOW (unplugged) or 1 for 333 HIGH (plugged). 334 """ 335 self.chameleond_proxy.FireHpdPulse( 336 self.port_id, deassert_interval_usec, 337 assert_interval_usec, repeat_count, int(bool(end_level))) 338 339 340 def fire_mixed_hpd_pulses(self, widths): 341 """Fires one or more HPD pulses, starting at low, of mixed widths. 342 343 One must specify a list of segment widths in the widths argument where 344 widths[0] is the width of the first low segment, widths[1] is that of 345 the first high segment, widths[2] is that of the second low segment... 346 etc. The HPD line stops at low if even number of segment widths are 347 specified; otherwise, it stops at high. 348 349 @param widths: list of pulse segment widths in usec. 350 """ 351 self.chameleond_proxy.FireMixedHpdPulses(self.port_id, widths) 352 353 354 def capture_screen(self): 355 """Captures Chameleon framebuffer. 356 357 @return An Image object. 358 """ 359 return Image.fromstring( 360 'RGB', 361 self.get_resolution(), 362 self.chameleond_proxy.DumpPixels(self.port_id).data) 363 364 365 def get_resolution(self): 366 """Gets the source resolution. 367 368 @return: A (width, height) tuple. 369 """ 370 # The return value of RPC is converted to a list. Convert it back to 371 # a tuple. 372 return tuple(self.chameleond_proxy.DetectResolution(self.port_id)) 373 374 375class ChameleonAudioInput(ChameleonPort): 376 """ChameleonAudioInput is an abstraction of an audio input port. 377 378 It contains some special methods to control an audio input. 379 """ 380 381 def __init__(self, chameleon_port): 382 """Construct a ChameleonAudioInput. 383 384 @param chameleon_port: A general ChameleonPort object. 385 """ 386 self.chameleond_proxy = chameleon_port.chameleond_proxy 387 self.port_id = chameleon_port.port_id 388 389 390 def start_capturing_audio(self): 391 """Starts capturing audio.""" 392 return self.chameleond_proxy.StartCapturingAudio(self.port_id) 393 394 395 def stop_capturing_audio(self): 396 """Stops capturing audio. 397 398 Returns: 399 A tuple (data, format). 400 data: The captured binary data. 401 format: A dict containing: 402 file_type: 'raw' or 'wav'. 403 sample_format: 'S32_LE' for 32-bit signed integer in little-endian. 404 Refer to aplay manpage for other formats. 405 channel: channel number. 406 rate: sampling rate. 407 """ 408 rpc_data, data_format = self.chameleond_proxy.StopCapturingAudio( 409 self.port_id) 410 return rpc_data.data, data_format 411 412 413def make_chameleon_hostname(dut_hostname): 414 """Given a DUT's hostname, returns the hostname of its Chameleon. 415 416 @param dut_hostname: Hostname of a DUT. 417 418 @return Hostname of the DUT's Chameleon. 419 """ 420 host_parts = dut_hostname.split('.') 421 host_parts[0] = host_parts[0] + '-chameleon' 422 return '.'.join(host_parts) 423 424 425def create_chameleon_board(dut_hostname, args): 426 """Given either DUT's hostname or argments, creates a ChameleonBoard object. 427 428 If the DUT's hostname is in the lab zone, it connects to the Chameleon by 429 append the hostname with '-chameleon' suffix. If not, checks if the args 430 contains the key-value pair 'chameleon_host=IP'. 431 432 @param dut_hostname: Hostname of a DUT. 433 @param args: A string of arguments passed from the command line. 434 435 @return A ChameleonBoard object. 436 437 @raise ChameleonConnectionError if unknown hostname. 438 """ 439 connection = None 440 hostname = make_chameleon_hostname(dut_hostname) 441 if utils.host_is_in_lab_zone(hostname): 442 connection = ChameleonConnection(hostname) 443 else: 444 args_dict = utils.args_to_dict(args) 445 hostname = args_dict.get('chameleon_host', None) 446 port = args_dict.get('chameleon_port', CHAMELEON_PORT) 447 if hostname: 448 connection = ChameleonConnection(hostname, port) 449 else: 450 raise ChameleonConnectionError('No chameleon_host is given in args') 451 452 return ChameleonBoard(connection) 453