1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import math
6import os
7import shutil
8import tempfile
9import unittest
10
11import common
12from autotest_lib.client.bin.result_tools import result_info
13from autotest_lib.client.bin.result_tools import unittest_lib
14from autotest_lib.client.bin.result_tools import utils_lib
15from autotest_lib.client.bin.result_tools import zip_file_throttler
16
17
18ORIGINAL_SIZE_BYTE = 1000
19# Set max result size to 0 to force all files being compressed if possible.
20MAX_RESULT_SIZE_KB = 0
21FILE_SIZE_THRESHOLD_BYTE = 512
22
23SUMMARY_AFTER_TRIMMING = {
24    '': {utils_lib.DIRS: [
25            {'BUILD_INFO-x':
26                    {utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE}},
27            {'file1.xml.tgz':
28                {utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE,
29                 utils_lib.TRIMMED_SIZE_BYTES: 148}},
30            {'file2.jpg': {utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE}},
31            {'file3.log.tgz':
32                {utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE,
33                 utils_lib.TRIMMED_SIZE_BYTES: 148}},
34            {'folder1': {
35                utils_lib.DIRS: [
36                    {'file4.tgz': {
37                            utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE,
38                            utils_lib.TRIMMED_SIZE_BYTES: 140}}],
39                utils_lib.ORIGINAL_SIZE_BYTES: ORIGINAL_SIZE_BYTE,
40                utils_lib.TRIMMED_SIZE_BYTES: 140}}],
41         utils_lib.ORIGINAL_SIZE_BYTES: 5 * ORIGINAL_SIZE_BYTE,
42         utils_lib.TRIMMED_SIZE_BYTES: 2 * ORIGINAL_SIZE_BYTE + 435}
43    }
44
45class ZipFileThrottleTest(unittest.TestCase):
46    """Test class for zip_file_throttler.throttle method."""
47
48    def setUp(self):
49        """Setup directory for test."""
50        self.test_dir = tempfile.mkdtemp()
51        self.files_not_zip = []
52        self.files_to_zip = []
53
54        build_info = os.path.join(self.test_dir, 'BUILD_INFO-x')
55        unittest_lib.create_file(build_info, ORIGINAL_SIZE_BYTE)
56        self.files_not_zip.append(build_info)
57
58        file1 = os.path.join(self.test_dir, 'file1.xml')
59        unittest_lib.create_file(file1, ORIGINAL_SIZE_BYTE)
60        self.files_to_zip.append(file1)
61
62        file2 = os.path.join(self.test_dir, 'file2.jpg')
63        unittest_lib.create_file(file2, ORIGINAL_SIZE_BYTE)
64        self.files_not_zip.append(file2)
65
66        file3 = os.path.join(self.test_dir, 'file3.log')
67        unittest_lib.create_file(file3, ORIGINAL_SIZE_BYTE)
68        self.files_to_zip.append(file3)
69
70        folder1 = os.path.join(self.test_dir, 'folder1')
71        os.mkdir(folder1)
72        file4 = os.path.join(folder1, 'file4')
73        unittest_lib.create_file(file4, ORIGINAL_SIZE_BYTE)
74        self.files_to_zip.append(file4)
75
76    def tearDown(self):
77        """Cleanup the test directory."""
78        shutil.rmtree(self.test_dir, ignore_errors=True)
79
80    def compareSummary(self, expected, actual):
81        """Compare two summaries with tolerance on trimmed sizes.
82
83        @param expected: The expected directory summary.
84        @param actual: The actual directory summary after trimming.
85        """
86        self.assertEqual(expected.original_size, actual.original_size)
87        diff = math.fabs(expected.trimmed_size - actual.trimmed_size)
88        # Compression may generate different sizes of tgz file, but the
89        # difference shouldn't be more than 100 bytes.
90        self.assertTrue(
91                diff < 100,
92                'Compression failed to be verified. Expected size: %d, actual '
93                'size: %d' % (expected.trimmed_size, actual.trimmed_size))
94
95        # Match files inside the directories.
96        if expected.is_dir:
97            expected_info_map = {info.name: info for info in expected.files}
98            actual_info_map = {info.name: info for info in actual.files}
99            self.assertEqual(set(expected_info_map.keys()),
100                             set(actual_info_map.keys()))
101            for name in expected_info_map:
102                self.compareSummary(
103                        expected_info_map[name], actual_info_map[name])
104
105    def testTrim(self):
106        """Test throttle method."""
107        summary = result_info.ResultInfo.build_from_path(self.test_dir)
108        zip_file_throttler.throttle(
109                summary,
110                max_result_size_KB=MAX_RESULT_SIZE_KB,
111                file_size_threshold_byte=FILE_SIZE_THRESHOLD_BYTE)
112
113        expected_summary = result_info.ResultInfo(
114                parent_dir=self.test_dir, original_info=SUMMARY_AFTER_TRIMMING)
115        self.compareSummary(expected_summary, summary)
116
117        # Verify files that should not be compressed are not changed.
118        for f in self.files_not_zip:
119            self.assertEqual(ORIGINAL_SIZE_BYTE, os.stat(f).st_size,
120                             'File %s should not be compressed!' % f)
121
122        # Verify files that should be zipped are updated.
123        for f in self.files_to_zip:
124            stat = os.stat(f + '.tgz')
125            self.assertTrue(FILE_SIZE_THRESHOLD_BYTE >= stat.st_size,
126                            'File %s is not compressed!' % f)
127
128
129# this is so the test can be run in standalone mode
130if __name__ == '__main__':
131    """Main"""
132    unittest.main()
133