background_tasks.py revision effb81e5f8246d0db0270817048dc992db66e9fb
1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Background tasks for the www_server module. 6 7This module has the logic for handling background tasks for the www frontend. 8Long term actions (like periodic tracing), cannot be served synchronously in the 9context of a /ajax/endpoint request (would timeout HTTP). Instead, for such long 10operations, an instance of |BackgroundTask| is created here and the server 11returns just its id. The client can later poll the status of the asynchronous 12task to check for its progress. 13 14From a technical viewpoint, each background task is just a python subprocess 15which communicates its progress updates through a Queue. The messages enqueued 16are tuples with the following format: (completion_ratio%, 'message string'). 17""" 18 19import datetime 20import multiprocessing 21import Queue 22import time 23 24from memory_inspector.core import backends 25from memory_inspector.data import file_storage 26 27 28_tasks = {} #id (int) -> |BackgroundTask| instance. 29 30 31def StartTracer(process, storage_path, interval, count, trace_native_heap): 32 assert(isinstance(process, backends.Process)) 33 task = BackgroundTask( 34 TracerMain_, 35 storage_path=storage_path, 36 backend_name=process.device.backend.name, 37 device_id=process.device.id, 38 pid=process.pid, 39 interval=interval, 40 count=count, 41 trace_native_heap=trace_native_heap) 42 task.start() 43 _tasks[task.pid] = task 44 return task.pid 45 46 47def Get(task_id): 48 return _tasks.get(task_id) 49 50 51def TerminateAll(): 52 for proc in _tasks.itervalues(): 53 if proc.is_alive(): 54 proc.terminate() 55 _tasks.clear() 56 57 58def TracerMain_(log, storage_path, backend_name, device_id, pid, interval, 59 count, trace_native_heap): 60 """Entry point for the background periodic tracer task.""" 61 # Initialize storage. 62 storage = file_storage.Storage(storage_path) 63 64 # Initialize the backend. 65 backend = backends.GetBackend(backend_name) 66 for k, v in storage.LoadSettings(backend_name).iteritems(): 67 backend.settings[k] = v 68 69 # Initialize the device. 70 device = backends.GetDevice(backend_name, device_id) 71 for k, v in storage.LoadSettings(device_id).iteritems(): 72 device.settings[k] = v 73 74 # Start periodic tracing. 75 process = device.GetProcess(pid) 76 log.put((1, 'Starting trace (%d dumps x %s s.). Device: %s, process: %s' % ( 77 count, interval, device.name, process.name))) 78 datetime_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M') 79 archive_name = '%s - %s - %s' % (datetime_str, device.name, process.name) 80 archive = storage.OpenArchive(archive_name, create=True) 81 82 for i in xrange(1, count + 1): # [1, count] range is easier to handle. 83 process = device.GetProcess(pid) 84 if not process: 85 log.put((100, 'Process %d died.' % pid)) 86 return 1 87 # Calculate the completion rate proportionally to 80%. We keep the remaining 88 # 20% for the final symbolization step (just an approximate estimation). 89 completion = 80 * i / count 90 log.put((completion, 'Dumping trace %d of %d' % (i, count))) 91 archive.StartNewSnapshot() 92 mmaps = process.DumpMemoryMaps() 93 log.put((completion, 'Dumped %d memory maps' % len(mmaps))) 94 archive.StoreMemMaps(mmaps) 95 if trace_native_heap: 96 nheap = process.DumpNativeHeap() 97 log.put((completion, 'Dumped %d native allocs' % len(nheap.allocations))) 98 archive.StoreNativeHeap(nheap) 99 100 if i < count: 101 time.sleep(interval) 102 103 log.put((100, 'Trace complete.')) 104 return 0 105 106 107class BackgroundTask(multiprocessing.Process): 108 def __init__(self, entry_point, *args, **kwargs): 109 self._log_queue = multiprocessing.Queue() 110 self._progress_log = [] # A list of tuples [(50%, 'msg1'), (100%, 'msg2')]. 111 super(BackgroundTask, self).__init__( 112 target=entry_point, 113 args=((self._log_queue,) + args), # Just propagate all args. 114 kwargs=kwargs) 115 116 def GetProgress(self): 117 """ Returns a tuple (completion_rate, message). """ 118 while True: 119 try: 120 self._progress_log += [self._log_queue.get(block=False)] 121 except Queue.Empty: 122 break 123 if not self.is_alive() and self.exitcode != 0: 124 return self._progress_log + [(100, 'Failed with code %d' % self.exitcode)] 125 return self._progress_log