1#!/usr/bin/python
2
3"""Tests for site_sysinfo."""
4
5__author__ = 'dshi@google.com (Dan Shi)'
6
7import cPickle as pickle
8import os
9import random
10import shutil
11import tempfile
12import unittest
13
14import common
15from autotest_lib.client.bin import site_sysinfo
16from autotest_lib.client.common_lib import autotemp
17
18
19class diffable_logdir_test(unittest.TestCase):
20    """Tests for methods in class diffable_logdir."""
21
22
23    def setUp(self):
24        """Initialize a temp direcotry with test files."""
25        self.tempdir = autotemp.tempdir(unique_id='diffable_logdir')
26        self.src_dir = os.path.join(self.tempdir.name, 'src')
27        self.dest_dir = os.path.join(self.tempdir.name, 'dest')
28
29        self.existing_files = ['existing_file_'+str(i) for i in range(3)]
30        self.existing_files_folder = ['', 'sub', 'sub/sub2']
31        self.existing_files_path = [os.path.join(self.src_dir, folder, f)
32                                    for f,folder in zip(self.existing_files,
33                                                self.existing_files_folder)]
34        self.new_files = ['new_file_'+str(i) for i in range(2)]
35        self.new_files_folder = ['sub', 'sub/sub3']
36        self.new_files_path = [os.path.join(self.src_dir, folder, f)
37                                    for f,folder in zip(self.new_files,
38                                                self.new_files_folder)]
39
40        # Create some file with random data in source directory.
41        for p in self.existing_files_path:
42            self.append_text_to_file(str(random.random()), p)
43
44
45    def tearDown(self):
46        """Clearn up."""
47        self.tempdir.clean()
48
49
50    def append_text_to_file(self, text, file_path):
51        """Append text to the end of a file, create the file if not existed.
52
53        @param text: text to be appended to a file.
54        @param file_path: path to the file.
55
56        """
57        dir_name = os.path.dirname(file_path)
58        if not os.path.exists(dir_name):
59            os.makedirs(dir_name)
60        with open(file_path, 'a') as f:
61            f.write(text)
62
63
64    def test_diffable_logdir_success(self):
65        """Test the diff function to save new data from a directory."""
66        info = site_sysinfo.diffable_logdir(self.src_dir,
67                                            keep_file_hierarchy=False,
68                                            append_diff_in_name=False)
69        # Run the first time to collect file status.
70        info.run(log_dir=None, collect_init_status=True)
71
72        # Add new files to the test directory.
73        for file_name, file_path in zip(self.new_files,
74                                         self.new_files_path):
75            self.append_text_to_file(file_name, file_path)
76
77        # Temp file for existing_file_2, used to hold on the inode. If the
78        # file is deleted and recreated, its inode might not change.
79        existing_file_2 = self.existing_files_path[2]
80        existing_file_2_tmp =  existing_file_2 + '_tmp'
81        os.rename(existing_file_2, existing_file_2_tmp)
82
83        # Append data to existing file.
84        for file_name, file_path in zip(self.existing_files,
85                                         self.existing_files_path):
86            self.append_text_to_file(file_name, file_path)
87
88        # Remove the tmp file.
89        os.remove(existing_file_2_tmp)
90
91        # Run the second time to do diff.
92        info.run(self.dest_dir, collect_init_status=False)
93
94        # Validate files in dest_dir.
95        for file_name, file_path in zip(self.existing_files+self.new_files,
96                                self.existing_files_path+self.new_files_path):
97            file_path = file_path.replace('src', 'dest')
98            with open(file_path, 'r') as f:
99                self.assertEqual(file_name, f.read())
100
101
102class LogdirTestCase(unittest.TestCase):
103    """Tests logdir.run"""
104
105    def setUp(self):
106        self.tempdir = tempfile.mkdtemp()
107        self.addCleanup(shutil.rmtree, self.tempdir)
108
109        self.from_dir = os.path.join(self.tempdir, 'from')
110        self.to_dir = os.path.join(self.tempdir, 'to')
111        os.mkdir(self.from_dir)
112        os.mkdir(self.to_dir)
113
114    def _destination_path(self, relative_path, from_dir=None):
115        """The expected destination path for a subdir of the source directory"""
116        if from_dir is None:
117            from_dir = self.from_dir
118        return '%s%s' % (self.to_dir, os.path.join(from_dir, relative_path))
119
120    def test_run_recreates_absolute_source_path(self):
121        """When copying files, the absolute path from the source is recreated
122        in the destination folder.
123        """
124        os.mkdir(os.path.join(self.from_dir, 'fubar'))
125        logdir = site_sysinfo.logdir(self.from_dir)
126        logdir.run(self.to_dir)
127        destination_path= self._destination_path('fubar')
128        self.assertTrue(os.path.exists(destination_path),
129                        msg='Failed to copy to %s' % destination_path)
130
131    def test_run_skips_symlinks(self):
132        os.mkdir(os.path.join(self.from_dir, 'real'))
133        os.symlink(os.path.join(self.from_dir, 'real'),
134                   os.path.join(self.from_dir, 'symlink'))
135
136        logdir = site_sysinfo.logdir(self.from_dir)
137        logdir.run(self.to_dir)
138
139        destination_path_real = self._destination_path('real')
140        self.assertTrue(os.path.exists(destination_path_real),
141                        msg='real directory was not copied to %s' %
142                        destination_path_real)
143        destination_path_link = self._destination_path('symlink')
144        self.assertFalse(
145                os.path.exists(destination_path_link),
146                msg='symlink was copied to %s' % destination_path_link)
147
148    def test_run_resolves_symlinks_to_source_root(self):
149        """run tries hard to get to the source directory before copying.
150
151        Within the source folder, we skip any symlinks, but we first try to
152        resolve symlinks to the source root itself.
153        """
154        os.mkdir(os.path.join(self.from_dir, 'fubar'))
155        from_symlink = os.path.join(self.tempdir, 'from_symlink')
156        os.symlink(self.from_dir, from_symlink)
157
158        logdir = site_sysinfo.logdir(from_symlink)
159        logdir.run(self.to_dir)
160
161        destination_path = self._destination_path('fubar')
162        self.assertTrue(os.path.exists(destination_path),
163                        msg='Failed to copy to %s' % destination_path)
164
165    def test_run_excludes_common_patterns(self):
166        os.mkdir(os.path.join(self.from_dir, 'autoserv2344'))
167        deeper_subdir = os.path.join('prefix', 'autoserv', 'suffix')
168        os.makedirs(os.path.join(self.from_dir, deeper_subdir))
169
170        logdir = site_sysinfo.logdir(self.from_dir)
171        logdir.run(self.to_dir)
172
173        destination_path = self._destination_path('autoserv2344')
174        self.assertFalse(os.path.exists(destination_path),
175                         msg='Copied banned file to %s' % destination_path)
176        destination_path = self._destination_path(deeper_subdir)
177        self.assertFalse(os.path.exists(destination_path),
178                         msg='Copied banned file to %s' % destination_path)
179
180    def test_run_ignores_exclude_patterns_in_leading_dirs(self):
181        """Exclude patterns should only be applied to path suffixes within
182        from_dir, not to the root directory itself.
183        """
184        exclude_pattern_dir = os.path.join(self.from_dir, 'autoserv2344')
185        os.makedirs(os.path.join(exclude_pattern_dir, 'fubar'))
186        logdir = site_sysinfo.logdir(exclude_pattern_dir)
187        logdir.run(self.to_dir)
188        destination_path = self._destination_path('fubar',
189                                                  from_dir=exclude_pattern_dir)
190        self.assertTrue(os.path.exists(destination_path),
191                        msg='Failed to copy to %s' % destination_path)
192
193    def test_pickle_unpickle_equal(self):
194        """Sanity check pickle-unpickle round-trip."""
195        logdir = site_sysinfo.logdir(
196                self.from_dir,
197                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
198        # base_job uses protocol 2 to pickle. We follow suit.
199        logdir_pickle = pickle.dumps(logdir, protocol=2)
200        unpickled_logdir = pickle.loads(logdir_pickle)
201
202        self.assertEqual(unpickled_logdir, logdir)
203
204    def test_pickle_enforce_required_attributes(self):
205        """Some attributes from this object should never be dropped.
206
207        When running a client test against an older build, we pickle the objects
208        of this class from newer version of the class and unpicle to older
209        versions of the class. The older versions require some attributes to be
210        present.
211        """
212        logdir = site_sysinfo.logdir(
213                self.from_dir,
214                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
215        # base_job uses protocol 2 to pickle. We follow suit.
216        logdir_pickle = pickle.dumps(logdir, protocol=2)
217        logdir = pickle.loads(logdir_pickle)
218
219        self.assertEqual(logdir.additional_exclude, 'a')
220
221    def test_pickle_enforce_required_attributes_default(self):
222        """Some attributes from this object should never be dropped.
223
224        When running a client test against an older build, we pickle the objects
225        of this class from newer version of the class and unpicle to older
226        versions of the class. The older versions require some attributes to be
227        present.
228        """
229        logdir = site_sysinfo.logdir(self.from_dir)
230        # base_job uses protocol 2 to pickle. We follow suit.
231        logdir_pickle = pickle.dumps(logdir, protocol=2)
232        logdir = pickle.loads(logdir_pickle)
233
234        self.assertEqual(logdir.additional_exclude, None)
235
236    def test_unpickle_handle_missing__excludes(self):
237        """Sanely handle missing _excludes attribute from pickles
238
239        This can happen when running brand new version of this class that
240        introduced this attribute from older server side code in prod.
241        """
242        logdir = site_sysinfo.logdir(self.from_dir)
243        delattr(logdir, '_excludes')
244        # base_job uses protocol 2 to pickle. We follow suit.
245        logdir_pickle = pickle.dumps(logdir, protocol=2)
246        logdir = pickle.loads(logdir_pickle)
247
248        self.assertEqual(logdir._excludes,
249                         site_sysinfo.logdir.DEFAULT_EXCLUDES)
250
251    def test_unpickle_handle_missing__excludes_default(self):
252        """Sanely handle missing _excludes attribute from pickles
253
254        This can happen when running brand new version of this class that
255        introduced this attribute from older server side code in prod.
256        """
257        logdir = site_sysinfo.logdir(
258                self.from_dir,
259                excludes=(site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
260        delattr(logdir, '_excludes')
261        # base_job uses protocol 2 to pickle. We follow suit.
262        logdir_pickle = pickle.dumps(logdir, protocol=2)
263        logdir = pickle.loads(logdir_pickle)
264
265        self.assertEqual(
266                logdir._excludes,
267                (site_sysinfo.logdir.DEFAULT_EXCLUDES + ('a',)))
268
269
270if __name__ == '__main__':
271    unittest.main()
272