1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Expects to be run in an environment with sudo and no interactive password
6# prompt, such as within the Chromium OS development chroot.
7
8import os
9
10import logging, re, time, xmlrpclib
11
12from autotest_lib.client.common_lib import error
13from autotest_lib.server.cros.servo import firmware_programmer
14
15
16class _PowerStateController(object):
17
18    """Class to provide board-specific power operations.
19
20    This class is responsible for "power on" and "power off"
21    operations that can operate without making assumptions in
22    advance about board state.  It offers an interface that
23    abstracts out the different sequences required for different
24    board types.
25
26    """
27
28    # Constants acceptable to be passed for the `rec_mode` parameter
29    # to power_on().
30    #
31    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
32    #   SD card.
33    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
34
35    REC_ON = 'rec'
36    REC_OFF = 'on'
37
38    # Delay in seconds needed between asserting and de-asserting
39    # warm reset.
40    _RESET_HOLD_TIME = 0.5
41
42    def __init__(self, servo):
43        """Initialize the power state control.
44
45        @param servo Servo object providing the underlying `set` and `get`
46                     methods for the target controls.
47
48        """
49        self._servo = servo
50
51    def reset(self):
52        """Force the DUT to reset.
53
54        The DUT is guaranteed to be on at the end of this call,
55        regardless of its previous state, provided that there is
56        working OS software. This also guarantees that the EC has
57        been restarted.
58
59        """
60        self._servo.set_nocheck('power_state', 'reset')
61
62    def warm_reset(self):
63        """Apply warm reset to the DUT.
64
65        This asserts, then de-asserts the 'warm_reset' signal.
66        Generally, this causes the board to restart.
67
68        """
69        self._servo.set_get_all(['warm_reset:on',
70                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
71                                 'warm_reset:off'])
72
73    def power_off(self):
74        """Force the DUT to power off.
75
76        The DUT is guaranteed to be off at the end of this call,
77        regardless of its previous state, provided that there is
78        working EC and boot firmware.  There is no requirement for
79        working OS software.
80
81        """
82        self._servo.set_nocheck('power_state', 'off')
83
84    def power_on(self, rec_mode=REC_OFF):
85        """Force the DUT to power on.
86
87        Prior to calling this function, the DUT must be powered off,
88        e.g. with a call to `power_off()`.
89
90        At power on, recovery mode is set as specified by the
91        corresponding argument.  When booting with recovery mode on, it
92        is the caller's responsibility to unplug/plug in a bootable
93        external storage device.
94
95        If the DUT requires a delay after powering on but before
96        processing inputs such as USB stick insertion, the delay is
97        handled by this method; the caller is not responsible for such
98        delays.
99
100        @param rec_mode Setting of recovery mode to be applied at
101                        power on. default: REC_OFF aka 'off'
102
103        """
104        self._servo.set_nocheck('power_state', rec_mode)
105
106
107class Servo(object):
108
109    """Manages control of a Servo board.
110
111    Servo is a board developed by hardware group to aide in the debug and
112    control of various partner devices. Servo's features include the simulation
113    of pressing the power button, closing the lid, and pressing Ctrl-d. This
114    class manages setting up and communicating with a servo demon (servod)
115    process. It provides both high-level functions for common servo tasks and
116    low-level functions for directly setting and reading gpios.
117
118    """
119
120    # Power button press delays in seconds.
121    #
122    # The EC specification says that 8.0 seconds should be enough
123    # for the long power press.  However, some platforms need a bit
124    # more time.  Empirical testing has found these requirements:
125    #   Alex: 8.2 seconds
126    #   ZGB:  8.5 seconds
127    # The actual value is set to the largest known necessary value.
128    #
129    # TODO(jrbarnette) Being generous is the right thing to do for
130    # existing platforms, but if this code is to be used for
131    # qualification of new hardware, we should be less generous.
132    SHORT_DELAY = 0.1
133
134    # Maximum number of times to re-read power button on release.
135    GET_RETRY_MAX = 10
136
137    # Delays to deal with DUT state transitions.
138    SLEEP_DELAY = 6
139    BOOT_DELAY = 10
140
141    # Default minimum time interval between 'press' and 'release'
142    # keyboard events.
143    SERVO_KEY_PRESS_DELAY = 0.1
144
145    # Time to toggle recovery switch on and off.
146    REC_TOGGLE_DELAY = 0.1
147
148    # Time to toggle development switch on and off.
149    DEV_TOGGLE_DELAY = 0.1
150
151    # Time between an usb disk plugged-in and detected in the system.
152    USB_DETECTION_DELAY = 10
153    # Time to keep USB power off before and after USB mux direction is changed
154    USB_POWEROFF_DELAY = 2
155
156    # Time to wait before timing out on servo initialization.
157    INIT_TIMEOUT_SECS = 10
158
159
160    def __init__(self, servo_host, servo_serial=None):
161        """Sets up the servo communication infrastructure.
162
163        @param servo_host: A ServoHost object representing
164                           the host running servod.
165        @param servo_serial: Serial number of the servo board.
166        """
167        # TODO(fdeng): crbug.com/298379
168        # We should move servo_host object out of servo object
169        # to minimize the dependencies on the rest of Autotest.
170        self._servo_host = servo_host
171        self._servo_serial = servo_serial
172        self._server = servo_host.get_servod_server_proxy()
173        self._power_state = _PowerStateController(self)
174        self._usb_state = None
175        self._programmer = None
176
177
178    @property
179    def servo_serial(self):
180        """Returns the serial number of the servo board."""
181        return self._servo_serial
182
183
184    def get_power_state_controller(self):
185        """Return the power state controller for this Servo.
186
187        The power state controller provides board-independent
188        interfaces for reset, power-on, power-off operations.
189
190        """
191        return self._power_state
192
193
194    def initialize_dut(self, cold_reset=False):
195        """Initializes a dut for testing purposes.
196
197        This sets various servo signals back to default values
198        appropriate for the target board.  By default, if the DUT
199        is already on, it stays on.  If the DUT is powered off
200        before initialization, its state afterward is unspecified.
201
202        Rationale:  Basic initialization of servo sets the lid open,
203        when there is a lid.  This operation won't affect powered on
204        units; however, setting the lid open may power on a unit
205        that's off, depending on the board type and previous state
206        of the device.
207
208        If `cold_reset` is a true value, the DUT and its EC will be
209        reset, and the DUT rebooted in normal mode.
210
211        @param cold_reset If True, cold reset the device after
212                          initialization.
213        """
214        self._server.hwinit()
215        # Workaround for bug chrome-os-partner:42349. Without this check, the
216        # gpio will briefly pulse low if we set it from high to high.
217        if self.get('dut_hub_pwren') != 'on':
218            self.set('dut_hub_pwren', 'on')
219        self.set('usb_mux_oe1', 'on')
220        self._usb_state = None
221        self.switch_usbkey('off')
222        if cold_reset:
223            self._power_state.reset()
224        logging.debug('Servo initialized, version is %s',
225                      self._server.get_version())
226
227
228    def is_localhost(self):
229        """Is the servod hosted locally?
230
231        Returns:
232          True if local hosted; otherwise, False.
233        """
234        return self._servo_host.is_localhost()
235
236
237    def power_long_press(self):
238        """Simulate a long power button press."""
239        # After a long power press, the EC may ignore the next power
240        # button press (at least on Alex).  To guarantee that this
241        # won't happen, we need to allow the EC one second to
242        # collect itself.
243        self._server.power_long_press()
244
245
246    def power_normal_press(self):
247        """Simulate a normal power button press."""
248        self._server.power_normal_press()
249
250
251    def power_short_press(self):
252        """Simulate a short power button press."""
253        self._server.power_short_press()
254
255
256    def power_key(self, press_secs=''):
257        """Simulate a power button press.
258
259        @param press_secs : Str. Time to press key.
260        """
261        self._server.power_key(press_secs)
262
263
264    def lid_open(self):
265        """Simulate opening the lid and raise exception if all attempts fail"""
266        self.set('lid_open', 'yes')
267
268
269    def lid_close(self):
270        """Simulate closing the lid and raise exception if all attempts fail
271
272        Waits 6 seconds to ensure the device is fully asleep before returning.
273        """
274        self.set('lid_open', 'no')
275        time.sleep(Servo.SLEEP_DELAY)
276
277    def volume_up(self, timeout=300):
278        """Simulate pushing the volume down button"""
279        self.set_get_all(['volume_up:yes',
280                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
281                          'volume_up:no'])
282        # we need to wait for commands to take effect before moving on
283        time_left = float(timeout)
284        while time_left > 0.0:
285            value = self.get('volume_up')
286            if value == 'no':
287                return
288            time.sleep(self.SHORT_DELAY)
289            time_left = time_left - self.SHORT_DELAY
290        raise error.TestFail("Failed setting volume_up to no")
291
292    def volume_down(self, timeout=300):
293        """Simulate pushing the volume down button"""
294        self.set_get_all(['volume_down:yes',
295                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
296                          'volume_down:no'])
297        # we need to wait for commands to take effect before moving on
298        time_left = float(timeout)
299        while time_left > 0.0:
300            value = self.get('volume_down')
301            if value == 'no':
302                return
303            time.sleep(self.SHORT_DELAY)
304            time_left = time_left - self.SHORT_DELAY
305        raise error.TestFail("Failed setting volume_down to no")
306
307    def ctrl_d(self, press_secs=''):
308        """Simulate Ctrl-d simultaneous button presses.
309
310        @param press_secs : Str. Time to press key.
311        """
312        self._server.ctrl_d(press_secs)
313
314
315    def ctrl_u(self):
316        """Simulate Ctrl-u simultaneous button presses.
317
318        @param press_secs : Str. Time to press key.
319        """
320        self._server.ctrl_u()
321
322
323    def ctrl_enter(self, press_secs=''):
324        """Simulate Ctrl-enter simultaneous button presses.
325
326        @param press_secs : Str. Time to press key.
327        """
328        self._server.ctrl_enter(press_secs)
329
330
331    def d_key(self, press_secs=''):
332        """Simulate Enter key button press.
333
334        @param press_secs : Str. Time to press key.
335        """
336        self._server.d_key(press_secs)
337
338
339    def ctrl_key(self, press_secs=''):
340        """Simulate Enter key button press.
341
342        @param press_secs : Str. Time to press key.
343        """
344        self._server.ctrl_key(press_secs)
345
346
347    def enter_key(self, press_secs=''):
348        """Simulate Enter key button press.
349
350        @param press_secs : Str. Time to press key.
351        """
352        self._server.enter_key(press_secs)
353
354
355    def refresh_key(self, press_secs=''):
356        """Simulate Refresh key (F3) button press.
357
358        @param press_secs : Str. Time to press key.
359        """
360        self._server.refresh_key(press_secs)
361
362
363    def ctrl_refresh_key(self, press_secs=''):
364        """Simulate Ctrl and Refresh (F3) simultaneous press.
365
366        This key combination is an alternative of Space key.
367
368        @param press_secs : Str. Time to press key.
369        """
370        self._server.ctrl_refresh_key(press_secs)
371
372
373    def imaginary_key(self, press_secs=''):
374        """Simulate imaginary key button press.
375
376        Maps to a key that doesn't physically exist.
377
378        @param press_secs : Str. Time to press key.
379        """
380        self._server.imaginary_key(press_secs)
381
382
383    def toggle_recovery_switch(self):
384        """Toggle recovery switch on and off."""
385        self.enable_recovery_mode()
386        time.sleep(self.REC_TOGGLE_DELAY)
387        self.disable_recovery_mode()
388
389
390    def enable_recovery_mode(self):
391        """Enable recovery mode on device."""
392        self.set('rec_mode', 'on')
393
394
395    def disable_recovery_mode(self):
396        """Disable recovery mode on device."""
397        self.set('rec_mode', 'off')
398
399
400    def toggle_development_switch(self):
401        """Toggle development switch on and off."""
402        self.enable_development_mode()
403        time.sleep(self.DEV_TOGGLE_DELAY)
404        self.disable_development_mode()
405
406
407    def enable_development_mode(self):
408        """Enable development mode on device."""
409        self.set('dev_mode', 'on')
410
411
412    def disable_development_mode(self):
413        """Disable development mode on device."""
414        self.set('dev_mode', 'off')
415
416    def boot_devmode(self):
417        """Boot a dev-mode device that is powered off."""
418        self.power_short_press()
419        self.pass_devmode()
420
421
422    def pass_devmode(self):
423        """Pass through boot screens in dev-mode."""
424        time.sleep(Servo.BOOT_DELAY)
425        self.ctrl_d()
426        time.sleep(Servo.BOOT_DELAY)
427
428
429    def get_board(self):
430        """Get the board connected to servod.
431
432        """
433        return self._server.get_board()
434
435
436    def _get_xmlrpclib_exception(self, xmlexc):
437        """Get meaningful exception string from xmlrpc.
438
439        Args:
440            xmlexc: xmlrpclib.Fault object
441
442        xmlrpclib.Fault.faultString has the following format:
443
444        <type 'exception type'>:'actual error message'
445
446        Parse and return the real exception from the servod side instead of the
447        less meaningful one like,
448           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
449           attribute 'hw_driver'">
450
451        Returns:
452            string of underlying exception raised in servod.
453        """
454        return re.sub('^.*>:', '', xmlexc.faultString)
455
456
457    def get(self, gpio_name):
458        """Get the value of a gpio from Servod.
459
460        @param gpio_name Name of the gpio.
461        """
462        assert gpio_name
463        try:
464            return self._server.get(gpio_name)
465        except  xmlrpclib.Fault as e:
466            err_msg = "Getting '%s' :: %s" % \
467                (gpio_name, self._get_xmlrpclib_exception(e))
468            raise error.TestFail(err_msg)
469
470
471    def set(self, gpio_name, gpio_value):
472        """Set and check the value of a gpio using Servod.
473
474        @param gpio_name Name of the gpio.
475        @param gpio_value New setting for the gpio.
476        """
477        self.set_nocheck(gpio_name, gpio_value)
478        retry_count = Servo.GET_RETRY_MAX
479        while gpio_value != self.get(gpio_name) and retry_count:
480            logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
481                         retry_count)
482            retry_count -= 1
483            time.sleep(Servo.SHORT_DELAY)
484        if not retry_count:
485            assert gpio_value == self.get(gpio_name), \
486                'Servo failed to set %s to %s' % (gpio_name, gpio_value)
487
488
489    def set_nocheck(self, gpio_name, gpio_value):
490        """Set the value of a gpio using Servod.
491
492        @param gpio_name Name of the gpio.
493        @param gpio_value New setting for the gpio.
494        """
495        assert gpio_name and gpio_value
496        logging.info('Setting %s to %s', gpio_name, gpio_value)
497        try:
498            self._server.set(gpio_name, gpio_value)
499        except  xmlrpclib.Fault as e:
500            err_msg = "Setting '%s' to '%s' :: %s" % \
501                (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
502            raise error.TestFail(err_msg)
503
504
505    def set_get_all(self, controls):
506        """Set &| get one or more control values.
507
508        @param controls: list of strings, controls to set &| get.
509
510        @raise: error.TestError in case error occurs setting/getting values.
511        """
512        rv = []
513        try:
514            logging.info('Set/get all: %s', str(controls))
515            rv = self._server.set_get_all(controls)
516        except xmlrpclib.Fault as e:
517            # TODO(waihong): Remove the following backward compatibility when
518            # the new versions of hdctools are deployed.
519            if 'not supported' in str(e):
520                logging.warning('The servod is too old that set_get_all '
521                        'not supported. Use set and get instead.')
522                for control in controls:
523                    if ':' in control:
524                        (name, value) = control.split(':')
525                        if name == 'sleep':
526                            time.sleep(float(value))
527                        else:
528                            self.set_nocheck(name, value)
529                        rv.append(True)
530                    else:
531                        rv.append(self.get(name))
532            else:
533                err_msg = "Problem with '%s' :: %s" % \
534                    (controls, self._get_xmlrpclib_exception(e))
535                raise error.TestFail(err_msg)
536        return rv
537
538
539    # TODO(waihong) It may fail if multiple servo's are connected to the same
540    # host. Should look for a better way, like the USB serial name, to identify
541    # the USB device.
542    # TODO(sbasi) Remove this code from autoserv once firmware tests have been
543    # updated.
544    def probe_host_usb_dev(self):
545        """Probe the USB disk device plugged-in the servo from the host side.
546
547        It tries to switch the USB mux to make the host unable to see the
548        USB disk and compares the result difference.
549
550        Returns:
551          A string of USB disk path, like '/dev/sdb', or None if not existed.
552        """
553        cmd = 'ls /dev/sd[a-z]'
554        original_value = self.get_usbkey_direction()
555
556        # Make the host unable to see the USB disk.
557        self.switch_usbkey('off')
558        no_usb_set = set(self.system_output(cmd, ignore_status=True).split())
559
560        # Make the host able to see the USB disk.
561        self.switch_usbkey('host')
562        has_usb_set = set(self.system_output(cmd, ignore_status=True).split())
563
564        # Back to its original value.
565        if original_value != self.get_usbkey_direction():
566            self.switch_usbkey(original_value)
567
568        diff_set = has_usb_set - no_usb_set
569        if len(diff_set) == 1:
570            return diff_set.pop()
571        else:
572            return None
573
574
575    def image_to_servo_usb(self, image_path=None,
576                           make_image_noninteractive=False):
577        """Install an image to the USB key plugged into the servo.
578
579        This method may copy any image to the servo USB key including a
580        recovery image or a test image.  These images are frequently used
581        for test purposes such as restoring a corrupted image or conducting
582        an upgrade of ec/fw/kernel as part of a test of a specific image part.
583
584        @param image_path Path on the host to the recovery image.
585        @param make_image_noninteractive Make the recovery image
586                                   noninteractive, therefore the DUT
587                                   will reboot automatically after
588                                   installation.
589        """
590        # We're about to start plugging/unplugging the USB key.  We
591        # don't know the state of the DUT, or what it might choose
592        # to do to the device after hotplug.  To avoid surprises,
593        # force the DUT to be off.
594        self._server.hwinit()
595        self._power_state.power_off()
596
597        # Set up Servo's usb mux.
598        self.switch_usbkey('host')
599        if image_path:
600            logging.info('Searching for usb device and copying image to it. '
601                         'Please wait a few minutes...')
602            if not self._server.download_image_to_usb(image_path):
603                logging.error('Failed to transfer requested image to USB. '
604                              'Please take a look at Servo Logs.')
605                raise error.AutotestError('Download image to usb failed.')
606            if make_image_noninteractive:
607                logging.info('Making image noninteractive')
608                if not self._server.make_image_noninteractive():
609                    logging.error('Failed to make image noninteractive. '
610                                  'Please take a look at Servo Logs.')
611
612
613    def install_recovery_image(self, image_path=None,
614                               make_image_noninteractive=False):
615        """Install the recovery image specied by the path onto the DUT.
616
617        This method uses google recovery mode to install a recovery image
618        onto a DUT through the use of a USB stick that is mounted on a servo
619        board specified by the usb_dev.  If no image path is specified
620        we use the recovery image already on the usb image.
621
622        @param image_path Path on the host to the recovery image.
623        @param make_image_noninteractive Make the recovery image
624                                         noninteractive, therefore
625                                         the DUT will reboot
626                                         automatically after
627                                         installation.
628        """
629        self.image_to_servo_usb(image_path, make_image_noninteractive)
630        self._power_state.power_on(rec_mode=self._power_state.REC_ON)
631        self.switch_usbkey('dut')
632
633
634    def _scp_image(self, image_path):
635        """Copy image to the servo host.
636
637        When programming a firmware image on the DUT, the image must be
638        located on the host to which the servo device is connected. Sometimes
639        servo is controlled by a remote host, in this case the image needs to
640        be transferred to the remote host.
641
642        @param image_path: a string, name of the firmware image file to be
643               transferred.
644        @return: a string, full path name of the copied file on the remote.
645        """
646
647        dest_path = os.path.join('/tmp', os.path.basename(image_path))
648        self._servo_host.send_file(image_path, dest_path)
649        return dest_path
650
651
652    def system(self, command, timeout=3600):
653        """Execute the passed in command on the servod host.
654
655        @param command Command to be executed.
656        @param timeout Maximum number of seconds of runtime allowed. Default to
657                       1 hour.
658        """
659        logging.info('Will execute on servo host: %s', command)
660        self._servo_host.run(command, timeout=timeout)
661
662
663    def system_output(self, command, timeout=3600,
664                      ignore_status=False, args=()):
665        """Execute the passed in command on the servod host, return stdout.
666
667        @param command a string, the command to execute
668        @param timeout an int, max number of seconds to wait til command
669               execution completes. Default to 1 hour.
670        @param ignore_status a Boolean, if true - ignore command's nonzero exit
671               status, otherwise an exception will be thrown
672        @param args a tuple of strings, each becoming a separate command line
673               parameter for the command
674        @return command's stdout as a string.
675        """
676        return self._servo_host.run(command, timeout=timeout,
677                                    ignore_status=ignore_status,
678                                    args=args).stdout.strip()
679
680
681    def get_servo_version(self):
682        """Get the version of the servo, e.g., servo_v2 or servo_v3.
683
684        @return: The version of the servo.
685
686        """
687        return self._server.get_version()
688
689
690    def _initialize_programmer(self, rw_only=False):
691        """Initialize the firmware programmer.
692
693        @param rw_only: True to initialize a programmer which only
694                        programs the RW portions.
695        """
696        if self._programmer:
697            return
698        # Initialize firmware programmer
699        servo_version = self.get_servo_version()
700        if servo_version.startswith('servo_v2'):
701            self._programmer = firmware_programmer.ProgrammerV2(self)
702        elif servo_version.startswith('servo_v3'):
703            self._programmer = firmware_programmer.ProgrammerV3(self)
704            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
705        else:
706            raise error.TestError(
707                    'No firmware programmer for servo version: %s' %
708                         servo_version)
709
710
711    def program_bios(self, image, rw_only=False):
712        """Program bios on DUT with given image.
713
714        @param image: a string, file name of the BIOS image to program
715                      on the DUT.
716        @param rw_only: True to only program the RW portion of BIOS.
717
718        """
719        self._initialize_programmer()
720        if not self.is_localhost():
721            image = self._scp_image(image)
722        if rw_only:
723            self._programmer_rw.program_bios(image)
724        else:
725            self._programmer.program_bios(image)
726
727
728    def program_ec(self, image, rw_only=False):
729        """Program ec on DUT with given image.
730
731        @param image: a string, file name of the EC image to program
732                      on the DUT.
733        @param rw_only: True to only program the RW portion of EC.
734
735        """
736        self._initialize_programmer()
737        if not self.is_localhost():
738            image = self._scp_image(image)
739        if rw_only:
740           self._programmer_rw.program_ec(image)
741        else:
742           self._programmer.program_ec(image)
743
744
745    def _switch_usbkey_power(self, power_state, detection_delay=False):
746        """Switch usbkey power.
747
748        This function switches usbkey power by setting the value of
749        'prtctl4_pwren'. If the power is already in the
750        requested state, this function simply returns.
751
752        @param power_state: A string, 'on' or 'off'.
753        @param detection_delay: A boolean value, if True, sleep
754                                for |USB_DETECTION_DELAY| after switching
755                                the power on.
756        """
757        self.set('prtctl4_pwren', power_state)
758        if power_state == 'off':
759            time.sleep(self.USB_POWEROFF_DELAY)
760        elif detection_delay:
761            time.sleep(self.USB_DETECTION_DELAY)
762
763
764    def switch_usbkey(self, usb_state):
765        """Connect USB flash stick to either host or DUT, or turn USB port off.
766
767        This function switches the servo multiplexer to provide electrical
768        connection between the USB port J3 and either host or DUT side. It
769        can also be used to turn the USB port off.
770
771        Switching to 'dut' or 'host' is accompanied by powercycling
772        of the USB stick, because it sometimes gets wedged if the mux
773        is switched while the stick power is on.
774
775        @param usb_state: A string, one of 'dut', 'host', or 'off'.
776                          'dut' and 'host' indicate which side the
777                          USB flash device is required to be connected to.
778                          'off' indicates turning the USB port off.
779
780        @raise: error.TestError in case the parameter is not 'dut'
781                'host', or 'off'.
782        """
783        if self.get_usbkey_direction() == usb_state:
784            return
785
786        if usb_state == 'off':
787            self._switch_usbkey_power('off')
788            self._usb_state = usb_state
789            return
790        elif usb_state == 'host':
791            mux_direction = 'servo_sees_usbkey'
792        elif usb_state == 'dut':
793            mux_direction = 'dut_sees_usbkey'
794        else:
795            raise error.TestError('Unknown USB state request: %s' % usb_state)
796
797        self._switch_usbkey_power('off')
798        self.set('usb_mux_sel1', mux_direction)
799        time.sleep(self.USB_POWEROFF_DELAY)
800        self._switch_usbkey_power('on', usb_state == 'host')
801        self._usb_state = usb_state
802
803
804    def get_usbkey_direction(self):
805        """Get which side USB is connected to or 'off' if usb power is off.
806
807        @return: A string, one of 'dut', 'host', or 'off'.
808        """
809        if not self._usb_state:
810            if self.get('prtctl4_pwren') == 'off':
811                self._usb_state = 'off'
812            elif self.get('usb_mux_sel1').startswith('dut'):
813                self._usb_state = 'dut'
814            else:
815                self._usb_state = 'host'
816        return self._usb_state
817