1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import datetime
7import logging
8import multiprocessing
9import os
10import posixpath
11import Queue
12import re
13import subprocess
14import sys
15import threading
16
17
18# addr2line builds a possibly infinite memory cache that can exhaust
19# the computer's memory if allowed to grow for too long. This constant
20# controls how many lookups we do before restarting the process. 4000
21# gives near peak performance without extreme memory usage.
22ADDR2LINE_RECYCLE_LIMIT = 4000
23
24
25class ELFSymbolizer(object):
26  """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer.
27
28  This class is a frontend for addr2line (part of GNU binutils), designed to
29  symbolize batches of large numbers of symbols for a given ELF file. It
30  supports sharding symbolization against many addr2line instances and
31  pipelining of multiple requests per each instance (in order to hide addr2line
32  internals and OS pipe latencies).
33
34  The interface exhibited by this class is a very simple asynchronous interface,
35  which is based on the following three methods:
36  - SymbolizeAsync(): used to request (enqueue) resolution of a given address.
37  - The |callback| method: used to communicated back the symbol information.
38  - Join(): called to conclude the batch to gather the last outstanding results.
39  In essence, before the Join method returns, this class will have issued as
40  many callbacks as the number of SymbolizeAsync() calls. In this regard, note
41  that due to multiprocess sharding, callbacks can be delivered out of order.
42
43  Some background about addr2line:
44  - it is invoked passing the elf path in the cmdline, piping the addresses in
45    its stdin and getting results on its stdout.
46  - it has pretty large response times for the first requests, but it
47    works very well in streaming mode once it has been warmed up.
48  - it doesn't scale by itself (on more cores). However, spawning multiple
49    instances at the same time on the same file is pretty efficient as they
50    keep hitting the pagecache and become mostly CPU bound.
51  - it might hang or crash, mostly for OOM. This class deals with both of these
52    problems.
53
54  Despite the "scary" imports and the multi* words above, (almost) no multi-
55  threading/processing is involved from the python viewpoint. Concurrency
56  here is achieved by spawning several addr2line subprocesses and handling their
57  output pipes asynchronously. Therefore, all the code here (with the exception
58  of the Queue instance in Addr2Line) should be free from mind-blowing
59  thread-safety concerns.
60
61  The multiprocess sharding works as follows:
62  The symbolizer tries to use the lowest number of addr2line instances as
63  possible (with respect of |max_concurrent_jobs|) and enqueue all the requests
64  in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't
65  worth the startup cost.
66  The multiprocess logic kicks in as soon as the queues for the existing
67  instances grow. Specifically, once all the existing instances reach the
68  |max_queue_size| bound, a new addr2line instance is kicked in.
69  In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances
70  have a backlog of |max_queue_size|), back-pressure is applied on the caller by
71  blocking the SymbolizeAsync method.
72
73  This module has been deliberately designed to be dependency free (w.r.t. of
74  other modules in this project), to allow easy reuse in external projects.
75  """
76
77  def __init__(self, elf_file_path, addr2line_path, callback, inlines=False,
78      max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50):
79    """Args:
80      elf_file_path: path of the elf file to be symbolized.
81      addr2line_path: path of the toolchain's addr2line binary.
82      callback: a callback which will be invoked for each resolved symbol with
83          the two args (sym_info, callback_arg). The former is an instance of
84          |ELFSymbolInfo| and contains the symbol information. The latter is an
85          embedder-provided argument which is passed to SymbolizeAsync().
86      inlines: when True, the ELFSymbolInfo will contain also the details about
87          the outer inlining functions. When False, only the innermost function
88          will be provided.
89      max_concurrent_jobs: Max number of addr2line instances spawned.
90          Parallelize responsibly, addr2line is a memory and I/O monster.
91      max_queue_size: Max number of outstanding requests per addr2line instance.
92      addr2line_timeout: Max time (in seconds) to wait for a addr2line response.
93          After the timeout, the instance will be considered hung and respawned.
94    """
95    assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path
96    self.elf_file_path = elf_file_path
97    self.addr2line_path = addr2line_path
98    self.callback = callback
99    self.inlines = inlines
100    self.max_concurrent_jobs = (max_concurrent_jobs or
101                                min(multiprocessing.cpu_count(), 4))
102    self.max_queue_size = max_queue_size
103    self.addr2line_timeout = addr2line_timeout
104    self.requests_counter = 0  # For generating monotonic request IDs.
105    self._a2l_instances = []  # Up to |max_concurrent_jobs| _Addr2Line inst.
106
107    # Create one addr2line instance. More instances will be created on demand
108    # (up to |max_concurrent_jobs|) depending on the rate of the requests.
109    self._CreateNewA2LInstance()
110
111  def SymbolizeAsync(self, addr, callback_arg=None):
112    """Requests symbolization of a given address.
113
114    This method is not guaranteed to return immediately. It generally does, but
115    in some scenarios (e.g. all addr2line instances have full queues) it can
116    block to create back-pressure.
117
118    Args:
119      addr: address to symbolize.
120      callback_arg: optional argument which will be passed to the |callback|."""
121    assert(isinstance(addr, int))
122
123    # Process all the symbols that have been resolved in the meanwhile.
124    # Essentially, this drains all the addr2line(s) out queues.
125    for a2l_to_purge in self._a2l_instances:
126      a2l_to_purge.ProcessAllResolvedSymbolsInQueue()
127      a2l_to_purge.RecycleIfNecessary()
128
129    # Find the best instance according to this logic:
130    # 1. Find an existing instance with the shortest queue.
131    # 2. If all of instances' queues are full, but there is room in the pool,
132    #    (i.e. < |max_concurrent_jobs|) create a new instance.
133    # 3. If there were already |max_concurrent_jobs| instances and all of them
134    #    had full queues, make back-pressure.
135
136    # 1.
137    def _SortByQueueSizeAndReqID(a2l):
138      return (a2l.queue_size, a2l.first_request_id)
139    a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID)
140
141    # 2.
142    if (a2l.queue_size >= self.max_queue_size and
143        len(self._a2l_instances) < self.max_concurrent_jobs):
144      a2l = self._CreateNewA2LInstance()
145
146    # 3.
147    if a2l.queue_size >= self.max_queue_size:
148      a2l.WaitForNextSymbolInQueue()
149
150    a2l.EnqueueRequest(addr, callback_arg)
151
152  def Join(self):
153    """Waits for all the outstanding requests to complete and terminates."""
154    for a2l in self._a2l_instances:
155      a2l.WaitForIdle()
156      a2l.Terminate()
157
158  def _CreateNewA2LInstance(self):
159    assert(len(self._a2l_instances) < self.max_concurrent_jobs)
160    a2l = ELFSymbolizer.Addr2Line(self)
161    self._a2l_instances.append(a2l)
162    return a2l
163
164
165  class Addr2Line(object):
166    """A python wrapper around an addr2line instance.
167
168    The communication with the addr2line process looks as follows:
169      [STDIN]         [STDOUT]  (from addr2line's viewpoint)
170    > f001111
171    > f002222
172                    < Symbol::Name(foo, bar) for f001111
173                    < /path/to/source/file.c:line_number
174    > f003333
175                    < Symbol::Name2() for f002222
176                    < /path/to/source/file.c:line_number
177                    < Symbol::Name3() for f003333
178                    < /path/to/source/file.c:line_number
179    """
180
181    SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*')
182
183    def __init__(self, symbolizer):
184      self._symbolizer = symbolizer
185      self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
186
187      # The request queue (i.e. addresses pushed to addr2line's stdin and not
188      # yet retrieved on stdout)
189      self._request_queue = collections.deque()
190
191      # This is essentially len(self._request_queue). It has been optimized to a
192      # separate field because turned out to be a perf hot-spot.
193      self.queue_size = 0
194
195      # Keep track of the number of symbols a process has processed to
196      # avoid a single process growing too big and using all the memory.
197      self._processed_symbols_count = 0
198
199      # Objects required to handle the addr2line subprocess.
200      self._proc = None  # Subprocess.Popen(...) instance.
201      self._thread = None  # Threading.thread instance.
202      self._out_queue = None  # Queue.Queue instance (for buffering a2l stdout).
203      self._RestartAddr2LineProcess()
204
205    def EnqueueRequest(self, addr, callback_arg):
206      """Pushes an address to addr2line's stdin (and keeps track of it)."""
207      self._symbolizer.requests_counter += 1  # For global "age" of requests.
208      req_idx = self._symbolizer.requests_counter
209      self._request_queue.append((addr, callback_arg, req_idx))
210      self.queue_size += 1
211      self._WriteToA2lStdin(addr)
212
213    def WaitForIdle(self):
214      """Waits until all the pending requests have been symbolized."""
215      while self.queue_size > 0:
216        self.WaitForNextSymbolInQueue()
217
218    def WaitForNextSymbolInQueue(self):
219      """Waits for the next pending request to be symbolized."""
220      if not self.queue_size:
221        return
222
223      # This outer loop guards against a2l hanging (detecting stdout timeout).
224      while True:
225        start_time = datetime.datetime.now()
226        timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout)
227
228        # The inner loop guards against a2l crashing (checking if it exited).
229        while (datetime.datetime.now() - start_time < timeout):
230          # poll() returns !None if the process exited. a2l should never exit.
231          if self._proc.poll():
232            logging.warning('addr2line crashed, respawning (lib: %s).' %
233                            self._lib_file_name)
234            self._RestartAddr2LineProcess()
235            # TODO(primiano): the best thing to do in this case would be
236            # shrinking the pool size as, very likely, addr2line is crashed
237            # due to low memory (and the respawned one will die again soon).
238
239          try:
240            lines = self._out_queue.get(block=True, timeout=0.25)
241          except Queue.Empty:
242            # On timeout (1/4 s.) repeat the inner loop and check if either the
243            # addr2line process did crash or we waited its output for too long.
244            continue
245
246          # In nominal conditions, we get straight to this point.
247          self._ProcessSymbolOutput(lines)
248          return
249
250        # If this point is reached, we waited more than |addr2line_timeout|.
251        logging.warning('Hung addr2line process, respawning (lib: %s).' %
252                        self._lib_file_name)
253        self._RestartAddr2LineProcess()
254
255    def ProcessAllResolvedSymbolsInQueue(self):
256      """Consumes all the addr2line output lines produced (without blocking)."""
257      if not self.queue_size:
258        return
259      while True:
260        try:
261          lines = self._out_queue.get_nowait()
262        except Queue.Empty:
263          break
264        self._ProcessSymbolOutput(lines)
265
266    def RecycleIfNecessary(self):
267      """Restarts the process if it has been used for too long.
268
269      A long running addr2line process will consume excessive amounts
270      of memory without any gain in performance."""
271      if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT:
272        self._RestartAddr2LineProcess()
273
274
275    def Terminate(self):
276      """Kills the underlying addr2line process.
277
278      The poller |_thread| will terminate as well due to the broken pipe."""
279      try:
280        self._proc.kill()
281        self._proc.communicate()  # Essentially wait() without risking deadlock.
282      except Exception:  # An exception while terminating? How interesting.
283        pass
284      self._proc = None
285
286    def _WriteToA2lStdin(self, addr):
287      self._proc.stdin.write('%s\n' % hex(addr))
288      if self._symbolizer.inlines:
289        # In the case of inlines we output an extra blank line, which causes
290        # addr2line to emit a (??,??:0) tuple that we use as a boundary marker.
291        self._proc.stdin.write('\n')
292      self._proc.stdin.flush()
293
294    def _ProcessSymbolOutput(self, lines):
295      """Parses an addr2line symbol output and triggers the client callback."""
296      (_, callback_arg, _) = self._request_queue.popleft()
297      self.queue_size -= 1
298
299      innermost_sym_info = None
300      sym_info = None
301      for (line1, line2) in lines:
302        prev_sym_info = sym_info
303        name = line1 if not line1.startswith('?') else None
304        source_path = None
305        source_line = None
306        m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2)
307        if m:
308          if not m.group(1).startswith('?'):
309            source_path = m.group(1)
310            if not m.group(2).startswith('?'):
311              source_line = int(m.group(2))
312        else:
313          logging.warning('Got invalid symbol path from addr2line: %s' % line2)
314
315        sym_info = ELFSymbolInfo(name, source_path, source_line)
316        if prev_sym_info:
317          prev_sym_info.inlined_by = sym_info
318        if not innermost_sym_info:
319          innermost_sym_info = sym_info
320
321      self._processed_symbols_count += 1
322      self._symbolizer.callback(innermost_sym_info, callback_arg)
323
324    def _RestartAddr2LineProcess(self):
325      if self._proc:
326        self.Terminate()
327
328      # The only reason of existence of this Queue (and the corresponding
329      # Thread below) is the lack of a subprocess.stdout.poll_avail_lines().
330      # Essentially this is a pipe able to extract a couple of lines atomically.
331      self._out_queue = Queue.Queue()
332
333      # Start the underlying addr2line process in line buffered mode.
334
335      cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle',
336          '--exe=' + self._symbolizer.elf_file_path]
337      if self._symbolizer.inlines:
338        cmd += ['--inlines']
339      self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
340          stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True)
341
342      # Start the poller thread, which simply moves atomically the lines read
343      # from the addr2line's stdout to the |_out_queue|.
344      self._thread = threading.Thread(
345          target=ELFSymbolizer.Addr2Line.StdoutReaderThread,
346          args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines))
347      self._thread.daemon = True  # Don't prevent early process exit.
348      self._thread.start()
349
350      self._processed_symbols_count = 0
351
352      # Replay the pending requests on the new process (only for the case
353      # of a hung addr2line timing out during the game).
354      for (addr, _, _) in self._request_queue:
355        self._WriteToA2lStdin(addr)
356
357    @staticmethod
358    def StdoutReaderThread(process_pipe, queue, inlines):
359      """The poller thread fn, which moves the addr2line stdout to the |queue|.
360
361      This is the only piece of code not running on the main thread. It merely
362      writes to a Queue, which is thread-safe. In the case of inlines, it
363      detects the ??,??:0 marker and sends the lines atomically, such that the
364      main thread always receives all the lines corresponding to one symbol in
365      one shot."""
366      try:
367        lines_for_one_symbol = []
368        while True:
369          line1 = process_pipe.readline().rstrip('\r\n')
370          line2 = process_pipe.readline().rstrip('\r\n')
371          if not line1 or not line2:
372            break
373          inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or
374                                  (line1 != '??' and line2 != '??:0'))
375          if not inlines or inline_has_more_lines:
376            lines_for_one_symbol += [(line1, line2)]
377          if inline_has_more_lines:
378            continue
379          queue.put(lines_for_one_symbol)
380          lines_for_one_symbol = []
381        process_pipe.close()
382
383      # Every addr2line processes will die at some point, please die silently.
384      except (IOError, OSError):
385        pass
386
387    @property
388    def first_request_id(self):
389      """Returns the request_id of the oldest pending request in the queue."""
390      return self._request_queue[0][2] if self._request_queue else 0
391
392
393class ELFSymbolInfo(object):
394  """The result of the symbolization passed as first arg. of each callback."""
395
396  def __init__(self, name, source_path, source_line):
397    """All the fields here can be None (if addr2line replies with '??')."""
398    self.name = name
399    self.source_path = source_path
400    self.source_line = source_line
401    # In the case of |inlines|=True, the |inlined_by| points to the outer
402    # function inlining the current one (and so on, to form a chain).
403    self.inlined_by = None
404
405  def __str__(self):
406    return '%s [%s:%d]' % (
407        self.name or '??', self.source_path or '??', self.source_line or 0)
408