1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A module for storing and getting objects from datastore. 6 7This module provides Get, Set and Delete functions for storing pickleable 8objects in datastore, with support for large objects greater than 1 MB. 9 10Although this module contains ndb.Model classes, these are not intended 11to be used directly by other modules. 12 13App Engine datastore limits entity size to less than 1 MB; this module 14supports storing larger objects by splitting the data and using multiple 15datastore entities and multiple memcache keys. Using ndb.get and pickle, a 16complex data structure can be retrieved more quickly than datastore fetch. 17 18Example: 19 john = Account() 20 john.username = 'John' 21 john.userid = 123 22 stored_object.Set(john.userid, john) 23""" 24 25import cPickle as pickle 26import logging 27 28from google.appengine.api import memcache 29from google.appengine.ext import ndb 30 31_MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_' 32 33# Maximum number of entities and memcache to save a value. 34# The limit for data stored in one datastore entity is 1 MB, 35# and the limit for memcache batch operations is 32 MB. See: 36# https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits 37_MAX_NUM_PARTS = 16 38 39# Max bytes per entity or value cached with memcache. 40_CHUNK_SIZE = 1000 * 1000 41 42 43def Get(key): 44 """Gets the value. 45 46 Args: 47 key: String key value. 48 49 Returns: 50 A value for key. 51 """ 52 results = MultipartCache.Get(key) 53 if not results: 54 results = _GetValueFromDatastore(key) 55 MultipartCache.Set(key, results) 56 return results 57 58 59def Set(key, value): 60 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. 61 62 Args: 63 key: String key value. 64 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. 65 """ 66 entity = ndb.Key(MultipartEntity, key).get() 67 if not entity: 68 entity = MultipartEntity(id=key) 69 entity.SetData(value) 70 entity.Save() 71 MultipartCache.Set(key, value) 72 73 74def Delete(key): 75 """Deletes the value in datastore and memcache.""" 76 ndb.Key(MultipartEntity, key).delete() 77 MultipartCache.Delete(key) 78 79 80class MultipartEntity(ndb.Model): 81 """Container for PartEntity.""" 82 83 # Number of entities use to store serialized. 84 size = ndb.IntegerProperty(default=0, indexed=False) 85 86 @classmethod 87 def _post_get_hook(cls, key, future): # pylint: disable=unused-argument 88 """Deserializes data from multiple PartEntity.""" 89 entity = future.get_result() 90 if entity is None or not entity.size: 91 return 92 93 string_id = entity.key.string_id() 94 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) 95 for i in xrange(entity.size)] 96 part_entities = ndb.get_multi(part_keys) 97 serialized = ''.join(p.value for p in part_entities if p is not None) 98 entity.SetData(pickle.loads(serialized)) 99 100 @classmethod 101 def _pre_delete_hook(cls, key): 102 """Deletes PartEntity entities.""" 103 part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True) 104 ndb.delete_multi(part_keys) 105 106 def Save(self): 107 """Stores serialized data over multiple PartEntity.""" 108 serialized_parts = _Serialize(self.GetData()) 109 if len(serialized_parts) > _MAX_NUM_PARTS: 110 logging.error('Max number of parts reached.') 111 return 112 part_list = [] 113 num_parts = len(serialized_parts) 114 for i in xrange(num_parts): 115 if serialized_parts[i] is not None: 116 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) 117 part_list.append(part) 118 self.size = num_parts 119 ndb.put_multi(part_list + [self]) 120 121 def GetData(self): 122 return getattr(self, '_data', None) 123 124 def SetData(self, data): 125 setattr(self, '_data', data) 126 127 128class PartEntity(ndb.Model): 129 """Holds a part of serialized data for MultipartEntity. 130 131 This entity key has the form: 132 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) 133 """ 134 value = ndb.BlobProperty() 135 136 137class MultipartCache(object): 138 """Contains operations for storing values over multiple memcache keys. 139 140 Values are serialized, split, and stored over multiple memcache keys. The 141 head cache stores the expected size. 142 """ 143 144 @classmethod 145 def Get(cls, key): 146 """Gets value in memcache.""" 147 keys = cls._GetCacheKeyList(key) 148 head_key = cls._GetCacheKey(key) 149 cache_values = memcache.get_multi(keys) 150 # Whether we have all the memcache values. 151 if len(keys) != len(cache_values) or head_key not in cache_values: 152 return None 153 154 serialized = '' 155 cache_size = cache_values[head_key] 156 keys.remove(head_key) 157 for key in keys[:cache_size]: 158 if key not in cache_values: 159 return None 160 if cache_values[key] is not None: 161 serialized += cache_values[key] 162 return pickle.loads(serialized) 163 164 @classmethod 165 def Set(cls, key, value): 166 """Sets a value in memcache.""" 167 serialized_parts = _Serialize(value) 168 if len(serialized_parts) > _MAX_NUM_PARTS: 169 logging.error('Max number of parts reached.') 170 return 171 172 cached_values = {} 173 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) 174 for i in xrange(len(serialized_parts)): 175 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] 176 memcache.set_multi(cached_values) 177 178 @classmethod 179 def Delete(cls, key): 180 """Deletes all cached values for key.""" 181 memcache.delete_multi(cls._GetCacheKeyList(key)) 182 183 @classmethod 184 def _GetCacheKeyList(cls, key): 185 """Gets a list of head cache key and cache key parts.""" 186 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] 187 keys.append(cls._GetCacheKey(key)) 188 return keys 189 190 @classmethod 191 def _GetCacheKey(cls, key, index=None): 192 """Returns either head cache key or cache key part.""" 193 if index is not None: 194 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) 195 return _MULTIPART_ENTITY_MEMCACHE_KEY + key 196 197 198def _GetValueFromDatastore(key): 199 entity = ndb.Key(MultipartEntity, key).get() 200 if not entity: 201 return None 202 return entity.GetData() 203 204 205def _Serialize(value): 206 """Serializes value and returns a list of its parts. 207 208 Args: 209 value: A pickleable value. 210 211 Returns: 212 A list of string representation of the value that has been pickled and split 213 into _CHUNK_SIZE. 214 """ 215 serialized = pickle.dumps(value, 2) 216 length = len(serialized) 217 values = [] 218 for i in xrange(0, length, _CHUNK_SIZE): 219 values.append(serialized[i:i + _CHUNK_SIZE]) 220 for i in xrange(len(values), _MAX_NUM_PARTS): 221 values.append(None) 222 return values 223