1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A module for storing and getting objects from datastore.
6
7This module provides Get, Set and Delete functions for storing pickleable
8objects in datastore, with support for large objects greater than 1 MB.
9
10Although this module contains ndb.Model classes, these are not intended
11to be used directly by other modules.
12
13App Engine datastore limits entity size to less than 1 MB; this module
14supports storing larger objects by splitting the data and using multiple
15datastore entities and multiple memcache keys. Using ndb.get and pickle, a
16complex data structure can be retrieved more quickly than datastore fetch.
17
18Example:
19  john = Account()
20  john.username = 'John'
21  john.userid = 123
22  stored_object.Set(john.userid, john)
23"""
24
25import cPickle as pickle
26import logging
27
28from google.appengine.api import memcache
29from google.appengine.ext import ndb
30
31_MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_'
32
33# Maximum number of entities and memcache to save a value.
34# The limit for data stored in one datastore entity is 1 MB,
35# and the limit for memcache batch operations is 32 MB. See:
36# https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits
37_MAX_NUM_PARTS = 16
38
39# Max bytes per entity or value cached with memcache.
40_CHUNK_SIZE = 1000 * 1000
41
42
43def Get(key):
44  """Gets the value.
45
46  Args:
47    key: String key value.
48
49  Returns:
50    A value for key.
51  """
52  results = MultipartCache.Get(key)
53  if not results:
54    results = _GetValueFromDatastore(key)
55    MultipartCache.Set(key, results)
56  return results
57
58
59def Set(key, value):
60  """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
61
62  Args:
63    key: String key value.
64    value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
65  """
66  entity = ndb.Key(MultipartEntity, key).get()
67  if not entity:
68    entity = MultipartEntity(id=key)
69  entity.SetData(value)
70  entity.Save()
71  MultipartCache.Set(key, value)
72
73
74def Delete(key):
75  """Deletes the value in datastore and memcache."""
76  ndb.Key(MultipartEntity, key).delete()
77  MultipartCache.Delete(key)
78
79
80class MultipartEntity(ndb.Model):
81  """Container for PartEntity."""
82
83  # Number of entities use to store serialized.
84  size = ndb.IntegerProperty(default=0, indexed=False)
85
86  @classmethod
87  def _post_get_hook(cls, key, future):  # pylint: disable=unused-argument
88    """Deserializes data from multiple PartEntity."""
89    entity = future.get_result()
90    if entity is None or not entity.size:
91      return
92
93    string_id = entity.key.string_id()
94    part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
95                 for i in xrange(entity.size)]
96    part_entities = ndb.get_multi(part_keys)
97    serialized = ''.join(p.value for p in part_entities if p is not None)
98    entity.SetData(pickle.loads(serialized))
99
100  @classmethod
101  def _pre_delete_hook(cls, key):
102    """Deletes PartEntity entities."""
103    part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True)
104    ndb.delete_multi(part_keys)
105
106  def Save(self):
107    """Stores serialized data over multiple PartEntity."""
108    serialized_parts = _Serialize(self.GetData())
109    if len(serialized_parts) > _MAX_NUM_PARTS:
110      logging.error('Max number of parts reached.')
111      return
112    part_list = []
113    num_parts = len(serialized_parts)
114    for i in xrange(num_parts):
115      if serialized_parts[i] is not None:
116        part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i])
117        part_list.append(part)
118    self.size = num_parts
119    ndb.put_multi(part_list + [self])
120
121  def GetData(self):
122    return getattr(self, '_data', None)
123
124  def SetData(self, data):
125    setattr(self, '_data', data)
126
127
128class PartEntity(ndb.Model):
129  """Holds a part of serialized data for MultipartEntity.
130
131  This entity key has the form:
132    ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index)
133  """
134  value = ndb.BlobProperty()
135
136
137class MultipartCache(object):
138  """Contains operations for storing values over multiple memcache keys.
139
140  Values are serialized, split, and stored over multiple memcache keys.  The
141  head cache stores the expected size.
142  """
143
144  @classmethod
145  def Get(cls, key):
146    """Gets value in memcache."""
147    keys = cls._GetCacheKeyList(key)
148    head_key = cls._GetCacheKey(key)
149    cache_values = memcache.get_multi(keys)
150    # Whether we have all the memcache values.
151    if len(keys) != len(cache_values) or head_key not in cache_values:
152      return None
153
154    serialized = ''
155    cache_size = cache_values[head_key]
156    keys.remove(head_key)
157    for key in keys[:cache_size]:
158      if key not in cache_values:
159        return None
160      if cache_values[key] is not None:
161        serialized += cache_values[key]
162    return pickle.loads(serialized)
163
164  @classmethod
165  def Set(cls, key, value):
166    """Sets a value in memcache."""
167    serialized_parts = _Serialize(value)
168    if len(serialized_parts) > _MAX_NUM_PARTS:
169      logging.error('Max number of parts reached.')
170      return
171
172    cached_values = {}
173    cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
174    for i in xrange(len(serialized_parts)):
175      cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
176    memcache.set_multi(cached_values)
177
178  @classmethod
179  def Delete(cls, key):
180    """Deletes all cached values for key."""
181    memcache.delete_multi(cls._GetCacheKeyList(key))
182
183  @classmethod
184  def _GetCacheKeyList(cls, key):
185    """Gets a list of head cache key and cache key parts."""
186    keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)]
187    keys.append(cls._GetCacheKey(key))
188    return keys
189
190  @classmethod
191  def _GetCacheKey(cls, key, index=None):
192    """Returns either head cache key or cache key part."""
193    if index is not None:
194      return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index)
195    return _MULTIPART_ENTITY_MEMCACHE_KEY + key
196
197
198def _GetValueFromDatastore(key):
199  entity = ndb.Key(MultipartEntity, key).get()
200  if not entity:
201    return None
202  return entity.GetData()
203
204
205def _Serialize(value):
206  """Serializes value and returns a list of its parts.
207
208  Args:
209    value: A pickleable value.
210
211  Returns:
212    A list of string representation of the value that has been pickled and split
213    into _CHUNK_SIZE.
214  """
215  serialized = pickle.dumps(value, 2)
216  length = len(serialized)
217  values = []
218  for i in xrange(0, length, _CHUNK_SIZE):
219    values.append(serialized[i:i + _CHUNK_SIZE])
220  for i in xrange(len(values), _MAX_NUM_PARTS):
221    values.append(None)
222  return values
223