1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A tool to run long running tasks.
6
7This allows a task to run in Task Queue which gives about 10 minutes execution
8time.
9
10Usage:
11
12In https://chromeperf.appspot.com/_ah/stats/shell, pass a function to
13task_runner.Run.  Task function should be picklable and must include any
14required imports within the function's body.
15
16Example:
17
18  from dashboard import task_runner
19
20  def unique_test_suite_names():
21    from dashboard.models import graph_data
22    query = graph_data.Test.query(graph_data.Test.parent_test == None)
23    test_keys = query.fetch(limit=50000, keys_only=True)
24    return sorted(set(k.string_id() for k in test_keys))
25
26  task_runner.Run(unique_test_suite_names)
27
28The task function return value and stdouts will be displayed at:
29    https://chromeperf.appspot.com/get_logs?namespace=task_runner&name=report
30
31WARNING:
32Running code in Appstats does affect live dashboard.  So watchout for any
33datastore writes that may corrupt or unintentionally delete data.
34"""
35
36import datetime
37import marshal
38import cStringIO
39import sys
40import time
41import types
42
43from google.appengine.ext import deferred
44
45from dashboard import quick_logger
46
47_TASK_QUEUE_NAME = 'task-runner-queue'
48
49_REPORT_TEMPLATE = """%(function_name)s: %(start_time)s
50 Stdout:
51 %(stdout)s
52
53 Elapsed: %(elapsed_time)f seconds.
54 Returned results:
55 %(returned_results)s
56"""
57
58
59def Run(task_function):
60  """Runs task in task queue."""
61  # Since defer uses pickle and pickle can't serialize non-global function,
62  # we'll use marshal to serialize and deserialize the function code object
63  # before and after defer.
64  code_string = marshal.dumps(task_function.func_code)
65  deferred.defer(_TaskWrapper, code_string, task_function.__name__,
66                 _queue=_TASK_QUEUE_NAME)
67
68
69def _TaskWrapper(code_string, function_name):
70  """Runs the task and captures the stdout and the returned results."""
71  formatted_start_time = datetime.datetime.now().strftime(
72      '%Y-%m-%d %H:%M:%S %Z')
73  _AddReportToLog('Starting task "%s" at %s.' %
74                  (function_name, formatted_start_time))
75
76  code = marshal.loads(code_string)
77  task_function = types.FunctionType(code, globals(), 'TaskFunction')
78
79  stdout_original = sys.stdout
80  sys.stdout = stream = cStringIO.StringIO()
81  start_time = time.time()
82  try:
83    returned_results = task_function()
84  except Exception as e:  # Intentionally broad -- pylint: disable=broad-except
85    print str(e)
86    returned_results = ''
87  elapsed_time = time.time() - start_time
88  stdout = stream.getvalue()
89  sys.stdout = stdout_original
90
91  results = {
92      'function_name': function_name,
93      'start_time': formatted_start_time,
94      'stdout': stdout,
95      'returned_results': returned_results,
96      'elapsed_time': elapsed_time
97  }
98  _AddReportToLog(_REPORT_TEMPLATE % results)
99
100
101def _AddReportToLog(report):
102  """Adds a log for bench results."""
103  formatter = quick_logger.Formatter()
104  logger = quick_logger.QuickLogger('task_runner', 'report', formatter)
105  logger.Log(report)
106  logger.Save()
107