1# Copyright (C) 2010 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import math
30import logging
31
32from google.appengine.ext import blobstore
33from google.appengine.ext import db
34
35MAX_DATA_ENTRY_PER_FILE = 30
36MAX_ENTRY_LEN = 1000 * 1000
37
38
39class ChunkData:
40    def __init__(self):
41        self.reused_key = None
42        self.data_entry = None
43        self.entry_future = None
44        self.index = None
45
46
47class DataEntry(db.Model):
48    """Datastore entry that stores one segmant of file data
49       (<1000*1000 bytes).
50    """
51
52    data = db.BlobProperty()
53
54    @classmethod
55    def get(cls, key):
56        return db.get(key)
57
58    @classmethod
59    def get_async(cls, key):
60        return db.get_async(key)
61
62    @classmethod
63    def delete_async(cls, key):
64        return db.delete_async(key)
65
66
67class DataStoreFile(db.Model):
68    """This class stores file in datastore.
69       If a file is oversize (>1000*1000 bytes), the file is split into
70       multiple segments and stored in multiple datastore entries.
71    """
72
73    name = db.StringProperty()
74    data_keys = db.ListProperty(db.Key)
75    # keys to the data store entries that can be reused for new data.
76    # If it is emtpy, create new DataEntry.
77    new_data_keys = db.ListProperty(db.Key)
78    date = db.DateTimeProperty(auto_now_add=True)
79
80    data = None
81
82    def _get_chunk_indices(self, data_length):
83        nchunks = math.ceil(float(data_length) / MAX_ENTRY_LEN)
84        return xrange(0, int(nchunks) * MAX_ENTRY_LEN, MAX_ENTRY_LEN)
85
86    def _convert_blob_keys(self, keys):
87        converted_keys = []
88        for key in keys:
89            new_key = blobstore.BlobMigrationRecord.get_new_blob_key(key)
90            if new_key:
91                converted_keys.append(new_key)
92            else:
93                converted_keys.append(key)
94        return keys
95
96    def delete_data(self, keys=None):
97        if not keys:
98            keys = self._convert_blob_keys(self.data_keys)
99        logging.info('Doing async delete of keys: %s', keys)
100
101        get_futures = [DataEntry.get_async(k) for k in keys]
102        delete_futures = []
103        for get_future in get_futures:
104            result = get_future.get_result()
105            if result:
106                delete_futures.append(DataEntry.delete_async(result.key()))
107
108        for delete_future in delete_futures:
109            delete_future.get_result()
110
111    def save_data(self, data):
112        if not data:
113            logging.warning("No data to save.")
114            return False
115
116        if len(data) > (MAX_DATA_ENTRY_PER_FILE * MAX_ENTRY_LEN):
117            logging.error("File too big, can't save to datastore: %dK",
118                len(data) / 1024)
119            return False
120
121        start = 0
122        # Use the new_data_keys to store new data. If all new data are saved
123        # successfully, swap new_data_keys and data_keys so we can reuse the
124        # data_keys entries in next run. If unable to save new data for any
125        # reason, only the data pointed by new_data_keys may be corrupted,
126        # the existing data_keys data remains untouched. The corrupted data
127        # in new_data_keys will be overwritten in next update.
128        keys = self._convert_blob_keys(self.new_data_keys)
129        self.new_data_keys = []
130
131        chunk_indices = self._get_chunk_indices(len(data))
132        logging.info('Saving file in %s chunks', len(chunk_indices))
133
134        chunk_data = []
135        for chunk_index in chunk_indices:
136            chunk = ChunkData()
137            chunk.index = chunk_index
138            if keys:
139                chunk.reused_key = keys.pop()
140                chunk.entry_future = DataEntry.get_async(chunk.reused_key)
141            else:
142                chunk.data_entry = DataEntry()
143            chunk_data.append(chunk)
144
145        put_futures = []
146        for chunk in chunk_data:
147            if chunk.entry_future:
148                data_entry = chunk.entry_future.get_result()
149                if not data_entry:
150                    logging.warning("Found key, but no data entry: %s", chunk.reused_key)
151                    data_entry = DataEntry()
152                chunk.data_entry = data_entry
153
154            chunk.data_entry.data = db.Blob(data[chunk.index: chunk.index + MAX_ENTRY_LEN])
155            put_futures.append(db.put_async(chunk.data_entry))
156
157        for future in put_futures:
158            key = None
159            try:
160                key = future.get_result()
161                self.new_data_keys.append(key)
162            except Exception, err:
163                logging.error("Failed to save data store entry: %s", err)
164                self.delete_data(keys)
165                return False
166
167        if keys:
168            self.delete_data(keys)
169
170        temp_keys = self._convert_blob_keys(self.data_keys)
171        self.data_keys = self.new_data_keys
172        self.new_data_keys = temp_keys
173        self.data = data
174
175        return True
176
177    def load_data(self):
178        if not self.data_keys:
179            logging.warning("No data to load.")
180            return None
181
182        data_futures = [(k, DataEntry.get_async(k)) for k in self._convert_blob_keys(self.data_keys)]
183
184        data = []
185        for key, future in data_futures:
186            result = future.get_result()
187            if not result:
188                logging.error("No data found for key: %s.", key)
189                return None
190            data.append(result)
191
192        self.data = "".join([d.data for d in data])
193
194        return self.data
195