1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, tempfile, shutil, stat, time, posix
6from autotest_lib.client.bin import test, utils
7from autotest_lib.client.common_lib import error
8
9# TODO:
10#  - mock out TPM and check all error conditions
11#  - test failure when things aren't mounted correctly
12
13class test_checker(object):
14    def __init__(self):
15        logging.info("test_checker.__init__")
16        # Empty failure list means test passes.
17        self._failures = []
18
19    def _passed(self, msg):
20        logging.info('ok: %s' % (msg))
21
22    def _failed(self, msg):
23        logging.error('FAIL: %s' % (msg))
24        self._failures.append(msg)
25
26    def _fatal(self, msg):
27        logging.error('FATAL: %s' % (msg))
28        raise error.TestError(msg)
29
30    def check(self, boolean, msg, fatal=False):
31        if boolean == True:
32            self._passed(msg)
33        else:
34            msg = "could not satisfy '%s'" % (msg)
35            if fatal:
36                self._fatal(msg)
37            else:
38                self._failed(msg)
39
40    def test_raise(self):
41        # Raise a failure if anything unexpected was seen.
42        if len(self._failures):
43            raise error.TestFail((", ".join(self._failures)))
44
45chk = test_checker()
46
47
48class EncryptedStateful(object):
49    def _prepare_simulated_root(self):
50        os.makedirs(self.var)
51        os.makedirs(self.chronos)
52        os.makedirs(self.stateful)
53
54        # Build fake stateful block device (emulate 10G sda1).
55        self.stateful_block = os.path.join(self.root, 'stateful.block')
56        utils.system("truncate -s 10G %s" % (self.stateful_block))
57        utils.system("mkfs.ext4 -F %s" % (self.stateful_block))
58        utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" %
59                     (self.stateful_block, self.stateful))
60
61    def __init__(self, root=None):
62        if root == None:
63            self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition',
64                                         prefix='.test-enc-stateful-')
65            self.simulated = True
66        else:
67            self.root = root
68            self.simulated = False
69
70        self.var = os.path.join(self.root, 'var')
71        self.chronos = os.path.join(self.root, 'home', 'chronos')
72        self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition')
73        self.mount_log = os.path.join(self.stateful, 'mount.log')
74        self.key = os.path.join(self.stateful, 'encrypted.key')
75        self.needs_finalization = os.path.join(self.stateful,
76                                               'encrypted.needs-finalization')
77        self.block = os.path.join(self.stateful, 'encrypted.block')
78        self.encrypted = os.path.join(self.stateful, 'encrypted')
79
80        if self.simulated:
81            try:
82                self._prepare_simulated_root()
83            except:
84                shutil.rmtree(self.root)
85                raise
86
87        self.mounted = not self.simulated
88
89    def mount(self, args=""):
90        if self.mounted or not self.simulated:
91            return
92        # TODO(keescook): figure out what is killing the resizer and
93        # remove the explicit use of "tee" here.
94        # Without the pipe to "tee", mount-encrypted's forked resizing
95        # process gets killed, even though it is using daemon(). (Is
96        # autotest doing something odd here?) This leaves the filesystem
97        # unresized. It would be better to have the resizer running in
98        # the background, as it is designed, so we can examine its behavior
99        # during testing (e.g. "does the filesystem actually grow?").
100        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted %s 2>&1 "
101                     "| tee %s" % (self.root, args, self.mount_log))
102        self.mounted = True
103
104    def umount(self):
105        if not self.mounted or not self.simulated:
106            return
107        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" %
108                         (self.root))
109        self.mounted = False
110
111    # Clean up when destroyed.
112    def __del__(self):
113        if self.simulated:
114            self.umount()
115            utils.system("umount -n %s" % (self.stateful))
116            shutil.rmtree(self.root)
117
118    # Perform common post-mount size/owner checks on the filesystem and
119    # backing files.
120    def check_sizes(self, finalized=True):
121        # Do we have the expected backing files?
122        chk.check(os.path.exists(self.block), "%s exists" % (self.block))
123        if finalized:
124            keyfile = self.key
125            other = self.needs_finalization
126        else:
127            keyfile = self.needs_finalization
128            other = self.key
129        chk.check(os.path.exists(keyfile), "%s exists" % (keyfile))
130        chk.check(not os.path.exists(other), "%s does not exist" % (other))
131
132        # Sanity check the key file stat.
133        info = os.stat(keyfile)
134        chk.check(stat.S_ISREG(info.st_mode),
135                  "%s is regular file" % (keyfile))
136        chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile))
137        chk.check(info.st_gid == 0, "%s has group root" % (keyfile))
138        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
139                  "%s is S_IRUSR | S_IWUSR" % (keyfile))
140        chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile))
141
142        # Sanity check the block file stat.
143        info = os.stat(self.block)
144        chk.check(stat.S_ISREG(info.st_mode),
145                  "%s is regular file" % (self.block))
146        chk.check(info.st_uid == 0, "%s is owned by root" % (self.block))
147        chk.check(info.st_gid == 0, "%s has group root" % (self.block))
148        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
149                  "%s is S_IRUSR | S_IWUSR" % (self.block))
150        # Make sure block file is roughly a third of the size of the root
151        # filesystem (within 5%).
152        top = os.statvfs(self.stateful)
153        backing_size = float(info.st_size)
154        third = top.f_blocks * top.f_frsize * .3
155        chk.check(backing_size > (third * .95)
156                  and backing_size < (third * 1.05),
157                  "%s is near %d bytes (was %d)" % (self.block, third,
158                                                    info.st_size))
159
160        # Wait for resize manager task to finish.
161        utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted",
162                                                      ignore_status=True) != 0,
163                                 error.TestError('resizer still running'))
164
165        # Verify filesystem is within 5% of the block file size.
166        info = os.statvfs(self.encrypted)
167        encrypted_size = float(info.f_frsize) * float(info.f_blocks)
168        chk.check(encrypted_size / backing_size > 0.95,
169                  "%s fs (%d) is nearly the backing device size (%d)" %
170                  (self.encrypted, encrypted_size, backing_size))
171        # Verify there is a reasonable number of inodes in the encrypted
172        # filesystem (near 25% inodes-to-blocks ratio).
173        inode_ratio = float(info.f_files) / float(info.f_blocks)
174        chk.check(inode_ratio > 0.20 and inode_ratio < 0.30,
175                  "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" %
176                  (self.encrypted, inode_ratio*100))
177
178        # Raise non-fatal failures now, if they were encountered.
179        chk.test_raise()
180
181    # Wait for kernel background writing to finish.
182    def _backing_stabilize(self):
183        start = None
184        size = 0
185        while True:
186            k = long(utils.system_output("du -sk %s" % (self.block),
187                                         retain_output = True).split()[0])
188            if start == None:
189                start = k
190            if k == size:
191                # Backing file has remained the same size for 10 seconds.
192                # Assume the kernel is done with background initialization.
193                break
194            time.sleep(10)
195            utils.system("sync")
196            size = k
197        logging.info("%s stabilized at %dK (was %dK)" %
198                     (self.block, size, start))
199
200    # Check that the backing file reclaims space when filesystem contents
201    # are deleted.
202    def check_reclamation(self):
203        # This test is sensitive to other things happening on the filesystem,
204        # so we must wait for background initialization to finish first.
205        self._backing_stabilize()
206
207        megs = 200
208        data = os.path.join(self.var, "check_reclamation")
209        orig = os.statvfs(self.stateful)
210
211        # 200M file added to encrypted filesystem.
212        utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data,
213                                                                     megs))
214        # Wait for background allocations to finish.
215        self._backing_stabilize()
216        filled = os.statvfs(self.stateful)
217
218        # 200M file removed from encrypted filesystem.
219        utils.system("rm %s; sync" % (data))
220        # Wait for background hole-punching to finish.
221        self._backing_stabilize()
222        done = os.statvfs(self.stateful)
223
224        # Did the underlying filesystem grow by the size of the test file?
225        file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize)
226        fs_blocks_used = float(orig.f_bfree - filled.f_bfree)
227        chk.check(file_blocks_used / fs_blocks_used > 0.95,
228                  "%d file blocks account for most of %d fs blocks" %
229                  (file_blocks_used, fs_blocks_used))
230
231        # Did the underlying filesystem shrink on removal?
232        fs_blocks_done = float(orig.f_bfree - done.f_bfree)
233        chk.check(fs_blocks_done / file_blocks_used < 0.05,
234                  "most of %d fs blocks reclaimed (%d fs blocks left over, "
235                  "free: %d -> %d -> %d)" %
236                  (fs_blocks_used, fs_blocks_done,
237                   orig.f_bfree, filled.f_bfree, done.f_bfree))
238
239        # Raise non-fatal failures now, if they were encountered.
240        chk.test_raise()
241
242
243class platform_EncryptedStateful(test.test):
244    version = 1
245
246    def existing_partition(self):
247        # Examine the existing encrypted partition.
248        encstate = EncryptedStateful("/")
249
250        # Perform post-mount sanity checks (and handle unfinalized devices).
251        encstate.check_sizes(finalized=os.path.exists(encstate.key))
252
253    def factory_key(self):
254        # Create test root directory.
255        encstate = EncryptedStateful()
256
257        # Make sure we haven't run here before.
258        chk.check(not os.path.exists(encstate.key),
259                  "%s does not exist" % (encstate.key))
260        chk.check(not os.path.exists(encstate.block),
261                  "%s does not exist" % (encstate.block))
262
263        # Mount a fresh encrypted stateful, with factory static key.
264        encstate.mount("factory")
265
266        # Perform post-mount sanity checks.
267        encstate.check_sizes()
268
269        # Check disk reclamation.
270        encstate.check_reclamation()
271
272        # Check explicit umount.
273        encstate.umount()
274
275    def no_tpm(self):
276        encstate = EncryptedStateful()
277
278        # Relocate the TPM device during mount.
279        tpm = "/dev/tpm0"
280        off = "%s.off" % (tpm)
281        try:
282            if os.path.exists(tpm):
283                utils.system("mv %s %s" % (tpm, off))
284            # Mount without a TPM.
285            encstate.mount()
286        finally:
287            if os.path.exists(off):
288                utils.system("mv %s %s" % (off, tpm))
289
290        # Perform post-mount sanity checks.
291        encstate.check_sizes(finalized=False)
292
293    def run_once(self):
294        # Do a no-write test of system's existing encrypted partition.
295        self.existing_partition()
296
297        # Do a no-write, no-TPM test with sanity checks.
298        self.no_tpm()
299
300        # There is no interactively controllable TPM mock yet for
301        # mount-encrypted, so we can only test the static key currently.
302        self.factory_key()
303