chameleon.py revision 72e354c97eb4890f4ce770ea27caa62cf620b4c1
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import httplib 6import logging 7import socket 8import time 9import xmlrpclib 10 11from PIL import Image 12 13from autotest_lib.client.bin import utils 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.cros.chameleon import edid 16 17 18CHAMELEON_PORT = 9992 19 20 21class ChameleonConnectionError(error.TestError): 22 """Indicates that connecting to Chameleon failed. 23 24 It is fatal to the test unless caught. 25 """ 26 pass 27 28 29class ChameleonConnection(object): 30 """ChameleonConnection abstracts the network connection to the board. 31 32 ChameleonBoard and ChameleonPort use it for accessing Chameleon RPC. 33 34 """ 35 36 def __init__(self, hostname, port=CHAMELEON_PORT): 37 """Constructs a ChameleonConnection. 38 39 @param hostname: Hostname the chameleond process is running. 40 @param port: Port number the chameleond process is listening on. 41 42 @raise ChameleonConnectionError if connection failed. 43 """ 44 self.chameleond_proxy = ChameleonConnection._create_server_proxy( 45 hostname, port) 46 47 48 @staticmethod 49 def _create_server_proxy(hostname, port): 50 """Creates the chameleond server proxy. 51 52 @param hostname: Hostname the chameleond process is running. 53 @param port: Port number the chameleond process is listening on. 54 55 @return ServerProxy object to chameleond. 56 57 @raise ChameleonConnectionError if connection failed. 58 """ 59 remote = 'http://%s:%s' % (hostname, port) 60 chameleond_proxy = xmlrpclib.ServerProxy(remote, allow_none=True) 61 # Call a RPC to test. 62 try: 63 chameleond_proxy.ProbeInputs() 64 except (socket.error, 65 xmlrpclib.ProtocolError, 66 httplib.BadStatusLine) as e: 67 raise ChameleonConnectionError(e) 68 return chameleond_proxy 69 70 71class ChameleonBoard(object): 72 """ChameleonBoard is an abstraction of a Chameleon board. 73 74 A Chameleond RPC proxy is passed to the construction such that it can 75 use this proxy to control the Chameleon board. 76 77 """ 78 79 def __init__(self, chameleon_connection): 80 """Construct a ChameleonBoard. 81 82 @param chameleon_connection: ChameleonConnection object. 83 """ 84 self._chameleond_proxy = chameleon_connection.chameleond_proxy 85 86 87 def reset(self): 88 """Resets Chameleon board.""" 89 self._chameleond_proxy.Reset() 90 91 92 def is_healthy(self): 93 """Returns if the Chameleon is healthy or any repair is needed. 94 95 @return: True if the Chameleon is healthy; 96 otherwise, False, need to repair. 97 """ 98 return self._chameleond_proxy.IsHealthy() 99 100 101 def repair(self): 102 """Repairs the Chameleon. 103 104 It is a synchronous call. It returns after repairs. 105 """ 106 repair_time = self._chameleond_proxy.Repair() 107 time.sleep(repair_time) 108 109 110 def get_all_ports(self): 111 """Gets all the ports on Chameleon board which are connected. 112 113 @return: A list of ChameleonPort objects. 114 """ 115 ports = self._chameleond_proxy.ProbePorts() 116 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 117 118 119 def get_all_inputs(self): 120 """Gets all the input ports on Chameleon board which are connected. 121 122 @return: A list of ChameleonPort objects. 123 """ 124 ports = self._chameleond_proxy.ProbeInputs() 125 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 126 127 128 def get_all_outputs(self): 129 """Gets all the output ports on Chameleon board which are connected. 130 131 @return: A list of ChameleonPort objects. 132 """ 133 ports = self._chameleond_proxy.ProbeOutputs() 134 return [ChameleonPort(self._chameleond_proxy, port) for port in ports] 135 136 137 def get_label(self): 138 """Gets the label which indicates the display connection. 139 140 @return: A string of the label, like 'hdmi', 'dp_hdmi', etc. 141 """ 142 connectors = [] 143 for port in self._chameleond_proxy.ProbeInputs(): 144 if self._chameleond_proxy.HasVideoSupport(port): 145 connector = self._chameleond_proxy.GetConnectorType(port).lower() 146 connectors.append(connector) 147 # Eliminate duplicated ports. It simplifies the labels of dual-port 148 # devices, i.e. dp_dp categorized into dp. 149 return '_'.join(sorted(set(connectors))) 150 151 152class ChameleonPort(object): 153 """ChameleonPort is an abstraction of a general port of a Chameleon board. 154 155 It only contains some common methods shared with audio and video ports. 156 157 A Chameleond RPC proxy and an port_id are passed to the construction. 158 The port_id is the unique identity to the port. 159 """ 160 161 def __init__(self, chameleond_proxy, port_id): 162 """Construct a ChameleonPort. 163 164 @param chameleond_proxy: Chameleond RPC proxy object. 165 @param port_id: The ID of the input port. 166 """ 167 self.chameleond_proxy = chameleond_proxy 168 self.port_id = port_id 169 170 171 def get_connector_id(self): 172 """Returns the connector ID. 173 174 @return: A number of connector ID. 175 """ 176 return self.port_id 177 178 179 def get_connector_type(self): 180 """Returns the human readable string for the connector type. 181 182 @return: A string, like "VGA", "DVI", "HDMI", or "DP". 183 """ 184 return self.chameleond_proxy.GetConnectorType(self.port_id) 185 186 187 def has_audio_support(self): 188 """Returns if the input has audio support. 189 190 @return: True if the input has audio support; otherwise, False. 191 """ 192 return self.chameleond_proxy.HasAudioSupport(self.port_id) 193 194 195 def has_video_support(self): 196 """Returns if the input has video support. 197 198 @return: True if the input has video support; otherwise, False. 199 """ 200 return self.chameleond_proxy.HasVideoSupport(self.port_id) 201 202 203 def plug(self): 204 """Asserts HPD line to high, emulating plug.""" 205 logging.info('Plug Chameleon port %d', self.port_id) 206 self.chameleond_proxy.Plug(self.port_id) 207 208 209 def unplug(self): 210 """Deasserts HPD line to low, emulating unplug.""" 211 logging.info('Unplug Chameleon port %d', self.port_id) 212 self.chameleond_proxy.Unplug(self.port_id) 213 214 215 def set_plug(self, plug_status): 216 """Sets plug/unplug by plug_status. 217 218 @param plug_status: True to plug; False to unplug. 219 """ 220 if plug_status: 221 self.plug() 222 else: 223 self.unplug() 224 225 226 @property 227 def plugged(self): 228 """ 229 @returns True if this port is plugged to Chameleon, False otherwise. 230 231 """ 232 return self.chameleond_proxy.IsPlugged(self.port_id) 233 234 235class ChameleonVideoInput(ChameleonPort): 236 """ChameleonVideoInput is an abstraction of a video input port. 237 238 It contains some special methods to control a video input. 239 """ 240 241 def __init__(self, chameleon_port): 242 """Construct a ChameleonVideoInput. 243 244 @param chameleon_port: A general ChameleonPort object. 245 """ 246 self.chameleond_proxy = chameleon_port.chameleond_proxy 247 self.port_id = chameleon_port.port_id 248 249 250 def wait_video_input_stable(self, timeout=None): 251 """Waits the video input stable or timeout. 252 253 @param timeout: The time period to wait for. 254 255 @return: True if the video input becomes stable within the timeout 256 period; otherwise, False. 257 """ 258 return self.chameleond_proxy.WaitVideoInputStable(self.port_id, 259 timeout) 260 261 262 def read_edid(self): 263 """Reads the EDID. 264 265 @return: An Edid object. 266 """ 267 # Read EDID without verify. It may be made corrupted as intended 268 # for the test purpose. 269 return edid.Edid(self.chameleond_proxy.ReadEdid(self.port_id).data, 270 skip_verify=True) 271 272 273 def apply_edid(self, edid): 274 """Applies the given EDID. 275 276 @param edid: An Edid object. 277 """ 278 edid_id = self.chameleond_proxy.CreateEdid(xmlrpclib.Binary(edid.data)) 279 self.chameleond_proxy.ApplyEdid(self.port_id, edid_id) 280 self.chameleond_proxy.DestroyEdid(edid_id) 281 282 283 def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None, 284 repeat_count=1, end_level=1): 285 286 """Fires one or more HPD pulse (low -> high -> low -> ...). 287 288 @param deassert_interval_usec: The time in microsecond of the 289 deassert pulse. 290 @param assert_interval_usec: The time in microsecond of the 291 assert pulse. If None, then use the same value as 292 deassert_interval_usec. 293 @param repeat_count: The count of HPD pulses to fire. 294 @param end_level: HPD ends with 0 for LOW (unplugged) or 1 for 295 HIGH (plugged). 296 """ 297 self.chameleond_proxy.FireHpdPulse( 298 self.port_id, deassert_interval_usec, 299 assert_interval_usec, repeat_count, int(bool(end_level))) 300 301 302 def fire_mixed_hpd_pulses(self, widths): 303 """Fires one or more HPD pulses, starting at low, of mixed widths. 304 305 One must specify a list of segment widths in the widths argument where 306 widths[0] is the width of the first low segment, widths[1] is that of 307 the first high segment, widths[2] is that of the second low segment... 308 etc. The HPD line stops at low if even number of segment widths are 309 specified; otherwise, it stops at high. 310 311 @param widths: list of pulse segment widths in usec. 312 """ 313 self.chameleond_proxy.FireMixedHpdPulses(self.port_id, widths) 314 315 316 def capture_screen(self): 317 """Captures Chameleon framebuffer. 318 319 @return An Image object. 320 """ 321 return Image.fromstring( 322 'RGB', 323 self.get_resolution(), 324 self.chameleond_proxy.DumpPixels(self.port_id).data) 325 326 327 def get_resolution(self): 328 """Gets the source resolution. 329 330 @return: A (width, height) tuple. 331 """ 332 # The return value of RPC is converted to a list. Convert it back to 333 # a tuple. 334 return tuple(self.chameleond_proxy.DetectResolution(self.port_id)) 335 336 337class ChameleonAudioInput(ChameleonPort): 338 """ChameleonAudioInput is an abstraction of an audio input port. 339 340 It contains some special methods to control an audio input. 341 """ 342 343 def __init__(self, chameleon_port): 344 """Construct a ChameleonAudioInput. 345 346 @param chameleon_port: A general ChameleonPort object. 347 """ 348 self.chameleond_proxy = chameleon_port.chameleond_proxy 349 self.port_id = chameleon_port.port_id 350 351 352 def start_capturing_audio(self): 353 """Starts capturing audio.""" 354 return self.chameleond_proxy.StartCapturingAudio(self.port_id) 355 356 357 def stop_capturing_audio(self): 358 """Stops capturing audio. 359 360 Returns: 361 A tuple (data, format). 362 data: The captured binary data. 363 format: A dict containing: 364 file_type: 'raw' or 'wav'. 365 sample_format: 'S32_LE' for 32-bit signed integer in little-endian. 366 Refer to aplay manpage for other formats. 367 channel: channel number. 368 rate: sampling rate. 369 """ 370 rpc_data, data_format = self.chameleond_proxy.StopCapturingAudio( 371 self.port_id) 372 return rpc_data.data, data_format 373 374 375def make_chameleon_hostname(dut_hostname): 376 """Given a DUT's hostname, returns the hostname of its Chameleon. 377 378 @param dut_hostname: Hostname of a DUT. 379 380 @return Hostname of the DUT's Chameleon. 381 """ 382 host_parts = dut_hostname.split('.') 383 host_parts[0] = host_parts[0] + '-chameleon' 384 return '.'.join(host_parts) 385 386 387def create_chameleon_board(dut_hostname, args): 388 """Given either DUT's hostname or argments, creates a ChameleonBoard object. 389 390 If the DUT's hostname is in the lab zone, it connects to the Chameleon by 391 append the hostname with '-chameleon' suffix. If not, checks if the args 392 contains the key-value pair 'chameleon_host=IP'. 393 394 @param dut_hostname: Hostname of a DUT. 395 @param args: A string of arguments passed from the command line. 396 397 @return A ChameleonBoard object. 398 399 @raise ChameleonConnectionError if unknown hostname. 400 """ 401 connection = None 402 hostname = make_chameleon_hostname(dut_hostname) 403 if utils.host_is_in_lab_zone(hostname): 404 connection = ChameleonConnection(hostname) 405 else: 406 args_dict = utils.args_to_dict(args) 407 hostname = args_dict.get('chameleon_host', None) 408 port = args_dict.get('chameleon_port', CHAMELEON_PORT) 409 if hostname: 410 connection = ChameleonConnection(hostname, port) 411 else: 412 raise ChameleonConnectionError('No chameleon_host is given in args') 413 414 return ChameleonBoard(connection) 415