1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Google App Engine Pipeline API for complex, asynchronous workflows.""" 18 19__all__ = [ 20 # Public API. 21 'Error', 'PipelineSetupError', 'PipelineExistsError', 22 'PipelineRuntimeError', 'SlotNotFilledError', 'SlotNotDeclaredError', 23 'UnexpectedPipelineError', 'PipelineStatusError', 'Slot', 'Pipeline', 24 'PipelineFuture', 'After', 'InOrder', 'Retry', 'Abort', 'get_status_tree', 25 'get_pipeline_names', 'get_root_list', 'create_handlers_map', 26 'set_enforce_auth', 27] 28 29import datetime 30import hashlib 31import itertools 32import logging 33import os 34import posixpath 35import pprint 36import re 37import sys 38import threading 39import time 40import urllib 41import uuid 42 43from google.appengine.api import mail 44from google.appengine.api import app_identity 45from google.appengine.api import users 46from google.appengine.api import taskqueue 47from google.appengine.ext import blobstore 48from google.appengine.ext import db 49from google.appengine.ext import webapp 50 51# pylint: disable=g-import-not-at-top 52# TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime. 53try: 54 # Check if the full cloudstorage package exists. The stub part is in runtime. 55 import cloudstorage 56 if hasattr(cloudstorage, "_STUB"): 57 cloudstorage = None 58except ImportError: 59 pass # CloudStorage library not available 60 61try: 62 import json 63except ImportError: 64 import simplejson as json 65 66# Relative imports 67import models 68import status_ui 69import util as mr_util 70 71# pylint: disable=g-bad-name 72# pylint: disable=protected-access 73 74# For convenience 75_BarrierIndex = models._BarrierIndex 76_BarrierRecord = models._BarrierRecord 77_PipelineRecord = models._PipelineRecord 78_SlotRecord = models._SlotRecord 79_StatusRecord = models._StatusRecord 80 81 82# Overall TODOs: 83# - Add a human readable name for start() 84 85# Potential TODOs: 86# - Add support for ANY N barriers. 87# - Allow Pipelines to declare they are "short" and optimize the evaluate() 88# function to run as many of them in quick succession. 89# - Add support in all Pipelines for hold/release where up-stream 90# barriers will fire but do nothing because the Pipeline is not ready. 91 92################################################################################ 93 94 95class Error(Exception): 96 """Base class for exceptions in this module.""" 97 98 99class PipelineSetupError(Error): 100 """Base class for exceptions that happen before Pipeline execution.""" 101 102 103class PipelineExistsError(PipelineSetupError): 104 """A new Pipeline with an assigned idempotence_key cannot be overwritten.""" 105 106 107class PipelineRuntimeError(Error): 108 """Base class for exceptions that happen during Pipeline execution.""" 109 110 111class SlotNotFilledError(PipelineRuntimeError): 112 """A slot that should have been filled already was not yet filled.""" 113 114 115class SlotNotDeclaredError(PipelineRuntimeError): 116 """A slot that was filled or passed along was not previously declared.""" 117 118 119class UnexpectedPipelineError(PipelineRuntimeError): 120 """An assertion failed, potentially leaving the pipeline unable to proceed.""" 121 122 123class PipelineUserError(Error): 124 """Exceptions raised indirectly by developers to cause certain behaviors.""" 125 126 127class Retry(PipelineUserError): 128 """The currently running pipeline should be retried at a later time.""" 129 130 131class Abort(PipelineUserError): 132 """The currently running pipeline should be aborted up to the root.""" 133 134 135class PipelineStatusError(Error): 136 """Exceptions raised when trying to collect pipeline status.""" 137 138 139class _CallbackTaskError(Error): 140 """A callback task was unable to execute properly for some reason.""" 141 142 143################################################################################ 144 145_MAX_BARRIERS_TO_NOTIFY = 10 146 147_MAX_ABORTS_TO_BEGIN = 10 148 149_TEST_MODE = False 150 151_TEST_ROOT_PIPELINE_KEY = None 152 153_DEFAULT_BACKOFF_SECONDS = 15 154 155_DEFAULT_BACKOFF_FACTOR = 2 156 157_DEFAULT_MAX_ATTEMPTS = 3 158 159_RETRY_WIGGLE_TIMEDELTA = datetime.timedelta(seconds=20) 160 161_DEBUG = False 162 163_MAX_JSON_SIZE = 900000 164 165_ENFORCE_AUTH = True 166 167_MAX_CALLBACK_TASK_RETRIES = 5 168 169################################################################################ 170 171 172class Slot(object): 173 """An output that is filled by a Pipeline as it executes.""" 174 175 def __init__(self, name=None, slot_key=None, strict=False): 176 """Initializer. 177 178 Args: 179 name: The name of this slot. 180 slot_key: The db.Key for this slot's _SlotRecord if it's already been 181 allocated by an up-stream pipeline. 182 strict: If this Slot was created as an output of a strictly defined 183 pipeline. 184 """ 185 if name is None: 186 raise UnexpectedPipelineError('Slot with key "%s" missing a name.' % 187 slot_key) 188 if slot_key is None: 189 slot_key = db.Key.from_path(_SlotRecord.kind(), uuid.uuid4().hex) 190 self._exists = _TEST_MODE 191 else: 192 self._exists = True 193 self._touched = False 194 self._strict = strict 195 self.name = name 196 self.key = slot_key 197 self.filled = False 198 self._filler_pipeline_key = None 199 self._fill_datetime = None 200 self._value = None 201 202 @property 203 def value(self): 204 """Returns the current value of this slot. 205 206 Returns: 207 The value of the slot (a serializable Python type). 208 209 Raises: 210 SlotNotFilledError if the value hasn't been filled yet. 211 """ 212 if not self.filled: 213 raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.' 214 % (self.name, self.key)) 215 return self._value 216 217 @property 218 def filler(self): 219 """Returns the pipeline ID that filled this slot's value. 220 221 Returns: 222 A string that is the pipeline ID. 223 224 Raises: 225 SlotNotFilledError if the value hasn't been filled yet. 226 """ 227 if not self.filled: 228 raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.' 229 % (self.name, self.key)) 230 return self._filler_pipeline_key.name() 231 232 @property 233 def fill_datetime(self): 234 """Returns when the slot was filled. 235 236 Returns: 237 A datetime.datetime. 238 239 Raises: 240 SlotNotFilledError if the value hasn't been filled yet. 241 """ 242 if not self.filled: 243 raise SlotNotFilledError('Slot with name "%s", key "%s" not yet filled.' 244 % (self.name, self.key)) 245 return self._fill_datetime 246 247 def _set_value(self, slot_record): 248 """Sets the value of this slot based on its corresponding _SlotRecord. 249 250 Does nothing if the slot has not yet been filled. 251 252 Args: 253 slot_record: The _SlotRecord containing this Slot's value. 254 """ 255 if slot_record.status == _SlotRecord.FILLED: 256 self.filled = True 257 self._filler_pipeline_key = _SlotRecord.filler.get_value_for_datastore( 258 slot_record) 259 self._fill_datetime = slot_record.fill_time 260 self._value = slot_record.value 261 262 def _set_value_test(self, filler_pipeline_key, value): 263 """Sets the value of this slot for use in testing. 264 265 Args: 266 filler_pipeline_key: The db.Key of the _PipelineRecord that filled 267 this slot. 268 value: The serializable value set for this slot. 269 """ 270 self.filled = True 271 self._filler_pipeline_key = filler_pipeline_key 272 self._fill_datetime = datetime.datetime.utcnow() 273 # Convert to JSON and back again, to simulate the behavior of production. 274 self._value = json.loads(json.dumps( 275 value, cls=mr_util.JsonEncoder), cls=mr_util.JsonDecoder) 276 277 def __repr__(self): 278 """Returns a string representation of this slot.""" 279 if self.filled: 280 return repr(self._value) 281 else: 282 return 'Slot(name="%s", slot_key="%s")' % (self.name, self.key) 283 284 285class PipelineFuture(object): 286 """A future for accessing the outputs of a Pipeline.""" 287 288 # NOTE: Do not, ever, add a names() method to this class. Callers cannot do 289 # introspection on their context of being called. Even though the runtime 290 # environment of the Pipeline can allow for that to happen, such behavior 291 # would prevent synchronous simulation and verification, whic is an 292 # unacceptable tradeoff. 293 294 def __init__(self, output_names, force_strict=False): 295 """Initializer. 296 297 Args: 298 output_names: The list of require output names that will be strictly 299 enforced by this class. 300 force_strict: If True, force this future to be in strict mode. 301 """ 302 self._after_all_pipelines = set() 303 self._output_dict = { 304 'default': Slot(name='default'), 305 } 306 307 self._strict = len(output_names) > 0 or force_strict 308 if self._strict: 309 for name in output_names: 310 if name in self._output_dict: 311 raise UnexpectedPipelineError('Output name reserved: "%s"' % name) 312 self._output_dict[name] = Slot(name=name, strict=True) 313 314 def _inherit_outputs(self, 315 pipeline_name, 316 already_defined, 317 resolve_outputs=False): 318 """Inherits outputs from a calling Pipeline. 319 320 Args: 321 pipeline_name: The Pipeline class name (used for debugging). 322 already_defined: Maps output name to stringified db.Key (of _SlotRecords) 323 of any exiting output slots to be inherited by this future. 324 resolve_outputs: When True, this method will dereference all output slots 325 before returning back to the caller, making those output slots' values 326 available. 327 328 Raises: 329 UnexpectedPipelineError when resolve_outputs is True and any of the output 330 slots could not be retrived from the Datastore. 331 """ 332 for name, slot_key in already_defined.iteritems(): 333 if not isinstance(slot_key, db.Key): 334 slot_key = db.Key(slot_key) 335 336 slot = self._output_dict.get(name) 337 if slot is None: 338 if self._strict: 339 raise UnexpectedPipelineError( 340 'Inherited output named "%s" must be filled but ' 341 'not declared for pipeline class "%s"' % (name, pipeline_name)) 342 else: 343 self._output_dict[name] = Slot(name=name, slot_key=slot_key) 344 else: 345 slot.key = slot_key 346 slot._exists = True 347 348 if resolve_outputs: 349 slot_key_dict = dict((s.key, s) for s in self._output_dict.itervalues()) 350 all_slots = db.get(slot_key_dict.keys()) 351 for slot, slot_record in zip(slot_key_dict.itervalues(), all_slots): 352 if slot_record is None: 353 raise UnexpectedPipelineError( 354 'Inherited output named "%s" for pipeline class "%s" is ' 355 'missing its Slot in the datastore: "%s"' % 356 (slot.name, pipeline_name, slot.key)) 357 slot = slot_key_dict[slot_record.key()] 358 slot._set_value(slot_record) 359 360 def __getattr__(self, name): 361 """Provides an output Slot instance with the given name if allowed.""" 362 if name not in self._output_dict: 363 if self._strict: 364 raise SlotNotDeclaredError('Undeclared output with name "%s"' % name) 365 self._output_dict[name] = Slot(name=name) 366 slot = self._output_dict[name] 367 return slot 368 369 370class _PipelineMeta(type): 371 """Meta-class for recording all Pipelines that have been defined.""" 372 373 # List of all Pipeline classes that have been seen. 374 _all_classes = [] 375 376 def __new__(meta, name, bases, cls_dict): 377 """Initializes the class path of a Pipeline and saves it.""" 378 cls = type.__new__(meta, name, bases, cls_dict) 379 meta._all_classes.append(cls) 380 return cls 381 382 383class ClassProperty(object): 384 """Descriptor that lets us have read-only class properties.""" 385 386 def __init__(self, method): 387 self.method = method 388 389 def __get__(self, cls, obj): 390 return self.method(obj) 391 392 393class Pipeline(object): 394 """A Pipeline function-object that performs operations and has a life cycle. 395 396 Class properties (to be overridden by sub-classes): 397 async: When True, this Pipeline will execute asynchronously and fill the 398 default output slot itself using the complete() method. 399 output_names: List of named outputs (in addition to the default slot) that 400 this Pipeline must output to (no more, no less). 401 public_callbacks: If the callback URLs generated for this class should be 402 accessible by all external requests regardless of login or task queue. 403 admin_callbacks: If the callback URLs generated for this class should be 404 accessible by the task queue ane externally by users logged in as admins. 405 class_path: String identifier for this Pipeline, which is derived from 406 its path in the global system modules dictionary. 407 408 Modifiable instance properties: 409 backoff_seconds: How many seconds to use as the constant factor in 410 exponential backoff; may be changed by the user. 411 backoff_factor: Base factor to use for exponential backoff. The formula 412 followed is (backoff_seconds * backoff_factor^current_attempt). 413 max_attempts: Maximum number of retry attempts to make before failing 414 completely and aborting the entire pipeline up to the root. 415 target: The application version to use for processing this Pipeline. This 416 can be set to the name of a backend to direct Pipelines to run there. 417 418 Instance properties: 419 pipeline_id: The ID of this pipeline. 420 root_pipeline_id: The ID of the root of this pipeline. 421 queue_name: The queue this pipeline runs on or None if unknown. 422 current_attempt: The current attempt being tried for this pipeline. 423 """ 424 425 __metaclass__ = _PipelineMeta 426 427 # To be set by sub-classes 428 async = False 429 output_names = [] 430 public_callbacks = False 431 admin_callbacks = False 432 433 # Internal only. 434 _class_path = None # Set for each class 435 _send_mail = mail.send_mail_to_admins # For testing 436 437 # callback_xg_transaction: Determines whether callbacks are processed within 438 # a single entity-group transaction (False), a cross-entity-group 439 # transaction (True), or no transaction (None, default). It is generally 440 # unsafe for a callback to modify pipeline state outside of a transaction, in 441 # particular any pre-initialized state from the pipeline record, such as the 442 # outputs. If a transaction is used, the callback method must operate within 443 # the datastore's transaction time limits. 444 # TODO(user): Make non-internal once other API calls are considered for 445 # transaction support. 446 _callback_xg_transaction = None 447 448 def __init__(self, *args, **kwargs): 449 """Initializer. 450 451 Args: 452 *args: The positional arguments for this function-object. 453 **kwargs: The keyword arguments for this function-object. 454 """ 455 self.args = args 456 self.kwargs = kwargs 457 self.outputs = None 458 self.backoff_seconds = _DEFAULT_BACKOFF_SECONDS 459 self.backoff_factor = _DEFAULT_BACKOFF_FACTOR 460 self.max_attempts = _DEFAULT_MAX_ATTEMPTS 461 self.target = None 462 self.task_retry = False 463 self._current_attempt = 0 464 self._root_pipeline_key = None 465 self._pipeline_key = None 466 self._context = None 467 self._result_status = None 468 self._set_class_path() 469 # Introspectively set the target so pipelines stick to the version it 470 # started. 471 self.target = mr_util._get_task_target() 472 473 if _TEST_MODE: 474 self._context = _PipelineContext('', 'default', '') 475 self._root_pipeline_key = _TEST_ROOT_PIPELINE_KEY 476 self._pipeline_key = db.Key.from_path( 477 _PipelineRecord.kind(), uuid.uuid4().hex) 478 self.outputs = PipelineFuture(self.output_names) 479 self._context.evaluate_test(self) 480 481 @property 482 def pipeline_id(self): 483 """Returns the ID of this Pipeline as a string or None if unknown.""" 484 if self._pipeline_key is None: 485 return None 486 return self._pipeline_key.name() 487 488 @property 489 def root_pipeline_id(self): 490 """Returns root pipeline ID as a websafe string or None if unknown.""" 491 if self._root_pipeline_key is None: 492 return None 493 return self._root_pipeline_key.name() 494 495 @property 496 def is_root(self): 497 """Returns True if this pipeline is a root pipeline, False otherwise.""" 498 return self._root_pipeline_key == self._pipeline_key 499 500 @property 501 def queue_name(self): 502 """Returns the queue name this Pipeline runs on or None if unknown.""" 503 if self._context: 504 return self._context.queue_name 505 return None 506 507 @property 508 def base_path(self): 509 """Returns the base path for Pipeline URL handlers or None if unknown.""" 510 if self._context: 511 return self._context.base_path 512 return None 513 514 @property 515 def has_finalized(self): 516 """Returns True if this pipeline has completed and finalized.""" 517 return self._result_status == _PipelineRecord.DONE 518 519 @property 520 def was_aborted(self): 521 """Returns True if this pipeline was aborted.""" 522 return self._result_status == _PipelineRecord.ABORTED 523 524 @property 525 def current_attempt(self): 526 """Returns the current attempt at running this pipeline, starting at 1.""" 527 return self._current_attempt + 1 528 529 @property 530 def test_mode(self): 531 """Returns True if the pipeline is running in test mode.""" 532 return _TEST_MODE 533 534 @ClassProperty 535 def class_path(cls): 536 """Returns the unique string identifier for this Pipeline class. 537 538 Refers to how to find the Pipeline in the global modules dictionary. 539 """ 540 cls._set_class_path() 541 return cls._class_path 542 543 @classmethod 544 def from_id(cls, pipeline_id, resolve_outputs=True, _pipeline_record=None): 545 """Returns an instance corresponding to an existing Pipeline. 546 547 The returned object will have the same properties a Pipeline does while 548 it's running synchronously (e.g., like what it's first allocated), allowing 549 callers to inspect caller arguments, outputs, fill slots, complete the 550 pipeline, abort, retry, etc. 551 552 Args: 553 pipeline_id: The ID of this pipeline (a string). 554 resolve_outputs: When True, dereference the outputs of this Pipeline 555 so their values can be accessed by the caller. 556 _pipeline_record: Internal-only. The _PipelineRecord instance to use 557 to instantiate this instance instead of fetching it from 558 the datastore. 559 560 Returns: 561 Pipeline sub-class instances or None if it could not be found. 562 """ 563 pipeline_record = _pipeline_record 564 565 # Support pipeline IDs and idempotence_keys that are not unicode. 566 if not isinstance(pipeline_id, unicode): 567 try: 568 pipeline_id = pipeline_id.encode('utf-8') 569 except UnicodeDecodeError: 570 pipeline_id = hashlib.sha1(pipeline_id).hexdigest() 571 572 pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id) 573 574 if pipeline_record is None: 575 pipeline_record = db.get(pipeline_key) 576 if pipeline_record is None: 577 return None 578 579 try: 580 pipeline_func_class = mr_util.for_name(pipeline_record.class_path) 581 except ImportError, e: 582 logging.warning('Tried to find Pipeline %s#%s, but class could ' 583 'not be found. Using default Pipeline class instead.', 584 pipeline_record.class_path, pipeline_id) 585 pipeline_func_class = cls 586 587 params = pipeline_record.params 588 arg_list, kwarg_dict = _dereference_args( 589 pipeline_record.class_path, params['args'], params['kwargs']) 590 outputs = PipelineFuture(pipeline_func_class.output_names) 591 outputs._inherit_outputs( 592 pipeline_record.class_path, 593 params['output_slots'], 594 resolve_outputs=resolve_outputs) 595 596 stage = pipeline_func_class(*arg_list, **kwarg_dict) 597 stage.backoff_seconds = params['backoff_seconds'] 598 stage.backoff_factor = params['backoff_factor'] 599 stage.max_attempts = params['max_attempts'] 600 stage.task_retry = params['task_retry'] 601 stage.target = params.get('target') # May not be defined for old Pipelines 602 stage._current_attempt = pipeline_record.current_attempt 603 stage._set_values_internal( 604 _PipelineContext('', params['queue_name'], params['base_path']), 605 pipeline_key, 606 _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record), 607 outputs, 608 pipeline_record.status) 609 return stage 610 611 # Methods that can be invoked on a Pipeline instance by anyone with a 612 # valid object (e.g., directly instantiated, retrieve via from_id). 613 def start(self, 614 idempotence_key='', 615 queue_name='default', 616 base_path='/_ah/pipeline', 617 return_task=False, 618 countdown=None, 619 eta=None): 620 """Starts a new instance of this pipeline. 621 622 Args: 623 idempotence_key: The ID to use for this Pipeline and throughout its 624 asynchronous workflow to ensure the operations are idempotent. If 625 empty a starting key will be automatically assigned. 626 queue_name: What queue this Pipeline's workflow should execute on. 627 base_path: The relative URL path to where the Pipeline API is 628 mounted for access by the taskqueue API or external requests. 629 return_task: When True, a task to start this pipeline will be returned 630 instead of submitted, allowing the caller to start off this pipeline 631 as part of a separate transaction (potentially leaving this newly 632 allocated pipeline's datastore entities in place if that separate 633 transaction fails for any reason). 634 countdown: Time in seconds into the future that this Task should execute. 635 Defaults to zero. 636 eta: A datetime.datetime specifying the absolute time at which the task 637 should be executed. Must not be specified if 'countdown' is specified. 638 This may be timezone-aware or timezone-naive. If None, defaults to now. 639 For pull tasks, no worker will be able to lease this task before the 640 time indicated by eta. 641 642 Returns: 643 A taskqueue.Task instance if return_task was True. This task will *not* 644 have a name, thus to ensure reliable execution of your pipeline you 645 should add() this task as part of a separate Datastore transaction. 646 647 Raises: 648 PipelineExistsError if the pipeline with the given idempotence key exists. 649 PipelineSetupError if the pipeline could not start for any other reason. 650 """ 651 if not idempotence_key: 652 idempotence_key = uuid.uuid4().hex 653 elif not isinstance(idempotence_key, unicode): 654 try: 655 idempotence_key.encode('utf-8') 656 except UnicodeDecodeError: 657 idempotence_key = hashlib.sha1(idempotence_key).hexdigest() 658 659 pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key) 660 context = _PipelineContext('', queue_name, base_path) 661 future = PipelineFuture(self.output_names, force_strict=True) 662 try: 663 self._set_values_internal( 664 context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING) 665 return context.start( 666 self, return_task=return_task, countdown=countdown, eta=eta) 667 except Error: 668 # Pass through exceptions that originate in this module. 669 raise 670 except Exception, e: 671 # Re-type any exceptions that were raised in dependent methods. 672 raise PipelineSetupError('Error starting %s#%s: %s' % ( 673 self, idempotence_key, str(e))) 674 675 def start_test(self, idempotence_key=None, base_path='', **kwargs): 676 """Starts this pipeline in test fashion. 677 678 Args: 679 idempotence_key: Dummy idempotence_key to use for this root pipeline. 680 base_path: Dummy base URL path to use for this root pipeline. 681 kwargs: Ignored keyword arguments usually passed to start(). 682 """ 683 if not idempotence_key: 684 idempotence_key = uuid.uuid4().hex 685 pipeline_key = db.Key.from_path(_PipelineRecord.kind(), idempotence_key) 686 context = _PipelineContext('', 'default', base_path) 687 future = PipelineFuture(self.output_names, force_strict=True) 688 self._set_values_internal( 689 context, pipeline_key, pipeline_key, future, _PipelineRecord.WAITING) 690 context.start_test(self) 691 692 # Pipeline control methods. 693 def retry(self, retry_message=''): 694 """Forces a currently running asynchronous pipeline to retry. 695 696 Note this may not be called by synchronous or generator pipelines. Those 697 must instead raise the 'Retry' exception during execution. 698 699 Args: 700 retry_message: Optional message explaining why the retry happened. 701 702 Returns: 703 True if the Pipeline should be retried, False if it cannot be cancelled 704 mid-flight for some reason. 705 """ 706 if not self.async: 707 raise UnexpectedPipelineError( 708 'May only call retry() method for asynchronous pipelines.') 709 if self.try_cancel(): 710 self._context.transition_retry(self._pipeline_key, retry_message) 711 return True 712 else: 713 return False 714 715 def abort(self, abort_message=''): 716 """Mark the entire pipeline up to the root as aborted. 717 718 Note this should only be called from *outside* the context of a running 719 pipeline. Synchronous and generator pipelines should raise the 'Abort' 720 exception to cause this behavior during execution. 721 722 Args: 723 abort_message: Optional message explaining why the abort happened. 724 725 Returns: 726 True if the abort signal was sent successfully; False if the pipeline 727 could not be aborted for any reason. 728 """ 729 # TODO: Use thread-local variable to enforce that this is not called 730 # while a pipeline is executing in the current thread. 731 if (self.async and self._root_pipeline_key == self._pipeline_key and 732 not self.try_cancel()): 733 # Handle the special case where the root pipeline is async and thus 734 # cannot be aborted outright. 735 return False 736 else: 737 return self._context.begin_abort( 738 self._root_pipeline_key, abort_message=abort_message) 739 740 # Methods used by the Pipeline as it runs. 741 def fill(self, name_or_slot, value): 742 """Fills an output slot required by this Pipeline. 743 744 Args: 745 name_or_slot: The name of the slot (a string) or Slot record to fill. 746 value: The serializable value to assign to this slot. 747 748 Raises: 749 UnexpectedPipelineError if the Slot no longer exists. SlotNotDeclaredError 750 if trying to output to a slot that was not declared ahead of time. 751 """ 752 if isinstance(name_or_slot, basestring): 753 slot = getattr(self.outputs, name_or_slot) 754 elif isinstance(name_or_slot, Slot): 755 slot = name_or_slot 756 else: 757 raise UnexpectedPipelineError( 758 'Could not fill invalid output name: %r' % name_or_slot) 759 760 if not slot._exists: 761 raise SlotNotDeclaredError( 762 'Cannot fill output with name "%s" that was just ' 763 'declared within the Pipeline context.' % slot.name) 764 765 self._context.fill_slot(self._pipeline_key, slot, value) 766 767 def set_status(self, message=None, console_url=None, status_links=None): 768 """Sets the current status of this pipeline. 769 770 This method is purposefully non-transactional. Updates are written to the 771 datastore immediately and overwrite all existing statuses. 772 773 Args: 774 message: (optional) Overall status message. 775 console_url: (optional) Relative URL to use for the "console" of this 776 pipeline that displays current progress. When None, no console will 777 be displayed. 778 status_links: (optional) Dictionary of readable link names to relative 779 URLs that should be associated with this pipeline as it runs. These links 780 provide convenient access to other dashboards, consoles, etc associated 781 with the pipeline. 782 783 Raises: 784 PipelineRuntimeError if the status could not be set for any reason. 785 """ 786 if _TEST_MODE: 787 logging.info( 788 'New status for %s#%s: message=%r, console_url=%r, status_links=%r', 789 self, self.pipeline_id, message, console_url, status_links) 790 return 791 792 status_key = db.Key.from_path(_StatusRecord.kind(), self.pipeline_id) 793 root_pipeline_key = db.Key.from_path( 794 _PipelineRecord.kind(), self.root_pipeline_id) 795 status_record = _StatusRecord( 796 key=status_key, root_pipeline=root_pipeline_key) 797 798 try: 799 if message: 800 status_record.message = message 801 if console_url: 802 status_record.console_url = console_url 803 if status_links: 804 # Alphabeticalize the list. 805 status_record.link_names = sorted( 806 db.Text(s) for s in status_links.iterkeys()) 807 status_record.link_urls = [ 808 db.Text(status_links[name]) for name in status_record.link_names] 809 810 status_record.status_time = datetime.datetime.utcnow() 811 812 status_record.put() 813 except Exception, e: 814 raise PipelineRuntimeError('Could not set status for %s#%s: %s' % 815 (self, self.pipeline_id, str(e))) 816 817 def complete(self, default_output=None): 818 """Marks this asynchronous Pipeline as complete. 819 820 Args: 821 default_output: What value the 'default' output slot should be assigned. 822 823 Raises: 824 UnexpectedPipelineError if the slot no longer exists or this method was 825 called for a pipeline that is not async. 826 """ 827 # TODO: Enforce that all outputs expected by this async pipeline were 828 # filled before this complete() function was called. May required all 829 # async functions to declare their outputs upfront. 830 if not self.async: 831 raise UnexpectedPipelineError( 832 'May only call complete() method for asynchronous pipelines.') 833 self._context.fill_slot( 834 self._pipeline_key, self.outputs.default, default_output) 835 836 def get_callback_url(self, **kwargs): 837 """Returns a relative URL for invoking this Pipeline's callback method. 838 839 Args: 840 kwargs: Dictionary mapping keyword argument names to single values that 841 should be passed to the callback when it is invoked. 842 843 Raises: 844 UnexpectedPipelineError if this is invoked on pipeline that is not async. 845 """ 846 # TODO: Support positional parameters. 847 if not self.async: 848 raise UnexpectedPipelineError( 849 'May only call get_callback_url() method for asynchronous pipelines.') 850 kwargs['pipeline_id'] = self._pipeline_key.name() 851 params = urllib.urlencode(sorted(kwargs.items())) 852 return '%s/callback?%s' % (self.base_path, params) 853 854 def get_callback_task(self, *args, **kwargs): 855 """Returns a task for calling back this Pipeline. 856 857 Args: 858 params: Keyword argument containing a dictionary of key/value pairs 859 that will be passed to the callback when it is executed. 860 args, kwargs: Passed to the taskqueue.Task constructor. Use these 861 arguments to set the task name (for idempotence), etc. 862 863 Returns: 864 A taskqueue.Task instance that must be enqueued by the caller. 865 """ 866 if not self.async: 867 raise UnexpectedPipelineError( 868 'May only call get_callback_task() method for asynchronous pipelines.') 869 870 params = kwargs.get('params', {}) 871 kwargs['params'] = params 872 params['pipeline_id'] = self._pipeline_key.name() 873 kwargs['url'] = self.base_path + '/callback' 874 kwargs['method'] = 'POST' 875 return taskqueue.Task(*args, **kwargs) 876 877 def send_result_email(self, sender=None): 878 """Sends an email to admins indicating this Pipeline has completed. 879 880 For developer convenience. Automatically called from finalized for root 881 Pipelines that do not override the default action. 882 883 Args: 884 sender: (optional) Override the sender's email address. 885 """ 886 status = 'successful' 887 if self.was_aborted: 888 status = 'aborted' 889 890 app_id = os.environ['APPLICATION_ID'] 891 shard_index = app_id.find('~') 892 if shard_index != -1: 893 app_id = app_id[shard_index+1:] 894 895 param_dict = { 896 'status': status, 897 'app_id': app_id, 898 'class_path': self._class_path, 899 'pipeline_id': self.root_pipeline_id, 900 'base_path': '%s.appspot.com%s' % (app_id, self.base_path), 901 } 902 subject = ( 903 'Pipeline %(status)s: App "%(app_id)s", %(class_path)s' 904 '#%(pipeline_id)s' % param_dict) 905 body = """View the pipeline results here: 906 907http://%(base_path)s/status?root=%(pipeline_id)s 908 909Thanks, 910 911The Pipeline API 912""" % param_dict 913 914 html = """<html><body> 915<p>View the pipeline results here:</p> 916 917<p><a href="http://%(base_path)s/status?root=%(pipeline_id)s" 918>http://%(base_path)s/status?root=%(pipeline_id)s</a></p> 919 920<p> 921Thanks, 922<br> 923The Pipeline API 924</p> 925</body></html> 926""" % param_dict 927 928 if sender is None: 929 sender = '%s@%s.appspotmail.com' % (app_id, app_id) 930 try: 931 self._send_mail(sender, subject, body, html=html) 932 except (mail.InvalidSenderError, mail.InvalidEmailError): 933 logging.warning('Could not send result email for ' 934 'root pipeline ID "%s" from sender "%s"', 935 self.root_pipeline_id, sender) 936 937 def cleanup(self): 938 """Clean up this Pipeline and all Datastore records used for coordination. 939 940 Only works when called on a root pipeline. Child pipelines will ignore 941 calls to this method. 942 943 After this method is called, Pipeline.from_id() and related status 944 methods will return inconsistent or missing results. This method is 945 fire-and-forget and asynchronous. 946 """ 947 if self._root_pipeline_key is None: 948 raise UnexpectedPipelineError( 949 'Could not cleanup Pipeline with unknown root pipeline ID.') 950 if not self.is_root: 951 return 952 task = taskqueue.Task( 953 params=dict(root_pipeline_key=self._root_pipeline_key), 954 url=self.base_path + '/cleanup', 955 headers={'X-Ae-Pipeline-Key': self._root_pipeline_key}) 956 taskqueue.Queue(self.queue_name).add(task) 957 958 def with_params(self, **kwargs): 959 """Modify various execution parameters of a Pipeline before it runs. 960 961 This method has no effect in test mode. 962 963 Args: 964 kwargs: Attributes to modify on this Pipeline instance before it has 965 been executed. 966 967 Returns: 968 This Pipeline instance, for easy chaining. 969 """ 970 if _TEST_MODE: 971 logging.info( 972 'Setting runtime parameters for %s#%s: %r', 973 self, self.pipeline_id, kwargs) 974 return self 975 976 if self.pipeline_id is not None: 977 raise UnexpectedPipelineError( 978 'May only call with_params() on a Pipeline that has not yet ' 979 'been scheduled for execution.') 980 981 ALLOWED = ('backoff_seconds', 'backoff_factor', 'max_attempts', 'target') 982 for name, value in kwargs.iteritems(): 983 if name not in ALLOWED: 984 raise TypeError('Unexpected keyword: %s=%r' % (name, value)) 985 setattr(self, name, value) 986 return self 987 988 # Methods implemented by developers for lifecycle management. These 989 # must be idempotent under all circumstances. 990 def run(self, *args, **kwargs): 991 """Runs this Pipeline.""" 992 raise NotImplementedError('Must implement "run" in Pipeline sub-class.') 993 994 def run_test(self, *args, **kwargs): 995 """Runs this Pipeline in test mode.""" 996 raise NotImplementedError( 997 'Must implement "run_test" in Pipeline sub-class.') 998 999 def finalized(self): 1000 """Finalizes this Pipeline after execution if it's a generator. 1001 1002 Default action as the root pipeline is to email the admins with the status. 1003 Implementors be sure to call 'was_aborted' to find out if the finalization 1004 that you're handling is for a success or error case. 1005 """ 1006 if self.pipeline_id == self.root_pipeline_id: 1007 self.send_result_email() 1008 1009 def finalized_test(self, *args, **kwargs): 1010 """Finalized this Pipeline in test mode.""" 1011 raise NotImplementedError( 1012 'Must implement "finalized_test" in Pipeline sub-class.') 1013 1014 def callback(self, **kwargs): 1015 """This Pipeline received an asynchronous callback request.""" 1016 raise NotImplementedError( 1017 'Must implement "callback" in Pipeline sub-class.') 1018 1019 def try_cancel(self): 1020 """This pipeline has been cancelled. 1021 1022 Called when a pipeline is interrupted part-way through due to some kind 1023 of failure (an abort of the whole pipeline to the root or a forced retry on 1024 this child pipeline). 1025 1026 Returns: 1027 True to indicate that cancellation was successful and this pipeline may 1028 go in the retry or aborted state; False to indicate that this pipeline 1029 cannot be canceled right now and must remain as-is. 1030 """ 1031 return False 1032 1033 # Internal methods. 1034 @classmethod 1035 def _set_class_path(cls, module_dict=sys.modules): 1036 """Sets the absolute path to this class as a string. 1037 1038 Used by the Pipeline API to reconstruct the Pipeline sub-class object 1039 at execution time instead of passing around a serialized function. 1040 1041 Args: 1042 module_dict: Used for testing. 1043 """ 1044 # Do not traverse the class hierarchy fetching the class path attribute. 1045 found = cls.__dict__.get('_class_path') 1046 if found is not None: 1047 return 1048 1049 # Do not set the _class_path for the base-class, otherwise all children's 1050 # lookups for _class_path will fall through and return 'Pipeline' above. 1051 # This situation can happen if users call the generic Pipeline.from_id 1052 # to get the result of a Pipeline without knowing its specific class. 1053 if cls is Pipeline: 1054 return 1055 1056 class_path = '%s.%s' % (cls.__module__, cls.__name__) 1057 # When a WSGI handler is invoked as an entry point, any Pipeline class 1058 # defined in the same file as the handler will get __module__ set to 1059 # __main__. Thus we need to find out its real fully qualified path. 1060 if cls.__module__ == '__main__': 1061 for name, module in module_dict.items(): 1062 if name == '__main__': 1063 continue 1064 found = getattr(module, cls.__name__, None) 1065 if found is cls: 1066 class_path = '%s.%s' % (name, cls.__name__) 1067 break 1068 cls._class_path = class_path 1069 1070 def _set_values_internal(self, 1071 context, 1072 pipeline_key, 1073 root_pipeline_key, 1074 outputs, 1075 result_status): 1076 """Sets the user-visible values provided as an API by this class. 1077 1078 Args: 1079 context: The _PipelineContext used for this Pipeline. 1080 pipeline_key: The db.Key of this pipeline. 1081 root_pipeline_key: The db.Key of the root pipeline. 1082 outputs: The PipelineFuture for this pipeline. 1083 result_status: The result status of this pipeline. 1084 """ 1085 self._context = context 1086 self._pipeline_key = pipeline_key 1087 self._root_pipeline_key = root_pipeline_key 1088 self._result_status = result_status 1089 self.outputs = outputs 1090 1091 def _callback_internal(self, kwargs): 1092 """Used to execute callbacks on asynchronous pipelines.""" 1093 logging.debug('Callback %s(*%s, **%s)#%s with params: %r', 1094 self._class_path, _short_repr(self.args), 1095 _short_repr(self.kwargs), self._pipeline_key.name(), kwargs) 1096 return self.callback(**kwargs) 1097 1098 def _run_internal(self, 1099 context, 1100 pipeline_key, 1101 root_pipeline_key, 1102 caller_output): 1103 """Used by the Pipeline evaluator to execute this Pipeline.""" 1104 self._set_values_internal( 1105 context, pipeline_key, root_pipeline_key, caller_output, 1106 _PipelineRecord.RUN) 1107 logging.debug('Running %s(*%s, **%s)#%s', 1108 self._class_path, _short_repr(self.args), 1109 _short_repr(self.kwargs), self._pipeline_key.name()) 1110 return self.run(*self.args, **self.kwargs) 1111 1112 def _finalized_internal(self, 1113 context, 1114 pipeline_key, 1115 root_pipeline_key, 1116 caller_output, 1117 aborted): 1118 """Used by the Pipeline evaluator to finalize this Pipeline.""" 1119 result_status = _PipelineRecord.RUN 1120 if aborted: 1121 result_status = _PipelineRecord.ABORTED 1122 1123 self._set_values_internal( 1124 context, pipeline_key, root_pipeline_key, caller_output, result_status) 1125 logging.debug('Finalizing %s(*%r, **%r)#%s', 1126 self._class_path, _short_repr(self.args), 1127 _short_repr(self.kwargs), self._pipeline_key.name()) 1128 try: 1129 self.finalized() 1130 except NotImplementedError: 1131 pass 1132 1133 def __repr__(self): 1134 """Returns a string representation of this Pipeline.""" 1135 return '%s(*%s, **%s)' % ( 1136 self._class_path, _short_repr(self.args), _short_repr(self.kwargs)) 1137 1138 1139# TODO: Change InOrder and After to use a common thread-local list of 1140# execution modifications to apply to the current evaluating pipeline. 1141 1142class After(object): 1143 """Causes all contained Pipelines to run after the given ones complete. 1144 1145 Must be used in a 'with' block. 1146 """ 1147 1148 _local = threading.local() 1149 1150 def __init__(self, *futures): 1151 """Initializer. 1152 1153 Args: 1154 *futures: PipelineFutures that all subsequent pipelines should follow. 1155 May be empty, in which case this statement does nothing. 1156 """ 1157 for f in futures: 1158 if not isinstance(f, PipelineFuture): 1159 raise TypeError('May only pass PipelineFuture instances to After(). %r', 1160 type(f)) 1161 self._futures = set(futures) 1162 1163 def __enter__(self): 1164 """When entering a 'with' block.""" 1165 After._thread_init() 1166 After._local._after_all_futures.extend(self._futures) 1167 1168 def __exit__(self, type, value, trace): 1169 """When exiting a 'with' block.""" 1170 for future in self._futures: 1171 After._local._after_all_futures.remove(future) 1172 return False 1173 1174 @classmethod 1175 def _thread_init(cls): 1176 """Ensure thread local is initialized.""" 1177 if not hasattr(cls._local, '_after_all_futures'): 1178 cls._local._after_all_futures = [] 1179 1180 1181class InOrder(object): 1182 """Causes all contained Pipelines to run in order. 1183 1184 Must be used in a 'with' block. 1185 """ 1186 1187 _local = threading.local() 1188 1189 @classmethod 1190 def _add_future(cls, future): 1191 """Adds a future to the list of in-order futures thus far. 1192 1193 Args: 1194 future: The future to add to the list. 1195 """ 1196 if cls._local._activated: 1197 cls._local._in_order_futures.add(future) 1198 1199 def __init__(self): 1200 """Initializer.""" 1201 1202 def __enter__(self): 1203 """When entering a 'with' block.""" 1204 InOrder._thread_init() 1205 if InOrder._local._activated: 1206 raise UnexpectedPipelineError('Already in an InOrder "with" block.') 1207 InOrder._local._activated = True 1208 InOrder._local._in_order_futures.clear() 1209 1210 def __exit__(self, type, value, trace): 1211 """When exiting a 'with' block.""" 1212 InOrder._local._activated = False 1213 InOrder._local._in_order_futures.clear() 1214 return False 1215 1216 @classmethod 1217 def _thread_init(cls): 1218 """Ensure thread local is initialized.""" 1219 if not hasattr(cls._local, '_in_order_futures'): 1220 cls._local._in_order_futures = set() 1221 cls._local._activated = False 1222 1223 1224################################################################################ 1225 1226def _short_repr(obj): 1227 """Helper function returns a truncated repr() of an object.""" 1228 stringified = pprint.saferepr(obj) 1229 if len(stringified) > 200: 1230 return '%s... (%d bytes)' % (stringified[:200], len(stringified)) 1231 return stringified 1232 1233 1234def _write_json_blob(encoded_value, pipeline_id=None): 1235 """Writes a JSON encoded value to a Cloud Storage File. 1236 1237 This function will store the blob in a GCS file in the default bucket under 1238 the appengine_pipeline directory. Optionally using another directory level 1239 specified by pipeline_id 1240 Args: 1241 encoded_value: The encoded JSON string. 1242 pipeline_id: A pipeline id to segment files in Cloud Storage, if none, 1243 the file will be created under appengine_pipeline 1244 1245 Returns: 1246 The blobstore.BlobKey for the file that was created. 1247 """ 1248 1249 default_bucket = app_identity.get_default_gcs_bucket_name() 1250 path_components = ['/', default_bucket, "appengine_pipeline"] 1251 if pipeline_id: 1252 path_components.append(pipeline_id) 1253 path_components.append(uuid.uuid4().hex) 1254 # Use posixpath to get a / even if we're running on windows somehow 1255 file_name = posixpath.join(*path_components) 1256 with cloudstorage.open(file_name, 'w', content_type='application/json') as f: 1257 for start_index in xrange(0, len(encoded_value), _MAX_JSON_SIZE): 1258 end_index = start_index + _MAX_JSON_SIZE 1259 f.write(encoded_value[start_index:end_index]) 1260 1261 key_str = blobstore.create_gs_key("/gs" + file_name) 1262 logging.debug("Created blob for filename = %s gs_key = %s", file_name, key_str) 1263 return blobstore.BlobKey(key_str) 1264 1265 1266def _dereference_args(pipeline_name, args, kwargs): 1267 """Dereference a Pipeline's arguments that are slots, validating them. 1268 1269 Each argument value passed in is assumed to be a dictionary with the format: 1270 {'type': 'value', 'value': 'serializable'} # A resolved value. 1271 {'type': 'slot', 'slot_key': 'str() on a db.Key'} # A pending Slot. 1272 1273 Args: 1274 pipeline_name: The name of the pipeline class; used for debugging. 1275 args: Iterable of positional arguments. 1276 kwargs: Dictionary of keyword arguments. 1277 1278 Returns: 1279 Tuple (args, kwargs) where: 1280 Args: A list of positional arguments values that are all dereferenced. 1281 Kwargs: A list of keyword arguments values that are all dereferenced. 1282 1283 Raises: 1284 SlotNotFilledError if any of the supplied 'slot_key' records are not 1285 present in the Datastore or have not yet been filled. 1286 UnexpectedPipelineError if an unknown parameter type was passed. 1287 """ 1288 lookup_slots = set() 1289 for arg in itertools.chain(args, kwargs.itervalues()): 1290 if arg['type'] == 'slot': 1291 lookup_slots.add(db.Key(arg['slot_key'])) 1292 1293 slot_dict = {} 1294 for key, slot_record in zip(lookup_slots, db.get(lookup_slots)): 1295 if slot_record is None or slot_record.status != _SlotRecord.FILLED: 1296 raise SlotNotFilledError( 1297 'Slot "%s" missing its value. From %s(*args=%s, **kwargs=%s)' % 1298 (key, pipeline_name, _short_repr(args), _short_repr(kwargs))) 1299 slot_dict[key] = slot_record.value 1300 1301 arg_list = [] 1302 for current_arg in args: 1303 if current_arg['type'] == 'slot': 1304 arg_list.append(slot_dict[db.Key(current_arg['slot_key'])]) 1305 elif current_arg['type'] == 'value': 1306 arg_list.append(current_arg['value']) 1307 else: 1308 raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg) 1309 1310 kwarg_dict = {} 1311 for key, current_arg in kwargs.iteritems(): 1312 if current_arg['type'] == 'slot': 1313 kwarg_dict[key] = slot_dict[db.Key(current_arg['slot_key'])] 1314 elif current_arg['type'] == 'value': 1315 kwarg_dict[key] = current_arg['value'] 1316 else: 1317 raise UnexpectedPipelineError('Unknown parameter type: %r' % current_arg) 1318 1319 return (arg_list, kwarg_dict) 1320 1321 1322def _generate_args(pipeline, future, queue_name, base_path): 1323 """Generate the params used to describe a Pipeline's depedencies. 1324 1325 The arguments passed to this method may be normal values, Slot instances 1326 (for named outputs), or PipelineFuture instances (for referring to the 1327 default output slot). 1328 1329 Args: 1330 pipeline: The Pipeline instance to generate args for. 1331 future: The PipelineFuture for the Pipeline these arguments correspond to. 1332 queue_name: The queue to run the pipeline on. 1333 base_path: Relative URL for pipeline URL handlers. 1334 1335 Returns: 1336 Tuple (dependent_slots, output_slot_keys, params_text, params_blob) where: 1337 dependent_slots: List of db.Key instances of _SlotRecords on which 1338 this pipeline will need to block before execution (passed to 1339 create a _BarrierRecord for running the pipeline). 1340 output_slot_keys: List of db.Key instances of _SlotRecords that will 1341 be filled by this pipeline during its execution (passed to create 1342 a _BarrierRecord for finalizing the pipeline). 1343 params_text: JSON dictionary of pipeline parameters to be serialized and 1344 saved in a corresponding _PipelineRecord. Will be None if the params are 1345 too big and must be saved in a blob instead. 1346 params_blob: JSON dictionary of pipeline parameters to be serialized and 1347 saved in a Blob file, and then attached to a _PipelineRecord. Will be 1348 None if the params data size was small enough to fit in the entity. 1349 """ 1350 params = { 1351 'args': [], 1352 'kwargs': {}, 1353 'after_all': [], 1354 'output_slots': {}, 1355 'class_path': pipeline._class_path, 1356 'queue_name': queue_name, 1357 'base_path': base_path, 1358 'backoff_seconds': pipeline.backoff_seconds, 1359 'backoff_factor': pipeline.backoff_factor, 1360 'max_attempts': pipeline.max_attempts, 1361 'task_retry': pipeline.task_retry, 1362 'target': pipeline.target, 1363 } 1364 dependent_slots = set() 1365 1366 arg_list = params['args'] 1367 for current_arg in pipeline.args: 1368 if isinstance(current_arg, PipelineFuture): 1369 current_arg = current_arg.default 1370 if isinstance(current_arg, Slot): 1371 arg_list.append({'type': 'slot', 'slot_key': str(current_arg.key)}) 1372 dependent_slots.add(current_arg.key) 1373 else: 1374 arg_list.append({'type': 'value', 'value': current_arg}) 1375 1376 kwarg_dict = params['kwargs'] 1377 for name, current_arg in pipeline.kwargs.iteritems(): 1378 if isinstance(current_arg, PipelineFuture): 1379 current_arg = current_arg.default 1380 if isinstance(current_arg, Slot): 1381 kwarg_dict[name] = {'type': 'slot', 'slot_key': str(current_arg.key)} 1382 dependent_slots.add(current_arg.key) 1383 else: 1384 kwarg_dict[name] = {'type': 'value', 'value': current_arg} 1385 1386 after_all = params['after_all'] 1387 for other_future in future._after_all_pipelines: 1388 slot_key = other_future._output_dict['default'].key 1389 after_all.append(str(slot_key)) 1390 dependent_slots.add(slot_key) 1391 1392 output_slots = params['output_slots'] 1393 output_slot_keys = set() 1394 for name, slot in future._output_dict.iteritems(): 1395 output_slot_keys.add(slot.key) 1396 output_slots[name] = str(slot.key) 1397 1398 params_encoded = json.dumps(params, cls=mr_util.JsonEncoder) 1399 params_text = None 1400 params_blob = None 1401 if len(params_encoded) > _MAX_JSON_SIZE: 1402 params_blob = _write_json_blob(params_encoded, pipeline.pipeline_id) 1403 else: 1404 params_text = params_encoded 1405 1406 return dependent_slots, output_slot_keys, params_text, params_blob 1407 1408 1409class _PipelineContext(object): 1410 """Internal API for interacting with Pipeline state.""" 1411 1412 _gettime = datetime.datetime.utcnow 1413 1414 def __init__(self, 1415 task_name, 1416 queue_name, 1417 base_path): 1418 """Initializer. 1419 1420 Args: 1421 task_name: The name of the currently running task or empty if there 1422 is no task running. 1423 queue_name: The queue this pipeline should run on (may not be the 1424 current queue this request is on). 1425 base_path: Relative URL for the pipeline's handlers. 1426 """ 1427 self.task_name = task_name 1428 self.queue_name = queue_name 1429 self.base_path = base_path 1430 self.barrier_handler_path = '%s/output' % base_path 1431 self.pipeline_handler_path = '%s/run' % base_path 1432 self.finalized_handler_path = '%s/finalized' % base_path 1433 self.fanout_handler_path = '%s/fanout' % base_path 1434 self.abort_handler_path = '%s/abort' % base_path 1435 self.fanout_abort_handler_path = '%s/fanout_abort' % base_path 1436 self.session_filled_output_names = set() 1437 1438 @classmethod 1439 def from_environ(cls, environ=os.environ): 1440 """Constructs a _PipelineContext from the task queue environment.""" 1441 base_path, unused = (environ['PATH_INFO'].rsplit('/', 1) + [''])[:2] 1442 return cls( 1443 environ['HTTP_X_APPENGINE_TASKNAME'], 1444 environ['HTTP_X_APPENGINE_QUEUENAME'], 1445 base_path) 1446 1447 def fill_slot(self, filler_pipeline_key, slot, value): 1448 """Fills a slot, enqueueing a task to trigger pending barriers. 1449 1450 Args: 1451 filler_pipeline_key: db.Key or stringified key of the _PipelineRecord 1452 that filled this slot. 1453 slot: The Slot instance to fill. 1454 value: The serializable value to assign. 1455 1456 Raises: 1457 UnexpectedPipelineError if the _SlotRecord for the 'slot' could not 1458 be found in the Datastore. 1459 """ 1460 if not isinstance(filler_pipeline_key, db.Key): 1461 filler_pipeline_key = db.Key(filler_pipeline_key) 1462 1463 if _TEST_MODE: 1464 slot._set_value_test(filler_pipeline_key, value) 1465 else: 1466 encoded_value = json.dumps(value, 1467 sort_keys=True, 1468 cls=mr_util.JsonEncoder) 1469 value_text = None 1470 value_blob = None 1471 if len(encoded_value) <= _MAX_JSON_SIZE: 1472 value_text = db.Text(encoded_value) 1473 else: 1474 # The encoded value is too big. Save it as a blob. 1475 value_blob = _write_json_blob(encoded_value, filler_pipeline_key.name()) 1476 1477 def txn(): 1478 slot_record = db.get(slot.key) 1479 if slot_record is None: 1480 raise UnexpectedPipelineError( 1481 'Tried to fill missing slot "%s" ' 1482 'by pipeline ID "%s" with value: %r' 1483 % (slot.key, filler_pipeline_key.name(), value)) 1484 # NOTE: Always take the override value here. If down-stream pipelines 1485 # need a consitent view of all up-stream outputs (meaning, all of the 1486 # outputs came from the same retry attempt of the upstream pipeline), 1487 # the down-stream pipeline must also wait for the 'default' output 1488 # of these up-stream pipelines. 1489 slot_record.filler = filler_pipeline_key 1490 slot_record.value_text = value_text 1491 slot_record.value_blob = value_blob 1492 slot_record.status = _SlotRecord.FILLED 1493 slot_record.fill_time = self._gettime() 1494 slot_record.put() 1495 task = taskqueue.Task( 1496 url=self.barrier_handler_path, 1497 params=dict( 1498 slot_key=slot.key, 1499 use_barrier_indexes=True), 1500 headers={'X-Ae-Slot-Key': slot.key, 1501 'X-Ae-Filler-Pipeline-Key': filler_pipeline_key}) 1502 task.add(queue_name=self.queue_name, transactional=True) 1503 db.run_in_transaction_options( 1504 db.create_transaction_options(propagation=db.ALLOWED), txn) 1505 1506 self.session_filled_output_names.add(slot.name) 1507 1508 def notify_barriers(self, 1509 slot_key, 1510 cursor, 1511 use_barrier_indexes, 1512 max_to_notify=_MAX_BARRIERS_TO_NOTIFY): 1513 """Searches for barriers affected by a slot and triggers completed ones. 1514 1515 Args: 1516 slot_key: db.Key or stringified key of the _SlotRecord that was filled. 1517 cursor: Stringified Datastore cursor where the notification query 1518 should pick up. 1519 use_barrier_indexes: When True, use _BarrierIndex records to determine 1520 which _Barriers to trigger by having this _SlotRecord filled. When 1521 False, use the old method that queries for _BarrierRecords by 1522 the blocking_slots parameter. 1523 max_to_notify: Used for testing. 1524 1525 Raises: 1526 PipelineStatusError: If any of the barriers are in a bad state. 1527 """ 1528 if not isinstance(slot_key, db.Key): 1529 slot_key = db.Key(slot_key) 1530 logging.debug('Notifying slot %r', slot_key) 1531 1532 if use_barrier_indexes: 1533 # Please see models.py:_BarrierIndex to understand how _BarrierIndex 1534 # entities relate to _BarrierRecord entities. 1535 query = ( 1536 _BarrierIndex.all(cursor=cursor, keys_only=True) 1537 .ancestor(slot_key)) 1538 barrier_index_list = query.fetch(max_to_notify) 1539 barrier_key_list = [ 1540 _BarrierIndex.to_barrier_key(key) for key in barrier_index_list] 1541 1542 # If there are task and pipeline kickoff retries it's possible for a 1543 # _BarrierIndex to exist for a _BarrierRecord that was not successfully 1544 # written. It's safe to ignore this because the original task that wrote 1545 # the _BarrierIndex and _BarrierRecord would not have made progress to 1546 # kick off a real pipeline or child pipeline unless all of the writes for 1547 # these dependent entities went through. We assume that the instigator 1548 # retried from scratch and somehwere there exists a good _BarrierIndex and 1549 # corresponding _BarrierRecord that tries to accomplish the same thing. 1550 barriers = db.get(barrier_key_list) 1551 results = [] 1552 for barrier_key, barrier in zip(barrier_key_list, barriers): 1553 if barrier is None: 1554 logging.debug('Ignoring that Barrier "%r" is missing, ' 1555 'relies on Slot "%r"', barrier_key, slot_key) 1556 else: 1557 results.append(barrier) 1558 else: 1559 # TODO(user): Delete this backwards compatible codepath and 1560 # make use_barrier_indexes the assumed default in all cases. 1561 query = ( 1562 _BarrierRecord.all(cursor=cursor) 1563 .filter('blocking_slots =', slot_key)) 1564 results = query.fetch(max_to_notify) 1565 1566 # Fetch all blocking _SlotRecords for any potentially triggered barriers. 1567 blocking_slot_keys = [] 1568 for barrier in results: 1569 blocking_slot_keys.extend(barrier.blocking_slots) 1570 1571 blocking_slot_dict = {} 1572 for slot_record in db.get(blocking_slot_keys): 1573 if slot_record is None: 1574 continue 1575 blocking_slot_dict[slot_record.key()] = slot_record 1576 1577 task_list = [] 1578 updated_barriers = [] 1579 for barrier in results: 1580 ready_slots = [] 1581 for blocking_slot_key in barrier.blocking_slots: 1582 slot_record = blocking_slot_dict.get(blocking_slot_key) 1583 if slot_record is None: 1584 raise UnexpectedPipelineError( 1585 'Barrier "%r" relies on Slot "%r" which is missing.' % 1586 (barrier.key(), blocking_slot_key)) 1587 if slot_record.status == _SlotRecord.FILLED: 1588 ready_slots.append(blocking_slot_key) 1589 1590 # When all of the blocking_slots have been filled, consider the barrier 1591 # ready to trigger. We'll trigger it regardless of the current 1592 # _BarrierRecord status, since there could be task queue failures at any 1593 # point in this flow; this rolls forward the state and de-dupes using 1594 # the task name tombstones. 1595 pending_slots = set(barrier.blocking_slots) - set(ready_slots) 1596 if not pending_slots: 1597 if barrier.status != _BarrierRecord.FIRED: 1598 barrier.status = _BarrierRecord.FIRED 1599 barrier.trigger_time = self._gettime() 1600 updated_barriers.append(barrier) 1601 1602 purpose = barrier.key().name() 1603 if purpose == _BarrierRecord.START: 1604 path = self.pipeline_handler_path 1605 countdown = None 1606 else: 1607 path = self.finalized_handler_path 1608 # NOTE: Wait one second before finalization to prevent 1609 # contention on the _PipelineRecord entity. 1610 countdown = 1 1611 pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier) 1612 logging.debug('Firing barrier %r', barrier.key()) 1613 task_list.append(taskqueue.Task( 1614 url=path, 1615 countdown=countdown, 1616 name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose), 1617 params=dict(pipeline_key=pipeline_key, purpose=purpose), 1618 headers={'X-Ae-Pipeline-Key': pipeline_key})) 1619 else: 1620 logging.debug('Not firing barrier %r, Waiting for slots: %r', 1621 barrier.key(), pending_slots) 1622 1623 # Blindly overwrite _BarrierRecords that have an updated status. This is 1624 # acceptable because by this point all finalization barriers for 1625 # generator children should have already had their final outputs assigned. 1626 if updated_barriers: 1627 db.put(updated_barriers) 1628 1629 # Task continuation with sequence number to prevent fork-bombs. 1630 if len(results) == max_to_notify: 1631 the_match = re.match('(.*)-ae-barrier-notify-([0-9]+)', self.task_name) 1632 if the_match: 1633 prefix = the_match.group(1) 1634 end = int(the_match.group(2)) + 1 1635 else: 1636 prefix = self.task_name 1637 end = 0 1638 task_list.append(taskqueue.Task( 1639 name='%s-ae-barrier-notify-%d' % (prefix, end), 1640 url=self.barrier_handler_path, 1641 params=dict( 1642 slot_key=slot_key, 1643 cursor=query.cursor(), 1644 use_barrier_indexes=use_barrier_indexes))) 1645 1646 if task_list: 1647 try: 1648 taskqueue.Queue(self.queue_name).add(task_list) 1649 except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError): 1650 pass 1651 1652 def begin_abort(self, root_pipeline_key, abort_message): 1653 """Kicks off the abort process for a root pipeline and all its children. 1654 1655 Args: 1656 root_pipeline_key: db.Key of the root pipeline to abort. 1657 abort_message: Message explaining why the abort happened, only saved 1658 into the root pipeline. 1659 1660 Returns: 1661 True if the abort signal was sent successfully; False otherwise. 1662 """ 1663 def txn(): 1664 pipeline_record = db.get(root_pipeline_key) 1665 if pipeline_record is None: 1666 logging.warning( 1667 'Tried to abort root pipeline ID "%s" but it does not exist.', 1668 root_pipeline_key.name()) 1669 raise db.Rollback() 1670 if pipeline_record.status == _PipelineRecord.ABORTED: 1671 logging.warning( 1672 'Tried to abort root pipeline ID "%s"; already in state: %s', 1673 root_pipeline_key.name(), pipeline_record.status) 1674 raise db.Rollback() 1675 if pipeline_record.abort_requested: 1676 logging.warning( 1677 'Tried to abort root pipeline ID "%s"; abort signal already sent.', 1678 root_pipeline_key.name()) 1679 raise db.Rollback() 1680 1681 pipeline_record.abort_requested = True 1682 pipeline_record.abort_message = abort_message 1683 pipeline_record.put() 1684 1685 task = taskqueue.Task( 1686 url=self.fanout_abort_handler_path, 1687 params=dict(root_pipeline_key=root_pipeline_key)) 1688 task.add(queue_name=self.queue_name, transactional=True) 1689 return True 1690 1691 return db.run_in_transaction(txn) 1692 1693 def continue_abort(self, 1694 root_pipeline_key, 1695 cursor=None, 1696 max_to_notify=_MAX_ABORTS_TO_BEGIN): 1697 """Sends the abort signal to all children for a root pipeline. 1698 1699 Args: 1700 root_pipeline_key: db.Key of the root pipeline to abort. 1701 cursor: The query cursor for enumerating _PipelineRecords when inserting 1702 tasks to cause child pipelines to terminate. 1703 max_to_notify: Used for testing. 1704 """ 1705 if not isinstance(root_pipeline_key, db.Key): 1706 root_pipeline_key = db.Key(root_pipeline_key) 1707 # NOTE: The results of this query may include _PipelineRecord instances 1708 # that are not actually "reachable", meaning you cannot get to them by 1709 # starting at the root pipeline and following "fanned_out" onward. This 1710 # is acceptable because even these defunct _PipelineRecords will properly 1711 # set their status to ABORTED when the signal comes, regardless of any 1712 # other status they may have had. 1713 # 1714 # The only gotcha here is if a Pipeline's finalize method somehow modifies 1715 # its inputs (like deleting an input file). In the case there are 1716 # unreachable child pipelines, it will appear as if two finalize methods 1717 # have been called instead of just one. The saving grace here is that 1718 # finalize must be idempotent, so this *should* be harmless. 1719 query = ( 1720 _PipelineRecord.all(cursor=cursor) 1721 .filter('root_pipeline =', root_pipeline_key)) 1722 results = query.fetch(max_to_notify) 1723 1724 task_list = [] 1725 for pipeline_record in results: 1726 if pipeline_record.status not in ( 1727 _PipelineRecord.RUN, _PipelineRecord.WAITING): 1728 continue 1729 1730 pipeline_key = pipeline_record.key() 1731 task_list.append(taskqueue.Task( 1732 name='%s-%s-abort' % (self.task_name, pipeline_key.name()), 1733 url=self.abort_handler_path, 1734 params=dict(pipeline_key=pipeline_key, purpose=_BarrierRecord.ABORT), 1735 headers={'X-Ae-Pipeline-Key': pipeline_key})) 1736 1737 # Task continuation with sequence number to prevent fork-bombs. 1738 if len(results) == max_to_notify: 1739 the_match = re.match('(.*)-([0-9]+)', self.task_name) 1740 if the_match: 1741 prefix = the_match.group(1) 1742 end = int(the_match.group(2)) + 1 1743 else: 1744 prefix = self.task_name 1745 end = 0 1746 task_list.append(taskqueue.Task( 1747 name='%s-%d' % (prefix, end), 1748 url=self.fanout_abort_handler_path, 1749 params=dict(root_pipeline_key=root_pipeline_key, 1750 cursor=query.cursor()))) 1751 1752 if task_list: 1753 try: 1754 taskqueue.Queue(self.queue_name).add(task_list) 1755 except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError): 1756 pass 1757 1758 def start(self, pipeline, return_task=True, countdown=None, eta=None): 1759 """Starts a pipeline. 1760 1761 Args: 1762 pipeline: Pipeline instance to run. 1763 return_task: When True, do not submit the task to start the pipeline 1764 but instead return it for someone else to enqueue. 1765 countdown: Time in seconds into the future that this Task should execute. 1766 Defaults to zero. 1767 eta: A datetime.datetime specifying the absolute time at which the task 1768 should be executed. Must not be specified if 'countdown' is specified. 1769 This may be timezone-aware or timezone-naive. If None, defaults to now. 1770 For pull tasks, no worker will be able to lease this task before the 1771 time indicated by eta. 1772 1773 Returns: 1774 The task to start this pipeline if return_task was True. 1775 1776 Raises: 1777 PipelineExistsError if the pipeline with the given ID already exists. 1778 """ 1779 # Adjust all pipeline output keys for this Pipeline to be children of 1780 # the _PipelineRecord, that way we can write them all and submit in a 1781 # single transaction. 1782 for name, slot in pipeline.outputs._output_dict.iteritems(): 1783 slot.key = db.Key.from_path( 1784 *slot.key.to_path(), **dict(parent=pipeline._pipeline_key)) 1785 1786 _, output_slots, params_text, params_blob = _generate_args( 1787 pipeline, pipeline.outputs, self.queue_name, self.base_path) 1788 1789 @db.transactional(propagation=db.INDEPENDENT) 1790 def txn(): 1791 pipeline_record = db.get(pipeline._pipeline_key) 1792 if pipeline_record is not None: 1793 raise PipelineExistsError( 1794 'Pipeline with idempotence key "%s" already exists; params=%s' % 1795 (pipeline._pipeline_key.name(), 1796 _short_repr(pipeline_record.params))) 1797 1798 entities_to_put = [] 1799 for name, slot in pipeline.outputs._output_dict.iteritems(): 1800 entities_to_put.append(_SlotRecord( 1801 key=slot.key, 1802 root_pipeline=pipeline._pipeline_key)) 1803 1804 entities_to_put.append(_PipelineRecord( 1805 key=pipeline._pipeline_key, 1806 root_pipeline=pipeline._pipeline_key, 1807 is_root_pipeline=True, 1808 # Bug in DB means we need to use the storage name here, 1809 # not the local property name. 1810 params=params_text, 1811 params_blob=params_blob, 1812 start_time=self._gettime(), 1813 class_path=pipeline._class_path, 1814 max_attempts=pipeline.max_attempts)) 1815 1816 entities_to_put.extend(_PipelineContext._create_barrier_entities( 1817 pipeline._pipeline_key, 1818 pipeline._pipeline_key, 1819 _BarrierRecord.FINALIZE, 1820 output_slots)) 1821 1822 db.put(entities_to_put) 1823 1824 task = taskqueue.Task( 1825 url=self.pipeline_handler_path, 1826 params=dict(pipeline_key=pipeline._pipeline_key), 1827 headers={'X-Ae-Pipeline-Key': pipeline._pipeline_key}, 1828 target=pipeline.target, 1829 countdown=countdown, 1830 eta=eta) 1831 if return_task: 1832 return task 1833 task.add(queue_name=self.queue_name, transactional=True) 1834 1835 task = txn() 1836 # Immediately mark the output slots as existing so they can be filled 1837 # by asynchronous pipelines or used in test mode. 1838 for output_slot in pipeline.outputs._output_dict.itervalues(): 1839 output_slot._exists = True 1840 return task 1841 1842 def start_test(self, pipeline): 1843 """Starts a pipeline in the test mode. 1844 1845 Args: 1846 pipeline: The Pipeline instance to test. 1847 """ 1848 global _TEST_MODE, _TEST_ROOT_PIPELINE_KEY 1849 self.start(pipeline, return_task=True) 1850 _TEST_MODE = True 1851 _TEST_ROOT_PIPELINE_KEY = pipeline._pipeline_key 1852 try: 1853 self.evaluate_test(pipeline, root=True) 1854 finally: 1855 _TEST_MODE = False 1856 1857 def evaluate_test(self, stage, root=False): 1858 """Recursively evaluates the given pipeline in test mode. 1859 1860 Args: 1861 stage: The Pipeline instance to run at this stage in the flow. 1862 root: True if the supplied stage is the root of the pipeline. 1863 """ 1864 args_adjusted = [] 1865 for arg in stage.args: 1866 if isinstance(arg, PipelineFuture): 1867 arg = arg.default 1868 if isinstance(arg, Slot): 1869 value = arg.value 1870 arg._touched = True 1871 else: 1872 value = arg 1873 args_adjusted.append(value) 1874 1875 kwargs_adjusted = {} 1876 for name, arg in stage.kwargs.iteritems(): 1877 if isinstance(arg, PipelineFuture): 1878 arg = arg.default 1879 if isinstance(arg, Slot): 1880 value = arg.value 1881 arg._touched = True 1882 else: 1883 value = arg 1884 kwargs_adjusted[name] = value 1885 1886 stage.args, stage.kwargs = args_adjusted, kwargs_adjusted 1887 pipeline_generator = mr_util.is_generator_function(stage.run) 1888 logging.debug('Running %s(*%s, **%s)', stage._class_path, 1889 _short_repr(stage.args), _short_repr(stage.kwargs)) 1890 1891 if stage.async: 1892 stage.run_test(*stage.args, **stage.kwargs) 1893 elif pipeline_generator: 1894 all_output_slots = set() 1895 try: 1896 pipeline_iter = stage.run_test(*stage.args, **stage.kwargs) 1897 except NotImplementedError: 1898 pipeline_iter = stage.run(*stage.args, **stage.kwargs) 1899 1900 all_substages = set() 1901 next_value = None 1902 last_sub_stage = None 1903 while True: 1904 try: 1905 yielded = pipeline_iter.send(next_value) 1906 except StopIteration: 1907 break 1908 1909 if isinstance(yielded, Pipeline): 1910 if yielded in all_substages: 1911 raise UnexpectedPipelineError( 1912 'Already yielded pipeline object %r' % yielded) 1913 else: 1914 all_substages.add(yielded) 1915 1916 last_sub_stage = yielded 1917 next_value = yielded.outputs 1918 all_output_slots.update(next_value._output_dict.itervalues()) 1919 else: 1920 raise UnexpectedPipelineError( 1921 'Yielded a disallowed value: %r' % yielded) 1922 1923 if last_sub_stage: 1924 # Generator's outputs inherited from last running sub-stage. 1925 # If the generator changes its mind and doesn't yield anything, this 1926 # may not happen at all. Missing outputs will be caught when they 1927 # are passed to the stage as inputs, or verified from the outside by 1928 # the test runner. 1929 for slot_name, slot in last_sub_stage.outputs._output_dict.iteritems(): 1930 stage.outputs._output_dict[slot_name] = slot 1931 # Any inherited slots won't be checked for declaration. 1932 all_output_slots.remove(slot) 1933 else: 1934 # Generator yielded no children, so treat it as a sync function. 1935 stage.outputs.default._set_value_test(stage._pipeline_key, None) 1936 1937 # Enforce the policy of requiring all undeclared output slots from 1938 # child pipelines to be consumed by their parent generator. 1939 for slot in all_output_slots: 1940 if slot.name == 'default': 1941 continue 1942 if slot.filled and not slot._strict and not slot._touched: 1943 raise SlotNotDeclaredError( 1944 'Undeclared output "%s"; all dynamic outputs from child ' 1945 'pipelines must be consumed.' % slot.name) 1946 else: 1947 try: 1948 result = stage.run_test(*stage.args, **stage.kwargs) 1949 except NotImplementedError: 1950 result = stage.run(*stage.args, **stage.kwargs) 1951 stage.outputs.default._set_value_test(stage._pipeline_key, result) 1952 1953 # Enforce strict output usage at the top level. 1954 if root: 1955 found_outputs = set() 1956 for slot in stage.outputs._output_dict.itervalues(): 1957 if slot.filled: 1958 found_outputs.add(slot.name) 1959 if slot.name == 'default': 1960 continue 1961 if slot.name not in stage.output_names: 1962 raise SlotNotDeclaredError( 1963 'Undeclared output from root pipeline "%s"' % slot.name) 1964 1965 missing_outputs = set(stage.output_names) - found_outputs 1966 if missing_outputs: 1967 raise SlotNotFilledError( 1968 'Outputs %r were never filled.' % missing_outputs) 1969 1970 logging.debug('Finalizing %s(*%s, **%s)', stage._class_path, 1971 _short_repr(stage.args), _short_repr(stage.kwargs)) 1972 ran = False 1973 try: 1974 stage.finalized_test() 1975 ran = True 1976 except NotImplementedError: 1977 pass 1978 if not ran: 1979 try: 1980 stage.finalized() 1981 except NotImplementedError: 1982 pass 1983 1984 def evaluate(self, pipeline_key, purpose=None, attempt=0): 1985 """Evaluates the given Pipeline and enqueues sub-stages for execution. 1986 1987 Args: 1988 pipeline_key: The db.Key or stringified key of the _PipelineRecord to run. 1989 purpose: Why evaluate was called ('start', 'finalize', or 'abort'). 1990 attempt: The attempt number that should be tried. 1991 """ 1992 After._thread_init() 1993 InOrder._thread_init() 1994 InOrder._local._activated = False 1995 1996 if not isinstance(pipeline_key, db.Key): 1997 pipeline_key = db.Key(pipeline_key) 1998 pipeline_record = db.get(pipeline_key) 1999 if pipeline_record is None: 2000 logging.error('Pipeline ID "%s" does not exist.', pipeline_key.name()) 2001 return 2002 if pipeline_record.status not in ( 2003 _PipelineRecord.WAITING, _PipelineRecord.RUN): 2004 logging.error('Pipeline ID "%s" in bad state for purpose "%s": "%s"', 2005 pipeline_key.name(), purpose or _BarrierRecord.START, 2006 pipeline_record.status) 2007 return 2008 2009 params = pipeline_record.params 2010 root_pipeline_key = \ 2011 _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record) 2012 default_slot_key = db.Key(params['output_slots']['default']) 2013 2014 default_slot_record, root_pipeline_record = db.get([ 2015 default_slot_key, root_pipeline_key]) 2016 if default_slot_record is None: 2017 logging.error('Pipeline ID "%s" default slot "%s" does not exist.', 2018 pipeline_key.name(), default_slot_key) 2019 return 2020 if root_pipeline_record is None: 2021 logging.error('Pipeline ID "%s" root pipeline ID "%s" is missing.', 2022 pipeline_key.name(), root_pipeline_key.name()) 2023 return 2024 2025 # Always finalize if we're aborting so pipelines have a chance to cleanup 2026 # before they terminate. Pipelines must access 'was_aborted' to find 2027 # out how their finalization should work. 2028 abort_signal = ( 2029 purpose == _BarrierRecord.ABORT or 2030 root_pipeline_record.abort_requested == True) 2031 finalize_signal = ( 2032 (default_slot_record.status == _SlotRecord.FILLED and 2033 purpose == _BarrierRecord.FINALIZE) or abort_signal) 2034 2035 try: 2036 pipeline_func_class = mr_util.for_name(pipeline_record.class_path) 2037 except ImportError, e: 2038 # This means something is wrong with the deployed code. Rely on the 2039 # taskqueue system to do retries. 2040 retry_message = '%s: %s' % (e.__class__.__name__, str(e)) 2041 logging.exception( 2042 'Could not locate %s#%s. %s', 2043 pipeline_record.class_path, pipeline_key.name(), retry_message) 2044 raise 2045 2046 try: 2047 pipeline_func = pipeline_func_class.from_id( 2048 pipeline_key.name(), 2049 resolve_outputs=finalize_signal, 2050 _pipeline_record=pipeline_record) 2051 except SlotNotFilledError, e: 2052 logging.exception( 2053 'Could not resolve arguments for %s#%s. Most likely this means there ' 2054 'is a bug in the Pipeline runtime or some intermediate data has been ' 2055 'deleted from the Datastore. Giving up.', 2056 pipeline_record.class_path, pipeline_key.name()) 2057 self.transition_aborted(pipeline_key) 2058 return 2059 except Exception, e: 2060 retry_message = '%s: %s' % (e.__class__.__name__, str(e)) 2061 logging.exception( 2062 'Instantiating %s#%s raised exception. %s', 2063 pipeline_record.class_path, pipeline_key.name(), retry_message) 2064 self.transition_retry(pipeline_key, retry_message) 2065 if pipeline_record.params['task_retry']: 2066 raise 2067 else: 2068 return 2069 else: 2070 pipeline_generator = mr_util.is_generator_function( 2071 pipeline_func_class.run) 2072 caller_output = pipeline_func.outputs 2073 2074 if (abort_signal and pipeline_func.async and 2075 pipeline_record.status == _PipelineRecord.RUN 2076 and not pipeline_func.try_cancel()): 2077 logging.warning( 2078 'Could not cancel and abort mid-flight async pipeline: %r#%s', 2079 pipeline_func, pipeline_key.name()) 2080 return 2081 2082 if finalize_signal: 2083 try: 2084 pipeline_func._finalized_internal( 2085 self, pipeline_key, root_pipeline_key, 2086 caller_output, abort_signal) 2087 except Exception, e: 2088 # This means something is wrong with the deployed finalization code. 2089 # Rely on the taskqueue system to do retries. 2090 retry_message = '%s: %s' % (e.__class__.__name__, str(e)) 2091 logging.exception('Finalizing %r#%s raised exception. %s', 2092 pipeline_func, pipeline_key.name(), retry_message) 2093 raise 2094 else: 2095 if not abort_signal: 2096 self.transition_complete(pipeline_key) 2097 return 2098 2099 if abort_signal: 2100 logging.debug('Marking as aborted %s#%s', pipeline_func, 2101 pipeline_key.name()) 2102 self.transition_aborted(pipeline_key) 2103 return 2104 2105 if pipeline_record.current_attempt != attempt: 2106 logging.error( 2107 'Received evaluation task for pipeline ID "%s" attempt %d but ' 2108 'current pending attempt is %d', pipeline_key.name(), attempt, 2109 pipeline_record.current_attempt) 2110 return 2111 2112 if pipeline_record.current_attempt >= pipeline_record.max_attempts: 2113 logging.error( 2114 'Received evaluation task for pipeline ID "%s" on attempt %d ' 2115 'but that exceeds max attempts %d', pipeline_key.name(), attempt, 2116 pipeline_record.max_attempts) 2117 return 2118 2119 if pipeline_record.next_retry_time is not None: 2120 retry_time = pipeline_record.next_retry_time - _RETRY_WIGGLE_TIMEDELTA 2121 if self._gettime() <= retry_time: 2122 detail_message = ( 2123 'Received evaluation task for pipeline ID "%s" on attempt %d, ' 2124 'which will not be ready until: %s' % (pipeline_key.name(), 2125 pipeline_record.current_attempt, pipeline_record.next_retry_time)) 2126 logging.warning(detail_message) 2127 raise UnexpectedPipelineError(detail_message) 2128 2129 if pipeline_record.status == _PipelineRecord.RUN and pipeline_generator: 2130 if (default_slot_record.status == _SlotRecord.WAITING and 2131 not pipeline_record.fanned_out): 2132 # This properly handles the yield-less generator case when the 2133 # RUN state transition worked properly but outputting to the default 2134 # slot failed. 2135 self.fill_slot(pipeline_key, caller_output.default, None) 2136 return 2137 2138 if (pipeline_record.status == _PipelineRecord.WAITING and 2139 pipeline_func.async): 2140 self.transition_run(pipeline_key) 2141 2142 try: 2143 result = pipeline_func._run_internal( 2144 self, pipeline_key, root_pipeline_key, caller_output) 2145 except Exception, e: 2146 if self.handle_run_exception(pipeline_key, pipeline_func, e): 2147 raise 2148 else: 2149 return 2150 2151 if pipeline_func.async: 2152 return 2153 2154 if not pipeline_generator: 2155 # Catch any exceptions that are thrown when the pipeline's return 2156 # value is being serialized. This ensures that serialization errors 2157 # will cause normal abort/retry behavior. 2158 try: 2159 self.fill_slot(pipeline_key, caller_output.default, result) 2160 except Exception, e: 2161 retry_message = 'Bad return value. %s: %s' % ( 2162 e.__class__.__name__, str(e)) 2163 logging.exception( 2164 'Generator %r#%s caused exception while serializing return ' 2165 'value %r. %s', pipeline_func, pipeline_key.name(), result, 2166 retry_message) 2167 self.transition_retry(pipeline_key, retry_message) 2168 if pipeline_func.task_retry: 2169 raise 2170 else: 2171 return 2172 2173 expected_outputs = set(caller_output._output_dict.iterkeys()) 2174 found_outputs = self.session_filled_output_names 2175 if expected_outputs != found_outputs: 2176 exception = SlotNotFilledError( 2177 'Outputs %r for pipeline ID "%s" were never filled by "%s".' % ( 2178 expected_outputs - found_outputs, 2179 pipeline_key.name(), pipeline_func._class_path)) 2180 if self.handle_run_exception(pipeline_key, pipeline_func, exception): 2181 raise exception 2182 return 2183 2184 pipeline_iter = result 2185 next_value = None 2186 last_sub_stage = None 2187 sub_stage = None 2188 sub_stage_dict = {} 2189 sub_stage_ordering = [] 2190 2191 while True: 2192 try: 2193 yielded = pipeline_iter.send(next_value) 2194 except StopIteration: 2195 break 2196 except Exception, e: 2197 if self.handle_run_exception(pipeline_key, pipeline_func, e): 2198 raise 2199 else: 2200 return 2201 2202 if isinstance(yielded, Pipeline): 2203 if yielded in sub_stage_dict: 2204 raise UnexpectedPipelineError( 2205 'Already yielded pipeline object %r with pipeline ID %s' % 2206 (yielded, yielded.pipeline_id)) 2207 2208 last_sub_stage = yielded 2209 next_value = PipelineFuture(yielded.output_names) 2210 next_value._after_all_pipelines.update(After._local._after_all_futures) 2211 next_value._after_all_pipelines.update(InOrder._local._in_order_futures) 2212 sub_stage_dict[yielded] = next_value 2213 sub_stage_ordering.append(yielded) 2214 InOrder._add_future(next_value) 2215 2216 # To aid local testing, the task_retry flag (which instructs the 2217 # evaluator to raise all exceptions back up to the task queue) is 2218 # inherited by all children from the root down. 2219 yielded.task_retry = pipeline_func.task_retry 2220 else: 2221 raise UnexpectedPipelineError( 2222 'Yielded a disallowed value: %r' % yielded) 2223 2224 if last_sub_stage: 2225 # Final yielded stage inherits outputs from calling pipeline that were not 2226 # already filled during the generator's execution. 2227 inherited_outputs = params['output_slots'] 2228 for slot_name in self.session_filled_output_names: 2229 del inherited_outputs[slot_name] 2230 sub_stage_dict[last_sub_stage]._inherit_outputs( 2231 pipeline_record.class_path, inherited_outputs) 2232 else: 2233 # Here the generator has yielded nothing, and thus acts as a synchronous 2234 # function. We can skip the rest of the generator steps completely and 2235 # fill the default output slot to cause finalizing. 2236 expected_outputs = set(caller_output._output_dict.iterkeys()) 2237 expected_outputs.remove('default') 2238 found_outputs = self.session_filled_output_names 2239 if expected_outputs != found_outputs: 2240 exception = SlotNotFilledError( 2241 'Outputs %r for pipeline ID "%s" were never filled by "%s".' % ( 2242 expected_outputs - found_outputs, 2243 pipeline_key.name(), pipeline_func._class_path)) 2244 if self.handle_run_exception(pipeline_key, pipeline_func, exception): 2245 raise exception 2246 else: 2247 self.fill_slot(pipeline_key, caller_output.default, None) 2248 self.transition_run(pipeline_key) 2249 return 2250 2251 # Allocate any SlotRecords that do not yet exist. 2252 entities_to_put = [] 2253 for future in sub_stage_dict.itervalues(): 2254 for slot in future._output_dict.itervalues(): 2255 if not slot._exists: 2256 entities_to_put.append(_SlotRecord( 2257 key=slot.key, root_pipeline=root_pipeline_key)) 2258 2259 # Allocate PipelineRecords and BarrierRecords for generator-run Pipelines. 2260 pipelines_to_run = set() 2261 all_children_keys = [] 2262 all_output_slots = set() 2263 for sub_stage in sub_stage_ordering: 2264 future = sub_stage_dict[sub_stage] 2265 2266 # Catch any exceptions that are thrown when the pipeline's parameters 2267 # are being serialized. This ensures that serialization errors will 2268 # cause normal retry/abort behavior. 2269 try: 2270 dependent_slots, output_slots, params_text, params_blob = \ 2271 _generate_args(sub_stage, future, self.queue_name, self.base_path) 2272 except Exception, e: 2273 retry_message = 'Bad child arguments. %s: %s' % ( 2274 e.__class__.__name__, str(e)) 2275 logging.exception( 2276 'Generator %r#%s caused exception while serializing args for ' 2277 'child pipeline %r. %s', pipeline_func, pipeline_key.name(), 2278 sub_stage, retry_message) 2279 self.transition_retry(pipeline_key, retry_message) 2280 if pipeline_func.task_retry: 2281 raise 2282 else: 2283 return 2284 2285 child_pipeline_key = db.Key.from_path( 2286 _PipelineRecord.kind(), uuid.uuid4().hex) 2287 all_output_slots.update(output_slots) 2288 all_children_keys.append(child_pipeline_key) 2289 2290 child_pipeline = _PipelineRecord( 2291 key=child_pipeline_key, 2292 root_pipeline=root_pipeline_key, 2293 # Bug in DB means we need to use the storage name here, 2294 # not the local property name. 2295 params=params_text, 2296 params_blob=params_blob, 2297 class_path=sub_stage._class_path, 2298 max_attempts=sub_stage.max_attempts) 2299 entities_to_put.append(child_pipeline) 2300 2301 if not dependent_slots: 2302 # This child pipeline will run immediately. 2303 pipelines_to_run.add(child_pipeline_key) 2304 child_pipeline.start_time = self._gettime() 2305 else: 2306 entities_to_put.extend(_PipelineContext._create_barrier_entities( 2307 root_pipeline_key, 2308 child_pipeline_key, 2309 _BarrierRecord.START, 2310 dependent_slots)) 2311 2312 entities_to_put.extend(_PipelineContext._create_barrier_entities( 2313 root_pipeline_key, 2314 child_pipeline_key, 2315 _BarrierRecord.FINALIZE, 2316 output_slots)) 2317 2318 # This generator pipeline's finalization barrier must include all of the 2319 # outputs of any child pipelines that it runs. This ensures the finalized 2320 # calls will not happen until all child pipelines have completed. 2321 # 2322 # The transition_run() call below will update the FINALIZE _BarrierRecord 2323 # for this generator pipeline to include all of these child outputs in 2324 # its list of blocking_slots. That update is done transactionally to 2325 # make sure the _BarrierRecord only lists the slots that matter. 2326 # 2327 # However, the notify_barriers() method doesn't find _BarrierRecords 2328 # through the blocking_slots field. It finds them through _BarrierIndexes 2329 # entities. Thus, before we update the FINALIZE _BarrierRecord in 2330 # transition_run(), we need to write _BarrierIndexes for all child outputs. 2331 barrier_entities = _PipelineContext._create_barrier_entities( 2332 root_pipeline_key, 2333 pipeline_key, 2334 _BarrierRecord.FINALIZE, 2335 all_output_slots) 2336 # Ignore the first element which is the _BarrierRecord. That entity must 2337 # have already been created and put in the datastore for the parent 2338 # pipeline before this code generated child pipelines. 2339 barrier_indexes = barrier_entities[1:] 2340 entities_to_put.extend(barrier_indexes) 2341 2342 db.put(entities_to_put) 2343 2344 self.transition_run(pipeline_key, 2345 blocking_slot_keys=all_output_slots, 2346 fanned_out_pipelines=all_children_keys, 2347 pipelines_to_run=pipelines_to_run) 2348 2349 @staticmethod 2350 def _create_barrier_entities(root_pipeline_key, 2351 child_pipeline_key, 2352 purpose, 2353 blocking_slot_keys): 2354 """Creates all of the entities required for a _BarrierRecord. 2355 2356 Args: 2357 root_pipeline_key: The root pipeline this is part of. 2358 child_pipeline_key: The pipeline this barrier is for. 2359 purpose: _BarrierRecord.START or _BarrierRecord.FINALIZE. 2360 blocking_slot_keys: Set of db.Keys corresponding to _SlotRecords that 2361 this barrier should wait on before firing. 2362 2363 Returns: 2364 List of entities, starting with the _BarrierRecord entity, followed by 2365 _BarrierIndexes used for firing when _SlotRecords are filled in the same 2366 order as the blocking_slot_keys list provided. All of these entities 2367 should be put in the Datastore to ensure the barrier fires properly. 2368 """ 2369 result = [] 2370 2371 blocking_slot_keys = list(blocking_slot_keys) 2372 2373 barrier = _BarrierRecord( 2374 parent=child_pipeline_key, 2375 key_name=purpose, 2376 target=child_pipeline_key, 2377 root_pipeline=root_pipeline_key, 2378 blocking_slots=blocking_slot_keys) 2379 2380 result.append(barrier) 2381 2382 for slot_key in blocking_slot_keys: 2383 barrier_index_path = [] 2384 barrier_index_path.extend(slot_key.to_path()) 2385 barrier_index_path.extend(child_pipeline_key.to_path()) 2386 barrier_index_path.extend([_BarrierIndex.kind(), purpose]) 2387 barrier_index_key = db.Key.from_path(*barrier_index_path) 2388 barrier_index = _BarrierIndex( 2389 key=barrier_index_key, 2390 root_pipeline=root_pipeline_key) 2391 result.append(barrier_index) 2392 2393 return result 2394 2395 def handle_run_exception(self, pipeline_key, pipeline_func, e): 2396 """Handles an exception raised by a Pipeline's user code. 2397 2398 Args: 2399 pipeline_key: The pipeline that raised the error. 2400 pipeline_func: The class path name of the Pipeline that was running. 2401 e: The exception that was raised. 2402 2403 Returns: 2404 True if the exception should be re-raised up through the calling stack 2405 by the caller of this method. 2406 """ 2407 if isinstance(e, Retry): 2408 retry_message = str(e) 2409 logging.warning('User forced retry for pipeline ID "%s" of %r: %s', 2410 pipeline_key.name(), pipeline_func, retry_message) 2411 self.transition_retry(pipeline_key, retry_message) 2412 elif isinstance(e, Abort): 2413 abort_message = str(e) 2414 logging.warning('User forced abort for pipeline ID "%s" of %r: %s', 2415 pipeline_key.name(), pipeline_func, abort_message) 2416 pipeline_func.abort(abort_message) 2417 else: 2418 retry_message = '%s: %s' % (e.__class__.__name__, str(e)) 2419 logging.exception('Generator %r#%s raised exception. %s', 2420 pipeline_func, pipeline_key.name(), retry_message) 2421 self.transition_retry(pipeline_key, retry_message) 2422 2423 return pipeline_func.task_retry 2424 2425 def transition_run(self, 2426 pipeline_key, 2427 blocking_slot_keys=None, 2428 fanned_out_pipelines=None, 2429 pipelines_to_run=None): 2430 """Marks an asynchronous or generator pipeline as running. 2431 2432 Does nothing if the pipeline is no longer in a runnable state. 2433 2434 Args: 2435 pipeline_key: The db.Key of the _PipelineRecord to update. 2436 blocking_slot_keys: List of db.Key instances that this pipeline's 2437 finalization barrier should wait on in addition to the existing one. 2438 This is used to update the barrier to include all child outputs. When 2439 None, the barrier will not be updated. 2440 fanned_out_pipelines: List of db.Key instances of _PipelineRecords that 2441 were fanned out by this generator pipeline. This is distinct from the 2442 'pipelines_to_run' list because not all of the pipelines listed here 2443 will be immediately ready to execute. When None, then this generator 2444 yielded no children. 2445 pipelines_to_run: List of db.Key instances of _PipelineRecords that should 2446 be kicked off (fan-out) transactionally as part of this transition. 2447 When None, no child pipelines will run. All db.Keys in this list must 2448 also be present in the fanned_out_pipelines list. 2449 2450 Raises: 2451 UnexpectedPipelineError if blocking_slot_keys was not empty and the 2452 _BarrierRecord has gone missing. 2453 """ 2454 def txn(): 2455 pipeline_record = db.get(pipeline_key) 2456 if pipeline_record is None: 2457 logging.warning('Pipeline ID "%s" cannot be marked as run. ' 2458 'Does not exist.', pipeline_key.name()) 2459 raise db.Rollback() 2460 if pipeline_record.status != _PipelineRecord.WAITING: 2461 logging.warning('Pipeline ID "%s" in bad state to be marked as run: %s', 2462 pipeline_key.name(), pipeline_record.status) 2463 raise db.Rollback() 2464 2465 pipeline_record.status = _PipelineRecord.RUN 2466 2467 if fanned_out_pipelines: 2468 # NOTE: We must model the pipeline relationship in a top-down manner, 2469 # meaning each pipeline must point forward to the pipelines that it 2470 # fanned out to. The reason is race conditions. If evaluate() 2471 # dies early, it may create many unused _PipelineRecord and _SlotRecord 2472 # instances that never progress. The only way we know which of these 2473 # are valid is by traversing the graph from the root, where the 2474 # fanned_out property refers to those pipelines that were run using a 2475 # transactional task. 2476 child_pipeline_list = list(fanned_out_pipelines) 2477 pipeline_record.fanned_out = child_pipeline_list 2478 2479 if pipelines_to_run: 2480 child_indexes = [ 2481 child_pipeline_list.index(p) for p in pipelines_to_run] 2482 child_indexes.sort() 2483 task = taskqueue.Task( 2484 url=self.fanout_handler_path, 2485 params=dict(parent_key=str(pipeline_key), 2486 child_indexes=child_indexes)) 2487 task.add(queue_name=self.queue_name, transactional=True) 2488 2489 pipeline_record.put() 2490 2491 if blocking_slot_keys: 2492 # NOTE: Always update a generator pipeline's finalization barrier to 2493 # include all of the outputs of any pipelines that it runs, to ensure 2494 # that finalized calls will not happen until all child pipelines have 2495 # completed. This must happen transactionally with the enqueue of 2496 # the fan-out kickoff task above to ensure the child output slots and 2497 # the barrier blocking slots are the same. 2498 barrier_key = db.Key.from_path( 2499 _BarrierRecord.kind(), _BarrierRecord.FINALIZE, 2500 parent=pipeline_key) 2501 finalize_barrier = db.get(barrier_key) 2502 if finalize_barrier is None: 2503 raise UnexpectedPipelineError( 2504 'Pipeline ID "%s" cannot update finalize barrier. ' 2505 'Does not exist.' % pipeline_key.name()) 2506 else: 2507 finalize_barrier.blocking_slots = list( 2508 blocking_slot_keys.union(set(finalize_barrier.blocking_slots))) 2509 finalize_barrier.put() 2510 2511 db.run_in_transaction(txn) 2512 2513 def transition_complete(self, pipeline_key): 2514 """Marks the given pipeline as complete. 2515 2516 Does nothing if the pipeline is no longer in a state that can be completed. 2517 2518 Args: 2519 pipeline_key: db.Key of the _PipelineRecord that has completed. 2520 """ 2521 def txn(): 2522 pipeline_record = db.get(pipeline_key) 2523 if pipeline_record is None: 2524 logging.warning( 2525 'Tried to mark pipeline ID "%s" as complete but it does not exist.', 2526 pipeline_key.name()) 2527 raise db.Rollback() 2528 if pipeline_record.status not in ( 2529 _PipelineRecord.WAITING, _PipelineRecord.RUN): 2530 logging.warning( 2531 'Tried to mark pipeline ID "%s" as complete, found bad state: %s', 2532 pipeline_key.name(), pipeline_record.status) 2533 raise db.Rollback() 2534 2535 pipeline_record.status = _PipelineRecord.DONE 2536 pipeline_record.finalized_time = self._gettime() 2537 pipeline_record.put() 2538 2539 db.run_in_transaction(txn) 2540 2541 def transition_retry(self, pipeline_key, retry_message): 2542 """Marks the given pipeline as requiring another retry. 2543 2544 Does nothing if all attempts have been exceeded. 2545 2546 Args: 2547 pipeline_key: db.Key of the _PipelineRecord that needs to be retried. 2548 retry_message: User-supplied message indicating the reason for the retry. 2549 """ 2550 def txn(): 2551 pipeline_record = db.get(pipeline_key) 2552 if pipeline_record is None: 2553 logging.warning( 2554 'Tried to retry pipeline ID "%s" but it does not exist.', 2555 pipeline_key.name()) 2556 raise db.Rollback() 2557 if pipeline_record.status not in ( 2558 _PipelineRecord.WAITING, _PipelineRecord.RUN): 2559 logging.warning( 2560 'Tried to retry pipeline ID "%s", found bad state: %s', 2561 pipeline_key.name(), pipeline_record.status) 2562 raise db.Rollback() 2563 2564 params = pipeline_record.params 2565 offset_seconds = ( 2566 params['backoff_seconds'] * 2567 (params['backoff_factor'] ** pipeline_record.current_attempt)) 2568 pipeline_record.next_retry_time = ( 2569 self._gettime() + datetime.timedelta(seconds=offset_seconds)) 2570 pipeline_record.current_attempt += 1 2571 pipeline_record.retry_message = retry_message 2572 pipeline_record.status = _PipelineRecord.WAITING 2573 2574 if pipeline_record.current_attempt >= pipeline_record.max_attempts: 2575 root_pipeline_key = ( 2576 _PipelineRecord.root_pipeline.get_value_for_datastore( 2577 pipeline_record)) 2578 logging.warning( 2579 'Giving up on pipeline ID "%s" after %d attempt(s); causing abort ' 2580 'all the way to the root pipeline ID "%s"', pipeline_key.name(), 2581 pipeline_record.current_attempt, root_pipeline_key.name()) 2582 # NOTE: We do *not* set the status to aborted here to ensure that 2583 # this pipeline will be finalized before it has been marked as aborted. 2584 pipeline_record.abort_message = ( 2585 'Aborting after %d attempts' % pipeline_record.current_attempt) 2586 task = taskqueue.Task( 2587 url=self.fanout_abort_handler_path, 2588 params=dict(root_pipeline_key=root_pipeline_key)) 2589 task.add(queue_name=self.queue_name, transactional=True) 2590 else: 2591 task = taskqueue.Task( 2592 url=self.pipeline_handler_path, 2593 eta=pipeline_record.next_retry_time, 2594 params=dict(pipeline_key=pipeline_key, 2595 purpose=_BarrierRecord.START, 2596 attempt=pipeline_record.current_attempt), 2597 headers={'X-Ae-Pipeline-Key': pipeline_key}) 2598 task.add(queue_name=self.queue_name, transactional=True) 2599 2600 pipeline_record.put() 2601 2602 db.run_in_transaction(txn) 2603 2604 def transition_aborted(self, pipeline_key): 2605 """Makes the given pipeline as having aborted. 2606 2607 Does nothing if the pipeline is in a bad state. 2608 2609 Args: 2610 pipeline_key: db.Key of the _PipelineRecord that needs to be retried. 2611 """ 2612 def txn(): 2613 pipeline_record = db.get(pipeline_key) 2614 if pipeline_record is None: 2615 logging.warning( 2616 'Tried to abort pipeline ID "%s" but it does not exist.', 2617 pipeline_key.name()) 2618 raise db.Rollback() 2619 if pipeline_record.status not in ( 2620 _PipelineRecord.WAITING, _PipelineRecord.RUN): 2621 logging.warning( 2622 'Tried to abort pipeline ID "%s", found bad state: %s', 2623 pipeline_key.name(), pipeline_record.status) 2624 raise db.Rollback() 2625 2626 pipeline_record.status = _PipelineRecord.ABORTED 2627 pipeline_record.finalized_time = self._gettime() 2628 pipeline_record.put() 2629 2630 db.run_in_transaction(txn) 2631 2632################################################################################ 2633 2634 2635class _BarrierHandler(webapp.RequestHandler): 2636 """Request handler for triggering barriers.""" 2637 2638 def post(self): 2639 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2640 self.response.set_status(403) 2641 return 2642 2643 context = _PipelineContext.from_environ(self.request.environ) 2644 context.notify_barriers( 2645 self.request.get('slot_key'), 2646 self.request.get('cursor'), 2647 use_barrier_indexes=self.request.get('use_barrier_indexes') == 'True') 2648 2649 2650class _PipelineHandler(webapp.RequestHandler): 2651 """Request handler for running pipelines.""" 2652 2653 def post(self): 2654 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2655 self.response.set_status(403) 2656 return 2657 2658 context = _PipelineContext.from_environ(self.request.environ) 2659 context.evaluate(self.request.get('pipeline_key'), 2660 purpose=self.request.get('purpose'), 2661 attempt=int(self.request.get('attempt', '0'))) 2662 2663 2664class _FanoutAbortHandler(webapp.RequestHandler): 2665 """Request handler for fanning out abort notifications.""" 2666 2667 def post(self): 2668 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2669 self.response.set_status(403) 2670 return 2671 2672 context = _PipelineContext.from_environ(self.request.environ) 2673 context.continue_abort( 2674 self.request.get('root_pipeline_key'), 2675 self.request.get('cursor')) 2676 2677 2678class _FanoutHandler(webapp.RequestHandler): 2679 """Request handler for fanning out pipeline children.""" 2680 2681 def post(self): 2682 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2683 self.response.set_status(403) 2684 return 2685 2686 context = _PipelineContext.from_environ(self.request.environ) 2687 2688 # Set of stringified db.Keys of children to run. 2689 all_pipeline_keys = set() 2690 2691 # For backwards compatibility with the old style of fan-out requests. 2692 all_pipeline_keys.update(self.request.get_all('pipeline_key')) 2693 2694 # Fetch the child pipelines from the parent. This works around the 10KB 2695 # task payload limit. This get() is consistent-on-read and the fan-out 2696 # task is enqueued in the transaction that updates the parent, so the 2697 # fanned_out property is consistent here. 2698 parent_key = self.request.get('parent_key') 2699 child_indexes = [int(x) for x in self.request.get_all('child_indexes')] 2700 if parent_key: 2701 parent_key = db.Key(parent_key) 2702 parent = db.get(parent_key) 2703 for index in child_indexes: 2704 all_pipeline_keys.add(str(parent.fanned_out[index])) 2705 2706 all_tasks = [] 2707 all_pipelines = db.get([db.Key(pipeline_key) for pipeline_key in all_pipeline_keys]) 2708 for child_pipeline in all_pipelines: 2709 if child_pipeline is None: 2710 continue 2711 pipeline_key = str(child_pipeline.key()) 2712 all_tasks.append(taskqueue.Task( 2713 url=context.pipeline_handler_path, 2714 params=dict(pipeline_key=pipeline_key), 2715 target=child_pipeline.params['target'], 2716 headers={'X-Ae-Pipeline-Key': pipeline_key}, 2717 name='ae-pipeline-fan-out-' + child_pipeline.key().name())) 2718 2719 batch_size = 100 # Limit of taskqueue API bulk add. 2720 for i in xrange(0, len(all_tasks), batch_size): 2721 batch = all_tasks[i:i+batch_size] 2722 try: 2723 taskqueue.Queue(context.queue_name).add(batch) 2724 except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError): 2725 pass 2726 2727 2728class _CleanupHandler(webapp.RequestHandler): 2729 """Request handler for cleaning up a Pipeline.""" 2730 2731 def post(self): 2732 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2733 self.response.set_status(403) 2734 return 2735 2736 root_pipeline_key = db.Key(self.request.get('root_pipeline_key')) 2737 logging.debug('Cleaning up root_pipeline_key=%r', root_pipeline_key) 2738 2739 # TODO(user): Accumulate all BlobKeys from _PipelineRecord and 2740 # _SlotRecord entities and delete them. 2741 pipeline_keys = ( 2742 _PipelineRecord.all(keys_only=True) 2743 .filter('root_pipeline =', root_pipeline_key)) 2744 db.delete(pipeline_keys) 2745 slot_keys = ( 2746 _SlotRecord.all(keys_only=True) 2747 .filter('root_pipeline =', root_pipeline_key)) 2748 db.delete(slot_keys) 2749 barrier_keys = ( 2750 _BarrierRecord.all(keys_only=True) 2751 .filter('root_pipeline =', root_pipeline_key)) 2752 db.delete(barrier_keys) 2753 status_keys = ( 2754 _StatusRecord.all(keys_only=True) 2755 .filter('root_pipeline =', root_pipeline_key)) 2756 db.delete(status_keys) 2757 barrier_index_keys = ( 2758 _BarrierIndex.all(keys_only=True) 2759 .filter('root_pipeline =', root_pipeline_key)) 2760 db.delete(barrier_index_keys) 2761 2762 2763class _CallbackHandler(webapp.RequestHandler): 2764 """Receives asynchronous callback requests from humans or tasks.""" 2765 2766 def post(self): 2767 self.get() 2768 2769 def get(self): 2770 try: 2771 self.run_callback() 2772 except _CallbackTaskError, e: 2773 logging.error(str(e)) 2774 if 'HTTP_X_APPENGINE_TASKRETRYCOUNT' in self.request.environ: 2775 # Silently give up on tasks that have retried many times. This 2776 # probably means that the target pipeline has been deleted, so there's 2777 # no reason to keep trying this task forever. 2778 retry_count = int( 2779 self.request.environ.get('HTTP_X_APPENGINE_TASKRETRYCOUNT')) 2780 if retry_count > _MAX_CALLBACK_TASK_RETRIES: 2781 logging.error('Giving up on task after %d retries', 2782 _MAX_CALLBACK_TASK_RETRIES) 2783 return 2784 2785 # NOTE: The undescriptive error code 400 are present to address security 2786 # risks of giving external users access to cause PipelineRecord lookups 2787 # and execution. 2788 self.response.set_status(400) 2789 2790 def run_callback(self): 2791 """Runs the callback for the pipeline specified in the request. 2792 2793 Raises: 2794 _CallbackTaskError if something was wrong with the request parameters. 2795 """ 2796 pipeline_id = self.request.get('pipeline_id') 2797 if not pipeline_id: 2798 raise _CallbackTaskError('"pipeline_id" parameter missing.') 2799 2800 pipeline_key = db.Key.from_path(_PipelineRecord.kind(), pipeline_id) 2801 pipeline_record = db.get(pipeline_key) 2802 if pipeline_record is None: 2803 raise _CallbackTaskError( 2804 'Pipeline ID "%s" for callback does not exist.' % pipeline_id) 2805 2806 params = pipeline_record.params 2807 real_class_path = params['class_path'] 2808 try: 2809 pipeline_func_class = mr_util.for_name(real_class_path) 2810 except ImportError, e: 2811 raise _CallbackTaskError( 2812 'Cannot load class named "%s" for pipeline ID "%s".' 2813 % (real_class_path, pipeline_id)) 2814 2815 if 'HTTP_X_APPENGINE_TASKNAME' not in self.request.environ: 2816 if pipeline_func_class.public_callbacks: 2817 pass 2818 elif pipeline_func_class.admin_callbacks: 2819 if not users.is_current_user_admin(): 2820 raise _CallbackTaskError( 2821 'Unauthorized callback for admin-only pipeline ID "%s"' 2822 % pipeline_id) 2823 else: 2824 raise _CallbackTaskError( 2825 'External callback for internal-only pipeline ID "%s"' 2826 % pipeline_id) 2827 2828 kwargs = {} 2829 for key in self.request.arguments(): 2830 if key != 'pipeline_id': 2831 kwargs[str(key)] = self.request.get(key) 2832 2833 def perform_callback(): 2834 stage = pipeline_func_class.from_id(pipeline_id) 2835 if stage is None: 2836 raise _CallbackTaskError( 2837 'Pipeline ID "%s" deleted during callback' % pipeline_id) 2838 return stage._callback_internal(kwargs) 2839 2840 # callback_xg_transaction is a 3-valued setting (None=no trans, 2841 # False=1-eg-trans, True=xg-trans) 2842 if pipeline_func_class._callback_xg_transaction is not None: 2843 transaction_options = db.create_transaction_options( 2844 xg=pipeline_func_class._callback_xg_transaction) 2845 callback_result = db.run_in_transaction_options(transaction_options, 2846 perform_callback) 2847 else: 2848 callback_result = perform_callback() 2849 2850 if callback_result is not None: 2851 status_code, content_type, content = callback_result 2852 self.response.set_status(status_code) 2853 self.response.headers['Content-Type'] = content_type 2854 self.response.out.write(content) 2855 2856 2857################################################################################ 2858 2859def _get_timestamp_ms(when): 2860 """Converts a datetime.datetime to integer milliseconds since the epoch. 2861 2862 Requires special handling to preserve microseconds. 2863 2864 Args: 2865 when: A datetime.datetime instance. 2866 2867 Returns: 2868 Integer time since the epoch in milliseconds. If the supplied 'when' is 2869 None, the return value will be None. 2870 """ 2871 if when is None: 2872 return None 2873 ms_since_epoch = float(time.mktime(when.utctimetuple()) * 1000.0) 2874 ms_since_epoch += when.microsecond / 1000.0 2875 return int(ms_since_epoch) 2876 2877 2878def _get_internal_status(pipeline_key=None, 2879 pipeline_dict=None, 2880 slot_dict=None, 2881 barrier_dict=None, 2882 status_dict=None): 2883 """Gets the UI dictionary of a pipeline from a set of status dictionaries. 2884 2885 Args: 2886 pipeline_key: The key of the pipeline to lookup. 2887 pipeline_dict: Dictionary mapping pipeline db.Key to _PipelineRecord. 2888 Default is an empty dictionary. 2889 slot_dict: Dictionary mapping slot db.Key to _SlotRecord. 2890 Default is an empty dictionary. 2891 barrier_dict: Dictionary mapping barrier db.Key to _BarrierRecord. 2892 Default is an empty dictionary. 2893 status_dict: Dictionary mapping status record db.Key to _StatusRecord. 2894 Default is an empty dictionary. 2895 2896 Returns: 2897 Dictionary with the keys: 2898 classPath: The pipeline function being run. 2899 args: List of positional argument slot dictionaries. 2900 kwargs: Dictionary of keyword argument slot dictionaries. 2901 outputs: Dictionary of output slot dictionaries. 2902 children: List of child pipeline IDs. 2903 queueName: Queue on which this pipeline is running. 2904 afterSlotKeys: List of Slot Ids after which this pipeline runs. 2905 currentAttempt: Number of the current attempt, starting at 1. 2906 maxAttempts: Maximum number of attempts before aborting. 2907 backoffSeconds: Constant factor for backoff before retrying. 2908 backoffFactor: Exponential factor for backoff before retrying. 2909 status: Current status of the pipeline. 2910 startTimeMs: When this pipeline ran or will run due to retries, if present. 2911 endTimeMs: When this pipeline finalized, if present. 2912 lastRetryMessage: Why the pipeline failed during the last retry, if there 2913 was a failure; may be empty. 2914 abortMessage: For root pipelines, why the pipeline was aborted if it was 2915 aborted; may be empty. 2916 2917 Dictionary will contain these keys if explicit status is set: 2918 statusTimeMs: When the status was set as milliseconds since the epoch. 2919 statusMessage: Status message, if present. 2920 statusConsoleUrl: The relative URL for the console of this pipeline. 2921 statusLinks: Dictionary mapping human-readable names to relative URLs 2922 for related URLs to this pipeline. 2923 2924 Raises: 2925 PipelineStatusError if any input is bad. 2926 """ 2927 if pipeline_dict is None: 2928 pipeline_dict = {} 2929 if slot_dict is None: 2930 slot_dict = {} 2931 if barrier_dict is None: 2932 barrier_dict = {} 2933 if status_dict is None: 2934 status_dict = {} 2935 2936 pipeline_record = pipeline_dict.get(pipeline_key) 2937 if pipeline_record is None: 2938 raise PipelineStatusError( 2939 'Could not find pipeline ID "%s"' % pipeline_key.name()) 2940 2941 params = pipeline_record.params 2942 root_pipeline_key = \ 2943 _PipelineRecord.root_pipeline.get_value_for_datastore(pipeline_record) 2944 default_slot_key = db.Key(params['output_slots']['default']) 2945 start_barrier_key = db.Key.from_path( 2946 _BarrierRecord.kind(), _BarrierRecord.START, parent=pipeline_key) 2947 finalize_barrier_key = db.Key.from_path( 2948 _BarrierRecord.kind(), _BarrierRecord.FINALIZE, parent=pipeline_key) 2949 status_record_key = db.Key.from_path( 2950 _StatusRecord.kind(), pipeline_key.name()) 2951 2952 start_barrier = barrier_dict.get(start_barrier_key) 2953 finalize_barrier = barrier_dict.get(finalize_barrier_key) 2954 default_slot = slot_dict.get(default_slot_key) 2955 status_record = status_dict.get(status_record_key) 2956 if finalize_barrier is None: 2957 raise PipelineStatusError( 2958 'Finalization barrier missing for pipeline ID "%s"' % 2959 pipeline_key.name()) 2960 if default_slot is None: 2961 raise PipelineStatusError( 2962 'Default output slot with key=%s missing for pipeline ID "%s"' % ( 2963 default_slot_key, pipeline_key.name())) 2964 2965 output = { 2966 'classPath': pipeline_record.class_path, 2967 'args': list(params['args']), 2968 'kwargs': params['kwargs'].copy(), 2969 'outputs': params['output_slots'].copy(), 2970 'children': [key.name() for key in pipeline_record.fanned_out], 2971 'queueName': params['queue_name'], 2972 'afterSlotKeys': [str(key) for key in params['after_all']], 2973 'currentAttempt': pipeline_record.current_attempt + 1, 2974 'maxAttempts': pipeline_record.max_attempts, 2975 'backoffSeconds': pipeline_record.params['backoff_seconds'], 2976 'backoffFactor': pipeline_record.params['backoff_factor'], 2977 } 2978 2979 # TODO(user): Truncate args, kwargs, and outputs to < 1MB each so we 2980 # can reasonably return the whole tree of pipelines and their outputs. 2981 # Coerce each value to a string to truncate if necessary. For now if the 2982 # params are too big it will just cause the whole status page to break. 2983 2984 # Fix the key names in parameters to match JavaScript style. 2985 for value_dict in itertools.chain( 2986 output['args'], output['kwargs'].itervalues()): 2987 if 'slot_key' in value_dict: 2988 value_dict['slotKey'] = value_dict.pop('slot_key') 2989 2990 # Figure out the pipeline's status. 2991 if pipeline_record.status in (_PipelineRecord.WAITING, _PipelineRecord.RUN): 2992 if default_slot.status == _SlotRecord.FILLED: 2993 status = 'finalizing' 2994 elif (pipeline_record.status == _PipelineRecord.WAITING and 2995 pipeline_record.next_retry_time is not None): 2996 status = 'retry' 2997 elif start_barrier and start_barrier.status == _BarrierRecord.WAITING: 2998 # start_barrier will be missing for root pipelines 2999 status = 'waiting' 3000 else: 3001 status = 'run' 3002 elif pipeline_record.status == _PipelineRecord.DONE: 3003 status = 'done' 3004 elif pipeline_record.status == _PipelineRecord.ABORTED: 3005 status = 'aborted' 3006 3007 output['status'] = status 3008 3009 if status_record: 3010 output['statusTimeMs'] = _get_timestamp_ms(status_record.status_time) 3011 if status_record.message: 3012 output['statusMessage'] = status_record.message 3013 if status_record.console_url: 3014 output['statusConsoleUrl'] = status_record.console_url 3015 if status_record.link_names: 3016 output['statusLinks'] = dict( 3017 zip(status_record.link_names, status_record.link_urls)) 3018 3019 # Populate status-depenedent fields. 3020 if status in ('run', 'finalizing', 'done', 'retry'): 3021 if pipeline_record.next_retry_time is not None: 3022 output['startTimeMs'] = _get_timestamp_ms(pipeline_record.next_retry_time) 3023 elif start_barrier: 3024 # start_barrier will be missing for root pipelines 3025 output['startTimeMs'] = _get_timestamp_ms(start_barrier.trigger_time) 3026 elif pipeline_record.start_time: 3027 # Assume this pipeline ran immediately upon spawning with no 3028 # start barrier or it's the root pipeline. 3029 output['startTimeMs'] = _get_timestamp_ms(pipeline_record.start_time) 3030 3031 if status in ('finalizing',): 3032 output['endTimeMs'] = _get_timestamp_ms(default_slot.fill_time) 3033 3034 if status in ('done',): 3035 output['endTimeMs'] = _get_timestamp_ms(pipeline_record.finalized_time) 3036 3037 if pipeline_record.next_retry_time is not None: 3038 output['lastRetryMessage'] = pipeline_record.retry_message 3039 3040 if pipeline_record.abort_message: 3041 output['abortMessage'] = pipeline_record.abort_message 3042 3043 return output 3044 3045 3046def _get_internal_slot(slot_key=None, 3047 filler_pipeline_key=None, 3048 slot_dict=None): 3049 """Gets information about a _SlotRecord for display in UI. 3050 3051 Args: 3052 slot_key: The db.Key of the slot to fetch. 3053 filler_pipeline_key: In the case the slot has not yet been filled, assume 3054 that the given db.Key (for a _PipelineRecord) will be the filler of 3055 the slot in the future. 3056 slot_dict: The slot JSON dictionary. 3057 3058 Returns: 3059 Dictionary with the keys: 3060 status: Slot status: 'filled' or 'waiting' 3061 fillTimeMs: Time in milliseconds since the epoch of when it was filled. 3062 value: The current value of the slot, which is a slot's JSON dictionary. 3063 fillerPipelineId: The pipeline ID of what stage has or should fill 3064 this slot. 3065 3066 Raises: 3067 PipelineStatusError if any input is bad. 3068 """ 3069 if slot_dict is None: 3070 slot_dict = {} 3071 3072 slot_record = slot_dict.get(slot_key) 3073 if slot_record is None: 3074 raise PipelineStatusError( 3075 'Could not find data for output slot key "%s".' % slot_key) 3076 3077 output = {} 3078 if slot_record.status == _SlotRecord.FILLED: 3079 output['status'] = 'filled' 3080 output['fillTimeMs'] = _get_timestamp_ms(slot_record.fill_time) 3081 output['value'] = slot_record.value 3082 filler_pipeline_key = ( 3083 _SlotRecord.filler.get_value_for_datastore(slot_record)) 3084 else: 3085 output['status'] = 'waiting' 3086 3087 if filler_pipeline_key: 3088 output['fillerPipelineId'] = filler_pipeline_key.name() 3089 3090 return output 3091 3092 3093def get_status_tree(root_pipeline_id): 3094 """Gets the full status tree of a pipeline. 3095 3096 Args: 3097 root_pipeline_id: The pipeline ID to get status for. 3098 3099 Returns: 3100 Dictionary with the keys: 3101 rootPipelineId: The ID of the root pipeline. 3102 slots: Mapping of slot IDs to result of from _get_internal_slot. 3103 pipelines: Mapping of pipeline IDs to result of _get_internal_status. 3104 3105 Raises: 3106 PipelineStatusError if any input is bad. 3107 """ 3108 root_pipeline_key = db.Key.from_path(_PipelineRecord.kind(), root_pipeline_id) 3109 root_pipeline_record = db.get(root_pipeline_key) 3110 if root_pipeline_record is None: 3111 raise PipelineStatusError( 3112 'Could not find pipeline ID "%s"' % root_pipeline_id) 3113 3114 # If the supplied root_pipeline_id is not actually the root pipeline that's 3115 # okay. We'll find the real root and override the value they passed in. 3116 actual_root_key = _PipelineRecord.root_pipeline.get_value_for_datastore( 3117 root_pipeline_record) 3118 if actual_root_key != root_pipeline_key: 3119 root_pipeline_key = actual_root_key 3120 root_pipeline_id = root_pipeline_key.id_or_name() 3121 root_pipeline_record = db.get(root_pipeline_key) 3122 if not root_pipeline_record: 3123 raise PipelineStatusError( 3124 'Could not find pipeline ID "%s"' % root_pipeline_id) 3125 3126 # Run all queries asynchronously. 3127 queries = {} 3128 for model in (_PipelineRecord, _SlotRecord, _BarrierRecord, _StatusRecord): 3129 queries[model] = model.all().filter( 3130 'root_pipeline =', root_pipeline_key).run(batch_size=1000) 3131 3132 found_pipeline_dict = dict( 3133 (stage.key(), stage) for stage in queries[_PipelineRecord]) 3134 found_slot_dict = dict( 3135 (slot.key(), slot) for slot in queries[_SlotRecord]) 3136 found_barrier_dict = dict( 3137 (barrier.key(), barrier) for barrier in queries[_BarrierRecord]) 3138 found_status_dict = dict( 3139 (status.key(), status) for status in queries[_StatusRecord]) 3140 3141 # Breadth-first traversal of _PipelineRecord instances by following 3142 # _PipelineRecord.fanned_out property values. 3143 valid_pipeline_keys = set([root_pipeline_key]) 3144 slot_filler_dict = {} # slot_key to pipeline_key 3145 expand_stack = [root_pipeline_record] 3146 while expand_stack: 3147 old_stack = expand_stack 3148 expand_stack = [] 3149 for pipeline_record in old_stack: 3150 for child_pipeline_key in pipeline_record.fanned_out: 3151 # This will let us prune off those pipelines which were allocated in 3152 # the Datastore but were never run due to mid-flight task failures. 3153 child_pipeline_record = found_pipeline_dict.get(child_pipeline_key) 3154 if child_pipeline_record is None: 3155 raise PipelineStatusError( 3156 'Pipeline ID "%s" points to child ID "%s" which does not exist.' 3157 % (pipeline_record.key().name(), child_pipeline_key.name())) 3158 expand_stack.append(child_pipeline_record) 3159 valid_pipeline_keys.add(child_pipeline_key) 3160 3161 # Figure out the deepest pipeline that's responsible for outputting to 3162 # a particular _SlotRecord, so we can report which pipeline *should* 3163 # be the filler. 3164 child_outputs = child_pipeline_record.params['output_slots'] 3165 for output_slot_key in child_outputs.itervalues(): 3166 slot_filler_dict[db.Key(output_slot_key)] = child_pipeline_key 3167 3168 output = { 3169 'rootPipelineId': root_pipeline_id, 3170 'slots': {}, 3171 'pipelines': {}, 3172 } 3173 3174 for pipeline_key in found_pipeline_dict.keys(): 3175 if pipeline_key not in valid_pipeline_keys: 3176 continue 3177 output['pipelines'][pipeline_key.name()] = _get_internal_status( 3178 pipeline_key=pipeline_key, 3179 pipeline_dict=found_pipeline_dict, 3180 slot_dict=found_slot_dict, 3181 barrier_dict=found_barrier_dict, 3182 status_dict=found_status_dict) 3183 3184 for slot_key, filler_pipeline_key in slot_filler_dict.iteritems(): 3185 output['slots'][str(slot_key)] = _get_internal_slot( 3186 slot_key=slot_key, 3187 filler_pipeline_key=filler_pipeline_key, 3188 slot_dict=found_slot_dict) 3189 3190 return output 3191 3192 3193def get_pipeline_names(): 3194 """Returns the class paths of all Pipelines defined in alphabetical order.""" 3195 class_path_set = set() 3196 for cls in _PipelineMeta._all_classes: 3197 if cls.class_path is not None: 3198 class_path_set.add(cls.class_path) 3199 return sorted(class_path_set) 3200 3201 3202def get_root_list(class_path=None, cursor=None, count=50): 3203 """Gets a list root Pipelines. 3204 3205 Args: 3206 class_path: Optional. If supplied, only return root Pipelines with the 3207 given class_path. By default all root pipelines are returned. 3208 cursor: Optional. When supplied, the cursor returned from the last call to 3209 get_root_list which indicates where to pick up. 3210 count: How many pipeline returns to return. 3211 3212 Returns: 3213 Dictionary with the keys: 3214 pipelines: The list of Pipeline records in the same format as 3215 returned by get_status_tree, but with only the roots listed. 3216 cursor: Cursor to pass back to this function to resume the query. Will 3217 only be present if there is another page of results. 3218 3219 Raises: 3220 PipelineStatusError if any input is bad. 3221 """ 3222 query = _PipelineRecord.all(cursor=cursor) 3223 if class_path: 3224 query.filter('class_path =', class_path) 3225 query.filter('is_root_pipeline =', True) 3226 query.order('-start_time') 3227 3228 root_list = query.fetch(count) 3229 3230 fetch_list = [] 3231 for pipeline_record in root_list: 3232 fetch_list.append(db.Key(pipeline_record.params['output_slots']['default'])) 3233 fetch_list.append(db.Key.from_path( 3234 _BarrierRecord.kind(), _BarrierRecord.FINALIZE, 3235 parent=pipeline_record.key())) 3236 fetch_list.append(db.Key.from_path( 3237 _StatusRecord.kind(), pipeline_record.key().name())) 3238 3239 pipeline_dict = dict((stage.key(), stage) for stage in root_list) 3240 slot_dict = {} 3241 barrier_dict = {} 3242 status_dict = {} 3243 for entity in db.get(fetch_list): 3244 if isinstance(entity, _BarrierRecord): 3245 barrier_dict[entity.key()] = entity 3246 elif isinstance(entity, _SlotRecord): 3247 slot_dict[entity.key()] = entity 3248 elif isinstance(entity, _StatusRecord): 3249 status_dict[entity.key()] = entity 3250 3251 results = [] 3252 for pipeline_record in root_list: 3253 try: 3254 output = _get_internal_status( 3255 pipeline_record.key(), 3256 pipeline_dict=pipeline_dict, 3257 slot_dict=slot_dict, 3258 barrier_dict=barrier_dict, 3259 status_dict=status_dict) 3260 output['pipelineId'] = pipeline_record.key().name() 3261 results.append(output) 3262 except PipelineStatusError, e: 3263 output = {'status': e.message} 3264 output['classPath'] = '' 3265 output['pipelineId'] = pipeline_record.key().name() 3266 results.append(output) 3267 3268 result_dict = {} 3269 cursor = query.cursor() 3270 query.with_cursor(cursor) 3271 if query.get(keys_only=True): 3272 result_dict.update(cursor=cursor) 3273 result_dict.update(pipelines=results) 3274 return result_dict 3275 3276################################################################################ 3277 3278def set_enforce_auth(new_status): 3279 """Sets whether Pipeline API handlers rely on app.yaml for access control. 3280 3281 Args: 3282 new_status: If True, then the Pipeline API will enforce its own 3283 access control on status and static file handlers. If False, then 3284 it will assume app.yaml is doing the enforcement. 3285 """ 3286 global _ENFORCE_AUTH 3287 _ENFORCE_AUTH = new_status 3288 3289 3290def create_handlers_map(prefix='.*'): 3291 """Create new handlers map. 3292 3293 Args: 3294 prefix: url prefix to use. 3295 3296 Returns: 3297 list of (regexp, handler) pairs for WSGIApplication constructor. 3298 """ 3299 return [ 3300 (prefix + '/output', _BarrierHandler), 3301 (prefix + '/run', _PipelineHandler), 3302 (prefix + '/finalized', _PipelineHandler), 3303 (prefix + '/cleanup', _CleanupHandler), 3304 (prefix + '/abort', _PipelineHandler), 3305 (prefix + '/fanout', _FanoutHandler), 3306 (prefix + '/fanout_abort', _FanoutAbortHandler), 3307 (prefix + '/callback', _CallbackHandler), 3308 (prefix + '/rpc/tree', status_ui._TreeStatusHandler), 3309 (prefix + '/rpc/class_paths', status_ui._ClassPathListHandler), 3310 (prefix + '/rpc/list', status_ui._RootListHandler), 3311 (prefix + '(/.+)', status_ui._StatusUiHandler), 3312 ] 3313