1# Copyright 2013 Google Inc.
2# Copyright 2011, Nexenta Systems Inc.
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22
23import os
24import unittest
25from boto.s3.keyfile import KeyFile
26from tests.integration.s3.mock_storage_service import MockConnection
27from tests.integration.s3.mock_storage_service import MockBucket
28
29
30class KeyfileTest(unittest.TestCase):
31
32    def setUp(self):
33        service_connection = MockConnection()
34        self.contents = '0123456789'
35        bucket = MockBucket(service_connection, 'mybucket')
36        key = bucket.new_key('mykey')
37        key.set_contents_from_string(self.contents)
38        self.keyfile = KeyFile(key)
39
40    def tearDown(self):
41        self.keyfile.close()
42
43    def testReadFull(self):
44        self.assertEqual(self.keyfile.read(len(self.contents)), self.contents)
45
46    def testReadPartial(self):
47        self.assertEqual(self.keyfile.read(5), self.contents[:5])
48        self.assertEqual(self.keyfile.read(5), self.contents[5:])
49
50    def testTell(self):
51        self.assertEqual(self.keyfile.tell(), 0)
52        self.keyfile.read(4)
53        self.assertEqual(self.keyfile.tell(), 4)
54        self.keyfile.read(6)
55        self.assertEqual(self.keyfile.tell(), 10)
56        self.keyfile.close()
57        try:
58            self.keyfile.tell()
59        except ValueError as e:
60            self.assertEqual(str(e), 'I/O operation on closed file')
61
62    def testSeek(self):
63        self.assertEqual(self.keyfile.read(4), self.contents[:4])
64        self.keyfile.seek(0)
65        self.assertEqual(self.keyfile.read(4), self.contents[:4])
66        self.keyfile.seek(5)
67        self.assertEqual(self.keyfile.read(5), self.contents[5:])
68
69        # Seeking negative should raise.
70        try:
71            self.keyfile.seek(-5)
72        except IOError as e:
73            self.assertEqual(str(e), 'Invalid argument')
74
75        # Reading past end of file is supposed to return empty string.
76        self.keyfile.read(10)
77        self.assertEqual(self.keyfile.read(20), '')
78
79        # Seeking past end of file is supposed to silently work.
80        self.keyfile.seek(50)
81        self.assertEqual(self.keyfile.tell(), 50)
82        self.assertEqual(self.keyfile.read(1), '')
83
84    def testSeekEnd(self):
85        self.assertEqual(self.keyfile.read(4), self.contents[:4])
86        self.keyfile.seek(0, os.SEEK_END)
87        self.assertEqual(self.keyfile.read(1), '')
88        self.keyfile.seek(-1, os.SEEK_END)
89        self.assertEqual(self.keyfile.tell(), 9)
90        self.assertEqual(self.keyfile.read(1), '9')
91        # Test attempt to seek backwards past the start from the end.
92        try:
93            self.keyfile.seek(-100, os.SEEK_END)
94        except IOError as e:
95            self.assertEqual(str(e), 'Invalid argument')
96
97    def testSeekCur(self):
98        self.assertEqual(self.keyfile.read(1), self.contents[0])
99        self.keyfile.seek(1, os.SEEK_CUR)
100        self.assertEqual(self.keyfile.tell(), 2)
101        self.assertEqual(self.keyfile.read(4), self.contents[2:6])
102
103    def testSetEtag(self):
104        # Make sure both bytes and strings work as contents. This is one of the
105        # very few places Boto uses the mock key object.
106        # https://github.com/GoogleCloudPlatform/gsutil/issues/214#issuecomment-49906044
107        self.keyfile.key.data = b'test'
108        self.keyfile.key.set_etag()
109        self.assertEqual(self.keyfile.key.etag, '098f6bcd4621d373cade4e832627b4f6')
110
111        self.keyfile.key.etag = None
112        self.keyfile.key.data = 'test'
113        self.keyfile.key.set_etag()
114        self.assertEqual(self.keyfile.key.etag, '098f6bcd4621d373cade4e832627b4f6')
115