1# Copyright (c) 2012 Mitch Garnaat http://garnaat.org/
2# Copyright (c) 2012 Amazon.com, Inc. or its affiliates.  All Rights Reserved
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22#
23import time
24from binascii import crc32
25
26import boto
27from boto.connection import AWSAuthConnection
28from boto.exception import DynamoDBResponseError
29from boto.provider import Provider
30from boto.dynamodb import exceptions as dynamodb_exceptions
31from boto.compat import json
32
33
34class Layer1(AWSAuthConnection):
35    """
36    This is the lowest-level interface to DynamoDB.  Methods at this
37    layer map directly to API requests and parameters to the methods
38    are either simple, scalar values or they are the Python equivalent
39    of the JSON input as defined in the DynamoDB Developer's Guide.
40    All responses are direct decoding of the JSON response bodies to
41    Python data structures via the json or simplejson modules.
42
43    :ivar throughput_exceeded_events: An integer variable that
44        keeps a running total of the number of ThroughputExceeded
45        responses this connection has received from Amazon DynamoDB.
46    """
47
48    DefaultRegionName = 'us-east-1'
49    """The default region name for DynamoDB API."""
50
51    ServiceName = 'DynamoDB'
52    """The name of the Service"""
53
54    Version = '20111205'
55    """DynamoDB API version."""
56
57    ThruputError = "ProvisionedThroughputExceededException"
58    """The error response returned when provisioned throughput is exceeded"""
59
60    SessionExpiredError = 'com.amazon.coral.service#ExpiredTokenException'
61    """The error response returned when session token has expired"""
62
63    ConditionalCheckFailedError = 'ConditionalCheckFailedException'
64    """The error response returned when a conditional check fails"""
65
66    ValidationError = 'ValidationException'
67    """The error response returned when an item is invalid in some way"""
68
69    ResponseError = DynamoDBResponseError
70
71    NumberRetries = 10
72    """The number of times an error is retried."""
73
74    def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
75                 is_secure=True, port=None, proxy=None, proxy_port=None,
76                 debug=0, security_token=None, region=None,
77                 validate_certs=True, validate_checksums=True, profile_name=None):
78        if not region:
79            region_name = boto.config.get('DynamoDB', 'region',
80                                          self.DefaultRegionName)
81            for reg in boto.dynamodb.regions():
82                if reg.name == region_name:
83                    region = reg
84                    break
85
86        self.region = region
87        super(Layer1, self).__init__(self.region.endpoint,
88                                   aws_access_key_id,
89                                   aws_secret_access_key,
90                                   is_secure, port, proxy, proxy_port,
91                                   debug=debug, security_token=security_token,
92                                   validate_certs=validate_certs,
93                                   profile_name=profile_name)
94        self.throughput_exceeded_events = 0
95        self._validate_checksums = boto.config.getbool(
96            'DynamoDB', 'validate_checksums', validate_checksums)
97
98    def _get_session_token(self):
99        self.provider = Provider(self._provider_type)
100        self._auth_handler.update_provider(self.provider)
101
102    def _required_auth_capability(self):
103        return ['hmac-v4']
104
105    def make_request(self, action, body='', object_hook=None):
106        """
107        :raises: ``DynamoDBExpiredTokenError`` if the security token expires.
108        """
109        headers = {'X-Amz-Target': '%s_%s.%s' % (self.ServiceName,
110                                                 self.Version, action),
111                   'Host': self.region.endpoint,
112                   'Content-Type': 'application/x-amz-json-1.0',
113                   'Content-Length': str(len(body))}
114        http_request = self.build_base_http_request('POST', '/', '/',
115                                                    {}, headers, body, None)
116        start = time.time()
117        response = self._mexe(http_request, sender=None,
118                              override_num_retries=self.NumberRetries,
119                              retry_handler=self._retry_handler)
120        elapsed = (time.time() - start) * 1000
121        request_id = response.getheader('x-amzn-RequestId')
122        boto.log.debug('RequestId: %s' % request_id)
123        boto.perflog.debug('%s: id=%s time=%sms',
124                           headers['X-Amz-Target'], request_id, int(elapsed))
125        response_body = response.read().decode('utf-8')
126        boto.log.debug(response_body)
127        return json.loads(response_body, object_hook=object_hook)
128
129    def _retry_handler(self, response, i, next_sleep):
130        status = None
131        if response.status == 400:
132            response_body = response.read().decode('utf-8')
133            boto.log.debug(response_body)
134            data = json.loads(response_body)
135            if self.ThruputError in data.get('__type'):
136                self.throughput_exceeded_events += 1
137                msg = "%s, retry attempt %s" % (self.ThruputError, i)
138                next_sleep = self._exponential_time(i)
139                i += 1
140                status = (msg, i, next_sleep)
141                if i == self.NumberRetries:
142                    # If this was our last retry attempt, raise
143                    # a specific error saying that the throughput
144                    # was exceeded.
145                    raise dynamodb_exceptions.DynamoDBThroughputExceededError(
146                        response.status, response.reason, data)
147            elif self.SessionExpiredError in data.get('__type'):
148                msg = 'Renewing Session Token'
149                self._get_session_token()
150                status = (msg, i + self.num_retries - 1, 0)
151            elif self.ConditionalCheckFailedError in data.get('__type'):
152                raise dynamodb_exceptions.DynamoDBConditionalCheckFailedError(
153                    response.status, response.reason, data)
154            elif self.ValidationError in data.get('__type'):
155                raise dynamodb_exceptions.DynamoDBValidationError(
156                    response.status, response.reason, data)
157            else:
158                raise self.ResponseError(response.status, response.reason,
159                                         data)
160        expected_crc32 = response.getheader('x-amz-crc32')
161        if self._validate_checksums and expected_crc32 is not None:
162            boto.log.debug('Validating crc32 checksum for body: %s',
163                           response.read().decode('utf-8'))
164            actual_crc32 = crc32(response.read()) & 0xffffffff
165            expected_crc32 = int(expected_crc32)
166            if actual_crc32 != expected_crc32:
167                msg = ("The calculated checksum %s did not match the expected "
168                       "checksum %s" % (actual_crc32, expected_crc32))
169                status = (msg, i + 1, self._exponential_time(i))
170        return status
171
172    def _exponential_time(self, i):
173        if i == 0:
174            next_sleep = 0
175        else:
176            next_sleep = min(0.05 * (2 ** i),
177                             boto.config.get('Boto', 'max_retry_delay', 60))
178        return next_sleep
179
180    def list_tables(self, limit=None, start_table=None):
181        """
182        Returns a dictionary of results.  The dictionary contains
183        a **TableNames** key whose value is a list of the table names.
184        The dictionary could also contain a **LastEvaluatedTableName**
185        key whose value would be the last table name returned if
186        the complete list of table names was not returned.  This
187        value would then be passed as the ``start_table`` parameter on
188        a subsequent call to this method.
189
190        :type limit: int
191        :param limit: The maximum number of tables to return.
192
193        :type start_table: str
194        :param start_table: The name of the table that starts the
195            list.  If you ran a previous list_tables and not
196            all results were returned, the response dict would
197            include a LastEvaluatedTableName attribute.  Use
198            that value here to continue the listing.
199        """
200        data = {}
201        if limit:
202            data['Limit'] = limit
203        if start_table:
204            data['ExclusiveStartTableName'] = start_table
205        json_input = json.dumps(data)
206        return self.make_request('ListTables', json_input)
207
208    def describe_table(self, table_name):
209        """
210        Returns information about the table including current
211        state of the table, primary key schema and when the
212        table was created.
213
214        :type table_name: str
215        :param table_name: The name of the table to describe.
216        """
217        data = {'TableName': table_name}
218        json_input = json.dumps(data)
219        return self.make_request('DescribeTable', json_input)
220
221    def create_table(self, table_name, schema, provisioned_throughput):
222        """
223        Add a new table to your account.  The table name must be unique
224        among those associated with the account issuing the request.
225        This request triggers an asynchronous workflow to begin creating
226        the table.  When the workflow is complete, the state of the
227        table will be ACTIVE.
228
229        :type table_name: str
230        :param table_name: The name of the table to create.
231
232        :type schema: dict
233        :param schema: A Python version of the KeySchema data structure
234            as defined by DynamoDB
235
236        :type provisioned_throughput: dict
237        :param provisioned_throughput: A Python version of the
238            ProvisionedThroughput data structure defined by
239            DynamoDB.
240        """
241        data = {'TableName': table_name,
242                'KeySchema': schema,
243                'ProvisionedThroughput': provisioned_throughput}
244        json_input = json.dumps(data)
245        response_dict = self.make_request('CreateTable', json_input)
246        return response_dict
247
248    def update_table(self, table_name, provisioned_throughput):
249        """
250        Updates the provisioned throughput for a given table.
251
252        :type table_name: str
253        :param table_name: The name of the table to update.
254
255        :type provisioned_throughput: dict
256        :param provisioned_throughput: A Python version of the
257            ProvisionedThroughput data structure defined by
258            DynamoDB.
259        """
260        data = {'TableName': table_name,
261                'ProvisionedThroughput': provisioned_throughput}
262        json_input = json.dumps(data)
263        return self.make_request('UpdateTable', json_input)
264
265    def delete_table(self, table_name):
266        """
267        Deletes the table and all of it's data.  After this request
268        the table will be in the DELETING state until DynamoDB
269        completes the delete operation.
270
271        :type table_name: str
272        :param table_name: The name of the table to delete.
273        """
274        data = {'TableName': table_name}
275        json_input = json.dumps(data)
276        return self.make_request('DeleteTable', json_input)
277
278    def get_item(self, table_name, key, attributes_to_get=None,
279                 consistent_read=False, object_hook=None):
280        """
281        Return a set of attributes for an item that matches
282        the supplied key.
283
284        :type table_name: str
285        :param table_name: The name of the table containing the item.
286
287        :type key: dict
288        :param key: A Python version of the Key data structure
289            defined by DynamoDB.
290
291        :type attributes_to_get: list
292        :param attributes_to_get: A list of attribute names.
293            If supplied, only the specified attribute names will
294            be returned.  Otherwise, all attributes will be returned.
295
296        :type consistent_read: bool
297        :param consistent_read: If True, a consistent read
298            request is issued.  Otherwise, an eventually consistent
299            request is issued.
300        """
301        data = {'TableName': table_name,
302                'Key': key}
303        if attributes_to_get:
304            data['AttributesToGet'] = attributes_to_get
305        if consistent_read:
306            data['ConsistentRead'] = True
307        json_input = json.dumps(data)
308        response = self.make_request('GetItem', json_input,
309                                     object_hook=object_hook)
310        if 'Item' not in response:
311            raise dynamodb_exceptions.DynamoDBKeyNotFoundError(
312                "Key does not exist."
313            )
314        return response
315
316    def batch_get_item(self, request_items, object_hook=None):
317        """
318        Return a set of attributes for a multiple items in
319        multiple tables using their primary keys.
320
321        :type request_items: dict
322        :param request_items: A Python version of the RequestItems
323            data structure defined by DynamoDB.
324        """
325        # If the list is empty, return empty response
326        if not request_items:
327            return {}
328        data = {'RequestItems': request_items}
329        json_input = json.dumps(data)
330        return self.make_request('BatchGetItem', json_input,
331                                 object_hook=object_hook)
332
333    def batch_write_item(self, request_items, object_hook=None):
334        """
335        This operation enables you to put or delete several items
336        across multiple tables in a single API call.
337
338        :type request_items: dict
339        :param request_items: A Python version of the RequestItems
340            data structure defined by DynamoDB.
341        """
342        data = {'RequestItems': request_items}
343        json_input = json.dumps(data)
344        return self.make_request('BatchWriteItem', json_input,
345                                 object_hook=object_hook)
346
347    def put_item(self, table_name, item,
348                 expected=None, return_values=None,
349                 object_hook=None):
350        """
351        Create a new item or replace an old item with a new
352        item (including all attributes).  If an item already
353        exists in the specified table with the same primary
354        key, the new item will completely replace the old item.
355        You can perform a conditional put by specifying an
356        expected rule.
357
358        :type table_name: str
359        :param table_name: The name of the table in which to put the item.
360
361        :type item: dict
362        :param item: A Python version of the Item data structure
363            defined by DynamoDB.
364
365        :type expected: dict
366        :param expected: A Python version of the Expected
367            data structure defined by DynamoDB.
368
369        :type return_values: str
370        :param return_values: Controls the return of attribute
371            name-value pairs before then were changed.  Possible
372            values are: None or 'ALL_OLD'. If 'ALL_OLD' is
373            specified and the item is overwritten, the content
374            of the old item is returned.
375        """
376        data = {'TableName': table_name,
377                'Item': item}
378        if expected:
379            data['Expected'] = expected
380        if return_values:
381            data['ReturnValues'] = return_values
382        json_input = json.dumps(data)
383        return self.make_request('PutItem', json_input,
384                                 object_hook=object_hook)
385
386    def update_item(self, table_name, key, attribute_updates,
387                    expected=None, return_values=None,
388                    object_hook=None):
389        """
390        Edits an existing item's attributes. You can perform a conditional
391        update (insert a new attribute name-value pair if it doesn't exist,
392        or replace an existing name-value pair if it has certain expected
393        attribute values).
394
395        :type table_name: str
396        :param table_name: The name of the table.
397
398        :type key: dict
399        :param key: A Python version of the Key data structure
400            defined by DynamoDB which identifies the item to be updated.
401
402        :type attribute_updates: dict
403        :param attribute_updates: A Python version of the AttributeUpdates
404            data structure defined by DynamoDB.
405
406        :type expected: dict
407        :param expected: A Python version of the Expected
408            data structure defined by DynamoDB.
409
410        :type return_values: str
411        :param return_values: Controls the return of attribute
412            name-value pairs before then were changed.  Possible
413            values are: None or 'ALL_OLD'. If 'ALL_OLD' is
414            specified and the item is overwritten, the content
415            of the old item is returned.
416        """
417        data = {'TableName': table_name,
418                'Key': key,
419                'AttributeUpdates': attribute_updates}
420        if expected:
421            data['Expected'] = expected
422        if return_values:
423            data['ReturnValues'] = return_values
424        json_input = json.dumps(data)
425        return self.make_request('UpdateItem', json_input,
426                                 object_hook=object_hook)
427
428    def delete_item(self, table_name, key,
429                    expected=None, return_values=None,
430                    object_hook=None):
431        """
432        Delete an item and all of it's attributes by primary key.
433        You can perform a conditional delete by specifying an
434        expected rule.
435
436        :type table_name: str
437        :param table_name: The name of the table containing the item.
438
439        :type key: dict
440        :param key: A Python version of the Key data structure
441            defined by DynamoDB.
442
443        :type expected: dict
444        :param expected: A Python version of the Expected
445            data structure defined by DynamoDB.
446
447        :type return_values: str
448        :param return_values: Controls the return of attribute
449            name-value pairs before then were changed.  Possible
450            values are: None or 'ALL_OLD'. If 'ALL_OLD' is
451            specified and the item is overwritten, the content
452            of the old item is returned.
453        """
454        data = {'TableName': table_name,
455                'Key': key}
456        if expected:
457            data['Expected'] = expected
458        if return_values:
459            data['ReturnValues'] = return_values
460        json_input = json.dumps(data)
461        return self.make_request('DeleteItem', json_input,
462                                 object_hook=object_hook)
463
464    def query(self, table_name, hash_key_value, range_key_conditions=None,
465              attributes_to_get=None, limit=None, consistent_read=False,
466              scan_index_forward=True, exclusive_start_key=None,
467              object_hook=None, count=False):
468        """
469        Perform a query of DynamoDB.  This version is currently punting
470        and expecting you to provide a full and correct JSON body
471        which is passed as is to DynamoDB.
472
473        :type table_name: str
474        :param table_name: The name of the table to query.
475
476        :type hash_key_value: dict
477        :param key: A DynamoDB-style HashKeyValue.
478
479        :type range_key_conditions: dict
480        :param range_key_conditions: A Python version of the
481            RangeKeyConditions data structure.
482
483        :type attributes_to_get: list
484        :param attributes_to_get: A list of attribute names.
485            If supplied, only the specified attribute names will
486            be returned.  Otherwise, all attributes will be returned.
487
488        :type limit: int
489        :param limit: The maximum number of items to return.
490
491        :type count: bool
492        :param count: If True, Amazon DynamoDB returns a total
493            number of items for the Query operation, even if the
494            operation has no matching items for the assigned filter.
495
496        :type consistent_read: bool
497        :param consistent_read: If True, a consistent read
498            request is issued.  Otherwise, an eventually consistent
499            request is issued.
500
501        :type scan_index_forward: bool
502        :param scan_index_forward: Specified forward or backward
503            traversal of the index.  Default is forward (True).
504
505        :type exclusive_start_key: list or tuple
506        :param exclusive_start_key: Primary key of the item from
507            which to continue an earlier query.  This would be
508            provided as the LastEvaluatedKey in that query.
509        """
510        data = {'TableName': table_name,
511                'HashKeyValue': hash_key_value}
512        if range_key_conditions:
513            data['RangeKeyCondition'] = range_key_conditions
514        if attributes_to_get:
515            data['AttributesToGet'] = attributes_to_get
516        if limit:
517            data['Limit'] = limit
518        if count:
519            data['Count'] = True
520        if consistent_read:
521            data['ConsistentRead'] = True
522        if scan_index_forward:
523            data['ScanIndexForward'] = True
524        else:
525            data['ScanIndexForward'] = False
526        if exclusive_start_key:
527            data['ExclusiveStartKey'] = exclusive_start_key
528        json_input = json.dumps(data)
529        return self.make_request('Query', json_input,
530                                 object_hook=object_hook)
531
532    def scan(self, table_name, scan_filter=None,
533             attributes_to_get=None, limit=None,
534             exclusive_start_key=None, object_hook=None, count=False):
535        """
536        Perform a scan of DynamoDB.  This version is currently punting
537        and expecting you to provide a full and correct JSON body
538        which is passed as is to DynamoDB.
539
540        :type table_name: str
541        :param table_name: The name of the table to scan.
542
543        :type scan_filter: dict
544        :param scan_filter: A Python version of the
545            ScanFilter data structure.
546
547        :type attributes_to_get: list
548        :param attributes_to_get: A list of attribute names.
549            If supplied, only the specified attribute names will
550            be returned.  Otherwise, all attributes will be returned.
551
552        :type limit: int
553        :param limit: The maximum number of items to evaluate.
554
555        :type count: bool
556        :param count: If True, Amazon DynamoDB returns a total
557            number of items for the Scan operation, even if the
558            operation has no matching items for the assigned filter.
559
560        :type exclusive_start_key: list or tuple
561        :param exclusive_start_key: Primary key of the item from
562            which to continue an earlier query.  This would be
563            provided as the LastEvaluatedKey in that query.
564        """
565        data = {'TableName': table_name}
566        if scan_filter:
567            data['ScanFilter'] = scan_filter
568        if attributes_to_get:
569            data['AttributesToGet'] = attributes_to_get
570        if limit:
571            data['Limit'] = limit
572        if count:
573            data['Count'] = True
574        if exclusive_start_key:
575            data['ExclusiveStartKey'] = exclusive_start_key
576        json_input = json.dumps(data)
577        return self.make_request('Scan', json_input, object_hook=object_hook)
578