1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Unit tests for resumable streaming upload functions and classes."""
16
17from __future__ import absolute_import
18
19from hashlib import md5
20import os
21import pkgutil
22
23from gslib.exception import CommandException
24from gslib.hashing_helper import CalculateHashesFromContents
25from gslib.hashing_helper import CalculateMd5FromContents
26from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
27import gslib.tests.testcase as testcase
28from gslib.util import GetJsonResumableChunkSize
29from gslib.util import TRANSFER_BUFFER_SIZE
30
31
32_TEST_FILE = 'test.txt'
33
34
35class TestResumableStreamingJsonUploadWrapper(testcase.GsUtilUnitTestCase):
36  """Unit tests for the TestResumableStreamingJsonUploadWrapper class."""
37
38  _temp_test_file = None
39  _temp_test_file_contents = None
40  _temp_test_file_len = None
41
42  def _GetTestFile(self):
43    if not self._temp_test_file:
44      self._temp_test_file_contents = pkgutil.get_data(
45          'gslib', 'tests/test_data/%s' % _TEST_FILE)
46      self._temp_test_file = self.CreateTempFile(
47          file_name=_TEST_FILE, contents=self._temp_test_file_contents)
48      self._temp_test_file_len = len(self._temp_test_file_contents)
49    return self._temp_test_file
50
51  def testReadInChunks(self):
52    tmp_file = self._GetTestFile()
53    with open(tmp_file, 'rb') as stream:
54      wrapper = ResumableStreamingJsonUploadWrapper(
55          stream, TRANSFER_BUFFER_SIZE, test_small_buffer=True)
56      hash_dict = {'md5': md5()}
57      # CalculateHashesFromContents reads in chunks, but does not seek.
58      CalculateHashesFromContents(wrapper, hash_dict)
59    with open(tmp_file, 'rb') as stream:
60      actual = CalculateMd5FromContents(stream)
61    self.assertEqual(actual, hash_dict['md5'].hexdigest())
62
63  def testReadInChunksWithSeekToBeginning(self):
64    """Reads one buffer, then seeks to 0 and reads chunks until the end."""
65    tmp_file = self._GetTestFile()
66    for initial_read in (TRANSFER_BUFFER_SIZE - 1,
67                         TRANSFER_BUFFER_SIZE,
68                         TRANSFER_BUFFER_SIZE + 1,
69                         TRANSFER_BUFFER_SIZE * 2 - 1,
70                         TRANSFER_BUFFER_SIZE * 2,
71                         TRANSFER_BUFFER_SIZE * 2 + 1,
72                         TRANSFER_BUFFER_SIZE * 3 - 1,
73                         TRANSFER_BUFFER_SIZE * 3,
74                         TRANSFER_BUFFER_SIZE * 3 + 1):
75      for buffer_size in (TRANSFER_BUFFER_SIZE - 1,
76                          TRANSFER_BUFFER_SIZE,
77                          TRANSFER_BUFFER_SIZE + 1,
78                          self._temp_test_file_len - 1,
79                          self._temp_test_file_len,
80                          self._temp_test_file_len + 1):
81        # Can't seek to 0 if the buffer is too small, so we expect an
82        # exception.
83        expect_exception = buffer_size < self._temp_test_file_len
84        with open(tmp_file, 'rb') as stream:
85          wrapper = ResumableStreamingJsonUploadWrapper(
86              stream, buffer_size, test_small_buffer=True)
87          wrapper.read(initial_read)
88          # CalculateMd5FromContents seeks to 0, reads in chunks, then seeks
89          # to 0 again.
90          try:
91            hex_digest = CalculateMd5FromContents(wrapper)
92            if expect_exception:
93              self.fail('Did not get expected CommandException for '
94                        'initial read size %s, buffer size %s' %
95                        (initial_read, buffer_size))
96          except CommandException, e:
97            if not expect_exception:
98              self.fail('Got unexpected CommandException "%s" for '
99                        'initial read size %s, buffer size %s' %
100                        (str(e), initial_read, buffer_size))
101        if not expect_exception:
102          with open(tmp_file, 'rb') as stream:
103            actual = CalculateMd5FromContents(stream)
104          self.assertEqual(
105              actual, hex_digest,
106              'Digests not equal for initial read size %s, buffer size %s' %
107              (initial_read, buffer_size))
108
109  def _testSeekBack(self, initial_reads, buffer_size, seek_back_amount):
110    """Tests reading then seeking backwards.
111
112    This function simulates an upload that is resumed after a connection break.
113    It reads one transfer buffer at a time until it reaches initial_position,
114    then seeks backwards (as if the server did not receive some of the bytes)
115    and reads to the end of the file, ensuring the data read after the seek
116    matches the original file.
117
118    Args:
119      initial_reads: List of integers containing read sizes to perform
120          before seek.
121      buffer_size: Maximum buffer size for the wrapper.
122      seek_back_amount: Number of bytes to seek backward.
123
124    Raises:
125      AssertionError on wrong data returned by the wrapper.
126    """
127    tmp_file = self._GetTestFile()
128    initial_position = 0
129    for read_size in initial_reads:
130      initial_position += read_size
131    self.assertGreaterEqual(
132        buffer_size, seek_back_amount,
133        'seek_back_amount must be less than initial position %s '
134        '(but was actually: %s)' % (buffer_size, seek_back_amount))
135    self.assertLess(
136        initial_position, self._temp_test_file_len,
137        'initial_position must be less than test file size %s '
138        '(but was actually: %s)' % (self._temp_test_file_len, initial_position))
139
140    with open(tmp_file, 'rb') as stream:
141      wrapper = ResumableStreamingJsonUploadWrapper(
142          stream, buffer_size, test_small_buffer=True)
143      position = 0
144      for read_size in initial_reads:
145        data = wrapper.read(read_size)
146        self.assertEqual(
147            self._temp_test_file_contents[position:position + read_size],
148            data, 'Data from position %s to %s did not match file contents.' %
149            (position, position + read_size))
150        position += len(data)
151      wrapper.seek(initial_position - seek_back_amount)
152      self.assertEqual(wrapper.tell(),
153                       initial_position - seek_back_amount)
154      data = wrapper.read()
155      self.assertEqual(
156          self._temp_test_file_len - (initial_position - seek_back_amount),
157          len(data),
158          'Unexpected data length with initial pos %s seek_back_amount %s. '
159          'Expected: %s, actual: %s.' %
160          (initial_position, seek_back_amount,
161           self._temp_test_file_len - (initial_position - seek_back_amount),
162           len(data)))
163      self.assertEqual(
164          self._temp_test_file_contents[-len(data):], data,
165          'Data from position %s to EOF did not match file contents.' %
166          position)
167
168  def testReadSeekAndReadToEOF(self):
169    """Tests performing reads on the wrapper, seeking, then reading to EOF."""
170    for initial_reads in ([1],
171                          [TRANSFER_BUFFER_SIZE - 1],
172                          [TRANSFER_BUFFER_SIZE],
173                          [TRANSFER_BUFFER_SIZE + 1],
174                          [1, TRANSFER_BUFFER_SIZE - 1],
175                          [1, TRANSFER_BUFFER_SIZE],
176                          [1, TRANSFER_BUFFER_SIZE + 1],
177                          [TRANSFER_BUFFER_SIZE - 1, 1],
178                          [TRANSFER_BUFFER_SIZE, 1],
179                          [TRANSFER_BUFFER_SIZE + 1, 1],
180                          [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE - 1],
181                          [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE],
182                          [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE + 1],
183                          [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE - 1],
184                          [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE],
185                          [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE + 1],
186                          [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE - 1],
187                          [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE],
188                          [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE + 1],
189                          [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE,
190                           TRANSFER_BUFFER_SIZE]):
191      initial_position = 0
192      for read_size in initial_reads:
193        initial_position += read_size
194      for buffer_size in (initial_position,
195                          initial_position + 1,
196                          initial_position * 2 - 1,
197                          initial_position * 2):
198        for seek_back_amount in (
199            min(TRANSFER_BUFFER_SIZE - 1, initial_position),
200            min(TRANSFER_BUFFER_SIZE, initial_position),
201            min(TRANSFER_BUFFER_SIZE + 1, initial_position),
202            min(TRANSFER_BUFFER_SIZE * 2 - 1, initial_position),
203            min(TRANSFER_BUFFER_SIZE * 2, initial_position),
204            min(TRANSFER_BUFFER_SIZE * 2 + 1, initial_position)):
205          self._testSeekBack(initial_reads, buffer_size, seek_back_amount)
206
207  def testBufferSizeLessThanChunkSize(self):
208    ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize())
209    try:
210      ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize() - 1)
211      self.fail('Did not get expected CommandException')
212    except CommandException, e:
213      self.assertIn('Buffer size must be >= JSON resumable upload', str(e))
214
215  def testSeekPartialBuffer(self):
216    """Tests seeking back partially within the buffer."""
217    tmp_file = self._GetTestFile()
218    read_size = TRANSFER_BUFFER_SIZE
219    with open(tmp_file, 'rb') as stream:
220      wrapper = ResumableStreamingJsonUploadWrapper(
221          stream, TRANSFER_BUFFER_SIZE * 3, test_small_buffer=True)
222      position = 0
223      for _ in xrange(3):
224        data = wrapper.read(read_size)
225        self.assertEqual(
226            self._temp_test_file_contents[position:position + read_size],
227            data, 'Data from position %s to %s did not match file contents.' %
228            (position, position + read_size))
229        position += len(data)
230
231      data = wrapper.read(read_size / 2)
232      # Buffer contents should now be have contents from:
233      # read_size/2 through 7*read_size/2.
234      position = read_size / 2
235      wrapper.seek(position)
236      data = wrapper.read()
237      self.assertEqual(
238          self._temp_test_file_contents[-len(data):], data,
239          'Data from position %s to EOF did not match file contents.' %
240          position)
241
242  def testSeekEnd(self):
243    tmp_file = self._GetTestFile()
244    for buffer_size in (TRANSFER_BUFFER_SIZE - 1,
245                        TRANSFER_BUFFER_SIZE,
246                        TRANSFER_BUFFER_SIZE + 1):
247      for seek_back in (TRANSFER_BUFFER_SIZE - 1,
248                        TRANSFER_BUFFER_SIZE,
249                        TRANSFER_BUFFER_SIZE + 1):
250        expect_exception = seek_back > buffer_size
251        with open(tmp_file, 'rb') as stream:
252          wrapper = ResumableStreamingJsonUploadWrapper(
253              stream, buffer_size, test_small_buffer=True)
254          # Read to the end.
255          while wrapper.read(TRANSFER_BUFFER_SIZE):
256            pass
257          try:
258            wrapper.seek(seek_back, whence=os.SEEK_END)
259            if expect_exception:
260              self.fail('Did not get expected CommandException for '
261                        'seek_back size %s, buffer size %s' %
262                        (seek_back, buffer_size))
263          except CommandException, e:
264            if not expect_exception:
265              self.fail('Got unexpected CommandException "%s" for '
266                        'seek_back size %s, buffer size %s' %
267                        (str(e), seek_back, buffer_size))
268