1# Copyright (c) 2012 Amazon.com, Inc. or its affiliates.  All Rights Reserved
2#
3# Permission is hereby granted, free of charge, to any person obtaining a
4# copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish, dis-
7# tribute, sublicense, and/or sell copies of the Software, and to permit
8# persons to whom the Software is furnished to do so, subject to the fol-
9# lowing conditions:
10#
11# The above copyright notice and this permission notice shall be included
12# in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20# IN THE SOFTWARE.
21#
22import logging
23import os
24import tempfile
25import time
26from hashlib import sha256
27from tests.unit import unittest
28
29from boto.compat import BytesIO, six, StringIO
30from boto.glacier.utils import minimum_part_size, chunk_hashes, tree_hash, \
31        bytes_to_hex, compute_hashes_from_fileobj
32
33
34class TestPartSizeCalculations(unittest.TestCase):
35    def test_small_values_still_use_default_part_size(self):
36        self.assertEqual(minimum_part_size(1), 4 * 1024 * 1024)
37
38    def test_under_the_maximum_value(self):
39        # If we're under the maximum, we can use 4MB part sizes.
40        self.assertEqual(minimum_part_size(8 * 1024 * 1024),
41                         4 * 1024 * 1024)
42
43    def test_gigabyte_size(self):
44        # If we're over the maximum default part size, we go up to the next
45        # power of two until we find a part size that keeps us under 10,000
46        # parts.
47        self.assertEqual(minimum_part_size(8 * 1024 * 1024 * 10000),
48                         8 * 1024 * 1024)
49
50    def test_terabyte_size(self):
51        # For a 4 TB file we need at least a 512 MB part size.
52        self.assertEqual(minimum_part_size(4 * 1024 * 1024 * 1024 * 1024),
53                         512 * 1024 * 1024)
54
55    def test_file_size_too_large(self):
56        with self.assertRaises(ValueError):
57            minimum_part_size((40000 * 1024 * 1024 * 1024) + 1)
58
59    def test_default_part_size_can_be_specified(self):
60        default_part_size = 2 * 1024 * 1024
61        self.assertEqual(minimum_part_size(8 * 1024 * 1024, default_part_size),
62                         default_part_size)
63
64
65class TestChunking(unittest.TestCase):
66    def test_chunk_hashes_exact(self):
67        chunks = chunk_hashes(b'a' * (2 * 1024 * 1024))
68        self.assertEqual(len(chunks), 2)
69        self.assertEqual(chunks[0], sha256(b'a' * 1024 * 1024).digest())
70
71    def test_chunks_with_leftovers(self):
72        bytestring = b'a' * (2 * 1024 * 1024 + 20)
73        chunks = chunk_hashes(bytestring)
74        self.assertEqual(len(chunks), 3)
75        self.assertEqual(chunks[0], sha256(b'a' * 1024 * 1024).digest())
76        self.assertEqual(chunks[1], sha256(b'a' * 1024 * 1024).digest())
77        self.assertEqual(chunks[2], sha256(b'a' * 20).digest())
78
79    def test_less_than_one_chunk(self):
80        chunks = chunk_hashes(b'aaaa')
81        self.assertEqual(len(chunks), 1)
82        self.assertEqual(chunks[0], sha256(b'aaaa').digest())
83
84
85class TestTreeHash(unittest.TestCase):
86    # For these tests, a set of reference tree hashes were computed.
87    # This will at least catch any regressions to the tree hash
88    # calculations.
89    def calculate_tree_hash(self, bytestring):
90        start = time.time()
91        calculated = bytes_to_hex(tree_hash(chunk_hashes(bytestring)))
92        end = time.time()
93        logging.debug("Tree hash calc time for length %s: %s",
94                      len(bytestring), end - start)
95        return calculated
96
97    def test_tree_hash_calculations(self):
98        one_meg_bytestring = b'a' * (1 * 1024 * 1024)
99        two_meg_bytestring = b'a' * (2 * 1024 * 1024)
100        four_meg_bytestring = b'a' * (4 * 1024 * 1024)
101        bigger_bytestring = four_meg_bytestring + b'a' * 20
102
103        self.assertEqual(
104            self.calculate_tree_hash(one_meg_bytestring),
105            b'9bc1b2a288b26af7257a36277ae3816a7d4f16e89c1e7e77d0a5c48bad62b360')
106        self.assertEqual(
107            self.calculate_tree_hash(two_meg_bytestring),
108            b'560c2c9333c719cb00cfdffee3ba293db17f58743cdd1f7e4055373ae6300afa')
109        self.assertEqual(
110            self.calculate_tree_hash(four_meg_bytestring),
111            b'9491cb2ed1d4e7cd53215f4017c23ec4ad21d7050a1e6bb636c4f67e8cddb844')
112        self.assertEqual(
113            self.calculate_tree_hash(bigger_bytestring),
114            b'12f3cbd6101b981cde074039f6f728071da8879d6f632de8afc7cdf00661b08f')
115
116    def test_empty_tree_hash(self):
117        self.assertEqual(
118            self.calculate_tree_hash(''),
119            b'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')
120
121
122class TestFileHash(unittest.TestCase):
123    def _gen_data(self):
124        # Generate some pseudo-random bytes of data. We include the
125        # hard-coded blob as an example that fails to decode via UTF-8.
126        return os.urandom(5000) + b'\xc2\x00'
127
128    def test_compute_hash_tempfile(self):
129        # Compute a hash from a file object. On Python 2 this uses a non-
130        # binary mode. On Python 3, however, binary mode is required for
131        # binary files. If not used, you will get UTF-8 code errors.
132        if six.PY2:
133            mode = "w+"
134        else:
135            mode = "wb+"
136
137        with tempfile.TemporaryFile(mode=mode) as f:
138            f.write(self._gen_data())
139            f.seek(0)
140
141            compute_hashes_from_fileobj(f, chunk_size=512)
142
143    @unittest.skipUnless(six.PY3, 'Python 3 requires reading binary!')
144    def test_compute_hash_tempfile_py3(self):
145        # Note the missing 'b' in the mode!
146        with tempfile.TemporaryFile(mode='w+') as f:
147            with self.assertRaises(ValueError):
148                compute_hashes_from_fileobj(f, chunk_size=512)
149
150        # What about file-like objects without a mode? If it has an
151        # encoding we use it, otherwise attempt UTF-8 encoding to
152        # bytes for hashing.
153        f = StringIO('test data' * 500)
154        compute_hashes_from_fileobj(f, chunk_size=512)
155
156    @unittest.skipUnless(six.PY2, 'Python 3 requires reading binary!')
157    def test_compute_hash_stringio(self):
158        # Python 2 binary data in StringIO example
159        f = StringIO(self._gen_data())
160        compute_hashes_from_fileobj(f, chunk_size=512)
161
162    def test_compute_hash_bytesio(self):
163        # Compute a hash from a file-like BytesIO object.
164        f = BytesIO(self._gen_data())
165        compute_hashes_from_fileobj(f, chunk_size=512)
166