background_tasks.py revision 0529e5d033099cbfc42635f6f6183833b09dff6e
1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Background tasks for the www_server module.
6
7This module has the logic for handling background tasks for the www frontend.
8Long term actions (like periodic tracing), cannot be served synchronously in the
9context of a /ajax/endpoint request (would timeout HTTP). Instead, for such long
10operations, an instance of |BackgroundTask| is created here and the server
11returns just its id. The client can later poll the status of the asynchronous
12task to check for its progress.
13
14From a technical viewpoint, each background task is just a python subprocess
15which communicates its progress updates through a Queue. The messages enqueued
16are tuples with the following format: (completion_ratio%, 'message string').
17"""
18
19import datetime
20import multiprocessing
21import Queue
22import time
23
24from memory_inspector.core import backends
25from memory_inspector.data import file_storage
26
27
28_tasks = {}  #id (int) -> |BackgroundTask| instance.
29
30
31def StartTracer(process, storage_path, interval, count, trace_native_heap):
32  assert(isinstance(process, backends.Process))
33  task = BackgroundTask(
34      TracerMain_,
35      storage_path=storage_path,
36      backend_name=process.device.backend.name,
37      device_id=process.device.id,
38      pid=process.pid,
39      interval=interval,
40      count=count,
41      trace_native_heap=trace_native_heap)
42  task.start()
43  _tasks[task.pid] = task
44  return task.pid
45
46
47def Get(task_id):
48  return _tasks.get(task_id)
49
50
51def TerminateAll():
52  for proc in _tasks.itervalues():
53    if proc.is_alive():
54      proc.terminate()
55  _tasks.clear()
56
57
58def TracerMain_(log, storage_path, backend_name, device_id, pid, interval,
59    count, trace_native_heap):
60  """Entry point for the background periodic tracer task."""
61  # Initialize storage.
62  storage = file_storage.Storage(storage_path)
63
64  # Initialize the backend.
65  backend = backends.GetBackend(backend_name)
66  for k, v in storage.LoadSettings(backend_name).iteritems():
67    backend.settings[k] = v
68
69  # Initialize the device.
70  device = backends.GetDevice(backend_name, device_id)
71  for k, v in storage.LoadSettings(device_id).iteritems():
72    device.settings[k] = v
73
74  # Start periodic tracing.
75  process = device.GetProcess(pid)
76  log.put((1, 'Starting trace (%d dumps x %s s.). Device: %s, process: %s' % (
77      count, interval, device.name, process.name)))
78  datetime_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M')
79  archive_name = '%s - %s - %s' % (datetime_str, device.name, process.name)
80  archive = storage.OpenArchive(archive_name, create=True)
81  heaps_to_symbolize = []
82
83  for i in xrange(1, count + 1):  # [1, count] range is easier to handle.
84    process = device.GetProcess(pid)
85    if not process:
86      log.put((100, 'Process %d died.' % pid))
87      return 1
88    # Calculate the completion rate proportionally to 80%. We keep the remaining
89    # 20% for the final symbolization step (just an approximate estimation).
90    completion = 80 * i / count
91    log.put((completion, 'Dumping trace %d of %d' % (i, count)))
92    archive.StartNewSnapshot()
93    mmaps = process.DumpMemoryMaps()
94    log.put((completion, 'Dumped %d memory maps' % len(mmaps)))
95    archive.StoreMemMaps(mmaps)
96    if trace_native_heap:
97      nheap = process.DumpNativeHeap()
98      log.put((completion, 'Dumped %d native allocs' % len(nheap.allocations)))
99      archive.StoreNativeHeap(nheap)
100      heaps_to_symbolize += [nheap]
101
102    if i < count:
103      time.sleep(interval)
104
105  log.put((90, 'Symbolizing'))
106  symbols = backend.ExtractSymbols(heaps_to_symbolize,
107                                   device.settings['native_symbol_paths'] or '')
108
109  expected_symbols_count = len(set.union(
110      *[set(x.stack_frames.iterkeys()) for x in heaps_to_symbolize]))
111  log.put((99, 'Symbolization complete. Got %d symbols (%.1f%%).' % (
112      len(symbols), 100.0 * len(symbols) / expected_symbols_count)))
113
114  archive.StoreSymbols(symbols)
115
116  log.put((100, 'Trace complete.'))
117  return 0
118
119
120class BackgroundTask(multiprocessing.Process):
121  def __init__(self, entry_point, *args, **kwargs):
122    self._log_queue = multiprocessing.Queue()
123    self._progress_log = []  # A list of tuples [(50%, 'msg1'), (100%, 'msg2')].
124    super(BackgroundTask, self).__init__(
125        target=entry_point,
126        args=((self._log_queue,) + args),  # Just propagate all args.
127        kwargs=kwargs)
128
129  def GetProgress(self):
130    """ Returns a tuple (completion_rate, message). """
131    while True:
132      try:
133        self._progress_log += [self._log_queue.get(block=False)]
134      except Queue.Empty:
135        break
136    if not self.is_alive() and self.exitcode != 0:
137      return self._progress_log + [(100, 'Failed with code %d' % self.exitcode)]
138    return self._progress_log