1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import tarfile
8import zipfile
9
10from autotest_lib.client.bin import test
11from autotest_lib.client.common_lib import autotemp, error
12from autotest_lib.client.cros.cros_disks import CrosDisksTester
13from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage
14from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent
15from collections import deque
16
17
18class CrosDisksArchiveTester(CrosDisksTester):
19    """A tester to verify archive support in CrosDisks.
20    """
21    def __init__(self, test, archive_types):
22        super(CrosDisksArchiveTester, self).__init__(test)
23        self._archive_types = archive_types
24
25    def _find_all_files(self, root_dir):
26        """Returns all files under a directory and its sub-directories.
27
28           This is a generator that performs a breadth-first-search of
29           all files under a specified directory and its sub-directories.
30
31        Args:
32            root_dir: The root directory where the search starts from.
33        Yields:
34            Path of any found file relative to the root directory.
35        """
36        dirs_to_explore = deque([''])
37        while len(dirs_to_explore) > 0:
38            current_dir = dirs_to_explore.popleft()
39            for path in os.listdir(os.path.join(root_dir, current_dir)):
40                expanded_path = os.path.join(root_dir, current_dir, path)
41                relative_path = os.path.join(current_dir, path)
42                if os.path.isdir(expanded_path):
43                    dirs_to_explore.append(relative_path)
44                else:
45                    yield relative_path
46
47    def _make_tar_archive(self, archive_path, root_dir, compression=None):
48        """Archives a specified directory into a tar file.
49
50           The created tar file contains all files and sub-directories
51           under the specified root directory, but not the root directory
52           itself.
53
54        Args:
55            archive_path: Path of the output archive.
56            root_dir: The root directory to archive.
57            compression: The compression method: None, 'gz', 'bz2'
58        """
59        mode = 'w:' + compression if compression else 'w'
60        # TarFile in Python 2.6 does not work with the 'with' statement.
61        archive = tarfile.open(archive_path, mode)
62        for path in self._find_all_files(root_dir):
63            archive.add(os.path.join(root_dir, path), path)
64        archive.close()
65
66    def _make_zip_archive(self, archive_path, root_dir,
67                         compression=zipfile.ZIP_DEFLATED):
68        """Archives a specified directory into a ZIP file.
69
70           The created ZIP file contains all files and sub-directories
71           under the specified root directory, but not the root directory
72           itself.
73
74        Args:
75            archive_path: Path of the output archive.
76            root_dir: The root directory to archive.
77            compression: The ZIP compression method.
78        """
79        # ZipFile in Python 2.6 does not work with the 'with' statement.
80        archive = zipfile.ZipFile(archive_path, 'w', compression)
81        for path in self._find_all_files(root_dir):
82            archive.write(os.path.join(root_dir, path), path)
83        archive.close()
84
85    def _make_archive(self, archive_type, archive_path, root_dir):
86        """Archives a specified directory into an archive of specified type.
87
88           The created archive file contains all files and sub-directories
89           under the specified root directory, but not the root directory
90           itself.
91
92        Args:
93            archive_type: Type of the output archive.
94            archive_path: Path of the output archive.
95            root_dir: The root directory to archive.
96        """
97        if archive_type in ['zip']:
98            self._make_zip_archive(archive_path, root_dir)
99        elif archive_type in ['tar']:
100            self._make_tar_archive(archive_path, root_dir)
101        elif archive_type in ['tar.gz', 'tgz']:
102            self._make_tar_archive(archive_path, root_dir, 'gz')
103        elif archive_type in ['tar.bz2', 'tbz', 'tbz2']:
104            self._make_tar_archive(archive_path, root_dir, 'bz2')
105        else:
106            raise error.TestFail("Unsupported archive type " + archive_type)
107
108    def _test_archive(self, archive_type):
109        # Create the archive file content in a temporary directory.
110        archive_dir = autotemp.tempdir(unique_id='CrosDisks')
111        test_content = DefaultFilesystemTestContent()
112        if not test_content.create(archive_dir.name):
113            raise error.TestFail("Failed to create archive test content")
114
115        # Create a FAT-formatted virtual filesystem image containing an
116        # archive file to help stimulate mounting an archive file on a
117        # removable drive.
118        with VirtualFilesystemImage(
119                block_size=1024,
120                block_count=65536,
121                filesystem_type='vfat',
122                mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image:
123            image.format()
124            image.mount(options=['sync'])
125            # Create the archive file on the virtual filesystem image.
126            archive_name = 'test.' + archive_type
127            archive_path = os.path.join(image.mount_dir, archive_name)
128            self._make_archive(archive_type, archive_path, archive_dir.name)
129            image.unmount()
130
131            # Mount the virtual filesystem image via CrosDisks.
132            device_file = image.loop_device
133            self.cros_disks.mount(device_file, '',
134                                  [ "ro", "nodev", "noexec", "nosuid" ])
135            result = self.cros_disks.expect_mount_completion({
136                'status': 0,
137                'source_path': device_file
138            })
139
140            # Mount the archive file on the mounted filesystem via CrosDisks.
141            archive_path = os.path.join(result['mount_path'], archive_name)
142            expected_mount_path = os.path.join('/media/archive', archive_name)
143            self.cros_disks.mount(archive_path)
144            result = self.cros_disks.expect_mount_completion({
145                'status': 0,
146                'source_path': archive_path,
147                'mount_path': expected_mount_path
148            })
149
150            # Verify the content of the mounted archive file.
151            if not test_content.verify(expected_mount_path):
152                raise error.TestFail("Failed to verify filesystem test content")
153
154            self.cros_disks.unmount(expected_mount_path, ['lazy'])
155            self.cros_disks.unmount(device_file, ['lazy'])
156
157    def test_archives(self):
158        for archive_type in self._archive_types:
159            self._test_archive(archive_type)
160
161    def get_tests(self):
162        return [self.test_archives]
163
164
165class platform_CrosDisksArchive(test.test):
166    version = 1
167
168    def run_once(self, *args, **kwargs):
169        tester = CrosDisksArchiveTester(self, kwargs['archive_types'])
170        tester.run(*args, **kwargs)
171