1# -*- coding: utf-8 -*- 2# Copyright 2014 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Unit tests for resumable streaming upload functions and classes.""" 16 17from __future__ import absolute_import 18 19from hashlib import md5 20import os 21import pkgutil 22 23from gslib.exception import CommandException 24from gslib.hashing_helper import CalculateHashesFromContents 25from gslib.hashing_helper import CalculateMd5FromContents 26from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper 27import gslib.tests.testcase as testcase 28from gslib.util import GetJsonResumableChunkSize 29from gslib.util import TRANSFER_BUFFER_SIZE 30 31 32_TEST_FILE = 'test.txt' 33 34 35class TestResumableStreamingJsonUploadWrapper(testcase.GsUtilUnitTestCase): 36 """Unit tests for the TestResumableStreamingJsonUploadWrapper class.""" 37 38 _temp_test_file = None 39 _temp_test_file_contents = None 40 _temp_test_file_len = None 41 42 def _GetTestFile(self): 43 if not self._temp_test_file: 44 self._temp_test_file_contents = pkgutil.get_data( 45 'gslib', 'tests/test_data/%s' % _TEST_FILE) 46 self._temp_test_file = self.CreateTempFile( 47 file_name=_TEST_FILE, contents=self._temp_test_file_contents) 48 self._temp_test_file_len = len(self._temp_test_file_contents) 49 return self._temp_test_file 50 51 def testReadInChunks(self): 52 tmp_file = self._GetTestFile() 53 with open(tmp_file, 'rb') as stream: 54 wrapper = ResumableStreamingJsonUploadWrapper( 55 stream, TRANSFER_BUFFER_SIZE, test_small_buffer=True) 56 hash_dict = {'md5': md5()} 57 # CalculateHashesFromContents reads in chunks, but does not seek. 58 CalculateHashesFromContents(wrapper, hash_dict) 59 with open(tmp_file, 'rb') as stream: 60 actual = CalculateMd5FromContents(stream) 61 self.assertEqual(actual, hash_dict['md5'].hexdigest()) 62 63 def testReadInChunksWithSeekToBeginning(self): 64 """Reads one buffer, then seeks to 0 and reads chunks until the end.""" 65 tmp_file = self._GetTestFile() 66 for initial_read in (TRANSFER_BUFFER_SIZE - 1, 67 TRANSFER_BUFFER_SIZE, 68 TRANSFER_BUFFER_SIZE + 1, 69 TRANSFER_BUFFER_SIZE * 2 - 1, 70 TRANSFER_BUFFER_SIZE * 2, 71 TRANSFER_BUFFER_SIZE * 2 + 1, 72 TRANSFER_BUFFER_SIZE * 3 - 1, 73 TRANSFER_BUFFER_SIZE * 3, 74 TRANSFER_BUFFER_SIZE * 3 + 1): 75 for buffer_size in (TRANSFER_BUFFER_SIZE - 1, 76 TRANSFER_BUFFER_SIZE, 77 TRANSFER_BUFFER_SIZE + 1, 78 self._temp_test_file_len - 1, 79 self._temp_test_file_len, 80 self._temp_test_file_len + 1): 81 # Can't seek to 0 if the buffer is too small, so we expect an 82 # exception. 83 expect_exception = buffer_size < self._temp_test_file_len 84 with open(tmp_file, 'rb') as stream: 85 wrapper = ResumableStreamingJsonUploadWrapper( 86 stream, buffer_size, test_small_buffer=True) 87 wrapper.read(initial_read) 88 # CalculateMd5FromContents seeks to 0, reads in chunks, then seeks 89 # to 0 again. 90 try: 91 hex_digest = CalculateMd5FromContents(wrapper) 92 if expect_exception: 93 self.fail('Did not get expected CommandException for ' 94 'initial read size %s, buffer size %s' % 95 (initial_read, buffer_size)) 96 except CommandException, e: 97 if not expect_exception: 98 self.fail('Got unexpected CommandException "%s" for ' 99 'initial read size %s, buffer size %s' % 100 (str(e), initial_read, buffer_size)) 101 if not expect_exception: 102 with open(tmp_file, 'rb') as stream: 103 actual = CalculateMd5FromContents(stream) 104 self.assertEqual( 105 actual, hex_digest, 106 'Digests not equal for initial read size %s, buffer size %s' % 107 (initial_read, buffer_size)) 108 109 def _testSeekBack(self, initial_reads, buffer_size, seek_back_amount): 110 """Tests reading then seeking backwards. 111 112 This function simulates an upload that is resumed after a connection break. 113 It reads one transfer buffer at a time until it reaches initial_position, 114 then seeks backwards (as if the server did not receive some of the bytes) 115 and reads to the end of the file, ensuring the data read after the seek 116 matches the original file. 117 118 Args: 119 initial_reads: List of integers containing read sizes to perform 120 before seek. 121 buffer_size: Maximum buffer size for the wrapper. 122 seek_back_amount: Number of bytes to seek backward. 123 124 Raises: 125 AssertionError on wrong data returned by the wrapper. 126 """ 127 tmp_file = self._GetTestFile() 128 initial_position = 0 129 for read_size in initial_reads: 130 initial_position += read_size 131 self.assertGreaterEqual( 132 buffer_size, seek_back_amount, 133 'seek_back_amount must be less than initial position %s ' 134 '(but was actually: %s)' % (buffer_size, seek_back_amount)) 135 self.assertLess( 136 initial_position, self._temp_test_file_len, 137 'initial_position must be less than test file size %s ' 138 '(but was actually: %s)' % (self._temp_test_file_len, initial_position)) 139 140 with open(tmp_file, 'rb') as stream: 141 wrapper = ResumableStreamingJsonUploadWrapper( 142 stream, buffer_size, test_small_buffer=True) 143 position = 0 144 for read_size in initial_reads: 145 data = wrapper.read(read_size) 146 self.assertEqual( 147 self._temp_test_file_contents[position:position + read_size], 148 data, 'Data from position %s to %s did not match file contents.' % 149 (position, position + read_size)) 150 position += len(data) 151 wrapper.seek(initial_position - seek_back_amount) 152 self.assertEqual(wrapper.tell(), 153 initial_position - seek_back_amount) 154 data = wrapper.read() 155 self.assertEqual( 156 self._temp_test_file_len - (initial_position - seek_back_amount), 157 len(data), 158 'Unexpected data length with initial pos %s seek_back_amount %s. ' 159 'Expected: %s, actual: %s.' % 160 (initial_position, seek_back_amount, 161 self._temp_test_file_len - (initial_position - seek_back_amount), 162 len(data))) 163 self.assertEqual( 164 self._temp_test_file_contents[-len(data):], data, 165 'Data from position %s to EOF did not match file contents.' % 166 position) 167 168 def testReadSeekAndReadToEOF(self): 169 """Tests performing reads on the wrapper, seeking, then reading to EOF.""" 170 for initial_reads in ([1], 171 [TRANSFER_BUFFER_SIZE - 1], 172 [TRANSFER_BUFFER_SIZE], 173 [TRANSFER_BUFFER_SIZE + 1], 174 [1, TRANSFER_BUFFER_SIZE - 1], 175 [1, TRANSFER_BUFFER_SIZE], 176 [1, TRANSFER_BUFFER_SIZE + 1], 177 [TRANSFER_BUFFER_SIZE - 1, 1], 178 [TRANSFER_BUFFER_SIZE, 1], 179 [TRANSFER_BUFFER_SIZE + 1, 1], 180 [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE - 1], 181 [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE], 182 [TRANSFER_BUFFER_SIZE - 1, TRANSFER_BUFFER_SIZE + 1], 183 [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE - 1], 184 [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE], 185 [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE + 1], 186 [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE - 1], 187 [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE], 188 [TRANSFER_BUFFER_SIZE + 1, TRANSFER_BUFFER_SIZE + 1], 189 [TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE, 190 TRANSFER_BUFFER_SIZE]): 191 initial_position = 0 192 for read_size in initial_reads: 193 initial_position += read_size 194 for buffer_size in (initial_position, 195 initial_position + 1, 196 initial_position * 2 - 1, 197 initial_position * 2): 198 for seek_back_amount in ( 199 min(TRANSFER_BUFFER_SIZE - 1, initial_position), 200 min(TRANSFER_BUFFER_SIZE, initial_position), 201 min(TRANSFER_BUFFER_SIZE + 1, initial_position), 202 min(TRANSFER_BUFFER_SIZE * 2 - 1, initial_position), 203 min(TRANSFER_BUFFER_SIZE * 2, initial_position), 204 min(TRANSFER_BUFFER_SIZE * 2 + 1, initial_position)): 205 self._testSeekBack(initial_reads, buffer_size, seek_back_amount) 206 207 def testBufferSizeLessThanChunkSize(self): 208 ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize()) 209 try: 210 ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize() - 1) 211 self.fail('Did not get expected CommandException') 212 except CommandException, e: 213 self.assertIn('Buffer size must be >= JSON resumable upload', str(e)) 214 215 def testSeekPartialBuffer(self): 216 """Tests seeking back partially within the buffer.""" 217 tmp_file = self._GetTestFile() 218 read_size = TRANSFER_BUFFER_SIZE 219 with open(tmp_file, 'rb') as stream: 220 wrapper = ResumableStreamingJsonUploadWrapper( 221 stream, TRANSFER_BUFFER_SIZE * 3, test_small_buffer=True) 222 position = 0 223 for _ in xrange(3): 224 data = wrapper.read(read_size) 225 self.assertEqual( 226 self._temp_test_file_contents[position:position + read_size], 227 data, 'Data from position %s to %s did not match file contents.' % 228 (position, position + read_size)) 229 position += len(data) 230 231 data = wrapper.read(read_size / 2) 232 # Buffer contents should now be have contents from: 233 # read_size/2 through 7*read_size/2. 234 position = read_size / 2 235 wrapper.seek(position) 236 data = wrapper.read() 237 self.assertEqual( 238 self._temp_test_file_contents[-len(data):], data, 239 'Data from position %s to EOF did not match file contents.' % 240 position) 241 242 def testSeekEnd(self): 243 tmp_file = self._GetTestFile() 244 for buffer_size in (TRANSFER_BUFFER_SIZE - 1, 245 TRANSFER_BUFFER_SIZE, 246 TRANSFER_BUFFER_SIZE + 1): 247 for seek_back in (TRANSFER_BUFFER_SIZE - 1, 248 TRANSFER_BUFFER_SIZE, 249 TRANSFER_BUFFER_SIZE + 1): 250 expect_exception = seek_back > buffer_size 251 with open(tmp_file, 'rb') as stream: 252 wrapper = ResumableStreamingJsonUploadWrapper( 253 stream, buffer_size, test_small_buffer=True) 254 # Read to the end. 255 while wrapper.read(TRANSFER_BUFFER_SIZE): 256 pass 257 try: 258 wrapper.seek(seek_back, whence=os.SEEK_END) 259 if expect_exception: 260 self.fail('Did not get expected CommandException for ' 261 'seek_back size %s, buffer size %s' % 262 (seek_back, buffer_size)) 263 except CommandException, e: 264 if not expect_exception: 265 self.fail('Got unexpected CommandException "%s" for ' 266 'seek_back size %s, buffer size %s' % 267 (str(e), seek_back, buffer_size)) 268