1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A Python library to interact with TPM module for testing.
6
7Background
8 - TPM stands for Trusted Platform Module, a piece of security device
9 - TPM specification is the work of Trusted Computing Group
10 - As of September 2011, the current TPM specification is version 1.2
11
12Dependency
13 - This library depends on a C shared library called "libtspi.so", which
14   contains a set of APIs for interacting with TPM module
15
16Notes:
17 - An exception is raised if it doesn't make logical sense to continue program
18   flow (e.g. I/O error prevents test case from executing)
19 - An exception is caught and then converted to an error code if the caller
20   expects to check for error code per API definition
21"""
22
23import datetime, logging
24
25from autotest_lib.client.common_lib import smogcheck_ttci
26# Use explicit import to make code more readable
27from ctypes import c_uint, c_uint32, cdll, c_bool, Structure, POINTER, \
28    c_ubyte, c_byte, byref, c_uint16, cast, create_string_buffer, c_uint64, \
29    c_char_p, addressof, c_char, pointer
30
31# TPM flags
32# TODO(tgao): possible to import from trousers/src/include/tss/tss_defines.h?
33TSS_KEY_AUTHORIZATION = c_uint32(0x00000001)
34TSS_KEY_TSP_SRK = c_uint32(0x04000000)
35TSS_POLICY_USAGE = c_uint32(0x00000001)
36TSS_OBJECT_TYPE_RSAKEY = c_uint(0x02)
37TSS_SECRET_MODE_SHA1 = c_uint32(0x00001000)
38TSS_SECRET_MODE_PLAIN = c_uint32(0x00001800)
39TSS_TPMCAP_PROP_MANUFACTURER = c_uint(0x12)
40TSS_TPMCAP_PROPERTY = c_uint(0x13)
41TSS_TPMCAP_VERSION = c_uint(0x14)
42TSS_TPMCAP_VERSION_VAL = c_uint32(0x15)
43TSS_TPMSTATUS_DISABLEOWNERCLEAR = c_uint32(0x00000001)
44TSS_TPMSTATUS_DISABLEFORCECLEAR = c_uint32(0x00000002)
45TSS_TPMSTATUS_PHYSICALSETDEACTIVATED = c_uint32(0x00000010)
46TSS_TPMSTATUS_SETTEMPDEACTIVATED = c_uint32(0x00000011)
47
48# TODO(tgao): possible to import from trousers/src/include/tss/tpm.h?
49TPM_SHA1_160_HASH_LEN = c_uint(0x14)
50
51# Path to TSPI shared library.
52TSPI_C_LIB = "/usr/lib/libtspi.so.1"
53
54# Valid operation of tpmSetActive(). Equivalent CLI commands:
55# 'status' = tpm_setactive --well-known --status
56# 'activate' = tpm_setactive --well-known --active
57# 'deactivate' = tpm_setactive --well-known --inactive
58# 'temp' = tpm_setactive --well-known --temp
59TPM_SETACTIVE_OP = ['status', 'activate', 'deactivate', 'temp']
60
61# Valid operation of tpmSetClearable(). Equivalent CLI commands:
62# 'status' = tpm_setclearable --well-known --status
63# 'owner' = tpm_setclearable --well-known --owner
64# 'force' = tpm_setclearable --well-known --force
65TPM_SETCLEARABLE_OP = ['status', 'owner', 'force']
66
67# Secret mode for setPolicySecret()
68TSS_SECRET_MODE = dict(sha1=TSS_SECRET_MODE_SHA1,
69                       plain=TSS_SECRET_MODE_PLAIN)
70
71
72class SmogcheckError(Exception):
73    """Base class for all smogcheck API errors."""
74
75
76class TpmVersion(Structure):
77    """Defines TPM version string struct.
78
79    Declared in tss/tpm.h and named TPM_VERSION.
80    """
81    _fields_ = [('major', c_ubyte),
82                ('minor', c_ubyte),
83                ('revMajor', c_ubyte),
84                ('revMinor', c_ubyte)]
85
86
87class TpmCapVersionInfo(Structure):
88    """Defines TPM version info struct.
89
90    Declared in tss/tpm.h and named TPM_CAP_VERSION_INFO.
91    """
92    _fields_ = [('tag', c_uint16),
93                ('version', TpmVersion),
94                ('specLevel', c_uint16),
95                ('errataRev', c_ubyte),
96                ('tpmVendorID', c_char*4),
97                ('vendorSpecific', POINTER(c_ubyte))]
98
99
100def InitVersionInfo(vi):
101    """Utility method to allocate memory for TPM version info.
102
103    Args:
104      vi: a TpmCapVerisonInfo object, just created.
105    """
106    vi.tpmVendorId = create_string_buffer(4)  # Allocate 4 bytes
107    vendorDetail = create_string_buffer(64)   # Allocate 64 bytes
108    vi.vendorSpecific = cast(pointer(vendorDetail), POINTER(c_ubyte))
109
110
111def PrintVersionInfo(vi):
112    """Utility method to print TPM version info.
113
114    Args:
115      vi: a TpmCapVerisonInfo object.
116    """
117    logging.info('  TPM 1.2 Version Info:\n')
118    logging.info('  Chip Version:  %d.%d.%d.%d.', vi.version.major,
119                 vi.version.minor, vi.version.revMajor, vi.version.revMinor)
120    logging.info('  Spec Level:  %d', vi.specLevel)
121    logging.info('  Errata Revision:  %d', vi.errataRev)
122    vendorId = [i for i in vi.tpmVendorID if i]
123    logging.info('  TPM Vendor ID:  %s', ''.join(vendorId))
124    # TODO(tgao): handle the case when there's no vendor specific data.
125    logging.info('  Vendor Specific data (first 4 bytes in Hex):  '
126                 '%.2x %.2x %.2x %.2x', vi.vendorSpecific[0],
127                 vi.vendorSpecific[1], vi.vendorSpecific[2],
128                 vi.vendorSpecific[3])
129
130
131def PrintSelfTestResult(str_len, pResult):
132    """Utility method to print TPM self test result.
133
134    Args:
135      str_len: an integer, length of string pointed to by pResult.
136      pResult: a c_char_p, pointer to result.
137    """
138    out = []
139    for i in range(str_len):
140        if i and not i % 32:
141            out.append('\t')
142        if not i % 4:
143            out.append(' ')
144        b = pResult.value[i]
145        out.append('%02x' % ord(b))
146    logging.info('  TPM Test Results: %s', ''.join(out))
147
148
149class TpmController(object):
150    """Object to interact with TPM module for testing."""
151
152    def __init__(self):
153        """Constructor.
154
155        Mandatory params:
156          hContext: a c_uint32, context object handle.
157          _contextSet: a boolean, True if TPM context is set.
158          hTpm: a c_uint32, TPM object handle.
159          hTpmPolicy: a c_uint32, TPM policy object handle.
160          tspi_lib: a shared library object (libtspi.so).
161
162        Raises:
163          SmogcheckError: if error initializing TpmController.
164        """
165        self.hContext = c_uint32(0)
166        self._contextSet = False
167        self.hTpm = c_uint32(0)
168        self.hTpmPolicy = c_uint32(0)
169
170        logging.info('Attempt to load shared library %s', TSPI_C_LIB)
171        try:
172            self.tspi_lib = cdll.LoadLibrary(TSPI_C_LIB)
173        except OSError, e:
174            raise SmogcheckError('Error loading C library %s: %r' %
175                                 (TSPI_C_LIB, e))
176        logging.info('Successfully loaded shared library %s', TSPI_C_LIB)
177
178    def closeContext(self):
179        """Closes TPM context and cleans up.
180
181        Returns:
182          an integer, 0 for success and -1 for error.
183        """
184        if not self._contextSet:
185            logging.debug('TPM context NOT set.')
186            return 0
187
188        ret = -1
189        # Calling the pointer type without an argument creates a NULL pointer
190        if self.tspi_lib.Tspi_Context_FreeMemory(self.hContext,
191                                                 POINTER(c_byte)()) != 0:
192            logging.error('Error freeing memory when closing TPM context')
193        else:
194            logging.debug('Tspi_Context_FreeMemory() success')
195
196        if self.tspi_lib.Tspi_Context_Close(self.hContext) != 0:
197            logging.error('Error closing TPM context')
198        else:
199            logging.debug('Tspi_Context_Close() success')
200            ret = 0
201            self._contextSet = False
202
203        return ret
204
205    def _closeContextObject(self, hObject):
206        """Closes TPM context object.
207
208        Args:
209          hObject: an integer, basic object handle.
210
211        Raises:
212          SmogcheckError: if an error is encountered.
213        """
214        if self.tspi_lib.Tspi_Context_CloseObject(self.hContext, hObject) != 0:
215            raise SmogcheckError('Error closing TPM context object')
216
217        logging.debug('Tspi_Context_CloseObject() success')
218
219    def setupContext(self):
220        """Sets up tspi context for TPM access.
221
222        TPM context cannot be reused. Therefore, each new Tspi_* command would
223        require a new context to be set up before execution and closing that
224        context after execution (or error).
225
226        Raises:
227          SmogcheckError: if an error is encountered.
228        """
229        if self._contextSet:
230            logging.debug('TPM context already set.')
231            return
232
233        if self.tspi_lib.Tspi_Context_Create(byref(self.hContext)) != 0:
234            raise SmogcheckError('Error creating tspi context')
235
236        logging.info('Created tspi context = 0x%x', self.hContext.value)
237
238        if self.tspi_lib.Tspi_Context_Connect(self.hContext,
239                                              POINTER(c_uint16)()) != 0:
240            raise SmogcheckError('Error connecting to tspi context')
241
242        logging.info('Connected to tspi context')
243
244        if self.tspi_lib.Tspi_Context_GetTpmObject(self.hContext,
245                                                   byref(self.hTpm)) != 0:
246            raise SmogcheckError('Error getting TPM object from tspi context')
247
248        logging.info('Got tpm object from tspi context = 0x%x', self.hTpm.value)
249        self._contextSet = True
250
251    def _getTpmStatus(self, flag, bValue):
252        """Wrapper function to call Tspi_TPM_GetStatus().
253
254        Args:
255          flag: a c_uint, TPM status info flag, values defined in C header file
256                "tss/tss_defines.h".
257          bValue: a c_bool, place holder for specific TPM flag bit value (0/1).
258
259        Raises:
260          SmogcheckError: if an error is encountered.
261        """
262        result = self.tspi_lib.Tspi_TPM_GetStatus(self.hTpm, flag,
263                                                  byref(bValue))
264        if result != 0:
265            msg = ('Error (0x%x) getting status for flag 0x%x' %
266                   (result, flag.value))
267            raise SmogcheckError(msg)
268
269        logging.info('Tspi_TPM_GetStatus(): success for flag 0x%x',
270                     flag.value)
271
272    def _setTpmStatus(self, flag, bValue):
273        """Wrapper function to call Tspi_TPM_GetStatus().
274
275        Args:
276          flag: a c_uint, TPM status info flag.
277          bValue: a c_bool, place holder for specific TPM flag bit value (0/1).
278
279        Raises:
280          SmogcheckError: if an error is encountered.
281        """
282        result = self.tspi_lib.Tspi_TPM_SetStatus(self.hTpm, flag, bValue)
283        if result != 0:
284            msg = ('Error (0x%x) setting status for flag 0x%x' %
285                   (result, flag.value))
286            raise SmogcheckError(msg)
287
288        logging.info('Tspi_TPM_SetStatus(): success for flag 0x%x',
289                     flag.value)
290
291    def getPolicyObject(self, hTpm=None, hPolicy=None):
292        """Get TPM policy object.
293
294        Args:
295          hTpm: a c_uint, TPM object handle.
296          hPolicy: a c_uint, TPM policy object handle.
297
298        Raises:
299          SmogcheckError: if an error is encountered.
300        """
301        if hTpm is None:
302            hTpm = self.hTpm
303
304        if hPolicy is None:
305            hPolicy = self.hTpmPolicy
306
307        logging.debug('Tspi_GetPolicyObject(): hTpm = 0x%x, hPolicy = 0x%x',
308                      hTpm.value, hPolicy.value)
309        result = self.tspi_lib.Tspi_GetPolicyObject(hTpm, TSS_POLICY_USAGE,
310                                                    byref(hPolicy))
311        if result != 0:
312            msg = 'Error (0x%x) getting TPM policy object' % result
313            raise SmogcheckError(msg)
314
315        logging.debug('Tspi_GetPolicyObject() success hTpm = 0x%x, '
316                      'hPolicy = 0x%x', hTpm.value, hPolicy.value)
317
318    def setPolicySecret(self, hPolicy=None, pSecret=None, secret_mode=None):
319        """Sets TPM policy secret.
320
321        Args:
322          hPolicy: a c_uint, TPM policy object handle.
323          pSecret: a pointer to a byte array, which holds the TSS secret.
324          secret_mode: a string, valid values are keys of TSS_SECRET_MODE.
325
326        Raises:
327          SmogcheckError: if an error is encountered.
328        """
329        if hPolicy is None:
330            hPolicy = self.hTpmPolicy
331
332        if pSecret is None:
333            raise SmogcheckError('setPolicySecret(): pSecret cannot be None')
334
335        if secret_mode is None or secret_mode not in TSS_SECRET_MODE:
336            raise SmogcheckError('setPolicySecret(): invalid secret_mode')
337
338        logging.debug('Tspi_Policy_SetSecret(): hPolicy = 0x%x, secret_mode '
339                      '(%r) = %r', hPolicy.value, secret_mode,
340                      TSS_SECRET_MODE[secret_mode])
341
342        result = self.tspi_lib.Tspi_Policy_SetSecret(
343            hPolicy, TSS_SECRET_MODE[secret_mode], TPM_SHA1_160_HASH_LEN,
344            pSecret)
345        if result != 0:
346            msg = 'Error (0x%x) setting TPM policy secret' % result
347            raise SmogcheckError(msg)
348
349        logging.debug('Tspi_Policy_SetSecret() success, hPolicy = 0x%x',
350                      hPolicy.value)
351
352    def getTpmVersion(self):
353        """Gets TPM version info.
354
355        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_version.c
356        Downloaded from:
357          http://sourceforge.net/projects/trousers/files/tpm-tools/1.3.4/\
358          tpm-tools-1.3.4.tar.gz
359
360        Raises:
361          SmogcheckError: if an error is encountered.
362        """
363        uiResultLen = c_uint32(0)
364        pResult = c_char_p()
365        offset = c_uint64(0)
366        versionInfo = TpmCapVersionInfo()
367        InitVersionInfo(versionInfo)
368
369        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
370
371        result = self.tspi_lib.Tspi_TPM_GetCapability(
372            self.hTpm, TSS_TPMCAP_VERSION_VAL, 0, POINTER(c_byte)(),
373            byref(uiResultLen), byref(pResult))
374        if result != 0:
375            msg = 'Error (0x%x) getting TPM capability, pResult = %r' % (
376                result, pResult.value)
377            raise SmogcheckError(msg)
378
379        logging.info('Successfully received TPM capability: '
380                     'uiResultLen = %d, pResult=%r', uiResultLen.value,
381                     pResult.value)
382        result = self.tspi_lib.Trspi_UnloadBlob_CAP_VERSION_INFO(
383            byref(offset), pResult, cast(byref(versionInfo),
384                                         POINTER(c_byte)))
385        if result != 0:
386            msg = 'Error (0x%x) unloading TPM CAP version info' % result
387            raise SmogcheckError(msg)
388
389        PrintVersionInfo(versionInfo)
390
391    def runTpmSelfTest(self):
392        """Executes TPM self test.
393
394        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_selftest.c
395
396        Raises:
397          SmogcheckError: if an error is encountered.
398        """
399        uiResultLen = c_uint32(0)
400        pResult = c_char_p()
401        self.setupContext()
402
403        logging.debug('Successfully set up tspi context: hTpm = 0x%x',
404                      self.hTpm.value)
405
406        result = self.tspi_lib.Tspi_TPM_SelfTestFull(self.hTpm)
407        if result != 0:
408            self.closeContext()
409            raise SmogcheckError('Error (0x%x) with TPM self test' % result)
410
411        logging.info('Successfully executed TPM self test: hTpm = 0x%x',
412                     self.hTpm.value)
413        result = self.tspi_lib.Tspi_TPM_GetTestResult(
414            self.hTpm, byref(uiResultLen), byref(pResult))
415        if result != 0:
416            self.closeContext()
417            raise SmogcheckError('Error (0x%x) getting test results' % result)
418
419        logging.info('TPM self test results: uiResultLen = %d, pResult=%r',
420                     uiResultLen.value, pResult.value)
421        PrintSelfTestResult(uiResultLen.value, pResult)
422        self.closeContext()
423
424    def takeTpmOwnership(self):
425        """Take TPM ownership.
426
427        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_takeownership.c
428
429        Raises:
430          SmogcheckError: if an error is encountered.
431        """
432        hSrk = c_uint32(0)  # TPM Storage Root Key
433        hSrkPolicy = c_uint32(0)
434        # Defaults each byte value to 0x00
435        well_known_secret = create_string_buffer(20)
436        pSecret = c_char_p(addressof(well_known_secret))
437
438        self.setupContext()
439        logging.debug('Successfully set up tspi context: hTpm = 0x%x',
440                      self.hTpm.value)
441
442        try:
443            self.getPolicyObject()
444            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
445        except SmogcheckError:
446            if hSrk != 0:
447                self._closeContextObject(hSrk)
448            self.closeContext()
449            raise  # re-raise
450
451        flag = TSS_KEY_TSP_SRK.value | TSS_KEY_AUTHORIZATION.value
452        result = self.tspi_lib.Tspi_Context_CreateObject(
453            self.hContext, TSS_OBJECT_TYPE_RSAKEY, flag, byref(hSrk))
454        if result != 0:
455            raise SmogcheckError('Error (0x%x) creating context object' %
456                                 result)
457        logging.debug('hTpm = 0x%x, flag = 0x%x, hSrk = 0x%x',
458                      self.hTpm.value, flag, hSrk.value)  # DEBUG
459
460        try:
461            self.getPolicyObject(hTpm=hSrk, hPolicy=hSrkPolicy)
462            self.setPolicySecret(hPolicy=hSrkPolicy, pSecret=pSecret,
463                                 secret_mode='sha1')
464        except SmogcheckError:
465            if hSrk != 0:
466                self._closeContextObject(hSrk)
467            self.closeContext()
468            raise  # re-raise
469
470        logging.debug('Successfully set up SRK policy: secret = %r, '
471                      'hSrk = 0x%x, hSrkPolicy = 0x%x',
472                      well_known_secret.value, hSrk.value, hSrkPolicy.value)
473
474        start_time = datetime.datetime.now()
475        result = self.tspi_lib.Tspi_TPM_TakeOwnership(self.hTpm, hSrk,
476                                                      c_uint(0))
477        end_time = datetime.datetime.now()
478        if result != 0:
479            logging.info('Tspi_TPM_TakeOwnership error')
480            self._closeContextObject(hSrk)
481            self.closeContext()
482            raise SmogcheckError('Error (0x%x) taking TPM ownership' % result)
483
484        logging.info('Successfully took TPM ownership')
485        self._closeContextObject(hSrk)
486        self.closeContext()
487        return smogcheck_ttci.computeTimeElapsed(end_time, start_time)
488
489    def clearTpm(self):
490        """Return TPM to default state.
491
492        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_clear.c
493
494        Raises:
495          SmogcheckError: if an error is encountered.
496        """
497        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
498
499        result = self.tspi_lib.Tspi_TPM_ClearOwner(self.hTpm, True)
500        if result != 0:
501            raise SmogcheckError('Error (0x%x) clearing TPM' % result)
502
503        logging.info('Successfully cleared TPM')
504
505    def setTpmActive(self, op):
506        """Change TPM active state.
507
508        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_activate.c
509
510        Args:
511          op: a string, desired operation. Valid values are defined in
512              TPM_SETACTIVE_OP.
513
514        Raises:
515          SmogcheckError: if an error is encountered.
516        """
517        bValue = c_bool()
518        # Defaults each byte value to 0x00
519        well_known_secret = create_string_buffer(20)
520        pSecret = c_char_p(addressof(well_known_secret))
521
522        if op not in TPM_SETACTIVE_OP:
523            msg = ('Invalid op (%s) for tpmSetActive(). Valid values are %r' %
524                   (op, TPM_SETACTIVE_OP))
525            raise SmogcheckError(msg)
526
527        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
528
529        if op == 'status':
530            self.getPolicyObject()
531            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
532
533            self._getTpmStatus(
534                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, bValue)
535            logging.info('Persistent Deactivated Status: %s', bValue.value)
536
537            self._getTpmStatus(
538                TSS_TPMSTATUS_SETTEMPDEACTIVATED, bValue)
539            logging.info('Volatile Deactivated Status: %s', bValue.value)
540        elif op == 'activate':
541            self._setTpmStatus(
542                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, False)
543            logging.info('Successfully activated TPM')
544        elif op == 'deactivate':
545            self._setTpmStatus(
546                TSS_TPMSTATUS_PHYSICALSETDEACTIVATED, True)
547            logging.info('Successfully deactivated TPM')
548        elif op == 'temp':
549            self._setTpmStatus(
550                TSS_TPMSTATUS_SETTEMPDEACTIVATED, True)
551            logging.info('Successfully deactivated TPM for current boot')
552
553    def setTpmClearable(self, op):
554        """Disable TPM clear operations.
555
556        Implementation based on tpm-tools-1.3.4/src/tpm_mgmt/tpm_clearable.c
557
558        Args:
559          op: a string, desired operation. Valid values are defined in
560              TPM_SETCLEARABLE_OP.
561
562        Raises:
563          SmogcheckError: if an error is encountered.
564        """
565        bValue = c_bool()
566        # Defaults each byte value to 0x00
567        well_known_secret = create_string_buffer(20)
568        pSecret = c_char_p(addressof(well_known_secret))
569
570        if op not in TPM_SETCLEARABLE_OP:
571            msg = ('Invalid op (%s) for tpmSetClearable(). Valid values are %r'
572                   % (op, TPM_SETCLEARABLE_OP))
573            raise SmogcheckError(msg)
574
575        logging.debug('Successfully set up tspi context: hTpm = %r', self.hTpm)
576
577        if op == 'status':
578            self.getPolicyObject()
579            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
580
581            self._getTpmStatus(
582                TSS_TPMSTATUS_DISABLEOWNERCLEAR, bValue)
583            logging.info('Owner Clear Disabled: %s', bValue.value)
584
585            self._getTpmStatus(
586                TSS_TPMSTATUS_DISABLEFORCECLEAR, bValue)
587            logging.info('Force Clear Disabled: %s', bValue.value)
588        elif op == 'owner':
589            self.getPolicyObject()
590            self.setPolicySecret(pSecret=pSecret, secret_mode='sha1')
591
592            self._setTpmStatus(
593                TSS_TPMSTATUS_DISABLEOWNERCLEAR, False)
594            logging.info('Successfully disabled Owner Clear')
595        elif op == 'force':
596            self._setTpmStatus(
597                TSS_TPMSTATUS_DISABLEFORCECLEAR, True)
598            logging.info('Successfully disabled Force Clear')
599