1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import tarfile 8import zipfile 9 10from autotest_lib.client.bin import test 11from autotest_lib.client.common_lib import autotemp, error 12from autotest_lib.client.cros.cros_disks import CrosDisksTester 13from autotest_lib.client.cros.cros_disks import VirtualFilesystemImage 14from autotest_lib.client.cros.cros_disks import DefaultFilesystemTestContent 15from collections import deque 16 17 18class CrosDisksArchiveTester(CrosDisksTester): 19 """A tester to verify archive support in CrosDisks. 20 """ 21 def __init__(self, test, archive_types): 22 super(CrosDisksArchiveTester, self).__init__(test) 23 self._archive_types = archive_types 24 25 def _find_all_files(self, root_dir): 26 """Returns all files under a directory and its sub-directories. 27 28 This is a generator that performs a breadth-first-search of 29 all files under a specified directory and its sub-directories. 30 31 Args: 32 root_dir: The root directory where the search starts from. 33 Yields: 34 Path of any found file relative to the root directory. 35 """ 36 dirs_to_explore = deque(['']) 37 while len(dirs_to_explore) > 0: 38 current_dir = dirs_to_explore.popleft() 39 for path in os.listdir(os.path.join(root_dir, current_dir)): 40 expanded_path = os.path.join(root_dir, current_dir, path) 41 relative_path = os.path.join(current_dir, path) 42 if os.path.isdir(expanded_path): 43 dirs_to_explore.append(relative_path) 44 else: 45 yield relative_path 46 47 def _make_tar_archive(self, archive_path, root_dir, compression=None): 48 """Archives a specified directory into a tar file. 49 50 The created tar file contains all files and sub-directories 51 under the specified root directory, but not the root directory 52 itself. 53 54 Args: 55 archive_path: Path of the output archive. 56 root_dir: The root directory to archive. 57 compression: The compression method: None, 'gz', 'bz2' 58 """ 59 mode = 'w:' + compression if compression else 'w' 60 # TarFile in Python 2.6 does not work with the 'with' statement. 61 archive = tarfile.open(archive_path, mode) 62 for path in self._find_all_files(root_dir): 63 archive.add(os.path.join(root_dir, path), path) 64 archive.close() 65 66 def _make_zip_archive(self, archive_path, root_dir, 67 compression=zipfile.ZIP_DEFLATED): 68 """Archives a specified directory into a ZIP file. 69 70 The created ZIP file contains all files and sub-directories 71 under the specified root directory, but not the root directory 72 itself. 73 74 Args: 75 archive_path: Path of the output archive. 76 root_dir: The root directory to archive. 77 compression: The ZIP compression method. 78 """ 79 # ZipFile in Python 2.6 does not work with the 'with' statement. 80 archive = zipfile.ZipFile(archive_path, 'w', compression) 81 for path in self._find_all_files(root_dir): 82 archive.write(os.path.join(root_dir, path), path) 83 archive.close() 84 85 def _make_archive(self, archive_type, archive_path, root_dir): 86 """Archives a specified directory into an archive of specified type. 87 88 The created archive file contains all files and sub-directories 89 under the specified root directory, but not the root directory 90 itself. 91 92 Args: 93 archive_type: Type of the output archive. 94 archive_path: Path of the output archive. 95 root_dir: The root directory to archive. 96 """ 97 if archive_type in ['zip']: 98 self._make_zip_archive(archive_path, root_dir) 99 elif archive_type in ['tar']: 100 self._make_tar_archive(archive_path, root_dir) 101 elif archive_type in ['tar.gz', 'tgz']: 102 self._make_tar_archive(archive_path, root_dir, 'gz') 103 elif archive_type in ['tar.bz2', 'tbz', 'tbz2']: 104 self._make_tar_archive(archive_path, root_dir, 'bz2') 105 else: 106 raise error.TestFail("Unsupported archive type " + archive_type) 107 108 def _test_archive(self, archive_type): 109 # Create the archive file content in a temporary directory. 110 archive_dir = autotemp.tempdir(unique_id='CrosDisks') 111 test_content = DefaultFilesystemTestContent() 112 if not test_content.create(archive_dir.name): 113 raise error.TestFail("Failed to create archive test content") 114 115 # Create a FAT-formatted virtual filesystem image containing an 116 # archive file to help stimulate mounting an archive file on a 117 # removable drive. 118 with VirtualFilesystemImage( 119 block_size=1024, 120 block_count=65536, 121 filesystem_type='vfat', 122 mkfs_options=[ '-F', '32', '-n', 'ARCHIVE' ]) as image: 123 image.format() 124 image.mount(options=['sync']) 125 # Create the archive file on the virtual filesystem image. 126 archive_name = 'test.' + archive_type 127 archive_path = os.path.join(image.mount_dir, archive_name) 128 self._make_archive(archive_type, archive_path, archive_dir.name) 129 image.unmount() 130 131 # Mount the virtual filesystem image via CrosDisks. 132 device_file = image.loop_device 133 self.cros_disks.mount(device_file, '', 134 [ "ro", "nodev", "noexec", "nosuid" ]) 135 result = self.cros_disks.expect_mount_completion({ 136 'status': 0, 137 'source_path': device_file 138 }) 139 140 # Mount the archive file on the mounted filesystem via CrosDisks. 141 archive_path = os.path.join(result['mount_path'], archive_name) 142 expected_mount_path = os.path.join('/media/archive', archive_name) 143 self.cros_disks.mount(archive_path) 144 result = self.cros_disks.expect_mount_completion({ 145 'status': 0, 146 'source_path': archive_path, 147 'mount_path': expected_mount_path 148 }) 149 150 # Verify the content of the mounted archive file. 151 if not test_content.verify(expected_mount_path): 152 raise error.TestFail("Failed to verify filesystem test content") 153 154 self.cros_disks.unmount(expected_mount_path, ['lazy']) 155 self.cros_disks.unmount(device_file, ['lazy']) 156 157 def test_archives(self): 158 for archive_type in self._archive_types: 159 self._test_archive(archive_type) 160 161 def get_tests(self): 162 return [self.test_archives] 163 164 165class platform_CrosDisksArchive(test.test): 166 version = 1 167 168 def run_once(self, *args, **kwargs): 169 tester = CrosDisksArchiveTester(self, kwargs['archive_types']) 170 tester.run(*args, **kwargs) 171