1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import logging
5import re
6import time
7
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import error
10
11# en-US key matrix (from "kb membrane pin matrix.pdf")
12KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
13             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
14             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
15             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
16             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
17             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
18             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
19             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
20             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
21             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
22             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
23             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
24             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
25             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
26             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
27             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
28             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
29             '<left>': (7, 12)}
30
31
32def has_ectool():
33    """Determine if ectool shell command is present.
34
35    Returns:
36        boolean true if avail, false otherwise.
37    """
38    cmd = 'which ectool'
39    return (utils.system(cmd, ignore_status=True) == 0)
40
41
42class EC_Common(object):
43    """Class for EC common.
44
45    This incredibly brief base class is intended to encapsulate common elements
46    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
47    that includes only the use of ectool.
48    """
49
50    def __init__(self, target='cros_ec'):
51        """Constructor.
52
53        @param target: target name of ec to communicate with.
54        """
55        if not has_ectool():
56            ec_info = utils.system_output("mosys ec info",
57                                          ignore_status=True)
58            logging.warning("Ectool absent on this platform ( %s )",
59                         ec_info)
60            raise error.TestNAError("Platform doesn't support ectool")
61        self._target = target
62
63    def ec_command(self, cmd, **kwargs):
64        """Executes ec command and returns results.
65
66        @param cmd: string of command to execute.
67        @param kwargs: optional params passed to utils.system_output
68
69        @returns: string of results from ec command.
70        """
71        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
72        result = utils.system_output(full_cmd, **kwargs)
73        logging.debug('Command: %s', full_cmd)
74        logging.debug('Result: %s', result)
75        return result
76
77
78class EC(EC_Common):
79    """Class for CrOS embedded controller (EC)."""
80    HELLO_RE = "EC says hello"
81    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
82    SET_FANSPEED_RE = "Fan target RPM set."
83    TEMP_SENSOR_RE = "Reading temperature...([0-9]*)"
84    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
85    # For battery, check we can see a non-zero capacity value.
86    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
87    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
88
89
90    def hello(self):
91        """Test EC hello command.
92
93        @returns True if success False otherwise.
94        """
95        response = self.ec_command('hello')
96        return (re.search(self.HELLO_RE, response) is not None)
97
98    def auto_fan_ctrl(self):
99        """Turns auto fan ctrl on.
100
101        @returns True if success False otherwise.
102        """
103        response = self.ec_command('autofanctrl')
104        logging.info('Turned on auto fan control.')
105        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
106
107    def get_fanspeed(self):
108        """Gets fanspeed.
109
110        @raises error.TestError if regexp fails to match.
111
112        @returns integer of fan speed RPM.
113        """
114        response = self.ec_command('pwmgetfanrpm')
115        match = re.search(self.GET_FANSPEED_RE, response)
116        if not match:
117            raise error.TestError('Unable to read fan speed')
118
119        rpm = int(match.group(1))
120        logging.info('Fan speed: %d', rpm)
121        return rpm
122
123    def set_fanspeed(self, rpm):
124        """Sets fan speed.
125
126        @param rpm: integer of fan speed RPM to set
127
128        @returns True if success False otherwise.
129        """
130        response = self.ec_command('pwmsetfanrpm %d' % rpm)
131        logging.info('Set fan speed: %d', rpm)
132        return (re.search(self.SET_FANSPEED_RE, response) is not None)
133
134    def get_temperature(self, idx):
135        """Gets temperature from idx sensor.
136
137        @param idx: integer of temp sensor to read.
138
139        @raises error.TestError if fails to read sensor.
140
141        @returns integer of temperature reading in degrees Kelvin.
142        """
143        response = self.ec_command('temps %d' % idx)
144        match = re.search(self.TEMP_SENSOR_RE, response)
145        if not match:
146            raise error.TestError('Unable to read temperature sensor %d' % idx)
147
148        return int(match.group(1))
149
150    def get_battery(self):
151        """Get battery presence (design capacity found).
152
153        @returns True if success False otherwise.
154        """
155        response = self.ec_command('battery')
156        return (re.search(self.BATTERY_RE, response) is not None)
157
158    def get_lightbar(self):
159        """Test lightbar.
160
161        @returns True if success False otherwise.
162        """
163        self.ec_command('lightbar on')
164        self.ec_command('lightbar init')
165        self.ec_command('lightbar 4 255 255 255')
166        response = self.ec_command('lightbar')
167        self.ec_command('lightbar off')
168        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
169
170    def key_press(self, key):
171        """Emit key down and up signal of the keyboard.
172
173        @param key: name of a key defined in KEYMATRIX.
174        """
175        self.key_down(key)
176        self.key_up(key)
177
178    def _key_action(self, key, action_type):
179        if not key in KEYMATRIX:
180            raise error.TestError('Unknown key: ' + key)
181        row, col = KEYMATRIX[key]
182        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
183
184    def key_down(self, key):
185        """Emit key down signal of the keyboard.
186
187        @param key: name of a key defined in KEYMATRIX.
188        """
189        self._key_action(key, 1)
190
191    def key_up(self, key):
192        """Emit key up signal of the keyboard.
193
194        @param key: name of a key defined in KEYMATRIX.
195        """
196        self._key_action(key, 0)
197
198
199class EC_USBPD_Port(EC_Common):
200    """Class for CrOS embedded controller for USB-PD Port.
201
202    Public attributes:
203        index: integer of USB type-C port index.
204
205    Public Methods:
206        is_dfp: Determine if data role is Downstream Facing Port (DFP).
207        is_amode_supported: Check if alternate mode is supported by port.
208        is_amode_entered: Check if alternate mode is entered.
209        set_amode: Set an alternate mode.
210
211    Private attributes:
212        _port: integer of USB type-C port id.
213        _port_info: holds usbpd protocol info.
214        _amodes: holds alternate mode info.
215
216    Private methods:
217        _invalidate_port_data: Remove port data to force re-eval.
218        _get_port_info: Get USB-PD port info.
219        _get_amodes: parse and return port's svid info.
220    """
221    def __init__(self, index):
222        """Constructor.
223
224        @param index: integer of USB type-C port index.
225        """
226        self.index = index
227        # TODO(crosbug.com/p/38133) target= only works for samus
228        super(EC_USBPD_Port, self).__init__(target='cros_pd')
229
230        # Interrogate port at instantiation.  Use invalidate to force re-eval.
231        self._port_info = self._get_port_info()
232        self._amodes = self._get_amodes()
233
234    def _invalidate_port_data(self):
235        """Remove port data to force re-eval."""
236        self._port_info = None
237        self._amodes = None
238
239    def _get_port_info(self):
240        """Get USB-PD port info.
241
242        ectool command usbpd provides the following information about the port:
243          - Enabled/Disabled
244          - Power & Data Role
245          - Polarity
246          - Protocol State
247
248        At time of authoring it looks like:
249          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
250
251        @raises error.TestError if ...
252          port info not parseable.
253
254        @returns dictionary for <port> with keyval pairs:
255          enabled: True | False | None
256          power_role: sink | source | None
257          data_role: UFP | DFP | None
258          is_reversed: True | False | None
259          state: various strings | None
260        """
261        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
262                       'Polarity:CC(\d+)\s+State:(\w+)'
263
264        match = re.search(PORT_INFO_RE,
265                          self.ec_command("usbpd %s" % (self.index)))
266        if not match or int(match.group(1)) != self.index:
267            error.TestError('Unable to determine port %d info' % self.index)
268
269        pinfo = dict(enabled=None, power_role=None, data_role=None,
270                    is_reversed=None, state=None)
271        pinfo['enabled'] = match.group(2) == 'enabled'
272        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
273        pinfo['data_role'] = match.group(4)
274        pinfo['is_reversed'] = True if match.group(5) == '2' else False
275        pinfo['state'] = match.group(6)
276        logging.debug('port_info = %s', pinfo)
277        return pinfo
278
279    def _get_amodes(self):
280        """Parse alternate modes from pdgetmode.
281
282        Looks like ...
283          *SVID:0xff01 *0x00000485  0x00000000 ...
284          SVID:0x18d1   0x00000001  0x00000000 ...
285
286        @returns dictionary of format:
287          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
288            where:
289              <svid>        : USB-IF Standard or vendor id as
290                              hex string (i.e. 0xff01)
291              <config_list> : list of uint32_t configs
292              <opos>        : integer of active object position.
293                              Note, this is the config list index + 1
294        """
295        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
296        svids = dict()
297        cmd = 'pdgetmode %d' % self.index
298        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
299            if line.strip() == '':
300                continue
301            logging.debug('pdgetmode line: %s', line)
302            match = re.search(SVID_RE, line)
303            if not match:
304                logging.warning("Unable to parse SVID line %s", line)
305                continue
306            active = match.group(1) == '*'
307            svid = match.group(2)
308            configs_str = match.group(3)
309            configs = list()
310            opos = None
311            for i,config in enumerate(configs_str.split(), 1):
312                if config.startswith('*'):
313                    opos = i
314                    config = config[1:]
315                config = int(config, 16)
316                # ignore unpopulated configs
317                if config == 0:
318                    continue
319                configs.append(config)
320            svids[svid] = dict(active=active, configs=configs, opos=opos)
321
322        logging.debug("Port %d svids = %s", self.index, svids)
323        return svids
324
325    def is_dfp(self):
326        """Determine if data role is Downstream Facing Port (DFP).
327
328        @returns True if DFP False otherwise.
329        """
330        if self._port_info is None:
331            self._port_info = self._get_port_info()
332
333        return self._port_info['data_role'] == 'DFP'
334
335    def is_amode_supported(self, svid):
336        """Check if alternate mode is supported by port partner.
337
338        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
339        """
340        if self._amodes is None:
341            self._amodes = self._get_amodes()
342
343        if svid in self._amodes.keys():
344            return True
345        return False
346
347    def is_amode_entered(self, svid, opos):
348        """Check if alternate mode is entered.
349
350        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
351        @param opos: object position of config to act on.
352
353        @returns True if entered False otherwise
354        """
355        if self._amodes is None:
356            self._amodes = self._get_amodes()
357
358        if not self.is_amode_supported(svid):
359            return False
360
361        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
362            return True
363
364        return False
365
366    def set_amode(self, svid, opos, enter, delay_secs=2):
367        """Set alternate mode.
368
369        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
370        @param opos: object position of config to act on.
371        @param enter: Boolean of whether to enter mode.
372
373        @raises error.TestError if ...
374           mode not supported.
375           opos is > number of configs.
376
377        @returns True if successful False otherwise
378        """
379        if self._amodes is None:
380            self._amodes = self._get_amodes()
381
382        if svid not in self._amodes.keys():
383            raise error.TestError("SVID %s not supported", svid)
384
385        if opos > len(self._amodes[svid]['configs']):
386            raise error.TestError("opos > available configs")
387
388        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
389                                         1 if enter else 0)
390        self.ec_command(cmd, ignore_status=True)
391        self._invalidate_port_data()
392
393        # allow some time for mode entry/exit
394        time.sleep(delay_secs)
395        return self.is_amode_entered(svid, opos) == enter
396
397    def get_flash_info(self):
398        mat0_re = r'has no discovered device'
399        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
400        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
401        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
402                                    'dev_minor', 'rw_hash', 'image_status'])
403
404        cmd = 'infopddev %d' % self.index
405
406        tries = 3
407        while (tries):
408            res = self.ec_command(cmd, ignore_status=True)
409            if not 'has no discovered device' in res:
410                break
411
412            tries -= 1
413            time.sleep(1)
414
415        for ln in res.split('\n'):
416            mat1 = re.match(mat1_re, ln)
417            if mat1:
418                flash_dict['ptype'] = int(mat1.group(1))
419                flash_dict['vid'] = mat1.group(2)
420                flash_dict['pid'] = mat1.group(3)
421                continue
422
423            mat2 = re.match(mat2_re, ln)
424            if mat2:
425                flash_dict['dev_major'] = int(mat2.group(1))
426                flash_dict['dev_minor'] = int(mat2.group(2))
427                flash_dict['rw_hash'] = mat2.group(3)
428                flash_dict['image_status'] = mat2.group(4)
429                break
430
431        return flash_dict
432
433
434class EC_USBPD(EC_Common):
435    """Class for CrOS embedded controller for USB-PD.
436
437    Public attributes:
438        ports: list EC_USBPD_Port instances
439
440    Public Methods:
441        get_num_ports: get number of USB-PD ports device has.
442
443    Private attributes:
444        _num_ports: integer number of USB-PD ports device has.
445    """
446    def __init__(self, num_ports=None):
447        """Constructor.
448
449        @param num_ports: total number of USB-PD ports on device.  This is an
450          override.  If left 'None' will try to determine.
451        """
452        self._num_ports = num_ports
453        self.ports = list()
454
455        # TODO(crosbug.com/p/38133) target= only works for samus
456        super(EC_USBPD, self).__init__(target='cros_pd')
457
458        if (self.get_num_ports() == 0):
459            raise error.TestNAError("Device has no USB-PD ports")
460
461        for i in xrange(self._num_ports):
462            self.ports.append(EC_USBPD_Port(i))
463
464    def get_num_ports(self):
465        """Determine the number of ports for device.
466
467        Uses ectool's usbpdpower command which in turn makes host command call
468        to EC_CMD_USB_PD_PORTS to determine the number of ports.
469
470        TODO(tbroch) May want to consider adding separate ectool command to
471        surface the number of ports directly instead of via usbpdpower
472
473        @returns number of ports.
474        """
475        if (self._num_ports is not None):
476            return self._num_ports
477
478        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
479        return self._num_ports
480