1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Google App Engine Pipeline API for complex, asynchronous workflows."""
18
19__all__ = [
20    # Public API.
21    'Error', 'PipelineSetupError', 'PipelineExistsError',
22    'PipelineRuntimeError', 'SlotNotFilledError', 'SlotNotDeclaredError',
23    'UnexpectedPipelineError', 'PipelineStatusError', 'Slot', 'Pipeline',
24    'PipelineFuture', 'After', 'InOrder', 'Retry', 'Abort', 'get_status_tree',
25    'get_pipeline_names', 'get_root_list', 'create_handlers_map',
26    'set_enforce_auth',
27]
28
29import datetime
30import hashlib
31import itertools
32import logging
33import os
34import posixpath
35import pprint
36import re
37import sys
38import threading
39import time
40import urllib
41import uuid
42
43from google.appengine.api import mail
44from google.appengine.api import app_identity
45from google.appengine.api import users
46from google.appengine.api import taskqueue
47from google.appengine.ext import blobstore
48from google.appengine.ext import db
49from google.appengine.ext import webapp
50
51# pylint: disable=g-import-not-at-top
52# TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime.
53try:
54  # Check if the full cloudstorage package exists. The stub part is in runtime.
55  import cloudstorage
56  if hasattr(cloudstorage, "_STUB"):
57    cloudstorage = None
58except ImportError:
59  pass  # CloudStorage library not available
60
61try:
62  import json
63except ImportError:
64  import simplejson as json
65
66# Relative imports
67import models
68import status_ui
69import util as mr_util
70
71# pylint: disable=g-bad-name
72# pylint: disable=protected-access
73
74# For convenience
75_BarrierIndex = models._BarrierIndex
76_BarrierRecord = models._BarrierRecord
77_PipelineRecord = models._PipelineRecord
78_SlotRecord = models._SlotRecord
79_StatusRecord = models._StatusRecord
80
81
82# Overall TODOs:
83# - Add a human readable name for start()
84
85# Potential TODOs:
86# - Add support for ANY N barriers.
87# - Allow Pipelines to declare they are "short" and optimize the evaluate()
88#   function to run as many of them in quick succession.
89# - Add support in all Pipelines for hold/release where up-stream
90#   barriers will fire but do nothing because the Pipeline is not ready.
91
92################################################################################
93
94
95class Error(Exception):
96  """Base class for exceptions in this module."""
97
98
99class PipelineSetupError(Error):
100  """Base class for exceptions that happen before Pipeline execution."""
101
102
103class PipelineExistsError(PipelineSetupError):
104  """A new Pipeline with an assigned idempotence_key cannot be overwritten."""
105
106
107class PipelineRuntimeError(Error):
108  """Base class for exceptions that happen during Pipeline execution."""
109
110
111class SlotNotFilledError(PipelineRuntimeError):
112  """A slot that should have been filled already was not yet filled."""
113
114
115class SlotNotDeclaredError(PipelineRuntimeError):
116  """A slot that was filled or passed along was not previously declared."""
117
118
119class UnexpectedPipelineError(PipelineRuntimeError):
120  """An assertion failed, potentially leaving the pipeline unable to proceed."""
121
122
123class PipelineUserError(Error):
124  """Exceptions raised indirectly by developers to cause certain behaviors."""
125
126
127class Retry(PipelineUserError):
128  """The currently running pipeline should be retried at a later time."""
129
130
131class Abort(PipelineUserError):
132  """The currently running pipeline should be aborted up to the root."""
133
134
135class PipelineStatusError(Error):
136  """Exceptions raised when trying to collect pipeline status."""
137
138
139class _CallbackTaskError(Error):
140  """A callback task was unable to execute properly for some reason."""
141
142
143################################################################################
144
145_MAX_BARRIERS_TO_NOTIFY = 10
146
147_MAX_ABORTS_TO_BEGIN = 10
148
149_TEST_MODE = False
150
151_TEST_ROOT_PIPELINE_KEY = None
152
153_DEFAULT_BACKOFF_SECONDS = 15
154
155_DEFAULT_BACKOFF_FACTOR = 2
156
157_DEFAULT_MAX_ATTEMPTS = 3
158
159_RETRY_WIGGLE_TIMEDELTA = datetime.timedelta(seconds=20)
160
161_DEBUG = False
162
163_MAX_JSON_SIZE = 900000
164
165_ENFORCE_AUTH = True
166
167_MAX_CALLBACK_TASK_RETRIES = 5
168
169################################################################################
170
171
172class Slot(object):
173  """An output that is filled by a Pipeline as it executes."""
174
175  def __init__(self, name=None, slot_key=None, strict=False):
176    """Initializer.
177
178    Args:
179      name: The name of this slot.
180      slot_key: The db.Key for this slot's _SlotRecord if it's already been
181        allocated by an up-stream pipeline.
182      strict: If this Slot was created as an output of a strictly defined
183        pipeline.
184    """
185    if name is None:
186      raise UnexpectedPipelineError('Slot with key "%s" missing a name.' %
187                                    slot_key)
188    if slot_key is None:
189      slot_key = db.Key.from_path(_SlotRecord.kind(), uuid.uuid4().hex)
190      self._exists = _TEST_MODE
191    else:
192      self._exists = True
193    self._touched = False
194    self._strict = strict
195    self.name = name
196    self.key = slot_key
197    self.filled = False
198    self._filler_pipeline_key = None
199    self._fill_datetime = None
200    self._value = None
201
202  @property
203  def value(self):
204    """Returns the current value of this slot.
205
206    Returns:
207      The value of the slot (a serializable Python type).
208
209    Raises:
210      SlotNotFilledError if the value hasn't been filled yet.
211    """
212    if not self.filled:
213      raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
214                               % (self.name, self.key))
215    return self._value
216
217  @property
218  def filler(self):
219    """Returns the pipeline ID that filled this slot's value.
220
221    Returns:
222      A string that is the pipeline ID.
223
224    Raises:
225      SlotNotFilledError if the value hasn't been filled yet.
226    """
227    if not self.filled:
228      raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
229                               % (self.name, self.key))
230    return self._filler_pipeline_key.name()
231
232  @property
233  def fill_datetime(self):
234    """Returns when the slot was filled.
235
236    Returns:
237      A datetime.datetime.
238
239    Raises:
240      SlotNotFilledError if the value hasn't been filled yet.
241    """
242    if not self.filled:
243      raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.'
244                               % (self.name, self.key))
245    return self._fill_datetime
246
247  def _set_value(self, slot_record):
248    """Sets the value of this slot based on its corresponding _SlotRecord.
249
250    Does nothing if the slot has not yet been filled.
251
252    Args:
253      slot_record: The _SlotRecord containing this Slot's value.
254    """
255    if slot_record.status == _SlotRecord.FILLED:
256      self.filled = True
257      self._filler_pipeline_key = _SlotRecord.filler.get_value_for_datastore(
258          slot_record)
259      self._fill_datetime = slot_record.fill_time
260      self._value = slot_record.value
261
262  def _set_value_test(self, filler_pipeline_key, value):
263    """Sets the value of this slot for use in testing.
264
265    Args:
266      filler_pipeline_key: The db.Key of the _PipelineRecord that filled
267        this slot.
268      value: The serializable value set for this slot.
269    """
270    self.filled = True
271    self._filler_pipeline_key = filler_pipeline_key
272    self._fill_datetime = datetime.datetime.utcnow()
273    # Convert to JSON and back again, to simulate the behavior of production.
274    self._value = json.loads(json.dumps(
275        value, cls=mr_util.JsonEncoder), cls=mr_util.JsonDecoder)
276
277  def __repr__(self):
278    """Returns a string representation of this slot."""
279    if self.filled:
280      return repr(self._value)
281    else:
282      return 'Slot(name="%s", slot_key="%s")' % (self.name, self.key)
283
284
285class PipelineFuture(object):
286  """A future for accessing the outputs of a Pipeline."""
287
288  # NOTE: Do not, ever, add a names() method to this class. Callers cannot do
289  # introspection on their context of being called. Even though the runtime
290  # environment of the Pipeline can allow for that to happen, such behavior
291  # would prevent synchronous simulation and verification, whic is an
292  # unacceptable tradeoff.
293
294  def __init__(self, output_names, force_strict=False):
295    """Initializer.
296
297    Args:
298      output_names: The list of require output names that will be strictly
299        enforced by this class.
300      force_strict: If True, force this future to be in strict mode.
301    """
302    self._after_all_pipelines = set()
303    self._output_dict = {
304      'default': Slot(name='default'),
305    }
306
307    self._strict = len(output_names) > 0 or force_strict
308    if self._strict:
309      for name in output_names:
310        if name in self._output_dict:
311          raise UnexpectedPipelineError('Output name reserved: "%s"' % name)
312        self._output_dict[name] = Slot(name=name, strict=True)
313
314  def _inherit_outputs(self,
315                       pipeline_name,
316                       already_defined,
317                       resolve_outputs=False):
318    """Inherits outputs from a calling Pipeline.
319
320    Args:
321      pipeline_name: The Pipeline class name (used for debugging).
322      already_defined: Maps output name to stringified db.Key (of _SlotRecords)
323        of any exiting output slots to be inherited by this future.
324      resolve_outputs: When True, this method will dereference all output slots
325        before returning back to the caller, making those output slots' values
326        available.
327
328    Raises:
329      UnexpectedPipelineError when resolve_outputs is True and any of the output
330      slots could not be retrived from the Datastore.
331    """
332    for name, slot_key in already_defined.iteritems():
333      if not isinstance(slot_key, db.Key):
334        slot_key = db.Key(slot_key)
335
336      slot = self._output_dict.get(name)
337      if slot is None:
338        if self._strict:
339          raise UnexpectedPipelineError(
340              'Inherited output named "%s" must be filled but '
341              'not declared for pipeline class "%s"' % (name, pipeline_name))
342        else:
343          self._output_dict[name] = Slot(name=name, slot_key=slot_key)
344      else:
345        slot.key = slot_key
346        slot._exists = True
347
348    if resolve_outputs:
349      slot_key_dict = dict((s.key, s) for s in self._output_dict.itervalues())
350      all_slots = db.get(slot_key_dict.keys())
351      for slot, slot_record in zip(slot_key_dict.itervalues(), all_slots):
352        if slot_record is None:
353          raise UnexpectedPipelineError(
354              'Inherited output named "%s" for pipeline class "%s" is '
355              'missing its Slot in the datastore: "%s"' %
356              (slot.name, pipeline_name, slot.key))
357        slot = slot_key_dict[slot_record.key()]
358        slot._set_value(slot_record)
359
360  def __getattr__(self, name):
361    """Provides an output Slot instance with the given name if allowed."""
362    if name not in self._output_dict:
363      if self._strict:
364        raise SlotNotDeclaredError('Undeclared output with name "%s"' % name)
365      self._output_dict[name] = Slot(name=name)
366    slot = self._output_dict[name]
367    return slot
368
369
370class _PipelineMeta(type):
371  """Meta-class for recording all Pipelines that have been defined."""
372
373  # List of all Pipeline classes that have been seen.
374  _all_classes = []
375
376  def __new__(meta, name, bases, cls_dict):
377    """Initializes the class path of a Pipeline and saves it."""
378    cls = type.__new__(meta, name, bases, cls_dict)
379    meta._all_classes.append(cls)
380    return cls
381
382
383class ClassProperty(object):
384  """Descriptor that lets us have read-only class properties."""
385
386  def __init__(self, method):
387    self.method = method
388
389  def __get__(self, cls, obj):
390    return self.method(obj)
391
392
393class Pipeline(object):
394  """A Pipeline function-object that performs operations and has a life cycle.
395
396  Class properties (to be overridden by sub-classes):
397    async: When True, this Pipeline will execute asynchronously and fill the
398      default output slot itself using the complete() method.
399    output_names: List of named outputs (in addition to the default slot) that
400      this Pipeline must output to (no more, no less).
401    public_callbacks: If the callback URLs generated for this class should be
402      accessible by all external requests regardless of login or task queue.
403    admin_callbacks: If the callback URLs generated for this class should be
404      accessible by the task queue ane externally by users logged in as admins.
405    class_path: String identifier for this Pipeline, which is derived from
406      its path in the global system modules dictionary.
407
408  Modifiable instance properties:
409    backoff_seconds: How many seconds to use as the constant factor in
410      exponential backoff; may be changed by the user.
411    backoff_factor: Base factor to use for exponential backoff. The formula
412      followed is (backoff_seconds * backoff_factor^current_attempt).
413    max_attempts: Maximum number of retry attempts to make before failing
414      completely and aborting the entire pipeline up to the root.
415    target: The application version to use for processing this Pipeline. This
416      can be set to the name of a backend to direct Pipelines to run there.
417
418  Instance properties:
419    pipeline_id: The ID of this pipeline.
420    root_pipeline_id: The ID of the root of this pipeline.
421    queue_name: The queue this pipeline runs on or None if unknown.
422    current_attempt: The current attempt being tried for this pipeline.
423  """
424
425  __metaclass__ = _PipelineMeta
426
427  # To be set by sub-classes
428  async = False
429  output_names = []
430  public_callbacks = False
431  admin_callbacks = False
432
433  # Internal only.
434  _class_path = None  # Set for each class
435  _send_mail = mail.send_mail_to_admins  # For testing
436
437  # callback_xg_transaction: Determines whether callbacks are processed within
438  # a single entity-group transaction (False), a cross-entity-group
439  # transaction (True), or no transaction (None, default). It is generally
440  # unsafe for a callback to modify pipeline state outside of a transaction, in
441  # particular any pre-initialized state from the pipeline record, such as the
442  # outputs. If a transaction is used, the callback method must operate within
443  # the datastore's transaction time limits.
444  # TODO(user): Make non-internal once other API calls are considered for
445  # transaction support.
446  _callback_xg_transaction = None
447
448  def __init__(self, *args, **kwargs):
449    """Initializer.
450
451    Args:
452      *args: The positional arguments for this function-object.
453      **kwargs: The keyword arguments for this function-object.
454    """
455    self.args = args
456    self.kwargs = kwargs
457    self.outputs = None
458    self.backoff_seconds = _DEFAULT_BACKOFF_SECONDS
459    self.backoff_factor = _DEFAULT_BACKOFF_FACTOR
460    self.max_attempts = _DEFAULT_MAX_ATTEMPTS
461    self.target = None
462    self.task_retry = False
463    self._current_attempt = 0
464    self._root_pipeline_key = None
465    self._pipeline_key = None
466    self._context = None
467    self._result_status = None
468    self._set_class_path()
469    # Introspectively set the target so pipelines stick to the version it
470    # started.
471    self.target = mr_util._get_task_target()
472
473    if _TEST_MODE:
474      self._context = _PipelineContext('', 'default', '')
475      self._root_pipeline_key = _TEST_ROOT_PIPELINE_KEY
476      self._pipeline_key = db.Key.from_path(
477          _PipelineRecord.kind(), uuid.uuid4().hex)
478      self.outputs = PipelineFuture(self.output_names)
479      self._context.evaluate_test(self)
480
481  @property
482  def pipeline_id(self):
483    """Returns the ID of this Pipeline as a string or None if unknown."""
484    if self._pipeline_key is None:
485      return None
486    return self._pipeline_key.name()
487
488  @property
489  def root_pipeline_id(self):
490    """Returns root pipeline ID as a websafe string or None if unknown."""
491    if self._root_pipeline_key is None:
492      return None
493    return self._root_pipeline_key.name()
494
495  @property
496  def is_root(self):
497    """Returns True if this pipeline is a root pipeline, False otherwise."""
498    return self._root_pipeline_key == self._pipeline_key
499
500  @property
501  def queue_name(self):
502    """Returns the queue name this Pipeline runs on or None if unknown."""
503    if self._context:
504      return self._context.queue_name
505    return None
506
507  @property
508  def base_path(self):
509    """Returns the base path for Pipeline URL handlers or None if unknown."""
510    if self._context:
511      return self._context.base_path
512    return None
513
514  @property
515  def has_finalized(self):
516    """Returns True if this pipeline has completed and finalized."""
517    return self._result_status == _PipelineRecord.DONE
518
519  @property
520  def was_aborted(self):
521    """Returns True if this pipeline was aborted."""
522    return self._result_status == _PipelineRecord.ABORTED
523
524  @property
525  def current_attempt(self):
526    """Returns the current attempt at running this pipeline, starting at 1."""
527    return self._current_attempt + 1
528
529  @property
530  def test_mode(self):
531    """Returns True if the pipeline is running in test mode."""
532    return _TEST_MODE
533
534  @ClassProperty
535  def class_path(cls):
536    """Returns the unique string identifier for this Pipeline class.
537
538    Refers to how to find the Pipeline in the global modules dictionary.
539    """
540    cls._set_class_path()
541    return cls._class_path
542
543  @classmethod
544  def from_id(cls, pipeline_id, resolve_outputs=True, _pipeline_record=None):
545    """Returns an instance corresponding to an existing Pipeline.
546
547    The returned object will have the same properties a Pipeline does while
548    it's running synchronously (e.g., like what it's first allocated), allowing
549    callers to inspect caller arguments, outputs, fill slots, complete the
550    pipeline, abort, retry, etc.
551
552    Args:
553      pipeline_id: The ID of this pipeline (a string).
554      resolve_outputs: When True, dereference the outputs of this Pipeline
555        so their values can be accessed by the caller.
556      _pipeline_record: Internal-only. The _PipelineRecord instance to use
557        to instantiate this instance instead of fetching it from
558        the datastore.
559
560    Returns:
561      Pipeline sub-class instances or None if it could not be found.
562    """
563    pipeline_record = _pipeline_record
564
565    # Support pipeline IDs and idempotence_keys that are not unicode.
566    if not isinstance(pipeline_id, unicode):
567      try:
568        pipeline_id = pipeline_id.encode('utf-8')
569      except UnicodeDecodeError:
570        pipeline_id = hashlib.sha1(pipeline_id).hexdigest()
571
572    pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id)
573
574    if pipeline_record is None:
575      pipeline_record = db.get(pipeline_key)
576    if pipeline_record is None:
577      return None
578
579    try:
580      pipeline_func_class = mr_util.for_name(pipeline_record.class_path)
581    except ImportError, e:
582      logging.warning('Tried to find Pipeline %s#%s, but class could '
583                      'not be found. Using default Pipeline class instead.',
584                      pipeline_record.class_path, pipeline_id)
585      pipeline_func_class = cls
586
587    params = pipeline_record.params
588    arg_list, kwarg_dict = _dereference_args(
589        pipeline_record.class_path, params['args'], params['kwargs'])
590    outputs = PipelineFuture(pipeline_func_class.output_names)
591    outputs._inherit_outputs(
592        pipeline_record.class_path,
593        params['output_slots'],
594        resolve_outputs=resolve_outputs)
595
596    stage = pipeline_func_class(*arg_list, **kwarg_dict)
597    stage.backoff_seconds = params['backoff_seconds']
598    stage.backoff_factor = params['backoff_factor']
599    stage.max_attempts = params['max_attempts']
600    stage.task_retry = params['task_retry']
601    stage.target = params.get('target')  # May not be defined for old Pipelines
602    stage._current_attempt = pipeline_record.current_attempt
603    stage._set_values_internal(
604        _PipelineContext('', params['queue_name'], params['base_path']),
605        pipeline_key,
606        _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record),
607        outputs,
608        pipeline_record.status)
609    return stage
610
611  # Methods that can be invoked on a Pipeline instance by anyone with a
612  # valid object (e.g., directly instantiated, retrieve via from_id).
613  def start(self,
614            idempotence_key='',
615            queue_name='default',
616            base_path='/_ah/pipeline',
617            return_task=False,
618            countdown=None,
619            eta=None):
620    """Starts a new instance of this pipeline.
621
622    Args:
623      idempotence_key: The ID to use for this Pipeline and throughout its
624        asynchronous workflow to ensure the operations are idempotent. If
625        empty a starting key will be automatically assigned.
626      queue_name: What queue this Pipeline's workflow should execute on.
627      base_path: The relative URL path to where the Pipeline API is
628        mounted for access by the taskqueue API or external requests.
629      return_task: When True, a task to start this pipeline will be returned
630        instead of submitted, allowing the caller to start off this pipeline
631        as part of a separate transaction (potentially leaving this newly
632        allocated pipeline's datastore entities in place if that separate
633        transaction fails for any reason).
634      countdown: Time in seconds into the future that this Task should execute.
635        Defaults to zero.
636      eta: A datetime.datetime specifying the absolute time at which the task
637        should be executed. Must not be specified if 'countdown' is specified.
638        This may be timezone-aware or timezone-naive. If None, defaults to now.
639        For pull tasks, no worker will be able to lease this task before the
640        time indicated by eta.
641
642    Returns:
643      A taskqueue.Task instance if return_task was True. This task will *not*
644      have a name, thus to ensure reliable execution of your pipeline you
645      should add() this task as part of a separate Datastore transaction.
646
647    Raises:
648      PipelineExistsError if the pipeline with the given idempotence key exists.
649      PipelineSetupError if the pipeline could not start for any other reason.
650    """
651    if not idempotence_key:
652      idempotence_key = uuid.uuid4().hex
653    elif not isinstance(idempotence_key, unicode):
654      try:
655        idempotence_key.encode('utf-8')
656      except UnicodeDecodeError:
657        idempotence_key = hashlib.sha1(idempotence_key).hexdigest()
658
659    pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key)
660    context = _PipelineContext('', queue_name, base_path)
661    future = PipelineFuture(self.output_names, force_strict=True)
662    try:
663      self._set_values_internal(
664          context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING)
665      return context.start(
666          self, return_task=return_task, countdown=countdown, eta=eta)
667    except Error:
668      # Pass through exceptions that originate in this module.
669      raise
670    except Exception, e:
671      # Re-type any exceptions that were raised in dependent methods.
672      raise PipelineSetupError('Error starting %s#%s: %s' % (
673          self, idempotence_key, str(e)))
674
675  def start_test(self, idempotence_key=None, base_path='', **kwargs):
676    """Starts this pipeline in test fashion.
677
678    Args:
679      idempotence_key: Dummy idempotence_key to use for this root pipeline.
680      base_path: Dummy base URL path to use for this root pipeline.
681      kwargs: Ignored keyword arguments usually passed to start().
682    """
683    if not idempotence_key:
684      idempotence_key = uuid.uuid4().hex
685    pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key)
686    context = _PipelineContext('', 'default', base_path)
687    future = PipelineFuture(self.output_names, force_strict=True)
688    self._set_values_internal(
689        context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING)
690    context.start_test(self)
691
692  # Pipeline control methods.
693  def retry(self, retry_message=''):
694    """Forces a currently running asynchronous pipeline to retry.
695
696    Note this may not be called by synchronous or generator pipelines. Those
697    must instead raise the 'Retry' exception during execution.
698
699    Args:
700      retry_message: Optional message explaining why the retry happened.
701
702    Returns:
703      True if the Pipeline should be retried, False if it cannot be cancelled
704      mid-flight for some reason.
705    """
706    if not self.async:
707      raise UnexpectedPipelineError(
708          'May only call retry() method for asynchronous pipelines.')
709    if self.try_cancel():
710      self._context.transition_retry(self._pipeline_key, retry_message)
711      return True
712    else:
713      return False
714
715  def abort(self, abort_message=''):
716    """Mark the entire pipeline up to the root as aborted.
717
718    Note this should only be called from *outside* the context of a running
719    pipeline. Synchronous and generator pipelines should raise the 'Abort'
720    exception to cause this behavior during execution.
721
722    Args:
723      abort_message: Optional message explaining why the abort happened.
724
725    Returns:
726      True if the abort signal was sent successfully; False if the pipeline
727      could not be aborted for any reason.
728    """
729    # TODO: Use thread-local variable to enforce that this is not called
730    # while a pipeline is executing in the current thread.
731    if (self.async and self._root_pipeline_key == self._pipeline_key and
732        not self.try_cancel()):
733      # Handle the special case where the root pipeline is async and thus
734      # cannot be aborted outright.
735      return False
736    else:
737      return self._context.begin_abort(
738          self._root_pipeline_key, abort_message=abort_message)
739
740  # Methods used by the Pipeline as it runs.
741  def fill(self, name_or_slot, value):
742    """Fills an output slot required by this Pipeline.
743
744    Args:
745      name_or_slot: The name of the slot (a string) or Slot record to fill.
746      value: The serializable value to assign to this slot.
747
748    Raises:
749      UnexpectedPipelineError if the Slot no longer exists. SlotNotDeclaredError
750      if trying to output to a slot that was not declared ahead of time.
751    """
752    if isinstance(name_or_slot, basestring):
753      slot = getattr(self.outputs, name_or_slot)
754    elif isinstance(name_or_slot, Slot):
755      slot = name_or_slot
756    else:
757      raise UnexpectedPipelineError(
758          'Could not fill invalid output name: %r' % name_or_slot)
759
760    if not slot._exists:
761      raise SlotNotDeclaredError(
762          'Cannot fill output with name "%s" that was just '
763          'declared within the Pipeline context.' % slot.name)
764
765    self._context.fill_slot(self._pipeline_key, slot, value)
766
767  def set_status(self, message=None, console_url=None, status_links=None):
768    """Sets the current status of this pipeline.
769
770    This method is purposefully non-transactional. Updates are written to the
771    datastore immediately and overwrite all existing statuses.
772
773    Args:
774      message: (optional) Overall status message.
775      console_url: (optional) Relative URL to use for the "console" of this
776        pipeline that displays current progress. When None, no console will
777        be displayed.
778      status_links: (optional) Dictionary of readable link names to relative
779        URLs that should be associated with this pipeline as it runs. These links
780        provide convenient access to other dashboards, consoles, etc associated
781        with the pipeline.
782
783    Raises:
784      PipelineRuntimeError if the status could not be set for any reason.
785    """
786    if _TEST_MODE:
787      logging.info(
788          'New status for %s#%s: message=%r, console_url=%r, status_links=%r',
789          self, self.pipeline_id, message, console_url, status_links)
790      return
791
792    status_key = db.Key.from_path(_StatusRecord.kind(), self.pipeline_id)
793    root_pipeline_key = db.Key.from_path(
794        _PipelineRecord.kind(), self.root_pipeline_id)
795    status_record = _StatusRecord(
796        key=status_key, root_pipeline=root_pipeline_key)
797
798    try:
799      if message:
800        status_record.message = message
801      if console_url:
802        status_record.console_url = console_url
803      if status_links:
804        # Alphabeticalize the list.
805        status_record.link_names = sorted(
806            db.Text(s) for s in status_links.iterkeys())
807        status_record.link_urls = [
808            db.Text(status_links[name]) for name in status_record.link_names]
809
810      status_record.status_time = datetime.datetime.utcnow()
811
812      status_record.put()
813    except Exception, e:
814      raise PipelineRuntimeError('Could not set status for %s#%s: %s' %
815          (self, self.pipeline_id, str(e)))
816
817  def complete(self, default_output=None):
818    """Marks this asynchronous Pipeline as complete.
819
820    Args:
821      default_output: What value the 'default' output slot should be assigned.
822
823    Raises:
824      UnexpectedPipelineError if the slot no longer exists or this method was
825      called for a pipeline that is not async.
826    """
827    # TODO: Enforce that all outputs expected by this async pipeline were
828    # filled before this complete() function was called. May required all
829    # async functions to declare their outputs upfront.
830    if not self.async:
831      raise UnexpectedPipelineError(
832          'May only call complete() method for asynchronous pipelines.')
833    self._context.fill_slot(
834        self._pipeline_key, self.outputs.default, default_output)
835
836  def get_callback_url(self, **kwargs):
837    """Returns a relative URL for invoking this Pipeline's callback method.
838
839    Args:
840      kwargs: Dictionary mapping keyword argument names to single values that
841        should be passed to the callback when it is invoked.
842
843    Raises:
844      UnexpectedPipelineError if this is invoked on pipeline that is not async.
845    """
846    # TODO: Support positional parameters.
847    if not self.async:
848      raise UnexpectedPipelineError(
849          'May only call get_callback_url() method for asynchronous pipelines.')
850    kwargs['pipeline_id'] = self._pipeline_key.name()
851    params = urllib.urlencode(sorted(kwargs.items()))
852    return '%s/callback?%s' % (self.base_path, params)
853
854  def get_callback_task(self, *args, **kwargs):
855    """Returns a task for calling back this Pipeline.
856
857    Args:
858      params: Keyword argument containing a dictionary of key/value pairs
859        that will be passed to the callback when it is executed.
860      args, kwargs: Passed to the taskqueue.Task constructor. Use these
861        arguments to set the task name (for idempotence), etc.
862
863    Returns:
864      A taskqueue.Task instance that must be enqueued by the caller.
865    """
866    if not self.async:
867      raise UnexpectedPipelineError(
868          'May only call get_callback_task() method for asynchronous pipelines.')
869
870    params = kwargs.get('params', {})
871    kwargs['params'] = params
872    params['pipeline_id'] = self._pipeline_key.name()
873    kwargs['url'] = self.base_path + '/callback'
874    kwargs['method'] = 'POST'
875    return taskqueue.Task(*args, **kwargs)
876
877  def send_result_email(self, sender=None):
878    """Sends an email to admins indicating this Pipeline has completed.
879
880    For developer convenience. Automatically called from finalized for root
881    Pipelines that do not override the default action.
882
883    Args:
884      sender: (optional) Override the sender's email address.
885    """
886    status = 'successful'
887    if self.was_aborted:
888      status = 'aborted'
889
890    app_id = os.environ['APPLICATION_ID']
891    shard_index = app_id.find('~')
892    if shard_index != -1:
893      app_id = app_id[shard_index+1:]
894
895    param_dict = {
896        'status': status,
897        'app_id': app_id,
898        'class_path': self._class_path,
899        'pipeline_id': self.root_pipeline_id,
900        'base_path': '%s.appspot.com%s' % (app_id, self.base_path),
901    }
902    subject = (
903        'Pipeline %(status)s: App "%(app_id)s", %(class_path)s'
904        '#%(pipeline_id)s' % param_dict)
905    body = """View the pipeline results here:
906
907http://%(base_path)s/status?root=%(pipeline_id)s
908
909Thanks,
910
911The Pipeline API
912""" % param_dict
913
914    html = """<html><body>
915<p>View the pipeline results here:</p>
916
917<p><a href="http://%(base_path)s/status?root=%(pipeline_id)s"
918>http://%(base_path)s/status?root=%(pipeline_id)s</a></p>
919
920<p>
921Thanks,
922<br>
923The Pipeline API
924</p>
925</body></html>
926""" % param_dict
927
928    if sender is None:
929      sender = '%s@%s.appspotmail.com' % (app_id, app_id)
930    try:
931      self._send_mail(sender, subject, body, html=html)
932    except (mail.InvalidSenderError, mail.InvalidEmailError):
933      logging.warning('Could not send result email for '
934                      'root pipeline ID "%s" from sender "%s"',
935                      self.root_pipeline_id, sender)
936
937  def cleanup(self):
938    """Clean up this Pipeline and all Datastore records used for coordination.
939
940    Only works when called on a root pipeline. Child pipelines will ignore
941    calls to this method.
942
943    After this method is called, Pipeline.from_id() and related status
944    methods will return inconsistent or missing results. This method is
945    fire-and-forget and asynchronous.
946    """
947    if self._root_pipeline_key is None:
948      raise UnexpectedPipelineError(
949          'Could not cleanup Pipeline with unknown root pipeline ID.')
950    if not self.is_root:
951      return
952    task = taskqueue.Task(
953        params=dict(root_pipeline_key=self._root_pipeline_key),
954        url=self.base_path + '/cleanup',
955        headers={'X-Ae-Pipeline-Key': self._root_pipeline_key})
956    taskqueue.Queue(self.queue_name).add(task)
957
958  def with_params(self, **kwargs):
959    """Modify various execution parameters of a Pipeline before it runs.
960
961    This method has no effect in test mode.
962
963    Args:
964      kwargs: Attributes to modify on this Pipeline instance before it has
965        been executed.
966
967    Returns:
968      This Pipeline instance, for easy chaining.
969    """
970    if _TEST_MODE:
971      logging.info(
972          'Setting runtime parameters for %s#%s: %r',
973          self, self.pipeline_id, kwargs)
974      return self
975
976    if self.pipeline_id is not None:
977      raise UnexpectedPipelineError(
978          'May only call with_params() on a Pipeline that has not yet '
979          'been scheduled for execution.')
980
981    ALLOWED = ('backoff_seconds', 'backoff_factor', 'max_attempts', 'target')
982    for name, value in kwargs.iteritems():
983      if name not in ALLOWED:
984        raise TypeError('Unexpected keyword: %s=%r' % (name, value))
985      setattr(self, name, value)
986    return self
987
988  # Methods implemented by developers for lifecycle management. These
989  # must be idempotent under all circumstances.
990  def run(self, *args, **kwargs):
991    """Runs this Pipeline."""
992    raise NotImplementedError('Must implement "run" in Pipeline sub-class.')
993
994  def run_test(self, *args, **kwargs):
995    """Runs this Pipeline in test mode."""
996    raise NotImplementedError(
997        'Must implement "run_test" in Pipeline sub-class.')
998
999  def finalized(self):
1000    """Finalizes this Pipeline after execution if it's a generator.
1001
1002    Default action as the root pipeline is to email the admins with the status.
1003    Implementors be sure to call 'was_aborted' to find out if the finalization
1004    that you're handling is for a success or error case.
1005    """
1006    if self.pipeline_id == self.root_pipeline_id:
1007      self.send_result_email()
1008
1009  def finalized_test(self, *args, **kwargs):
1010    """Finalized this Pipeline in test mode."""
1011    raise NotImplementedError(
1012        'Must implement "finalized_test" in Pipeline sub-class.')
1013
1014  def callback(self, **kwargs):
1015    """This Pipeline received an asynchronous callback request."""
1016    raise NotImplementedError(
1017        'Must implement "callback" in Pipeline sub-class.')
1018
1019  def try_cancel(self):
1020    """This pipeline has been cancelled.
1021
1022    Called when a pipeline is interrupted part-way through due to some kind
1023    of failure (an abort of the whole pipeline to the root or a forced retry on
1024    this child pipeline).
1025
1026    Returns:
1027      True to indicate that cancellation was successful and this pipeline may
1028      go in the retry or aborted state; False to indicate that this pipeline
1029      cannot be canceled right now and must remain as-is.
1030    """
1031    return False
1032
1033  # Internal methods.
1034  @classmethod
1035  def _set_class_path(cls, module_dict=sys.modules):
1036    """Sets the absolute path to this class as a string.
1037
1038    Used by the Pipeline API to reconstruct the Pipeline sub-class object
1039    at execution time instead of passing around a serialized function.
1040
1041    Args:
1042      module_dict: Used for testing.
1043    """
1044    # Do not traverse the class hierarchy fetching the class path attribute.
1045    found = cls.__dict__.get('_class_path')
1046    if found is not None:
1047      return
1048
1049    # Do not set the _class_path for the base-class, otherwise all children's
1050    # lookups for _class_path will fall through and return 'Pipeline' above.
1051    # This situation can happen if users call the generic Pipeline.from_id
1052    # to get the result of a Pipeline without knowing its specific class.
1053    if cls is Pipeline:
1054      return
1055
1056    class_path = '%s.%s' % (cls.__module__, cls.__name__)
1057    # When a WSGI handler is invoked as an entry point, any Pipeline class
1058    # defined in the same file as the handler will get __module__ set to
1059    # __main__. Thus we need to find out its real fully qualified path.
1060    if cls.__module__ == '__main__':
1061      for name, module in module_dict.items():
1062        if name == '__main__':
1063          continue
1064        found = getattr(module, cls.__name__, None)
1065        if found is cls:
1066          class_path = '%s.%s' % (name, cls.__name__)
1067          break
1068    cls._class_path = class_path
1069
1070  def _set_values_internal(self,
1071                           context,
1072                           pipeline_key,
1073                           root_pipeline_key,
1074                           outputs,
1075                           result_status):
1076    """Sets the user-visible values provided as an API by this class.
1077
1078    Args:
1079      context: The _PipelineContext used for this Pipeline.
1080      pipeline_key: The db.Key of this pipeline.
1081      root_pipeline_key: The db.Key of the root pipeline.
1082      outputs: The PipelineFuture for this pipeline.
1083      result_status: The result status of this pipeline.
1084    """
1085    self._context = context
1086    self._pipeline_key = pipeline_key
1087    self._root_pipeline_key = root_pipeline_key
1088    self._result_status = result_status
1089    self.outputs = outputs
1090
1091  def _callback_internal(self, kwargs):
1092    """Used to execute callbacks on asynchronous pipelines."""
1093    logging.debug('Callback %s(*%s, **%s)#%s with params: %r',
1094                  self._class_path, _short_repr(self.args),
1095                  _short_repr(self.kwargs), self._pipeline_key.name(), kwargs)
1096    return self.callback(**kwargs)
1097
1098  def _run_internal(self,
1099                    context,
1100                    pipeline_key,
1101                    root_pipeline_key,
1102                    caller_output):
1103    """Used by the Pipeline evaluator to execute this Pipeline."""
1104    self._set_values_internal(
1105        context, pipeline_key, root_pipeline_key, caller_output,
1106        _PipelineRecord.RUN)
1107    logging.debug('Running %s(*%s, **%s)#%s',
1108                  self._class_path, _short_repr(self.args),
1109                  _short_repr(self.kwargs), self._pipeline_key.name())
1110    return self.run(*self.args, **self.kwargs)
1111
1112  def _finalized_internal(self,
1113                          context,
1114                          pipeline_key,
1115                          root_pipeline_key,
1116                          caller_output,
1117                          aborted):
1118    """Used by the Pipeline evaluator to finalize this Pipeline."""
1119    result_status = _PipelineRecord.RUN
1120    if aborted:
1121      result_status = _PipelineRecord.ABORTED
1122
1123    self._set_values_internal(
1124        context, pipeline_key, root_pipeline_key, caller_output, result_status)
1125    logging.debug('Finalizing %s(*%r, **%r)#%s',
1126                  self._class_path, _short_repr(self.args),
1127                  _short_repr(self.kwargs), self._pipeline_key.name())
1128    try:
1129      self.finalized()
1130    except NotImplementedError:
1131      pass
1132
1133  def __repr__(self):
1134    """Returns a string representation of this Pipeline."""
1135    return '%s(*%s, **%s)' % (
1136        self._class_path, _short_repr(self.args), _short_repr(self.kwargs))
1137
1138
1139# TODO: Change InOrder and After to use a common thread-local list of
1140# execution modifications to apply to the current evaluating pipeline.
1141
1142class After(object):
1143  """Causes all contained Pipelines to run after the given ones complete.
1144
1145  Must be used in a 'with' block.
1146  """
1147
1148  _local = threading.local()
1149
1150  def __init__(self, *futures):
1151    """Initializer.
1152
1153    Args:
1154      *futures: PipelineFutures that all subsequent pipelines should follow.
1155        May be empty, in which case this statement does nothing.
1156    """
1157    for f in futures:
1158      if not isinstance(f, PipelineFuture):
1159        raise TypeError('May only pass PipelineFuture instances to After(). %r',
1160                        type(f))
1161    self._futures = set(futures)
1162
1163  def __enter__(self):
1164    """When entering a 'with' block."""
1165    After._thread_init()
1166    After._local._after_all_futures.extend(self._futures)
1167
1168  def __exit__(self, type, value, trace):
1169    """When exiting a 'with' block."""
1170    for future in self._futures:
1171      After._local._after_all_futures.remove(future)
1172    return False
1173
1174  @classmethod
1175  def _thread_init(cls):
1176    """Ensure thread local is initialized."""
1177    if not hasattr(cls._local, '_after_all_futures'):
1178      cls._local._after_all_futures = []
1179
1180
1181class InOrder(object):
1182  """Causes all contained Pipelines to run in order.
1183
1184  Must be used in a 'with' block.
1185  """
1186
1187  _local = threading.local()
1188
1189  @classmethod
1190  def _add_future(cls, future):
1191    """Adds a future to the list of in-order futures thus far.
1192
1193    Args:
1194      future: The future to add to the list.
1195    """
1196    if cls._local._activated:
1197      cls._local._in_order_futures.add(future)
1198
1199  def __init__(self):
1200    """Initializer."""
1201
1202  def __enter__(self):
1203    """When entering a 'with' block."""
1204    InOrder._thread_init()
1205    if InOrder._local._activated:
1206      raise UnexpectedPipelineError('Already in an InOrder "with" block.')
1207    InOrder._local._activated = True
1208    InOrder._local._in_order_futures.clear()
1209
1210  def __exit__(self, type, value, trace):
1211    """When exiting a 'with' block."""
1212    InOrder._local._activated = False
1213    InOrder._local._in_order_futures.clear()
1214    return False
1215
1216  @classmethod
1217  def _thread_init(cls):
1218    """Ensure thread local is initialized."""
1219    if not hasattr(cls._local, '_in_order_futures'):
1220      cls._local._in_order_futures = set()
1221      cls._local._activated = False
1222
1223
1224################################################################################
1225
1226def _short_repr(obj):
1227  """Helper function returns a truncated repr() of an object."""
1228  stringified = pprint.saferepr(obj)
1229  if len(stringified) > 200:
1230    return '%s... (%d bytes)' % (stringified[:200], len(stringified))
1231  return stringified
1232
1233
1234def _write_json_blob(encoded_value, pipeline_id=None):
1235  """Writes a JSON encoded value to a Cloud Storage File.
1236
1237  This function will store the blob in a GCS file in the default bucket under
1238  the appengine_pipeline directory. Optionally using another directory level
1239  specified by pipeline_id
1240  Args:
1241    encoded_value: The encoded JSON string.
1242    pipeline_id: A pipeline id to segment files in Cloud Storage, if none,
1243      the file will be created under appengine_pipeline
1244
1245  Returns:
1246    The blobstore.BlobKey for the file that was created.
1247  """
1248
1249  default_bucket = app_identity.get_default_gcs_bucket_name()
1250  path_components = ['/', default_bucket, "appengine_pipeline"]
1251  if pipeline_id:
1252    path_components.append(pipeline_id)
1253  path_components.append(uuid.uuid4().hex)
1254  # Use posixpath to get a / even if we're running on windows somehow
1255  file_name = posixpath.join(*path_components)
1256  with cloudstorage.open(file_name, 'w', content_type='application/json') as f:
1257    for start_index in xrange(0, len(encoded_value), _MAX_JSON_SIZE):
1258      end_index = start_index + _MAX_JSON_SIZE
1259      f.write(encoded_value[start_index:end_index])
1260
1261  key_str = blobstore.create_gs_key("/gs" + file_name)
1262  logging.debug("Created blob for filename = %s gs_key = %s", file_name, key_str)
1263  return blobstore.BlobKey(key_str)
1264
1265
1266def _dereference_args(pipeline_name, args, kwargs):
1267  """Dereference a Pipeline's arguments that are slots, validating them.
1268
1269  Each argument value passed in is assumed to be a dictionary with the format:
1270    {'type': 'value', 'value': 'serializable'}  # A resolved value.
1271    {'type': 'slot', 'slot_key': 'str() on a db.Key'}  # A pending Slot.
1272
1273  Args:
1274    pipeline_name: The name of the pipeline class; used for debugging.
1275    args: Iterable of positional arguments.
1276    kwargs: Dictionary of keyword arguments.
1277
1278  Returns:
1279    Tuple (args, kwargs) where:
1280      Args: A list of positional arguments values that are all dereferenced.
1281      Kwargs: A list of keyword arguments values that are all dereferenced.
1282
1283  Raises:
1284    SlotNotFilledError if any of the supplied 'slot_key' records are not
1285    present in the Datastore or have not yet been filled.
1286    UnexpectedPipelineError if an unknown parameter type was passed.
1287  """
1288  lookup_slots = set()
1289  for arg in itertools.chain(args, kwargs.itervalues()):
1290    if arg['type'] == 'slot':
1291      lookup_slots.add(db.Key(arg['slot_key']))
1292
1293  slot_dict = {}
1294  for key, slot_record in zip(lookup_slots, db.get(lookup_slots)):
1295    if slot_record is None or slot_record.status != _SlotRecord.FILLED:
1296      raise SlotNotFilledError(
1297          'Slot "%s" missing its value. From %s(*args=%s, **kwargs=%s)' %
1298          (key, pipeline_name, _short_repr(args), _short_repr(kwargs)))
1299    slot_dict[key] = slot_record.value
1300
1301  arg_list = []
1302  for current_arg in args:
1303    if current_arg['type'] == 'slot':
1304      arg_list.append(slot_dict[db.Key(current_arg['slot_key'])])
1305    elif current_arg['type'] == 'value':
1306      arg_list.append(current_arg['value'])
1307    else:
1308      raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg)
1309
1310  kwarg_dict = {}
1311  for key, current_arg in kwargs.iteritems():
1312    if current_arg['type'] == 'slot':
1313      kwarg_dict[key] = slot_dict[db.Key(current_arg['slot_key'])]
1314    elif current_arg['type'] == 'value':
1315      kwarg_dict[key] = current_arg['value']
1316    else:
1317      raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg)
1318
1319  return (arg_list, kwarg_dict)
1320
1321
1322def _generate_args(pipeline, future, queue_name, base_path):
1323  """Generate the params used to describe a Pipeline's depedencies.
1324
1325  The arguments passed to this method may be normal values, Slot instances
1326  (for named outputs), or PipelineFuture instances (for referring to the
1327  default output slot).
1328
1329  Args:
1330    pipeline: The Pipeline instance to generate args for.
1331    future: The PipelineFuture for the Pipeline these arguments correspond to.
1332    queue_name: The queue to run the pipeline on.
1333    base_path: Relative URL for pipeline URL handlers.
1334
1335  Returns:
1336    Tuple (dependent_slots, output_slot_keys, params_text, params_blob) where:
1337      dependent_slots: List of db.Key instances of _SlotRecords on which
1338        this pipeline will need to block before execution (passed to
1339        create a _BarrierRecord for running the pipeline).
1340      output_slot_keys: List of db.Key instances of _SlotRecords that will
1341        be filled by this pipeline during its execution (passed to create
1342        a _BarrierRecord for finalizing the pipeline).
1343      params_text: JSON dictionary of pipeline parameters to be serialized and
1344        saved in a corresponding _PipelineRecord. Will be None if the params are
1345        too big and must be saved in a blob instead.
1346      params_blob: JSON dictionary of pipeline parameters to be serialized and
1347        saved in a Blob file, and then attached to a _PipelineRecord. Will be
1348        None if the params data size was small enough to fit in the entity.
1349  """
1350  params = {
1351      'args': [],
1352      'kwargs': {},
1353      'after_all': [],
1354      'output_slots': {},
1355      'class_path': pipeline._class_path,
1356      'queue_name': queue_name,
1357      'base_path': base_path,
1358      'backoff_seconds': pipeline.backoff_seconds,
1359      'backoff_factor': pipeline.backoff_factor,
1360      'max_attempts': pipeline.max_attempts,
1361      'task_retry': pipeline.task_retry,
1362      'target': pipeline.target,
1363  }
1364  dependent_slots = set()
1365
1366  arg_list = params['args']
1367  for current_arg in pipeline.args:
1368    if isinstance(current_arg, PipelineFuture):
1369      current_arg = current_arg.default
1370    if isinstance(current_arg, Slot):
1371      arg_list.append({'type': 'slot', 'slot_key': str(current_arg.key)})
1372      dependent_slots.add(current_arg.key)
1373    else:
1374      arg_list.append({'type': 'value', 'value': current_arg})
1375
1376  kwarg_dict = params['kwargs']
1377  for name, current_arg in pipeline.kwargs.iteritems():
1378    if isinstance(current_arg, PipelineFuture):
1379      current_arg = current_arg.default
1380    if isinstance(current_arg, Slot):
1381      kwarg_dict[name] = {'type': 'slot', 'slot_key': str(current_arg.key)}
1382      dependent_slots.add(current_arg.key)
1383    else:
1384      kwarg_dict[name] = {'type': 'value', 'value': current_arg}
1385
1386  after_all = params['after_all']
1387  for other_future in future._after_all_pipelines:
1388    slot_key = other_future._output_dict['default'].key
1389    after_all.append(str(slot_key))
1390    dependent_slots.add(slot_key)
1391
1392  output_slots = params['output_slots']
1393  output_slot_keys = set()
1394  for name, slot in future._output_dict.iteritems():
1395    output_slot_keys.add(slot.key)
1396    output_slots[name] = str(slot.key)
1397
1398  params_encoded = json.dumps(params, cls=mr_util.JsonEncoder)
1399  params_text = None
1400  params_blob = None
1401  if len(params_encoded) > _MAX_JSON_SIZE:
1402    params_blob = _write_json_blob(params_encoded, pipeline.pipeline_id)
1403  else:
1404    params_text = params_encoded
1405
1406  return dependent_slots, output_slot_keys, params_text, params_blob
1407
1408
1409class _PipelineContext(object):
1410  """Internal API for interacting with Pipeline state."""
1411
1412  _gettime = datetime.datetime.utcnow
1413
1414  def __init__(self,
1415               task_name,
1416               queue_name,
1417               base_path):
1418    """Initializer.
1419
1420    Args:
1421      task_name: The name of the currently running task or empty if there
1422        is no task running.
1423      queue_name: The queue this pipeline should run on (may not be the
1424        current queue this request is on).
1425      base_path: Relative URL for the pipeline's handlers.
1426    """
1427    self.task_name = task_name
1428    self.queue_name = queue_name
1429    self.base_path = base_path
1430    self.barrier_handler_path = '%s/output' % base_path
1431    self.pipeline_handler_path = '%s/run' % base_path
1432    self.finalized_handler_path = '%s/finalized' % base_path
1433    self.fanout_handler_path = '%s/fanout' % base_path
1434    self.abort_handler_path = '%s/abort' % base_path
1435    self.fanout_abort_handler_path = '%s/fanout_abort' % base_path
1436    self.session_filled_output_names = set()
1437
1438  @classmethod
1439  def from_environ(cls, environ=os.environ):
1440    """Constructs a _PipelineContext from the task queue environment."""
1441    base_path, unused = (environ['PATH_INFO'].rsplit('/', 1) + [''])[:2]
1442    return cls(
1443        environ['HTTP_X_APPENGINE_TASKNAME'],
1444        environ['HTTP_X_APPENGINE_QUEUENAME'],
1445        base_path)
1446
1447  def fill_slot(self, filler_pipeline_key, slot, value):
1448    """Fills a slot, enqueueing a task to trigger pending barriers.
1449
1450    Args:
1451      filler_pipeline_key: db.Key or stringified key of the _PipelineRecord
1452        that filled this slot.
1453      slot: The Slot instance to fill.
1454      value: The serializable value to assign.
1455
1456    Raises:
1457      UnexpectedPipelineError if the _SlotRecord for the 'slot' could not
1458      be found in the Datastore.
1459    """
1460    if not isinstance(filler_pipeline_key, db.Key):
1461      filler_pipeline_key = db.Key(filler_pipeline_key)
1462
1463    if _TEST_MODE:
1464      slot._set_value_test(filler_pipeline_key, value)
1465    else:
1466      encoded_value = json.dumps(value,
1467                                       sort_keys=True,
1468                                       cls=mr_util.JsonEncoder)
1469      value_text = None
1470      value_blob = None
1471      if len(encoded_value) <= _MAX_JSON_SIZE:
1472        value_text = db.Text(encoded_value)
1473      else:
1474        # The encoded value is too big. Save it as a blob.
1475        value_blob = _write_json_blob(encoded_value, filler_pipeline_key.name())
1476
1477      def txn():
1478        slot_record = db.get(slot.key)
1479        if slot_record is None:
1480          raise UnexpectedPipelineError(
1481              'Tried to fill missing slot "%s" '
1482              'by pipeline ID "%s" with value: %r'
1483              % (slot.key, filler_pipeline_key.name(), value))
1484        # NOTE: Always take the override value here. If down-stream pipelines
1485        # need a consitent view of all up-stream outputs (meaning, all of the
1486        # outputs came from the same retry attempt of the upstream pipeline),
1487        # the down-stream pipeline must also wait for the 'default' output
1488        # of these up-stream pipelines.
1489        slot_record.filler = filler_pipeline_key
1490        slot_record.value_text = value_text
1491        slot_record.value_blob = value_blob
1492        slot_record.status = _SlotRecord.FILLED
1493        slot_record.fill_time = self._gettime()
1494        slot_record.put()
1495        task = taskqueue.Task(
1496            url=self.barrier_handler_path,
1497            params=dict(
1498                slot_key=slot.key,
1499                use_barrier_indexes=True),
1500            headers={'X-Ae-Slot-Key': slot.key,
1501                     'X-Ae-Filler-Pipeline-Key': filler_pipeline_key})
1502        task.add(queue_name=self.queue_name, transactional=True)
1503      db.run_in_transaction_options(
1504          db.create_transaction_options(propagation=db.ALLOWED), txn)
1505
1506    self.session_filled_output_names.add(slot.name)
1507
1508  def notify_barriers(self,
1509                      slot_key,
1510                      cursor,
1511                      use_barrier_indexes,
1512                      max_to_notify=_MAX_BARRIERS_TO_NOTIFY):
1513    """Searches for barriers affected by a slot and triggers completed ones.
1514
1515    Args:
1516      slot_key: db.Key or stringified key of the _SlotRecord that was filled.
1517      cursor: Stringified Datastore cursor where the notification query
1518        should pick up.
1519      use_barrier_indexes: When True, use _BarrierIndex records to determine
1520        which _Barriers to trigger by having this _SlotRecord filled. When
1521        False, use the old method that queries for _BarrierRecords by
1522        the blocking_slots parameter.
1523      max_to_notify: Used for testing.
1524
1525    Raises:
1526      PipelineStatusError: If any of the barriers are in a bad state.
1527    """
1528    if not isinstance(slot_key, db.Key):
1529      slot_key = db.Key(slot_key)
1530    logging.debug('Notifying slot %r', slot_key)
1531
1532    if use_barrier_indexes:
1533      # Please see models.py:_BarrierIndex to understand how _BarrierIndex
1534      # entities relate to _BarrierRecord entities.
1535      query = (
1536          _BarrierIndex.all(cursor=cursor, keys_only=True)
1537          .ancestor(slot_key))
1538      barrier_index_list = query.fetch(max_to_notify)
1539      barrier_key_list = [
1540          _BarrierIndex.to_barrier_key(key) for key in barrier_index_list]
1541
1542      # If there are task and pipeline kickoff retries it's possible for a
1543      # _BarrierIndex to exist for a _BarrierRecord that was not successfully
1544      # written. It's safe to ignore this because the original task that wrote
1545      # the _BarrierIndex and _BarrierRecord would not have made progress to
1546      # kick off a real pipeline or child pipeline unless all of the writes for
1547      # these dependent entities went through. We assume that the instigator
1548      # retried from scratch and somehwere there exists a good _BarrierIndex and
1549      # corresponding _BarrierRecord that tries to accomplish the same thing.
1550      barriers = db.get(barrier_key_list)
1551      results = []
1552      for barrier_key, barrier in zip(barrier_key_list, barriers):
1553        if barrier is None:
1554          logging.debug('Ignoring that Barrier "%r" is missing, '
1555                        'relies on Slot "%r"', barrier_key, slot_key)
1556        else:
1557          results.append(barrier)
1558    else:
1559      # TODO(user): Delete this backwards compatible codepath and
1560      # make use_barrier_indexes the assumed default in all cases.
1561      query = (
1562          _BarrierRecord.all(cursor=cursor)
1563          .filter('blocking_slots =', slot_key))
1564      results = query.fetch(max_to_notify)
1565
1566    # Fetch all blocking _SlotRecords for any potentially triggered barriers.
1567    blocking_slot_keys = []
1568    for barrier in results:
1569      blocking_slot_keys.extend(barrier.blocking_slots)
1570
1571    blocking_slot_dict = {}
1572    for slot_record in db.get(blocking_slot_keys):
1573      if slot_record is None:
1574        continue
1575      blocking_slot_dict[slot_record.key()] = slot_record
1576
1577    task_list = []
1578    updated_barriers = []
1579    for barrier in results:
1580      ready_slots = []
1581      for blocking_slot_key in barrier.blocking_slots:
1582        slot_record = blocking_slot_dict.get(blocking_slot_key)
1583        if slot_record is None:
1584          raise UnexpectedPipelineError(
1585              'Barrier "%r" relies on Slot "%r" which is missing.' %
1586              (barrier.key(), blocking_slot_key))
1587        if slot_record.status == _SlotRecord.FILLED:
1588          ready_slots.append(blocking_slot_key)
1589
1590      # When all of the blocking_slots have been filled, consider the barrier
1591      # ready to trigger. We'll trigger it regardless of the current
1592      # _BarrierRecord status, since there could be task queue failures at any
1593      # point in this flow; this rolls forward the state and de-dupes using
1594      # the task name tombstones.
1595      pending_slots = set(barrier.blocking_slots) - set(ready_slots)
1596      if not pending_slots:
1597        if barrier.status != _BarrierRecord.FIRED:
1598          barrier.status = _BarrierRecord.FIRED
1599          barrier.trigger_time = self._gettime()
1600          updated_barriers.append(barrier)
1601
1602        purpose = barrier.key().name()
1603        if purpose == _BarrierRecord.START:
1604          path = self.pipeline_handler_path
1605          countdown = None
1606        else:
1607          path = self.finalized_handler_path
1608          # NOTE: Wait one second before finalization to prevent
1609          # contention on the _PipelineRecord entity.
1610          countdown = 1
1611        pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
1612        logging.debug('Firing barrier %r', barrier.key())
1613        task_list.append(taskqueue.Task(
1614            url=path,
1615            countdown=countdown,
1616            name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
1617            params=dict(pipeline_key=pipeline_key, purpose=purpose),
1618            headers={'X-Ae-Pipeline-Key': pipeline_key}))
1619      else:
1620        logging.debug('Not firing barrier %r, Waiting for slots: %r',
1621                      barrier.key(), pending_slots)
1622
1623    # Blindly overwrite _BarrierRecords that have an updated status. This is
1624    # acceptable because by this point all finalization barriers for
1625    # generator children should have already had their final outputs assigned.
1626    if updated_barriers:
1627      db.put(updated_barriers)
1628
1629    # Task continuation with sequence number to prevent fork-bombs.
1630    if len(results) == max_to_notify:
1631      the_match = re.match('(.*)-ae-barrier-notify-([0-9]+)', self.task_name)
1632      if the_match:
1633        prefix = the_match.group(1)
1634        end = int(the_match.group(2)) + 1
1635      else:
1636        prefix = self.task_name
1637        end = 0
1638      task_list.append(taskqueue.Task(
1639          name='%s-ae-barrier-notify-%d' % (prefix, end),
1640          url=self.barrier_handler_path,
1641          params=dict(
1642              slot_key=slot_key,
1643              cursor=query.cursor(),
1644              use_barrier_indexes=use_barrier_indexes)))
1645
1646    if task_list:
1647      try:
1648        taskqueue.Queue(self.queue_name).add(task_list)
1649      except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
1650        pass
1651
1652  def begin_abort(self, root_pipeline_key, abort_message):
1653    """Kicks off the abort process for a root pipeline and all its children.
1654
1655    Args:
1656      root_pipeline_key: db.Key of the root pipeline to abort.
1657      abort_message: Message explaining why the abort happened, only saved
1658          into the root pipeline.
1659
1660    Returns:
1661      True if the abort signal was sent successfully; False otherwise.
1662    """
1663    def txn():
1664      pipeline_record = db.get(root_pipeline_key)
1665      if pipeline_record is None:
1666        logging.warning(
1667            'Tried to abort root pipeline ID "%s" but it does not exist.',
1668            root_pipeline_key.name())
1669        raise db.Rollback()
1670      if pipeline_record.status == _PipelineRecord.ABORTED:
1671        logging.warning(
1672            'Tried to abort root pipeline ID "%s"; already in state: %s',
1673            root_pipeline_key.name(), pipeline_record.status)
1674        raise db.Rollback()
1675      if pipeline_record.abort_requested:
1676        logging.warning(
1677            'Tried to abort root pipeline ID "%s"; abort signal already sent.',
1678            root_pipeline_key.name())
1679        raise db.Rollback()
1680
1681      pipeline_record.abort_requested = True
1682      pipeline_record.abort_message = abort_message
1683      pipeline_record.put()
1684
1685      task = taskqueue.Task(
1686          url=self.fanout_abort_handler_path,
1687          params=dict(root_pipeline_key=root_pipeline_key))
1688      task.add(queue_name=self.queue_name, transactional=True)
1689      return True
1690
1691    return db.run_in_transaction(txn)
1692
1693  def continue_abort(self,
1694                     root_pipeline_key,
1695                     cursor=None,
1696                     max_to_notify=_MAX_ABORTS_TO_BEGIN):
1697    """Sends the abort signal to all children for a root pipeline.
1698
1699    Args:
1700      root_pipeline_key: db.Key of the root pipeline to abort.
1701      cursor: The query cursor for enumerating _PipelineRecords when inserting
1702        tasks to cause child pipelines to terminate.
1703      max_to_notify: Used for testing.
1704    """
1705    if not isinstance(root_pipeline_key, db.Key):
1706      root_pipeline_key = db.Key(root_pipeline_key)
1707    # NOTE: The results of this query may include _PipelineRecord instances
1708    # that are not actually "reachable", meaning you cannot get to them by
1709    # starting at the root pipeline and following "fanned_out" onward. This
1710    # is acceptable because even these defunct _PipelineRecords will properly
1711    # set their status to ABORTED when the signal comes, regardless of any
1712    # other status they may have had.
1713    #
1714    # The only gotcha here is if a Pipeline's finalize method somehow modifies
1715    # its inputs (like deleting an input file). In the case there are
1716    # unreachable child pipelines, it will appear as if two finalize methods
1717    # have been called instead of just one. The saving grace here is that
1718    # finalize must be idempotent, so this *should* be harmless.
1719    query = (
1720        _PipelineRecord.all(cursor=cursor)
1721        .filter('root_pipeline =', root_pipeline_key))
1722    results = query.fetch(max_to_notify)
1723
1724    task_list = []
1725    for pipeline_record in results:
1726      if pipeline_record.status not in (
1727          _PipelineRecord.RUN, _PipelineRecord.WAITING):
1728        continue
1729
1730      pipeline_key = pipeline_record.key()
1731      task_list.append(taskqueue.Task(
1732          name='%s-%s-abort' % (self.task_name, pipeline_key.name()),
1733          url=self.abort_handler_path,
1734          params=dict(pipeline_key=pipeline_key, purpose=_BarrierRecord.ABORT),
1735          headers={'X-Ae-Pipeline-Key': pipeline_key}))
1736
1737    # Task continuation with sequence number to prevent fork-bombs.
1738    if len(results) == max_to_notify:
1739      the_match = re.match('(.*)-([0-9]+)', self.task_name)
1740      if the_match:
1741        prefix = the_match.group(1)
1742        end = int(the_match.group(2)) + 1
1743      else:
1744        prefix = self.task_name
1745        end = 0
1746      task_list.append(taskqueue.Task(
1747          name='%s-%d' % (prefix, end),
1748          url=self.fanout_abort_handler_path,
1749          params=dict(root_pipeline_key=root_pipeline_key,
1750                      cursor=query.cursor())))
1751
1752    if task_list:
1753      try:
1754        taskqueue.Queue(self.queue_name).add(task_list)
1755      except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
1756        pass
1757
1758  def start(self, pipeline, return_task=True, countdown=None, eta=None):
1759    """Starts a pipeline.
1760
1761    Args:
1762      pipeline: Pipeline instance to run.
1763      return_task: When True, do not submit the task to start the pipeline
1764        but instead return it for someone else to enqueue.
1765      countdown: Time in seconds into the future that this Task should execute.
1766        Defaults to zero.
1767      eta: A datetime.datetime specifying the absolute time at which the task
1768        should be executed. Must not be specified if 'countdown' is specified.
1769        This may be timezone-aware or timezone-naive. If None, defaults to now.
1770        For pull tasks, no worker will be able to lease this task before the
1771        time indicated by eta.
1772
1773    Returns:
1774      The task to start this pipeline if return_task was True.
1775
1776    Raises:
1777      PipelineExistsError if the pipeline with the given ID already exists.
1778    """
1779    # Adjust all pipeline output keys for this Pipeline to be children of
1780    # the _PipelineRecord, that way we can write them all and submit in a
1781    # single transaction.
1782    for name, slot in pipeline.outputs._output_dict.iteritems():
1783      slot.key = db.Key.from_path(
1784          *slot.key.to_path(), **dict(parent=pipeline._pipeline_key))
1785
1786    _, output_slots, params_text, params_blob = _generate_args(
1787        pipeline, pipeline.outputs, self.queue_name, self.base_path)
1788
1789    @db.transactional(propagation=db.INDEPENDENT)
1790    def txn():
1791      pipeline_record = db.get(pipeline._pipeline_key)
1792      if pipeline_record is not None:
1793        raise PipelineExistsError(
1794            'Pipeline with idempotence key "%s" already exists; params=%s' %
1795            (pipeline._pipeline_key.name(),
1796             _short_repr(pipeline_record.params)))
1797
1798      entities_to_put = []
1799      for name, slot in pipeline.outputs._output_dict.iteritems():
1800        entities_to_put.append(_SlotRecord(
1801            key=slot.key,
1802            root_pipeline=pipeline._pipeline_key))
1803
1804      entities_to_put.append(_PipelineRecord(
1805          key=pipeline._pipeline_key,
1806          root_pipeline=pipeline._pipeline_key,
1807          is_root_pipeline=True,
1808          # Bug in DB means we need to use the storage name here,
1809          # not the local property name.
1810          params=params_text,
1811          params_blob=params_blob,
1812          start_time=self._gettime(),
1813          class_path=pipeline._class_path,
1814          max_attempts=pipeline.max_attempts))
1815
1816      entities_to_put.extend(_PipelineContext._create_barrier_entities(
1817          pipeline._pipeline_key,
1818          pipeline._pipeline_key,
1819          _BarrierRecord.FINALIZE,
1820          output_slots))
1821
1822      db.put(entities_to_put)
1823
1824      task = taskqueue.Task(
1825          url=self.pipeline_handler_path,
1826          params=dict(pipeline_key=pipeline._pipeline_key),
1827          headers={'X-Ae-Pipeline-Key': pipeline._pipeline_key},
1828          target=pipeline.target,
1829          countdown=countdown,
1830          eta=eta)
1831      if return_task:
1832        return task
1833      task.add(queue_name=self.queue_name, transactional=True)
1834
1835    task = txn()
1836    # Immediately mark the output slots as existing so they can be filled
1837    # by asynchronous pipelines or used in test mode.
1838    for output_slot in pipeline.outputs._output_dict.itervalues():
1839      output_slot._exists = True
1840    return task
1841
1842  def start_test(self, pipeline):
1843    """Starts a pipeline in the test mode.
1844
1845    Args:
1846      pipeline: The Pipeline instance to test.
1847    """
1848    global _TEST_MODE, _TEST_ROOT_PIPELINE_KEY
1849    self.start(pipeline, return_task=True)
1850    _TEST_MODE = True
1851    _TEST_ROOT_PIPELINE_KEY = pipeline._pipeline_key
1852    try:
1853      self.evaluate_test(pipeline, root=True)
1854    finally:
1855      _TEST_MODE = False
1856
1857  def evaluate_test(self, stage, root=False):
1858    """Recursively evaluates the given pipeline in test mode.
1859
1860    Args:
1861      stage: The Pipeline instance to run at this stage in the flow.
1862      root: True if the supplied stage is the root of the pipeline.
1863    """
1864    args_adjusted = []
1865    for arg in stage.args:
1866      if isinstance(arg, PipelineFuture):
1867        arg = arg.default
1868      if isinstance(arg, Slot):
1869        value = arg.value
1870        arg._touched = True
1871      else:
1872        value = arg
1873      args_adjusted.append(value)
1874
1875    kwargs_adjusted = {}
1876    for name, arg in stage.kwargs.iteritems():
1877      if isinstance(arg, PipelineFuture):
1878        arg = arg.default
1879      if isinstance(arg, Slot):
1880        value = arg.value
1881        arg._touched = True
1882      else:
1883        value = arg
1884      kwargs_adjusted[name] = value
1885
1886    stage.args, stage.kwargs = args_adjusted, kwargs_adjusted
1887    pipeline_generator = mr_util.is_generator_function(stage.run)
1888    logging.debug('Running %s(*%s, **%s)', stage._class_path,
1889                  _short_repr(stage.args), _short_repr(stage.kwargs))
1890
1891    if stage.async:
1892      stage.run_test(*stage.args, **stage.kwargs)
1893    elif pipeline_generator:
1894      all_output_slots = set()
1895      try:
1896        pipeline_iter = stage.run_test(*stage.args, **stage.kwargs)
1897      except NotImplementedError:
1898        pipeline_iter = stage.run(*stage.args, **stage.kwargs)
1899
1900      all_substages = set()
1901      next_value = None
1902      last_sub_stage = None
1903      while True:
1904        try:
1905          yielded = pipeline_iter.send(next_value)
1906        except StopIteration:
1907          break
1908
1909        if isinstance(yielded, Pipeline):
1910          if yielded in all_substages:
1911            raise UnexpectedPipelineError(
1912                'Already yielded pipeline object %r' % yielded)
1913          else:
1914            all_substages.add(yielded)
1915
1916          last_sub_stage = yielded
1917          next_value = yielded.outputs
1918          all_output_slots.update(next_value._output_dict.itervalues())
1919        else:
1920          raise UnexpectedPipelineError(
1921              'Yielded a disallowed value: %r' % yielded)
1922
1923      if last_sub_stage:
1924        # Generator's outputs inherited from last running sub-stage.
1925        # If the generator changes its mind and doesn't yield anything, this
1926        # may not happen at all. Missing outputs will be caught when they
1927        # are passed to the stage as inputs, or verified from the outside by
1928        # the test runner.
1929        for slot_name, slot in last_sub_stage.outputs._output_dict.iteritems():
1930          stage.outputs._output_dict[slot_name] = slot
1931          # Any inherited slots won't be checked for declaration.
1932          all_output_slots.remove(slot)
1933      else:
1934        # Generator yielded no children, so treat it as a sync function.
1935        stage.outputs.default._set_value_test(stage._pipeline_key, None)
1936
1937      # Enforce the policy of requiring all undeclared output slots from
1938      # child pipelines to be consumed by their parent generator.
1939      for slot in all_output_slots:
1940        if slot.name == 'default':
1941          continue
1942        if slot.filled and not slot._strict and not slot._touched:
1943          raise SlotNotDeclaredError(
1944              'Undeclared output "%s"; all dynamic outputs from child '
1945              'pipelines must be consumed.' % slot.name)
1946    else:
1947      try:
1948        result = stage.run_test(*stage.args, **stage.kwargs)
1949      except NotImplementedError:
1950        result = stage.run(*stage.args, **stage.kwargs)
1951      stage.outputs.default._set_value_test(stage._pipeline_key, result)
1952
1953    # Enforce strict output usage at the top level.
1954    if root:
1955      found_outputs = set()
1956      for slot in stage.outputs._output_dict.itervalues():
1957        if slot.filled:
1958          found_outputs.add(slot.name)
1959        if slot.name == 'default':
1960          continue
1961        if slot.name not in stage.output_names:
1962          raise SlotNotDeclaredError(
1963              'Undeclared output from root pipeline "%s"' % slot.name)
1964
1965      missing_outputs = set(stage.output_names) - found_outputs
1966      if missing_outputs:
1967        raise SlotNotFilledError(
1968            'Outputs %r were never filled.' % missing_outputs)
1969
1970    logging.debug('Finalizing %s(*%s, **%s)', stage._class_path,
1971                  _short_repr(stage.args), _short_repr(stage.kwargs))
1972    ran = False
1973    try:
1974      stage.finalized_test()
1975      ran = True
1976    except NotImplementedError:
1977      pass
1978    if not ran:
1979      try:
1980        stage.finalized()
1981      except NotImplementedError:
1982        pass
1983
1984  def evaluate(self, pipeline_key, purpose=None, attempt=0):
1985    """Evaluates the given Pipeline and enqueues sub-stages for execution.
1986
1987    Args:
1988      pipeline_key: The db.Key or stringified key of the _PipelineRecord to run.
1989      purpose: Why evaluate was called ('start', 'finalize', or 'abort').
1990      attempt: The attempt number that should be tried.
1991    """
1992    After._thread_init()
1993    InOrder._thread_init()
1994    InOrder._local._activated = False
1995
1996    if not isinstance(pipeline_key, db.Key):
1997      pipeline_key = db.Key(pipeline_key)
1998    pipeline_record = db.get(pipeline_key)
1999    if pipeline_record is None:
2000      logging.error('Pipeline ID "%s" does not exist.', pipeline_key.name())
2001      return
2002    if pipeline_record.status not in (
2003        _PipelineRecord.WAITING, _PipelineRecord.RUN):
2004      logging.error('Pipeline ID "%s" in bad state for purpose "%s": "%s"',
2005                    pipeline_key.name(), purpose or _BarrierRecord.START,
2006                    pipeline_record.status)
2007      return
2008
2009    params = pipeline_record.params
2010    root_pipeline_key = \
2011        _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record)
2012    default_slot_key = db.Key(params['output_slots']['default'])
2013
2014    default_slot_record, root_pipeline_record = db.get([
2015        default_slot_key, root_pipeline_key])
2016    if default_slot_record is None:
2017      logging.error('Pipeline ID "%s" default slot "%s" does not exist.',
2018                    pipeline_key.name(), default_slot_key)
2019      return
2020    if root_pipeline_record is None:
2021      logging.error('Pipeline ID "%s" root pipeline ID "%s" is missing.',
2022                    pipeline_key.name(), root_pipeline_key.name())
2023      return
2024
2025    # Always finalize if we're aborting so pipelines have a chance to cleanup
2026    # before they terminate. Pipelines must access 'was_aborted' to find
2027    # out how their finalization should work.
2028    abort_signal = (
2029        purpose == _BarrierRecord.ABORT or
2030        root_pipeline_record.abort_requested == True)
2031    finalize_signal = (
2032        (default_slot_record.status == _SlotRecord.FILLED and
2033         purpose == _BarrierRecord.FINALIZE) or abort_signal)
2034
2035    try:
2036      pipeline_func_class = mr_util.for_name(pipeline_record.class_path)
2037    except ImportError, e:
2038      # This means something is wrong with the deployed code. Rely on the
2039      # taskqueue system to do retries.
2040      retry_message = '%s: %s' % (e.__class__.__name__, str(e))
2041      logging.exception(
2042          'Could not locate %s#%s. %s',
2043          pipeline_record.class_path, pipeline_key.name(), retry_message)
2044      raise
2045
2046    try:
2047      pipeline_func = pipeline_func_class.from_id(
2048          pipeline_key.name(),
2049          resolve_outputs=finalize_signal,
2050          _pipeline_record=pipeline_record)
2051    except SlotNotFilledError, e:
2052      logging.exception(
2053          'Could not resolve arguments for %s#%s. Most likely this means there '
2054          'is a bug in the Pipeline runtime or some intermediate data has been '
2055          'deleted from the Datastore. Giving up.',
2056          pipeline_record.class_path, pipeline_key.name())
2057      self.transition_aborted(pipeline_key)
2058      return
2059    except Exception, e:
2060      retry_message = '%s: %s' % (e.__class__.__name__, str(e))
2061      logging.exception(
2062          'Instantiating %s#%s raised exception. %s',
2063          pipeline_record.class_path, pipeline_key.name(), retry_message)
2064      self.transition_retry(pipeline_key, retry_message)
2065      if pipeline_record.params['task_retry']:
2066        raise
2067      else:
2068        return
2069    else:
2070      pipeline_generator = mr_util.is_generator_function(
2071          pipeline_func_class.run)
2072      caller_output = pipeline_func.outputs
2073
2074    if (abort_signal and pipeline_func.async and
2075        pipeline_record.status == _PipelineRecord.RUN
2076        and not pipeline_func.try_cancel()):
2077      logging.warning(
2078          'Could not cancel and abort mid-flight async pipeline: %r#%s',
2079          pipeline_func, pipeline_key.name())
2080      return
2081
2082    if finalize_signal:
2083      try:
2084        pipeline_func._finalized_internal(
2085              self, pipeline_key, root_pipeline_key,
2086              caller_output, abort_signal)
2087      except Exception, e:
2088        # This means something is wrong with the deployed finalization code.
2089        # Rely on the taskqueue system to do retries.
2090        retry_message = '%s: %s' % (e.__class__.__name__, str(e))
2091        logging.exception('Finalizing %r#%s raised exception. %s',
2092                          pipeline_func, pipeline_key.name(), retry_message)
2093        raise
2094      else:
2095        if not abort_signal:
2096          self.transition_complete(pipeline_key)
2097          return
2098
2099    if abort_signal:
2100      logging.debug('Marking as aborted %s#%s', pipeline_func,
2101                    pipeline_key.name())
2102      self.transition_aborted(pipeline_key)
2103      return
2104
2105    if pipeline_record.current_attempt != attempt:
2106      logging.error(
2107          'Received evaluation task for pipeline ID "%s" attempt %d but '
2108          'current pending attempt is %d', pipeline_key.name(), attempt,
2109          pipeline_record.current_attempt)
2110      return
2111
2112    if pipeline_record.current_attempt >= pipeline_record.max_attempts:
2113      logging.error(
2114          'Received evaluation task for pipeline ID "%s" on attempt %d '
2115          'but that exceeds max attempts %d', pipeline_key.name(), attempt,
2116          pipeline_record.max_attempts)
2117      return
2118
2119    if pipeline_record.next_retry_time is not None:
2120      retry_time = pipeline_record.next_retry_time - _RETRY_WIGGLE_TIMEDELTA
2121      if self._gettime() <= retry_time:
2122        detail_message = (
2123            'Received evaluation task for pipeline ID "%s" on attempt %d, '
2124            'which will not be ready until: %s' % (pipeline_key.name(),
2125            pipeline_record.current_attempt, pipeline_record.next_retry_time))
2126        logging.warning(detail_message)
2127        raise UnexpectedPipelineError(detail_message)
2128
2129    if pipeline_record.status == _PipelineRecord.RUN and pipeline_generator:
2130      if (default_slot_record.status == _SlotRecord.WAITING and
2131          not pipeline_record.fanned_out):
2132        # This properly handles the yield-less generator case when the
2133        # RUN state transition worked properly but outputting to the default
2134        # slot failed.
2135        self.fill_slot(pipeline_key, caller_output.default, None)
2136      return
2137
2138    if (pipeline_record.status == _PipelineRecord.WAITING and
2139        pipeline_func.async):
2140      self.transition_run(pipeline_key)
2141
2142    try:
2143      result = pipeline_func._run_internal(
2144          self, pipeline_key, root_pipeline_key, caller_output)
2145    except Exception, e:
2146      if self.handle_run_exception(pipeline_key, pipeline_func, e):
2147        raise
2148      else:
2149        return
2150
2151    if pipeline_func.async:
2152      return
2153
2154    if not pipeline_generator:
2155      # Catch any exceptions that are thrown when the pipeline's return
2156      # value is being serialized. This ensures that serialization errors
2157      # will cause normal abort/retry behavior.
2158      try:
2159        self.fill_slot(pipeline_key, caller_output.default, result)
2160      except Exception, e:
2161        retry_message = 'Bad return value. %s: %s' % (
2162            e.__class__.__name__, str(e))
2163        logging.exception(
2164            'Generator %r#%s caused exception while serializing return '
2165            'value %r. %s', pipeline_func, pipeline_key.name(), result,
2166            retry_message)
2167        self.transition_retry(pipeline_key, retry_message)
2168        if pipeline_func.task_retry:
2169          raise
2170        else:
2171          return
2172
2173      expected_outputs = set(caller_output._output_dict.iterkeys())
2174      found_outputs = self.session_filled_output_names
2175      if expected_outputs != found_outputs:
2176        exception = SlotNotFilledError(
2177            'Outputs %r for pipeline ID "%s" were never filled by "%s".' % (
2178            expected_outputs - found_outputs,
2179            pipeline_key.name(), pipeline_func._class_path))
2180        if self.handle_run_exception(pipeline_key, pipeline_func, exception):
2181          raise exception
2182      return
2183
2184    pipeline_iter = result
2185    next_value = None
2186    last_sub_stage = None
2187    sub_stage = None
2188    sub_stage_dict = {}
2189    sub_stage_ordering = []
2190
2191    while True:
2192      try:
2193        yielded = pipeline_iter.send(next_value)
2194      except StopIteration:
2195        break
2196      except Exception, e:
2197        if self.handle_run_exception(pipeline_key, pipeline_func, e):
2198          raise
2199        else:
2200          return
2201
2202      if isinstance(yielded, Pipeline):
2203        if yielded in sub_stage_dict:
2204          raise UnexpectedPipelineError(
2205              'Already yielded pipeline object %r with pipeline ID %s' %
2206              (yielded, yielded.pipeline_id))
2207
2208        last_sub_stage = yielded
2209        next_value = PipelineFuture(yielded.output_names)
2210        next_value._after_all_pipelines.update(After._local._after_all_futures)
2211        next_value._after_all_pipelines.update(InOrder._local._in_order_futures)
2212        sub_stage_dict[yielded] = next_value
2213        sub_stage_ordering.append(yielded)
2214        InOrder._add_future(next_value)
2215
2216        # To aid local testing, the task_retry flag (which instructs the
2217        # evaluator to raise all exceptions back up to the task queue) is
2218        # inherited by all children from the root down.
2219        yielded.task_retry = pipeline_func.task_retry
2220      else:
2221        raise UnexpectedPipelineError(
2222            'Yielded a disallowed value: %r' % yielded)
2223
2224    if last_sub_stage:
2225      # Final yielded stage inherits outputs from calling pipeline that were not
2226      # already filled during the generator's execution.
2227      inherited_outputs = params['output_slots']
2228      for slot_name in self.session_filled_output_names:
2229        del inherited_outputs[slot_name]
2230      sub_stage_dict[last_sub_stage]._inherit_outputs(
2231          pipeline_record.class_path, inherited_outputs)
2232    else:
2233      # Here the generator has yielded nothing, and thus acts as a synchronous
2234      # function. We can skip the rest of the generator steps completely and
2235      # fill the default output slot to cause finalizing.
2236      expected_outputs = set(caller_output._output_dict.iterkeys())
2237      expected_outputs.remove('default')
2238      found_outputs = self.session_filled_output_names
2239      if expected_outputs != found_outputs:
2240        exception = SlotNotFilledError(
2241            'Outputs %r for pipeline ID "%s" were never filled by "%s".' % (
2242            expected_outputs - found_outputs,
2243            pipeline_key.name(), pipeline_func._class_path))
2244        if self.handle_run_exception(pipeline_key, pipeline_func, exception):
2245          raise exception
2246      else:
2247        self.fill_slot(pipeline_key, caller_output.default, None)
2248        self.transition_run(pipeline_key)
2249      return
2250
2251    # Allocate any SlotRecords that do not yet exist.
2252    entities_to_put = []
2253    for future in sub_stage_dict.itervalues():
2254      for slot in future._output_dict.itervalues():
2255        if not slot._exists:
2256          entities_to_put.append(_SlotRecord(
2257              key=slot.key, root_pipeline=root_pipeline_key))
2258
2259    # Allocate PipelineRecords and BarrierRecords for generator-run Pipelines.
2260    pipelines_to_run = set()
2261    all_children_keys = []
2262    all_output_slots = set()
2263    for sub_stage in sub_stage_ordering:
2264      future = sub_stage_dict[sub_stage]
2265
2266      # Catch any exceptions that are thrown when the pipeline's parameters
2267      # are being serialized. This ensures that serialization errors will
2268      # cause normal retry/abort behavior.
2269      try:
2270        dependent_slots, output_slots, params_text, params_blob = \
2271            _generate_args(sub_stage, future, self.queue_name, self.base_path)
2272      except Exception, e:
2273        retry_message = 'Bad child arguments. %s: %s' % (
2274            e.__class__.__name__, str(e))
2275        logging.exception(
2276            'Generator %r#%s caused exception while serializing args for '
2277            'child pipeline %r. %s', pipeline_func, pipeline_key.name(),
2278            sub_stage, retry_message)
2279        self.transition_retry(pipeline_key, retry_message)
2280        if pipeline_func.task_retry:
2281          raise
2282        else:
2283          return
2284
2285      child_pipeline_key = db.Key.from_path(
2286          _PipelineRecord.kind(), uuid.uuid4().hex)
2287      all_output_slots.update(output_slots)
2288      all_children_keys.append(child_pipeline_key)
2289
2290      child_pipeline = _PipelineRecord(
2291          key=child_pipeline_key,
2292          root_pipeline=root_pipeline_key,
2293          # Bug in DB means we need to use the storage name here,
2294          # not the local property name.
2295          params=params_text,
2296          params_blob=params_blob,
2297          class_path=sub_stage._class_path,
2298          max_attempts=sub_stage.max_attempts)
2299      entities_to_put.append(child_pipeline)
2300
2301      if not dependent_slots:
2302        # This child pipeline will run immediately.
2303        pipelines_to_run.add(child_pipeline_key)
2304        child_pipeline.start_time = self._gettime()
2305      else:
2306        entities_to_put.extend(_PipelineContext._create_barrier_entities(
2307            root_pipeline_key,
2308            child_pipeline_key,
2309            _BarrierRecord.START,
2310            dependent_slots))
2311
2312      entities_to_put.extend(_PipelineContext._create_barrier_entities(
2313          root_pipeline_key,
2314          child_pipeline_key,
2315          _BarrierRecord.FINALIZE,
2316          output_slots))
2317
2318    # This generator pipeline's finalization barrier must include all of the
2319    # outputs of any child pipelines that it runs. This ensures the finalized
2320    # calls will not happen until all child pipelines have completed.
2321    #
2322    # The transition_run() call below will update the FINALIZE _BarrierRecord
2323    # for this generator pipeline to include all of these child outputs in
2324    # its list of blocking_slots. That update is done transactionally to
2325    # make sure the _BarrierRecord only lists the slots that matter.
2326    #
2327    # However, the notify_barriers() method doesn't find _BarrierRecords
2328    # through the blocking_slots field. It finds them through _BarrierIndexes
2329    # entities. Thus, before we update the FINALIZE _BarrierRecord in
2330    # transition_run(), we need to write _BarrierIndexes for all child outputs.
2331    barrier_entities = _PipelineContext._create_barrier_entities(
2332        root_pipeline_key,
2333        pipeline_key,
2334        _BarrierRecord.FINALIZE,
2335        all_output_slots)
2336    # Ignore the first element which is the _BarrierRecord. That entity must
2337    # have already been created and put in the datastore for the parent
2338    # pipeline before this code generated child pipelines.
2339    barrier_indexes = barrier_entities[1:]
2340    entities_to_put.extend(barrier_indexes)
2341
2342    db.put(entities_to_put)
2343
2344    self.transition_run(pipeline_key,
2345                        blocking_slot_keys=all_output_slots,
2346                        fanned_out_pipelines=all_children_keys,
2347                        pipelines_to_run=pipelines_to_run)
2348
2349  @staticmethod
2350  def _create_barrier_entities(root_pipeline_key,
2351                               child_pipeline_key,
2352                               purpose,
2353                               blocking_slot_keys):
2354    """Creates all of the entities required for a _BarrierRecord.
2355
2356    Args:
2357      root_pipeline_key: The root pipeline this is part of.
2358      child_pipeline_key: The pipeline this barrier is for.
2359      purpose: _BarrierRecord.START or _BarrierRecord.FINALIZE.
2360      blocking_slot_keys: Set of db.Keys corresponding to _SlotRecords that
2361        this barrier should wait on before firing.
2362
2363    Returns:
2364      List of entities, starting with the _BarrierRecord entity, followed by
2365      _BarrierIndexes used for firing when _SlotRecords are filled in the same
2366      order as the blocking_slot_keys list provided. All of these entities
2367      should be put in the Datastore to ensure the barrier fires properly.
2368    """
2369    result = []
2370
2371    blocking_slot_keys = list(blocking_slot_keys)
2372
2373    barrier = _BarrierRecord(
2374        parent=child_pipeline_key,
2375        key_name=purpose,
2376        target=child_pipeline_key,
2377        root_pipeline=root_pipeline_key,
2378        blocking_slots=blocking_slot_keys)
2379
2380    result.append(barrier)
2381
2382    for slot_key in blocking_slot_keys:
2383      barrier_index_path = []
2384      barrier_index_path.extend(slot_key.to_path())
2385      barrier_index_path.extend(child_pipeline_key.to_path())
2386      barrier_index_path.extend([_BarrierIndex.kind(), purpose])
2387      barrier_index_key = db.Key.from_path(*barrier_index_path)
2388      barrier_index = _BarrierIndex(
2389          key=barrier_index_key,
2390          root_pipeline=root_pipeline_key)
2391      result.append(barrier_index)
2392
2393    return result
2394
2395  def handle_run_exception(self, pipeline_key, pipeline_func, e):
2396    """Handles an exception raised by a Pipeline's user code.
2397
2398    Args:
2399      pipeline_key: The pipeline that raised the error.
2400      pipeline_func: The class path name of the Pipeline that was running.
2401      e: The exception that was raised.
2402
2403    Returns:
2404      True if the exception should be re-raised up through the calling stack
2405      by the caller of this method.
2406    """
2407    if isinstance(e, Retry):
2408      retry_message = str(e)
2409      logging.warning('User forced retry for pipeline ID "%s" of %r: %s',
2410                      pipeline_key.name(), pipeline_func, retry_message)
2411      self.transition_retry(pipeline_key, retry_message)
2412    elif isinstance(e, Abort):
2413      abort_message = str(e)
2414      logging.warning('User forced abort for pipeline ID "%s" of %r: %s',
2415                      pipeline_key.name(), pipeline_func, abort_message)
2416      pipeline_func.abort(abort_message)
2417    else:
2418      retry_message = '%s: %s' % (e.__class__.__name__, str(e))
2419      logging.exception('Generator %r#%s raised exception. %s',
2420                        pipeline_func, pipeline_key.name(), retry_message)
2421      self.transition_retry(pipeline_key, retry_message)
2422
2423    return pipeline_func.task_retry
2424
2425  def transition_run(self,
2426                     pipeline_key,
2427                     blocking_slot_keys=None,
2428                     fanned_out_pipelines=None,
2429                     pipelines_to_run=None):
2430    """Marks an asynchronous or generator pipeline as running.
2431
2432    Does nothing if the pipeline is no longer in a runnable state.
2433
2434    Args:
2435      pipeline_key: The db.Key of the _PipelineRecord to update.
2436      blocking_slot_keys: List of db.Key instances that this pipeline's
2437        finalization barrier should wait on in addition to the existing one.
2438        This is used to update the barrier to include all child outputs. When
2439        None, the barrier will not be updated.
2440      fanned_out_pipelines: List of db.Key instances of _PipelineRecords that
2441        were fanned out by this generator pipeline. This is distinct from the
2442        'pipelines_to_run' list because not all of the pipelines listed here
2443        will be immediately ready to execute. When None, then this generator
2444        yielded no children.
2445      pipelines_to_run: List of db.Key instances of _PipelineRecords that should
2446        be kicked off (fan-out) transactionally as part of this transition.
2447        When None, no child pipelines will run. All db.Keys in this list must
2448        also be present in the fanned_out_pipelines list.
2449
2450    Raises:
2451      UnexpectedPipelineError if blocking_slot_keys was not empty and the
2452      _BarrierRecord has gone missing.
2453    """
2454    def txn():
2455      pipeline_record = db.get(pipeline_key)
2456      if pipeline_record is None:
2457        logging.warning('Pipeline ID "%s" cannot be marked as run. '
2458                        'Does not exist.', pipeline_key.name())
2459        raise db.Rollback()
2460      if pipeline_record.status != _PipelineRecord.WAITING:
2461        logging.warning('Pipeline ID "%s" in bad state to be marked as run: %s',
2462                        pipeline_key.name(), pipeline_record.status)
2463        raise db.Rollback()
2464
2465      pipeline_record.status = _PipelineRecord.RUN
2466
2467      if fanned_out_pipelines:
2468        # NOTE: We must model the pipeline relationship in a top-down manner,
2469        # meaning each pipeline must point forward to the pipelines that it
2470        # fanned out to. The reason is race conditions. If evaluate()
2471        # dies early, it may create many unused _PipelineRecord and _SlotRecord
2472        # instances that never progress. The only way we know which of these
2473        # are valid is by traversing the graph from the root, where the
2474        # fanned_out property refers to those pipelines that were run using a
2475        # transactional task.
2476        child_pipeline_list = list(fanned_out_pipelines)
2477        pipeline_record.fanned_out = child_pipeline_list
2478
2479        if pipelines_to_run:
2480          child_indexes = [
2481              child_pipeline_list.index(p) for p in pipelines_to_run]
2482          child_indexes.sort()
2483          task = taskqueue.Task(
2484              url=self.fanout_handler_path,
2485              params=dict(parent_key=str(pipeline_key),
2486                          child_indexes=child_indexes))
2487          task.add(queue_name=self.queue_name, transactional=True)
2488
2489      pipeline_record.put()
2490
2491      if blocking_slot_keys:
2492        # NOTE: Always update a generator pipeline's finalization barrier to
2493        # include all of the outputs of any pipelines that it runs, to ensure
2494        # that finalized calls will not happen until all child pipelines have
2495        # completed. This must happen transactionally with the enqueue of
2496        # the fan-out kickoff task above to ensure the child output slots and
2497        # the barrier blocking slots are the same.
2498        barrier_key = db.Key.from_path(
2499            _BarrierRecord.kind(), _BarrierRecord.FINALIZE,
2500            parent=pipeline_key)
2501        finalize_barrier = db.get(barrier_key)
2502        if finalize_barrier is None:
2503          raise UnexpectedPipelineError(
2504              'Pipeline ID "%s" cannot update finalize barrier. '
2505              'Does not exist.' % pipeline_key.name())
2506        else:
2507          finalize_barrier.blocking_slots = list(
2508              blocking_slot_keys.union(set(finalize_barrier.blocking_slots)))
2509          finalize_barrier.put()
2510
2511    db.run_in_transaction(txn)
2512
2513  def transition_complete(self, pipeline_key):
2514    """Marks the given pipeline as complete.
2515
2516    Does nothing if the pipeline is no longer in a state that can be completed.
2517
2518    Args:
2519      pipeline_key: db.Key of the _PipelineRecord that has completed.
2520    """
2521    def txn():
2522      pipeline_record = db.get(pipeline_key)
2523      if pipeline_record is None:
2524        logging.warning(
2525            'Tried to mark pipeline ID "%s" as complete but it does not exist.',
2526            pipeline_key.name())
2527        raise db.Rollback()
2528      if pipeline_record.status not in (
2529          _PipelineRecord.WAITING, _PipelineRecord.RUN):
2530        logging.warning(
2531            'Tried to mark pipeline ID "%s" as complete, found bad state: %s',
2532            pipeline_key.name(), pipeline_record.status)
2533        raise db.Rollback()
2534
2535      pipeline_record.status = _PipelineRecord.DONE
2536      pipeline_record.finalized_time = self._gettime()
2537      pipeline_record.put()
2538
2539    db.run_in_transaction(txn)
2540
2541  def transition_retry(self, pipeline_key, retry_message):
2542    """Marks the given pipeline as requiring another retry.
2543
2544    Does nothing if all attempts have been exceeded.
2545
2546    Args:
2547      pipeline_key: db.Key of the _PipelineRecord that needs to be retried.
2548      retry_message: User-supplied message indicating the reason for the retry.
2549    """
2550    def txn():
2551      pipeline_record = db.get(pipeline_key)
2552      if pipeline_record is None:
2553        logging.warning(
2554            'Tried to retry pipeline ID "%s" but it does not exist.',
2555            pipeline_key.name())
2556        raise db.Rollback()
2557      if pipeline_record.status not in (
2558          _PipelineRecord.WAITING, _PipelineRecord.RUN):
2559        logging.warning(
2560            'Tried to retry pipeline ID "%s", found bad state: %s',
2561            pipeline_key.name(), pipeline_record.status)
2562        raise db.Rollback()
2563
2564      params = pipeline_record.params
2565      offset_seconds = (
2566          params['backoff_seconds'] *
2567          (params['backoff_factor'] ** pipeline_record.current_attempt))
2568      pipeline_record.next_retry_time = (
2569          self._gettime() + datetime.timedelta(seconds=offset_seconds))
2570      pipeline_record.current_attempt += 1
2571      pipeline_record.retry_message = retry_message
2572      pipeline_record.status = _PipelineRecord.WAITING
2573
2574      if pipeline_record.current_attempt >= pipeline_record.max_attempts:
2575        root_pipeline_key = (
2576            _PipelineRecord.root_pipeline.get_value_for_datastore(
2577                pipeline_record))
2578        logging.warning(
2579            'Giving up on pipeline ID "%s" after %d attempt(s); causing abort '
2580            'all the way to the root pipeline ID "%s"', pipeline_key.name(),
2581            pipeline_record.current_attempt, root_pipeline_key.name())
2582        # NOTE: We do *not* set the status to aborted here to ensure that
2583        # this pipeline will be finalized before it has been marked as aborted.
2584        pipeline_record.abort_message = (
2585            'Aborting after %d attempts' % pipeline_record.current_attempt)
2586        task = taskqueue.Task(
2587            url=self.fanout_abort_handler_path,
2588            params=dict(root_pipeline_key=root_pipeline_key))
2589        task.add(queue_name=self.queue_name, transactional=True)
2590      else:
2591        task = taskqueue.Task(
2592            url=self.pipeline_handler_path,
2593            eta=pipeline_record.next_retry_time,
2594            params=dict(pipeline_key=pipeline_key,
2595                        purpose=_BarrierRecord.START,
2596                        attempt=pipeline_record.current_attempt),
2597            headers={'X-Ae-Pipeline-Key': pipeline_key})
2598        task.add(queue_name=self.queue_name, transactional=True)
2599
2600      pipeline_record.put()
2601
2602    db.run_in_transaction(txn)
2603
2604  def transition_aborted(self, pipeline_key):
2605    """Makes the given pipeline as having aborted.
2606
2607    Does nothing if the pipeline is in a bad state.
2608
2609    Args:
2610      pipeline_key: db.Key of the _PipelineRecord that needs to be retried.
2611    """
2612    def txn():
2613      pipeline_record = db.get(pipeline_key)
2614      if pipeline_record is None:
2615        logging.warning(
2616            'Tried to abort pipeline ID "%s" but it does not exist.',
2617            pipeline_key.name())
2618        raise db.Rollback()
2619      if pipeline_record.status not in (
2620          _PipelineRecord.WAITING, _PipelineRecord.RUN):
2621        logging.warning(
2622            'Tried to abort pipeline ID "%s", found bad state: %s',
2623            pipeline_key.name(), pipeline_record.status)
2624        raise db.Rollback()
2625
2626      pipeline_record.status = _PipelineRecord.ABORTED
2627      pipeline_record.finalized_time = self._gettime()
2628      pipeline_record.put()
2629
2630    db.run_in_transaction(txn)
2631
2632################################################################################
2633
2634
2635class _BarrierHandler(webapp.RequestHandler):
2636  """Request handler for triggering barriers."""
2637
2638  def post(self):
2639    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2640      self.response.set_status(403)
2641      return
2642
2643    context = _PipelineContext.from_environ(self.request.environ)
2644    context.notify_barriers(
2645        self.request.get('slot_key'),
2646        self.request.get('cursor'),
2647        use_barrier_indexes=self.request.get('use_barrier_indexes') == 'True')
2648
2649
2650class _PipelineHandler(webapp.RequestHandler):
2651  """Request handler for running pipelines."""
2652
2653  def post(self):
2654    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2655      self.response.set_status(403)
2656      return
2657
2658    context = _PipelineContext.from_environ(self.request.environ)
2659    context.evaluate(self.request.get('pipeline_key'),
2660                     purpose=self.request.get('purpose'),
2661                     attempt=int(self.request.get('attempt', '0')))
2662
2663
2664class _FanoutAbortHandler(webapp.RequestHandler):
2665  """Request handler for fanning out abort notifications."""
2666
2667  def post(self):
2668    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2669      self.response.set_status(403)
2670      return
2671
2672    context = _PipelineContext.from_environ(self.request.environ)
2673    context.continue_abort(
2674        self.request.get('root_pipeline_key'),
2675        self.request.get('cursor'))
2676
2677
2678class _FanoutHandler(webapp.RequestHandler):
2679  """Request handler for fanning out pipeline children."""
2680
2681  def post(self):
2682    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2683      self.response.set_status(403)
2684      return
2685
2686    context = _PipelineContext.from_environ(self.request.environ)
2687
2688    # Set of stringified db.Keys of children to run.
2689    all_pipeline_keys = set()
2690
2691    # For backwards compatibility with the old style of fan-out requests.
2692    all_pipeline_keys.update(self.request.get_all('pipeline_key'))
2693
2694    # Fetch the child pipelines from the parent. This works around the 10KB
2695    # task payload limit. This get() is consistent-on-read and the fan-out
2696    # task is enqueued in the transaction that updates the parent, so the
2697    # fanned_out property is consistent here.
2698    parent_key = self.request.get('parent_key')
2699    child_indexes = [int(x) for x in self.request.get_all('child_indexes')]
2700    if parent_key:
2701      parent_key = db.Key(parent_key)
2702      parent = db.get(parent_key)
2703      for index in child_indexes:
2704        all_pipeline_keys.add(str(parent.fanned_out[index]))
2705
2706    all_tasks = []
2707    all_pipelines = db.get([db.Key(pipeline_key) for pipeline_key in all_pipeline_keys])
2708    for child_pipeline in all_pipelines:
2709      if child_pipeline is None:
2710        continue
2711      pipeline_key = str(child_pipeline.key())
2712      all_tasks.append(taskqueue.Task(
2713          url=context.pipeline_handler_path,
2714          params=dict(pipeline_key=pipeline_key),
2715          target=child_pipeline.params['target'],
2716          headers={'X-Ae-Pipeline-Key': pipeline_key},
2717          name='ae-pipeline-fan-out-' + child_pipeline.key().name()))
2718
2719    batch_size = 100  # Limit of taskqueue API bulk add.
2720    for i in xrange(0, len(all_tasks), batch_size):
2721      batch = all_tasks[i:i+batch_size]
2722      try:
2723        taskqueue.Queue(context.queue_name).add(batch)
2724      except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
2725        pass
2726
2727
2728class _CleanupHandler(webapp.RequestHandler):
2729  """Request handler for cleaning up a Pipeline."""
2730
2731  def post(self):
2732    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2733      self.response.set_status(403)
2734      return
2735
2736    root_pipeline_key = db.Key(self.request.get('root_pipeline_key'))
2737    logging.debug('Cleaning up root_pipeline_key=%r', root_pipeline_key)
2738
2739    # TODO(user): Accumulate all BlobKeys from _PipelineRecord and
2740    # _SlotRecord entities and delete them.
2741    pipeline_keys = (
2742        _PipelineRecord.all(keys_only=True)
2743        .filter('root_pipeline =', root_pipeline_key))
2744    db.delete(pipeline_keys)
2745    slot_keys = (
2746        _SlotRecord.all(keys_only=True)
2747        .filter('root_pipeline =', root_pipeline_key))
2748    db.delete(slot_keys)
2749    barrier_keys = (
2750        _BarrierRecord.all(keys_only=True)
2751        .filter('root_pipeline =', root_pipeline_key))
2752    db.delete(barrier_keys)
2753    status_keys = (
2754        _StatusRecord.all(keys_only=True)
2755        .filter('root_pipeline =', root_pipeline_key))
2756    db.delete(status_keys)
2757    barrier_index_keys = (
2758        _BarrierIndex.all(keys_only=True)
2759        .filter('root_pipeline =', root_pipeline_key))
2760    db.delete(barrier_index_keys)
2761
2762
2763class _CallbackHandler(webapp.RequestHandler):
2764  """Receives asynchronous callback requests from humans or tasks."""
2765
2766  def post(self):
2767    self.get()
2768
2769  def get(self):
2770    try:
2771      self.run_callback()
2772    except _CallbackTaskError, e:
2773      logging.error(str(e))
2774      if 'HTTP_X_APPENGINE_TASKRETRYCOUNT' in self.request.environ:
2775        # Silently give up on tasks that have retried many times. This
2776        # probably means that the target pipeline has been deleted, so there's
2777        # no reason to keep trying this task forever.
2778        retry_count = int(
2779            self.request.environ.get('HTTP_X_APPENGINE_TASKRETRYCOUNT'))
2780        if retry_count > _MAX_CALLBACK_TASK_RETRIES:
2781          logging.error('Giving up on task after %d retries',
2782                        _MAX_CALLBACK_TASK_RETRIES)
2783          return
2784
2785      # NOTE: The undescriptive error code 400 are present to address security
2786      # risks of giving external users access to cause PipelineRecord lookups
2787      # and execution.
2788      self.response.set_status(400)
2789
2790  def run_callback(self):
2791    """Runs the callback for the pipeline specified in the request.
2792
2793    Raises:
2794      _CallbackTaskError if something was wrong with the request parameters.
2795    """
2796    pipeline_id = self.request.get('pipeline_id')
2797    if not pipeline_id:
2798      raise _CallbackTaskError('"pipeline_id" parameter missing.')
2799
2800    pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id)
2801    pipeline_record = db.get(pipeline_key)
2802    if pipeline_record is None:
2803      raise _CallbackTaskError(
2804          'Pipeline ID "%s" for callback does not exist.' % pipeline_id)
2805
2806    params = pipeline_record.params
2807    real_class_path = params['class_path']
2808    try:
2809      pipeline_func_class = mr_util.for_name(real_class_path)
2810    except ImportError, e:
2811      raise _CallbackTaskError(
2812          'Cannot load class named "%s" for pipeline ID "%s".'
2813          % (real_class_path, pipeline_id))
2814
2815    if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ:
2816      if pipeline_func_class.public_callbacks:
2817        pass
2818      elif pipeline_func_class.admin_callbacks:
2819        if not users.is_current_user_admin():
2820          raise _CallbackTaskError(
2821              'Unauthorized callback for admin-only pipeline ID "%s"'
2822              % pipeline_id)
2823      else:
2824        raise _CallbackTaskError(
2825            'External callback for internal-only pipeline ID "%s"'
2826            % pipeline_id)
2827
2828    kwargs = {}
2829    for key in self.request.arguments():
2830      if key != 'pipeline_id':
2831        kwargs[str(key)] = self.request.get(key)
2832
2833    def perform_callback():
2834      stage = pipeline_func_class.from_id(pipeline_id)
2835      if stage is None:
2836        raise _CallbackTaskError(
2837            'Pipeline ID "%s" deleted during callback' % pipeline_id)
2838      return stage._callback_internal(kwargs)
2839
2840    # callback_xg_transaction is a 3-valued setting (None=no trans,
2841    # False=1-eg-trans, True=xg-trans)
2842    if pipeline_func_class._callback_xg_transaction is not None:
2843      transaction_options = db.create_transaction_options(
2844          xg=pipeline_func_class._callback_xg_transaction)
2845      callback_result = db.run_in_transaction_options(transaction_options,
2846                                                      perform_callback)
2847    else:
2848      callback_result = perform_callback()
2849
2850    if callback_result is not None:
2851      status_code, content_type, content = callback_result
2852      self.response.set_status(status_code)
2853      self.response.headers['Content-Type'] = content_type
2854      self.response.out.write(content)
2855
2856
2857################################################################################
2858
2859def _get_timestamp_ms(when):
2860  """Converts a datetime.datetime to integer milliseconds since the epoch.
2861
2862  Requires special handling to preserve microseconds.
2863
2864  Args:
2865    when: A datetime.datetime instance.
2866
2867  Returns:
2868    Integer time since the epoch in milliseconds. If the supplied 'when' is
2869    None, the return value will be None.
2870  """
2871  if when is None:
2872    return None
2873  ms_since_epoch = float(time.mktime(when.utctimetuple()) * 1000.0)
2874  ms_since_epoch += when.microsecond / 1000.0
2875  return int(ms_since_epoch)
2876
2877
2878def _get_internal_status(pipeline_key=None,
2879                         pipeline_dict=None,
2880                         slot_dict=None,
2881                         barrier_dict=None,
2882                         status_dict=None):
2883  """Gets the UI dictionary of a pipeline from a set of status dictionaries.
2884
2885  Args:
2886    pipeline_key: The key of the pipeline to lookup.
2887    pipeline_dict: Dictionary mapping pipeline db.Key to _PipelineRecord.
2888      Default is an empty dictionary.
2889    slot_dict: Dictionary mapping slot db.Key to _SlotRecord.
2890      Default is an empty dictionary.
2891    barrier_dict: Dictionary mapping barrier db.Key to _BarrierRecord.
2892      Default is an empty dictionary.
2893    status_dict: Dictionary mapping status record db.Key to _StatusRecord.
2894      Default is an empty dictionary.
2895
2896  Returns:
2897    Dictionary with the keys:
2898      classPath: The pipeline function being run.
2899      args: List of positional argument slot dictionaries.
2900      kwargs: Dictionary of keyword argument slot dictionaries.
2901      outputs: Dictionary of output slot dictionaries.
2902      children: List of child pipeline IDs.
2903      queueName: Queue on which this pipeline is running.
2904      afterSlotKeys: List of Slot Ids after which this pipeline runs.
2905      currentAttempt: Number of the current attempt, starting at 1.
2906      maxAttempts: Maximum number of attempts before aborting.
2907      backoffSeconds: Constant factor for backoff before retrying.
2908      backoffFactor: Exponential factor for backoff before retrying.
2909      status: Current status of the pipeline.
2910      startTimeMs: When this pipeline ran or will run due to retries, if present.
2911      endTimeMs: When this pipeline finalized, if present.
2912      lastRetryMessage: Why the pipeline failed during the last retry, if there
2913        was a failure; may be empty.
2914      abortMessage: For root pipelines, why the pipeline was aborted if it was
2915        aborted; may be empty.
2916
2917    Dictionary will contain these keys if explicit status is set:
2918      statusTimeMs: When the status was set as milliseconds since the epoch.
2919      statusMessage: Status message, if present.
2920      statusConsoleUrl: The relative URL for the console of this pipeline.
2921      statusLinks: Dictionary mapping human-readable names to relative URLs
2922        for related URLs to this pipeline.
2923
2924  Raises:
2925    PipelineStatusError if any input is bad.
2926  """
2927  if pipeline_dict is None:
2928    pipeline_dict = {}
2929  if slot_dict is None:
2930    slot_dict = {}
2931  if barrier_dict is None:
2932    barrier_dict = {}
2933  if status_dict is None:
2934    status_dict = {}
2935
2936  pipeline_record = pipeline_dict.get(pipeline_key)
2937  if pipeline_record is None:
2938    raise PipelineStatusError(
2939        'Could not find pipeline ID "%s"' % pipeline_key.name())
2940
2941  params = pipeline_record.params
2942  root_pipeline_key = \
2943      _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record)
2944  default_slot_key = db.Key(params['output_slots']['default'])
2945  start_barrier_key = db.Key.from_path(
2946      _BarrierRecord.kind(), _BarrierRecord.START, parent=pipeline_key)
2947  finalize_barrier_key = db.Key.from_path(
2948      _BarrierRecord.kind(), _BarrierRecord.FINALIZE, parent=pipeline_key)
2949  status_record_key = db.Key.from_path(
2950      _StatusRecord.kind(), pipeline_key.name())
2951
2952  start_barrier = barrier_dict.get(start_barrier_key)
2953  finalize_barrier = barrier_dict.get(finalize_barrier_key)
2954  default_slot = slot_dict.get(default_slot_key)
2955  status_record = status_dict.get(status_record_key)
2956  if finalize_barrier is None:
2957    raise PipelineStatusError(
2958        'Finalization barrier missing for pipeline ID "%s"' %
2959        pipeline_key.name())
2960  if default_slot is None:
2961    raise PipelineStatusError(
2962        'Default output slot with key=%s missing for pipeline ID "%s"' % (
2963        default_slot_key, pipeline_key.name()))
2964
2965  output = {
2966    'classPath': pipeline_record.class_path,
2967    'args': list(params['args']),
2968    'kwargs': params['kwargs'].copy(),
2969    'outputs': params['output_slots'].copy(),
2970    'children': [key.name() for key in pipeline_record.fanned_out],
2971    'queueName': params['queue_name'],
2972    'afterSlotKeys': [str(key) for key in params['after_all']],
2973    'currentAttempt': pipeline_record.current_attempt + 1,
2974    'maxAttempts': pipeline_record.max_attempts,
2975    'backoffSeconds': pipeline_record.params['backoff_seconds'],
2976    'backoffFactor': pipeline_record.params['backoff_factor'],
2977  }
2978
2979  # TODO(user): Truncate args, kwargs, and outputs to < 1MB each so we
2980  # can reasonably return the whole tree of pipelines and their outputs.
2981  # Coerce each value to a string to truncate if necessary. For now if the
2982  # params are too big it will just cause the whole status page to break.
2983
2984  # Fix the key names in parameters to match JavaScript style.
2985  for value_dict in itertools.chain(
2986      output['args'], output['kwargs'].itervalues()):
2987    if 'slot_key' in value_dict:
2988      value_dict['slotKey'] = value_dict.pop('slot_key')
2989
2990  # Figure out the pipeline's status.
2991  if pipeline_record.status in (_PipelineRecord.WAITING, _PipelineRecord.RUN):
2992    if default_slot.status == _SlotRecord.FILLED:
2993      status = 'finalizing'
2994    elif (pipeline_record.status == _PipelineRecord.WAITING and
2995          pipeline_record.next_retry_time is not None):
2996      status = 'retry'
2997    elif start_barrier and start_barrier.status == _BarrierRecord.WAITING:
2998      # start_barrier will be missing for root pipelines
2999      status = 'waiting'
3000    else:
3001      status = 'run'
3002  elif pipeline_record.status == _PipelineRecord.DONE:
3003    status = 'done'
3004  elif pipeline_record.status == _PipelineRecord.ABORTED:
3005    status = 'aborted'
3006
3007  output['status'] = status
3008
3009  if status_record:
3010    output['statusTimeMs'] = _get_timestamp_ms(status_record.status_time)
3011    if status_record.message:
3012      output['statusMessage'] = status_record.message
3013    if status_record.console_url:
3014      output['statusConsoleUrl'] = status_record.console_url
3015    if status_record.link_names:
3016      output['statusLinks'] = dict(
3017          zip(status_record.link_names, status_record.link_urls))
3018
3019  # Populate status-depenedent fields.
3020  if status in ('run', 'finalizing', 'done', 'retry'):
3021    if pipeline_record.next_retry_time is not None:
3022      output['startTimeMs'] = _get_timestamp_ms(pipeline_record.next_retry_time)
3023    elif start_barrier:
3024      # start_barrier will be missing for root pipelines
3025      output['startTimeMs'] = _get_timestamp_ms(start_barrier.trigger_time)
3026    elif pipeline_record.start_time:
3027      # Assume this pipeline ran immediately upon spawning with no
3028      # start barrier or it's the root pipeline.
3029      output['startTimeMs'] = _get_timestamp_ms(pipeline_record.start_time)
3030
3031  if status in ('finalizing',):
3032    output['endTimeMs'] = _get_timestamp_ms(default_slot.fill_time)
3033
3034  if status in ('done',):
3035    output['endTimeMs'] = _get_timestamp_ms(pipeline_record.finalized_time)
3036
3037  if pipeline_record.next_retry_time is not None:
3038    output['lastRetryMessage'] = pipeline_record.retry_message
3039
3040  if pipeline_record.abort_message:
3041    output['abortMessage'] = pipeline_record.abort_message
3042
3043  return output
3044
3045
3046def _get_internal_slot(slot_key=None,
3047                       filler_pipeline_key=None,
3048                       slot_dict=None):
3049  """Gets information about a _SlotRecord for display in UI.
3050
3051  Args:
3052    slot_key: The db.Key of the slot to fetch.
3053    filler_pipeline_key: In the case the slot has not yet been filled, assume
3054      that the given db.Key (for a _PipelineRecord) will be the filler of
3055      the slot in the future.
3056    slot_dict: The slot JSON dictionary.
3057
3058  Returns:
3059    Dictionary with the keys:
3060      status: Slot status: 'filled' or 'waiting'
3061      fillTimeMs: Time in milliseconds since the epoch of when it was filled.
3062      value: The current value of the slot, which is a slot's JSON dictionary.
3063      fillerPipelineId: The pipeline ID of what stage has or should fill
3064        this slot.
3065
3066  Raises:
3067    PipelineStatusError if any input is bad.
3068  """
3069  if slot_dict is None:
3070    slot_dict = {}
3071
3072  slot_record = slot_dict.get(slot_key)
3073  if slot_record is None:
3074    raise PipelineStatusError(
3075        'Could not find data for output slot key "%s".' % slot_key)
3076
3077  output = {}
3078  if slot_record.status == _SlotRecord.FILLED:
3079    output['status'] = 'filled'
3080    output['fillTimeMs'] = _get_timestamp_ms(slot_record.fill_time)
3081    output['value'] = slot_record.value
3082    filler_pipeline_key = (
3083        _SlotRecord.filler.get_value_for_datastore(slot_record))
3084  else:
3085    output['status'] = 'waiting'
3086
3087  if filler_pipeline_key:
3088    output['fillerPipelineId'] = filler_pipeline_key.name()
3089
3090  return output
3091
3092
3093def get_status_tree(root_pipeline_id):
3094  """Gets the full status tree of a pipeline.
3095
3096  Args:
3097    root_pipeline_id: The pipeline ID to get status for.
3098
3099  Returns:
3100    Dictionary with the keys:
3101      rootPipelineId: The ID of the root pipeline.
3102      slots: Mapping of slot IDs to result of from _get_internal_slot.
3103      pipelines: Mapping of pipeline IDs to result of _get_internal_status.
3104
3105  Raises:
3106    PipelineStatusError if any input is bad.
3107  """
3108  root_pipeline_key = db.Key.from_path(_PipelineRecord.kind(), root_pipeline_id)
3109  root_pipeline_record = db.get(root_pipeline_key)
3110  if root_pipeline_record is None:
3111    raise PipelineStatusError(
3112        'Could not find pipeline ID "%s"' % root_pipeline_id)
3113
3114  # If the supplied root_pipeline_id is not actually the root pipeline that's
3115  # okay. We'll find the real root and override the value they passed in.
3116  actual_root_key = _PipelineRecord.root_pipeline.get_value_for_datastore(
3117      root_pipeline_record)
3118  if actual_root_key != root_pipeline_key:
3119    root_pipeline_key = actual_root_key
3120    root_pipeline_id = root_pipeline_key.id_or_name()
3121    root_pipeline_record = db.get(root_pipeline_key)
3122    if not root_pipeline_record:
3123      raise PipelineStatusError(
3124          'Could not find pipeline ID "%s"' % root_pipeline_id)
3125
3126  # Run all queries asynchronously.
3127  queries = {}
3128  for model in (_PipelineRecord, _SlotRecord, _BarrierRecord, _StatusRecord):
3129    queries[model] = model.all().filter(
3130        'root_pipeline =', root_pipeline_key).run(batch_size=1000)
3131
3132  found_pipeline_dict = dict(
3133      (stage.key(), stage) for stage in queries[_PipelineRecord])
3134  found_slot_dict = dict(
3135      (slot.key(), slot) for slot in queries[_SlotRecord])
3136  found_barrier_dict = dict(
3137      (barrier.key(), barrier) for barrier in queries[_BarrierRecord])
3138  found_status_dict = dict(
3139      (status.key(), status) for status in queries[_StatusRecord])
3140
3141  # Breadth-first traversal of _PipelineRecord instances by following
3142  # _PipelineRecord.fanned_out property values.
3143  valid_pipeline_keys = set([root_pipeline_key])
3144  slot_filler_dict = {}  # slot_key to pipeline_key
3145  expand_stack = [root_pipeline_record]
3146  while expand_stack:
3147    old_stack = expand_stack
3148    expand_stack = []
3149    for pipeline_record in old_stack:
3150      for child_pipeline_key in pipeline_record.fanned_out:
3151        # This will let us prune off those pipelines which were allocated in
3152        # the Datastore but were never run due to mid-flight task failures.
3153        child_pipeline_record = found_pipeline_dict.get(child_pipeline_key)
3154        if child_pipeline_record is None:
3155          raise PipelineStatusError(
3156              'Pipeline ID "%s" points to child ID "%s" which does not exist.'
3157              % (pipeline_record.key().name(), child_pipeline_key.name()))
3158        expand_stack.append(child_pipeline_record)
3159        valid_pipeline_keys.add(child_pipeline_key)
3160
3161        # Figure out the deepest pipeline that's responsible for outputting to
3162        # a particular _SlotRecord, so we can report which pipeline *should*
3163        # be the filler.
3164        child_outputs = child_pipeline_record.params['output_slots']
3165        for output_slot_key in child_outputs.itervalues():
3166          slot_filler_dict[db.Key(output_slot_key)] = child_pipeline_key
3167
3168  output = {
3169    'rootPipelineId': root_pipeline_id,
3170    'slots': {},
3171    'pipelines': {},
3172  }
3173
3174  for pipeline_key in found_pipeline_dict.keys():
3175    if pipeline_key not in valid_pipeline_keys:
3176      continue
3177    output['pipelines'][pipeline_key.name()] = _get_internal_status(
3178        pipeline_key=pipeline_key,
3179        pipeline_dict=found_pipeline_dict,
3180        slot_dict=found_slot_dict,
3181        barrier_dict=found_barrier_dict,
3182        status_dict=found_status_dict)
3183
3184  for slot_key, filler_pipeline_key in slot_filler_dict.iteritems():
3185    output['slots'][str(slot_key)] = _get_internal_slot(
3186        slot_key=slot_key,
3187        filler_pipeline_key=filler_pipeline_key,
3188        slot_dict=found_slot_dict)
3189
3190  return output
3191
3192
3193def get_pipeline_names():
3194  """Returns the class paths of all Pipelines defined in alphabetical order."""
3195  class_path_set = set()
3196  for cls in _PipelineMeta._all_classes:
3197      if cls.class_path is not None:
3198        class_path_set.add(cls.class_path)
3199  return sorted(class_path_set)
3200
3201
3202def get_root_list(class_path=None, cursor=None, count=50):
3203  """Gets a list root Pipelines.
3204
3205  Args:
3206    class_path: Optional. If supplied, only return root Pipelines with the
3207      given class_path. By default all root pipelines are returned.
3208    cursor: Optional. When supplied, the cursor returned from the last call to
3209      get_root_list which indicates where to pick up.
3210    count: How many pipeline returns to return.
3211
3212  Returns:
3213    Dictionary with the keys:
3214      pipelines: The list of Pipeline records in the same format as
3215        returned by get_status_tree, but with only the roots listed.
3216      cursor: Cursor to pass back to this function to resume the query. Will
3217        only be present if there is another page of results.
3218
3219  Raises:
3220    PipelineStatusError if any input is bad.
3221  """
3222  query = _PipelineRecord.all(cursor=cursor)
3223  if class_path:
3224    query.filter('class_path =', class_path)
3225  query.filter('is_root_pipeline =', True)
3226  query.order('-start_time')
3227
3228  root_list = query.fetch(count)
3229
3230  fetch_list = []
3231  for pipeline_record in root_list:
3232    fetch_list.append(db.Key(pipeline_record.params['output_slots']['default']))
3233    fetch_list.append(db.Key.from_path(
3234        _BarrierRecord.kind(), _BarrierRecord.FINALIZE,
3235        parent=pipeline_record.key()))
3236    fetch_list.append(db.Key.from_path(
3237        _StatusRecord.kind(), pipeline_record.key().name()))
3238
3239  pipeline_dict = dict((stage.key(), stage) for stage in root_list)
3240  slot_dict = {}
3241  barrier_dict = {}
3242  status_dict = {}
3243  for entity in db.get(fetch_list):
3244    if isinstance(entity, _BarrierRecord):
3245      barrier_dict[entity.key()] = entity
3246    elif isinstance(entity, _SlotRecord):
3247      slot_dict[entity.key()] = entity
3248    elif isinstance(entity, _StatusRecord):
3249      status_dict[entity.key()] = entity
3250
3251  results = []
3252  for pipeline_record in root_list:
3253    try:
3254      output = _get_internal_status(
3255          pipeline_record.key(),
3256          pipeline_dict=pipeline_dict,
3257          slot_dict=slot_dict,
3258          barrier_dict=barrier_dict,
3259          status_dict=status_dict)
3260      output['pipelineId'] = pipeline_record.key().name()
3261      results.append(output)
3262    except PipelineStatusError, e:
3263      output = {'status': e.message}
3264      output['classPath'] = ''
3265      output['pipelineId'] = pipeline_record.key().name()
3266      results.append(output)
3267
3268  result_dict = {}
3269  cursor = query.cursor()
3270  query.with_cursor(cursor)
3271  if query.get(keys_only=True):
3272    result_dict.update(cursor=cursor)
3273  result_dict.update(pipelines=results)
3274  return result_dict
3275
3276################################################################################
3277
3278def set_enforce_auth(new_status):
3279  """Sets whether Pipeline API handlers rely on app.yaml for access control.
3280
3281  Args:
3282    new_status: If True, then the Pipeline API will enforce its own
3283      access control on status and static file handlers. If False, then
3284      it will assume app.yaml is doing the enforcement.
3285  """
3286  global _ENFORCE_AUTH
3287  _ENFORCE_AUTH = new_status
3288
3289
3290def create_handlers_map(prefix='.*'):
3291  """Create new handlers map.
3292
3293  Args:
3294    prefix: url prefix to use.
3295
3296  Returns:
3297    list of (regexp, handler) pairs for WSGIApplication constructor.
3298  """
3299  return [
3300      (prefix + '/output', _BarrierHandler),
3301      (prefix + '/run', _PipelineHandler),
3302      (prefix + '/finalized', _PipelineHandler),
3303      (prefix + '/cleanup', _CleanupHandler),
3304      (prefix + '/abort', _PipelineHandler),
3305      (prefix + '/fanout', _FanoutHandler),
3306      (prefix + '/fanout_abort', _FanoutAbortHandler),
3307      (prefix + '/callback', _CallbackHandler),
3308      (prefix + '/rpc/tree', status_ui._TreeStatusHandler),
3309      (prefix + '/rpc/class_paths', status_ui._ClassPathListHandler),
3310      (prefix + '/rpc/list', status_ui._RootListHandler),
3311      (prefix + '(/.+)', status_ui._StatusUiHandler),
3312      ]
3313