1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A tool to run long running tasks. 6 7This allows a task to run in Task Queue which gives about 10 minutes execution 8time. 9 10Usage: 11 12In https://chromeperf.appspot.com/_ah/stats/shell, pass a function to 13task_runner.Run. Task function should be picklable and must include any 14required imports within the function's body. 15 16Example: 17 18 from dashboard import task_runner 19 20 def unique_test_suite_names(): 21 from dashboard.models import graph_data 22 query = graph_data.Test.query(graph_data.Test.parent_test == None) 23 test_keys = query.fetch(limit=50000, keys_only=True) 24 return sorted(set(k.string_id() for k in test_keys)) 25 26 task_runner.Run(unique_test_suite_names) 27 28The task function return value and stdouts will be displayed at: 29 https://chromeperf.appspot.com/get_logs?namespace=task_runner&name=report 30 31WARNING: 32Running code in Appstats does affect live dashboard. So watchout for any 33datastore writes that may corrupt or unintentionally delete data. 34""" 35 36import datetime 37import marshal 38import cStringIO 39import sys 40import time 41import types 42 43from google.appengine.ext import deferred 44 45from dashboard import quick_logger 46 47_TASK_QUEUE_NAME = 'task-runner-queue' 48 49_REPORT_TEMPLATE = """%(function_name)s: %(start_time)s 50 Stdout: 51 %(stdout)s 52 53 Elapsed: %(elapsed_time)f seconds. 54 Returned results: 55 %(returned_results)s 56""" 57 58 59def Run(task_function): 60 """Runs task in task queue.""" 61 # Since defer uses pickle and pickle can't serialize non-global function, 62 # we'll use marshal to serialize and deserialize the function code object 63 # before and after defer. 64 code_string = marshal.dumps(task_function.func_code) 65 deferred.defer(_TaskWrapper, code_string, task_function.__name__, 66 _queue=_TASK_QUEUE_NAME) 67 68 69def _TaskWrapper(code_string, function_name): 70 """Runs the task and captures the stdout and the returned results.""" 71 formatted_start_time = datetime.datetime.now().strftime( 72 '%Y-%m-%d %H:%M:%S %Z') 73 _AddReportToLog('Starting task "%s" at %s.' % 74 (function_name, formatted_start_time)) 75 76 code = marshal.loads(code_string) 77 task_function = types.FunctionType(code, globals(), 'TaskFunction') 78 79 stdout_original = sys.stdout 80 sys.stdout = stream = cStringIO.StringIO() 81 start_time = time.time() 82 try: 83 returned_results = task_function() 84 except Exception as e: # Intentionally broad -- pylint: disable=broad-except 85 print str(e) 86 returned_results = '' 87 elapsed_time = time.time() - start_time 88 stdout = stream.getvalue() 89 sys.stdout = stdout_original 90 91 results = { 92 'function_name': function_name, 93 'start_time': formatted_start_time, 94 'stdout': stdout, 95 'returned_results': returned_results, 96 'elapsed_time': elapsed_time 97 } 98 _AddReportToLog(_REPORT_TEMPLATE % results) 99 100 101def _AddReportToLog(report): 102 """Adds a log for bench results.""" 103 formatter = quick_logger.Formatter() 104 logger = quick_logger.QuickLogger('task_runner', 'report', formatter) 105 logger.Log(report) 106 logger.Save() 107