1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Datastore models used by the Google App Engine Pipeline API."""
18
19from google.appengine.ext import db
20from google.appengine.ext import blobstore
21
22try:
23  import json
24except ImportError:
25  import simplejson as json
26
27# Relative imports
28import util
29
30
31class _PipelineRecord(db.Model):
32  """Represents a Pipeline.
33
34  Key name is a randomly assigned UUID. No parent entity.
35
36  Properties:
37    class_path: Path of the Python class to use for this pipeline.
38    root_pipeline: The root of the whole workflow; set to itself this pipeline
39      is its own root.
40    fanned_out: List of child _PipelineRecords that were started when this
41      generator pipeline moved from WAITING to RUN.
42    start_time: For pipelines with no start _BarrierRecord, when this pipeline
43      was enqueued to run immediately.
44    finalized_time: When this pipeline moved from WAITING or RUN to DONE.
45    params: Serialized parameter dictionary.
46    status: The current status of the pipeline.
47    current_attempt: The current attempt (starting at 0) to run.
48    max_attempts: Maximum number of attempts (starting at 0) to run.
49    next_retry_time: ETA of the next retry attempt.
50    retry_message: Why the last attempt failed; None or empty if no message.
51
52  Root pipeline properties:
53    is_root_pipeline: This is a root pipeline.
54    abort_message: Why the whole pipeline was aborted; only saved on
55      root pipelines.
56    abort_requested: If an abort signal has been requested for this root
57      pipeline; only saved on root pipelines
58  """
59
60  WAITING = 'waiting'
61  RUN = 'run'
62  DONE = 'done'
63  ABORTED = 'aborted'
64
65  class_path = db.StringProperty()
66  root_pipeline = db.SelfReferenceProperty(
67                      collection_name='child_pipelines_set')
68  fanned_out = db.ListProperty(db.Key, indexed=False)
69  start_time = db.DateTimeProperty(indexed=True)
70  finalized_time = db.DateTimeProperty(indexed=False)
71
72  # One of these two will be set, depending on the size of the params.
73  params_text = db.TextProperty(name='params')
74  params_blob = blobstore.BlobReferenceProperty(
75      name='params_blob', indexed=False)
76
77  status = db.StringProperty(choices=(WAITING, RUN, DONE, ABORTED),
78                             default=WAITING)
79
80  # Retry behavior
81  current_attempt = db.IntegerProperty(default=0, indexed=False)
82  max_attempts = db.IntegerProperty(default=1, indexed=False)
83  next_retry_time = db.DateTimeProperty(indexed=False)
84  retry_message = db.TextProperty()
85
86  # Root pipeline properties
87  is_root_pipeline = db.BooleanProperty()
88  abort_message = db.TextProperty()
89  abort_requested = db.BooleanProperty(indexed=False)
90
91  @classmethod
92  def kind(cls):
93    return '_AE_Pipeline_Record'
94
95  @property
96  def params(self):
97    """Returns the dictionary of parameters for this Pipeline."""
98    if hasattr(self, '_params_decoded'):
99      return self._params_decoded
100
101    if self.params_blob is not None:
102      value_encoded = self.params_blob.open().read()
103    else:
104      value_encoded = self.params_text
105
106    value = json.loads(value_encoded, cls=util.JsonDecoder)
107    if isinstance(value, dict):
108      kwargs = value.get('kwargs')
109      if kwargs:
110        adjusted_kwargs = {}
111        for arg_key, arg_value in kwargs.iteritems():
112          # Python only allows non-unicode strings as keyword arguments.
113          adjusted_kwargs[str(arg_key)] = arg_value
114        value['kwargs'] = adjusted_kwargs
115
116    self._params_decoded = value
117    return self._params_decoded
118
119
120class _SlotRecord(db.Model):
121  """Represents an output slot.
122
123  Key name is a randomly assigned UUID. No parent for slots of child pipelines.
124  For the outputs of root pipelines, the parent entity is the root
125  _PipelineRecord (see Pipeline.start()).
126
127  Properties:
128    root_pipeline: The root of the workflow.
129    filler: The pipeline that filled this slot.
130    value: Serialized value for this slot.
131    status: The current status of the slot.
132    fill_time: When the slot was filled by the filler.
133  """
134
135  FILLED = 'filled'
136  WAITING = 'waiting'
137
138  root_pipeline = db.ReferenceProperty(_PipelineRecord)
139  filler = db.ReferenceProperty(_PipelineRecord,
140                                collection_name='filled_slots_set')
141
142  # One of these two will be set, depending on the size of the value.
143  value_text = db.TextProperty(name='value')
144  value_blob = blobstore.BlobReferenceProperty(
145      name='value_blob', indexed=False)
146
147  status = db.StringProperty(choices=(FILLED, WAITING), default=WAITING,
148                             indexed=False)
149  fill_time = db.DateTimeProperty(indexed=False)
150
151  @classmethod
152  def kind(cls):
153    return '_AE_Pipeline_Slot'
154
155  @property
156  def value(self):
157    """Returns the value of this Slot."""
158    if hasattr(self, '_value_decoded'):
159      return self._value_decoded
160
161    if self.value_blob is not None:
162      encoded_value = self.value_blob.open().read()
163    else:
164      encoded_value = self.value_text
165
166    self._value_decoded = json.loads(encoded_value, cls=util.JsonDecoder)
167    return self._value_decoded
168
169
170class _BarrierRecord(db.Model):
171  """Represents a barrier.
172
173  Key name is the purpose of the barrier (START or FINALIZE). Parent entity
174  is the _PipelineRecord the barrier should trigger when all of its
175  blocking_slots are filled.
176
177  Properties:
178    root_pipeline: The root of the workflow.
179    target: The pipeline to run when the barrier fires.
180    blocking_slots: The slots that must be filled before this barrier fires.
181    trigger_time: When this barrier fired.
182    status: The current status of the barrier.
183  """
184
185  # Barrier statuses
186  FIRED = 'fired'
187  WAITING = 'waiting'
188
189  # Barrier trigger reasons (used as key names)
190  START = 'start'
191  FINALIZE = 'finalize'
192  ABORT = 'abort'
193
194  root_pipeline = db.ReferenceProperty(_PipelineRecord)
195  target = db.ReferenceProperty(_PipelineRecord,
196                                collection_name='called_barrier_set')
197  blocking_slots = db.ListProperty(db.Key)
198  trigger_time = db.DateTimeProperty(indexed=False)
199  status = db.StringProperty(choices=(FIRED, WAITING), default=WAITING,
200                             indexed=False)
201
202  @classmethod
203  def kind(cls):
204    return '_AE_Pipeline_Barrier'
205
206
207class _BarrierIndex(db.Model):
208  """Indicates a _BarrierRecord that is dependent on a slot.
209
210  Previously, when a _SlotRecord was filled, notify_barriers() would query for
211  all _BarrierRecords where the 'blocking_slots' property equals the
212  _SlotRecord's key. The problem with that approach is the 'blocking_slots'
213  index is eventually consistent, meaning _BarrierRecords that were just written
214  will not match the query. When pipelines are created and barriers are notified
215  in rapid succession, the inconsistent queries can cause certain barriers never
216  to fire. The outcome is a pipeline is WAITING and never RUN, even though all
217  of its dependent slots have been filled.
218
219  This entity is used to make it so barrier fan-out is fully consistent
220  with the High Replication Datastore. It's used by notify_barriers() to
221  do fully consistent ancestor queries every time a slot is filled. This
222  ensures that even all _BarrierRecords dependent on a _SlotRecord will
223  be found regardless of eventual consistency.
224
225  The key path for _BarrierIndexes is this for root entities:
226
227    _PipelineRecord<owns_slot_id>/_SlotRecord<slot_id>/
228        _PipelineRecord<dependent_pipeline_id>/_BarrierIndex<purpose>
229
230  And this for child pipelines:
231
232    _SlotRecord<slot_id>/_PipelineRecord<dependent_pipeline_id>/
233        _BarrierIndex<purpose>
234
235  That path is translated to the _BarrierRecord it should fire:
236
237    _PipelineRecord<dependent_pipeline_id>/_BarrierRecord<purpose>
238
239  All queries for _BarrierIndexes are key-only and thus the model requires
240  no properties or helper methods.
241  """
242
243  # Enable this entity to be cleaned up.
244  root_pipeline = db.ReferenceProperty(_PipelineRecord)
245
246  @classmethod
247  def kind(cls):
248    return '_AE_Barrier_Index'
249
250  @classmethod
251  def to_barrier_key(cls, barrier_index_key):
252    """Converts a _BarrierIndex key to a _BarrierRecord key.
253
254    Args:
255      barrier_index_key: db.Key for a _BarrierIndex entity.
256
257    Returns:
258      db.Key for the corresponding _BarrierRecord entity.
259    """
260    barrier_index_path = barrier_index_key.to_path()
261
262    # Pick out the items from the _BarrierIndex key path that we need to
263    # construct the _BarrierRecord key path.
264    (pipeline_kind, dependent_pipeline_id,
265     unused_kind, purpose) = barrier_index_path[-4:]
266
267    barrier_record_path = (
268        pipeline_kind, dependent_pipeline_id,
269        _BarrierRecord.kind(), purpose)
270
271    return db.Key.from_path(*barrier_record_path)
272
273
274class _StatusRecord(db.Model):
275  """Represents the current status of a pipeline.
276
277  Properties:
278    message: The textual message to show.
279    console_url: URL to iframe as the primary console for this pipeline.
280    link_names: Human display names for status links.
281    link_urls: URLs corresponding to human names for status links.
282    status_time: When the status was written.
283  """
284
285  root_pipeline = db.ReferenceProperty(_PipelineRecord)
286  message = db.TextProperty()
287  console_url = db.TextProperty()
288  link_names = db.ListProperty(db.Text, indexed=False)
289  link_urls = db.ListProperty(db.Text, indexed=False)
290  status_time = db.DateTimeProperty(indexed=False)
291
292  @classmethod
293  def kind(cls):
294    return '_AE_Pipeline_Status'
295