background_tasks.py revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Background tasks for the www_server module. 6 7This module has the logic for handling background tasks for the www frontend. 8Long term actions (like periodic tracing), cannot be served synchronously in the 9context of a /ajax/endpoint request (would timeout HTTP). Instead, for such long 10operations, an instance of |BackgroundTask| is created here and the server 11returns just its id. The client can later poll the status of the asynchronous 12task to check for its progress. 13 14From a technical viewpoint, each background task is just a python subprocess 15which communicates its progress updates through a Queue. The messages enqueued 16are tuples with the following format: (completion_ratio%, 'message string'). 17""" 18 19import datetime 20import multiprocessing 21import Queue 22import time 23 24from memory_inspector.core import backends 25from memory_inspector.data import file_storage 26 27 28_tasks = {} #id (int) -> |BackgroundTask| instance. 29 30 31def StartTracer(process, storage_path, interval, count, trace_native_heap): 32 assert(isinstance(process, backends.Process)) 33 task = BackgroundTask( 34 TracerMain_, 35 storage_path=storage_path, 36 backend_name=process.device.backend.name, 37 device_id=process.device.id, 38 pid=process.pid, 39 interval=interval, 40 count=count, 41 trace_native_heap=trace_native_heap) 42 task.start() 43 _tasks[task.pid] = task 44 return task.pid 45 46 47def Get(task_id): 48 return _tasks.get(task_id) 49 50 51def TerminateAll(): 52 for proc in _tasks.itervalues(): 53 if proc.is_alive(): 54 proc.terminate() 55 _tasks.clear() 56 57 58def TracerMain_(log, storage_path, backend_name, device_id, pid, interval, 59 count, trace_native_heap): 60 """Entry point for the background periodic tracer task.""" 61 # Initialize storage. 62 storage = file_storage.Storage(storage_path) 63 64 # Initialize the backend. 65 backend = backends.GetBackend(backend_name) 66 for k, v in storage.LoadSettings(backend_name).iteritems(): 67 backend.settings[k] = v 68 69 # Initialize the device. 70 device = backends.GetDevice(backend_name, device_id) 71 for k, v in storage.LoadSettings(device_id).iteritems(): 72 device.settings[k] = v 73 74 # Start periodic tracing. 75 process = device.GetProcess(pid) 76 log.put((1, 'Starting trace (%d dumps x %s s.). Device: %s, process: %s' % ( 77 count, interval, device.name, process.name))) 78 datetime_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M') 79 archive_name = '%s - %s - %s' % (datetime_str, device.name, process.name) 80 archive = storage.OpenArchive(archive_name, create=True) 81 heaps_to_symbolize = [] 82 83 for i in xrange(1, count + 1): # [1, count] range is easier to handle. 84 process = device.GetProcess(pid) 85 if not process: 86 log.put((100, 'Process %d died.' % pid)) 87 return 1 88 # Calculate the completion rate proportionally to 80%. We keep the remaining 89 # 20% for the final symbolization step (just an approximate estimation). 90 completion = 80 * i / count 91 log.put((completion, 'Dumping trace %d of %d' % (i, count))) 92 archive.StartNewSnapshot() 93 # Freeze the process, so that the mmaps and the heap dump are consistent. 94 process.Freeze() 95 try: 96 if trace_native_heap: 97 nheap = process.DumpNativeHeap() 98 log.put((completion, 99 'Dumped %d native allocations' % len(nheap.allocations))) 100 101 # TODO(primiano): memdump has the bad habit of sending SIGCONT to the 102 # process. Fix that, so we are the only one in charge of controlling it. 103 mmaps = process.DumpMemoryMaps() 104 log.put((completion, 'Dumped %d memory maps' % len(mmaps))) 105 archive.StoreMemMaps(mmaps) 106 107 if trace_native_heap: 108 nheap.RelativizeStackFrames(mmaps) 109 nheap.CalculateResidentSize(mmaps) 110 archive.StoreNativeHeap(nheap) 111 heaps_to_symbolize += [nheap] 112 finally: 113 process.Unfreeze() 114 115 if i < count: 116 time.sleep(interval) 117 118 if heaps_to_symbolize: 119 log.put((90, 'Symbolizing')) 120 symbols = backend.ExtractSymbols( 121 heaps_to_symbolize, device.settings['native_symbol_paths'] or '') 122 expected_symbols_count = len(set.union( 123 *[set(x.stack_frames.iterkeys()) for x in heaps_to_symbolize])) 124 log.put((99, 'Symbolization complete. Got %d symbols (%.1f%%).' % ( 125 len(symbols), 100.0 * len(symbols) / expected_symbols_count))) 126 archive.StoreSymbols(symbols) 127 128 log.put((100, 'Trace complete.')) 129 return 0 130 131 132class BackgroundTask(multiprocessing.Process): 133 def __init__(self, entry_point, *args, **kwargs): 134 self._log_queue = multiprocessing.Queue() 135 self._progress_log = [] # A list of tuples [(50%, 'msg1'), (100%, 'msg2')]. 136 super(BackgroundTask, self).__init__( 137 target=entry_point, 138 args=((self._log_queue,) + args), # Just propagate all args. 139 kwargs=kwargs) 140 141 def GetProgress(self): 142 """ Returns a tuple (completion_rate, message). """ 143 while True: 144 try: 145 self._progress_log += [self._log_queue.get(block=False)] 146 except Queue.Empty: 147 break 148 if not self.is_alive() and self.exitcode != 0: 149 return self._progress_log + [(100, 'Failed with code %d' % self.exitcode)] 150 return self._progress_log