1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import datetime
7import logging
8import multiprocessing
9import os
10import posixpath
11import Queue
12import re
13import subprocess
14import sys
15import threading
16import time
17
18
19# addr2line builds a possibly infinite memory cache that can exhaust
20# the computer's memory if allowed to grow for too long. This constant
21# controls how many lookups we do before restarting the process. 4000
22# gives near peak performance without extreme memory usage.
23ADDR2LINE_RECYCLE_LIMIT = 4000
24
25
26class ELFSymbolizer(object):
27  """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer.
28
29  This class is a frontend for addr2line (part of GNU binutils), designed to
30  symbolize batches of large numbers of symbols for a given ELF file. It
31  supports sharding symbolization against many addr2line instances and
32  pipelining of multiple requests per each instance (in order to hide addr2line
33  internals and OS pipe latencies).
34
35  The interface exhibited by this class is a very simple asynchronous interface,
36  which is based on the following three methods:
37  - SymbolizeAsync(): used to request (enqueue) resolution of a given address.
38  - The |callback| method: used to communicated back the symbol information.
39  - Join(): called to conclude the batch to gather the last outstanding results.
40  In essence, before the Join method returns, this class will have issued as
41  many callbacks as the number of SymbolizeAsync() calls. In this regard, note
42  that due to multiprocess sharding, callbacks can be delivered out of order.
43
44  Some background about addr2line:
45  - it is invoked passing the elf path in the cmdline, piping the addresses in
46    its stdin and getting results on its stdout.
47  - it has pretty large response times for the first requests, but it
48    works very well in streaming mode once it has been warmed up.
49  - it doesn't scale by itself (on more cores). However, spawning multiple
50    instances at the same time on the same file is pretty efficient as they
51    keep hitting the pagecache and become mostly CPU bound.
52  - it might hang or crash, mostly for OOM. This class deals with both of these
53    problems.
54
55  Despite the "scary" imports and the multi* words above, (almost) no multi-
56  threading/processing is involved from the python viewpoint. Concurrency
57  here is achieved by spawning several addr2line subprocesses and handling their
58  output pipes asynchronously. Therefore, all the code here (with the exception
59  of the Queue instance in Addr2Line) should be free from mind-blowing
60  thread-safety concerns.
61
62  The multiprocess sharding works as follows:
63  The symbolizer tries to use the lowest number of addr2line instances as
64  possible (with respect of |max_concurrent_jobs|) and enqueue all the requests
65  in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't
66  worth the startup cost.
67  The multiprocess logic kicks in as soon as the queues for the existing
68  instances grow. Specifically, once all the existing instances reach the
69  |max_queue_size| bound, a new addr2line instance is kicked in.
70  In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances
71  have a backlog of |max_queue_size|), back-pressure is applied on the caller by
72  blocking the SymbolizeAsync method.
73
74  This module has been deliberately designed to be dependency free (w.r.t. of
75  other modules in this project), to allow easy reuse in external projects.
76  """
77
78  def __init__(self, elf_file_path, addr2line_path, callback, inlines=False,
79      max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50,
80      source_root_path=None, strip_base_path=None):
81    """Args:
82      elf_file_path: path of the elf file to be symbolized.
83      addr2line_path: path of the toolchain's addr2line binary.
84      callback: a callback which will be invoked for each resolved symbol with
85          the two args (sym_info, callback_arg). The former is an instance of
86          |ELFSymbolInfo| and contains the symbol information. The latter is an
87          embedder-provided argument which is passed to SymbolizeAsync().
88      inlines: when True, the ELFSymbolInfo will contain also the details about
89          the outer inlining functions. When False, only the innermost function
90          will be provided.
91      max_concurrent_jobs: Max number of addr2line instances spawned.
92          Parallelize responsibly, addr2line is a memory and I/O monster.
93      max_queue_size: Max number of outstanding requests per addr2line instance.
94      addr2line_timeout: Max time (in seconds) to wait for a addr2line response.
95          After the timeout, the instance will be considered hung and respawned.
96      source_root_path: In some toolchains only the name of the source file is
97          is output, without any path information; disambiguation searches
98          through the source directory specified by |source_root_path| argument
99          for files whose name matches, adding the full path information to the
100          output. For example, if the toolchain outputs "unicode.cc" and there
101          is a file called "unicode.cc" located under |source_root_path|/foo,
102          the tool will replace "unicode.cc" with
103          "|source_root_path|/foo/unicode.cc". If there are multiple files with
104          the same name, disambiguation will fail because the tool cannot
105          determine which of the files was the source of the symbol.
106      strip_base_path: Rebases the symbols source paths onto |source_root_path|
107          (i.e replace |strip_base_path| with |source_root_path).
108    """
109    assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path
110    self.elf_file_path = elf_file_path
111    self.addr2line_path = addr2line_path
112    self.callback = callback
113    self.inlines = inlines
114    self.max_concurrent_jobs = (max_concurrent_jobs or
115                                min(multiprocessing.cpu_count(), 4))
116    self.max_queue_size = max_queue_size
117    self.addr2line_timeout = addr2line_timeout
118    self.requests_counter = 0  # For generating monotonic request IDs.
119    self._a2l_instances = []  # Up to |max_concurrent_jobs| _Addr2Line inst.
120
121    # If necessary, create disambiguation lookup table
122    self.disambiguate = source_root_path is not None
123    self.disambiguation_table = {}
124    self.strip_base_path = strip_base_path
125    if(self.disambiguate):
126      self.source_root_path = os.path.abspath(source_root_path)
127      self._CreateDisambiguationTable()
128
129    # Create one addr2line instance. More instances will be created on demand
130    # (up to |max_concurrent_jobs|) depending on the rate of the requests.
131    self._CreateNewA2LInstance()
132
133  def SymbolizeAsync(self, addr, callback_arg=None):
134    """Requests symbolization of a given address.
135
136    This method is not guaranteed to return immediately. It generally does, but
137    in some scenarios (e.g. all addr2line instances have full queues) it can
138    block to create back-pressure.
139
140    Args:
141      addr: address to symbolize.
142      callback_arg: optional argument which will be passed to the |callback|."""
143    assert(isinstance(addr, int))
144
145    # Process all the symbols that have been resolved in the meanwhile.
146    # Essentially, this drains all the addr2line(s) out queues.
147    for a2l_to_purge in self._a2l_instances:
148      a2l_to_purge.ProcessAllResolvedSymbolsInQueue()
149      a2l_to_purge.RecycleIfNecessary()
150
151    # Find the best instance according to this logic:
152    # 1. Find an existing instance with the shortest queue.
153    # 2. If all of instances' queues are full, but there is room in the pool,
154    #    (i.e. < |max_concurrent_jobs|) create a new instance.
155    # 3. If there were already |max_concurrent_jobs| instances and all of them
156    #    had full queues, make back-pressure.
157
158    # 1.
159    def _SortByQueueSizeAndReqID(a2l):
160      return (a2l.queue_size, a2l.first_request_id)
161    a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID)
162
163    # 2.
164    if (a2l.queue_size >= self.max_queue_size and
165        len(self._a2l_instances) < self.max_concurrent_jobs):
166      a2l = self._CreateNewA2LInstance()
167
168    # 3.
169    if a2l.queue_size >= self.max_queue_size:
170      a2l.WaitForNextSymbolInQueue()
171
172    a2l.EnqueueRequest(addr, callback_arg)
173
174  def Join(self):
175    """Waits for all the outstanding requests to complete and terminates."""
176    for a2l in self._a2l_instances:
177      a2l.WaitForIdle()
178      a2l.Terminate()
179
180  def _CreateNewA2LInstance(self):
181    assert(len(self._a2l_instances) < self.max_concurrent_jobs)
182    a2l = ELFSymbolizer.Addr2Line(self)
183    self._a2l_instances.append(a2l)
184    return a2l
185
186  def _CreateDisambiguationTable(self):
187    """ Non-unique file names will result in None entries"""
188    start_time = time.time()
189    logging.info('Collecting information about available source files...')
190    self.disambiguation_table = {}
191
192    for root, _, filenames in os.walk(self.source_root_path):
193      for f in filenames:
194        self.disambiguation_table[f] = os.path.join(root, f) if (f not in
195                                       self.disambiguation_table) else None
196    logging.info('Finished collecting information about '
197                 'possible files (took %.1f s).',
198                 (time.time() - start_time))
199
200
201  class Addr2Line(object):
202    """A python wrapper around an addr2line instance.
203
204    The communication with the addr2line process looks as follows:
205      [STDIN]         [STDOUT]  (from addr2line's viewpoint)
206    > f001111
207    > f002222
208                    < Symbol::Name(foo, bar) for f001111
209                    < /path/to/source/file.c:line_number
210    > f003333
211                    < Symbol::Name2() for f002222
212                    < /path/to/source/file.c:line_number
213                    < Symbol::Name3() for f003333
214                    < /path/to/source/file.c:line_number
215    """
216
217    SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*')
218
219    def __init__(self, symbolizer):
220      self._symbolizer = symbolizer
221      self._lib_file_name = posixpath.basename(symbolizer.elf_file_path)
222
223      # The request queue (i.e. addresses pushed to addr2line's stdin and not
224      # yet retrieved on stdout)
225      self._request_queue = collections.deque()
226
227      # This is essentially len(self._request_queue). It has been optimized to a
228      # separate field because turned out to be a perf hot-spot.
229      self.queue_size = 0
230
231      # Keep track of the number of symbols a process has processed to
232      # avoid a single process growing too big and using all the memory.
233      self._processed_symbols_count = 0
234
235      # Objects required to handle the addr2line subprocess.
236      self._proc = None  # Subprocess.Popen(...) instance.
237      self._thread = None  # Threading.thread instance.
238      self._out_queue = None  # Queue.Queue instance (for buffering a2l stdout).
239      self._RestartAddr2LineProcess()
240
241    def EnqueueRequest(self, addr, callback_arg):
242      """Pushes an address to addr2line's stdin (and keeps track of it)."""
243      self._symbolizer.requests_counter += 1  # For global "age" of requests.
244      req_idx = self._symbolizer.requests_counter
245      self._request_queue.append((addr, callback_arg, req_idx))
246      self.queue_size += 1
247      self._WriteToA2lStdin(addr)
248
249    def WaitForIdle(self):
250      """Waits until all the pending requests have been symbolized."""
251      while self.queue_size > 0:
252        self.WaitForNextSymbolInQueue()
253
254    def WaitForNextSymbolInQueue(self):
255      """Waits for the next pending request to be symbolized."""
256      if not self.queue_size:
257        return
258
259      # This outer loop guards against a2l hanging (detecting stdout timeout).
260      while True:
261        start_time = datetime.datetime.now()
262        timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout)
263
264        # The inner loop guards against a2l crashing (checking if it exited).
265        while (datetime.datetime.now() - start_time < timeout):
266          # poll() returns !None if the process exited. a2l should never exit.
267          if self._proc.poll():
268            logging.warning('addr2line crashed, respawning (lib: %s).' %
269                            self._lib_file_name)
270            self._RestartAddr2LineProcess()
271            # TODO(primiano): the best thing to do in this case would be
272            # shrinking the pool size as, very likely, addr2line is crashed
273            # due to low memory (and the respawned one will die again soon).
274
275          try:
276            lines = self._out_queue.get(block=True, timeout=0.25)
277          except Queue.Empty:
278            # On timeout (1/4 s.) repeat the inner loop and check if either the
279            # addr2line process did crash or we waited its output for too long.
280            continue
281
282          # In nominal conditions, we get straight to this point.
283          self._ProcessSymbolOutput(lines)
284          return
285
286        # If this point is reached, we waited more than |addr2line_timeout|.
287        logging.warning('Hung addr2line process, respawning (lib: %s).' %
288                        self._lib_file_name)
289        self._RestartAddr2LineProcess()
290
291    def ProcessAllResolvedSymbolsInQueue(self):
292      """Consumes all the addr2line output lines produced (without blocking)."""
293      if not self.queue_size:
294        return
295      while True:
296        try:
297          lines = self._out_queue.get_nowait()
298        except Queue.Empty:
299          break
300        self._ProcessSymbolOutput(lines)
301
302    def RecycleIfNecessary(self):
303      """Restarts the process if it has been used for too long.
304
305      A long running addr2line process will consume excessive amounts
306      of memory without any gain in performance."""
307      if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT:
308        self._RestartAddr2LineProcess()
309
310
311    def Terminate(self):
312      """Kills the underlying addr2line process.
313
314      The poller |_thread| will terminate as well due to the broken pipe."""
315      try:
316        self._proc.kill()
317        self._proc.communicate()  # Essentially wait() without risking deadlock.
318      except Exception:  # An exception while terminating? How interesting.
319        pass
320      self._proc = None
321
322    def _WriteToA2lStdin(self, addr):
323      self._proc.stdin.write('%s\n' % hex(addr))
324      if self._symbolizer.inlines:
325        # In the case of inlines we output an extra blank line, which causes
326        # addr2line to emit a (??,??:0) tuple that we use as a boundary marker.
327        self._proc.stdin.write('\n')
328      self._proc.stdin.flush()
329
330    def _ProcessSymbolOutput(self, lines):
331      """Parses an addr2line symbol output and triggers the client callback."""
332      (_, callback_arg, _) = self._request_queue.popleft()
333      self.queue_size -= 1
334
335      innermost_sym_info = None
336      sym_info = None
337      for (line1, line2) in lines:
338        prev_sym_info = sym_info
339        name = line1 if not line1.startswith('?') else None
340        source_path = None
341        source_line = None
342        m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2)
343        if m:
344          if not m.group(1).startswith('?'):
345            source_path = m.group(1)
346            if not m.group(2).startswith('?'):
347              source_line = int(m.group(2))
348        else:
349          logging.warning('Got invalid symbol path from addr2line: %s' % line2)
350
351        # In case disambiguation is on, and needed
352        was_ambiguous = False
353        disambiguated = False
354        if self._symbolizer.disambiguate:
355          if source_path and not posixpath.isabs(source_path):
356            path = self._symbolizer.disambiguation_table.get(source_path)
357            was_ambiguous = True
358            disambiguated = path is not None
359            source_path = path if disambiguated else source_path
360
361          # Use absolute paths (so that paths are consistent, as disambiguation
362          # uses absolute paths)
363          if source_path and not was_ambiguous:
364            source_path = os.path.abspath(source_path)
365
366        if source_path and self._symbolizer.strip_base_path:
367          # Strip the base path
368          source_path = re.sub('^' + self._symbolizer.strip_base_path,
369              self._symbolizer.source_root_path or '', source_path)
370
371        sym_info = ELFSymbolInfo(name, source_path, source_line, was_ambiguous,
372                                 disambiguated)
373        if prev_sym_info:
374          prev_sym_info.inlined_by = sym_info
375        if not innermost_sym_info:
376          innermost_sym_info = sym_info
377
378      self._processed_symbols_count += 1
379      self._symbolizer.callback(innermost_sym_info, callback_arg)
380
381    def _RestartAddr2LineProcess(self):
382      if self._proc:
383        self.Terminate()
384
385      # The only reason of existence of this Queue (and the corresponding
386      # Thread below) is the lack of a subprocess.stdout.poll_avail_lines().
387      # Essentially this is a pipe able to extract a couple of lines atomically.
388      self._out_queue = Queue.Queue()
389
390      # Start the underlying addr2line process in line buffered mode.
391
392      cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle',
393          '--exe=' + self._symbolizer.elf_file_path]
394      if self._symbolizer.inlines:
395        cmd += ['--inlines']
396      self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
397          stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True)
398
399      # Start the poller thread, which simply moves atomically the lines read
400      # from the addr2line's stdout to the |_out_queue|.
401      self._thread = threading.Thread(
402          target=ELFSymbolizer.Addr2Line.StdoutReaderThread,
403          args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines))
404      self._thread.daemon = True  # Don't prevent early process exit.
405      self._thread.start()
406
407      self._processed_symbols_count = 0
408
409      # Replay the pending requests on the new process (only for the case
410      # of a hung addr2line timing out during the game).
411      for (addr, _, _) in self._request_queue:
412        self._WriteToA2lStdin(addr)
413
414    @staticmethod
415    def StdoutReaderThread(process_pipe, queue, inlines):
416      """The poller thread fn, which moves the addr2line stdout to the |queue|.
417
418      This is the only piece of code not running on the main thread. It merely
419      writes to a Queue, which is thread-safe. In the case of inlines, it
420      detects the ??,??:0 marker and sends the lines atomically, such that the
421      main thread always receives all the lines corresponding to one symbol in
422      one shot."""
423      try:
424        lines_for_one_symbol = []
425        while True:
426          line1 = process_pipe.readline().rstrip('\r\n')
427          line2 = process_pipe.readline().rstrip('\r\n')
428          if not line1 or not line2:
429            break
430          inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or
431                                  (line1 != '??' and line2 != '??:0'))
432          if not inlines or inline_has_more_lines:
433            lines_for_one_symbol += [(line1, line2)]
434          if inline_has_more_lines:
435            continue
436          queue.put(lines_for_one_symbol)
437          lines_for_one_symbol = []
438        process_pipe.close()
439
440      # Every addr2line processes will die at some point, please die silently.
441      except (IOError, OSError):
442        pass
443
444    @property
445    def first_request_id(self):
446      """Returns the request_id of the oldest pending request in the queue."""
447      return self._request_queue[0][2] if self._request_queue else 0
448
449
450class ELFSymbolInfo(object):
451  """The result of the symbolization passed as first arg. of each callback."""
452
453  def __init__(self, name, source_path, source_line, was_ambiguous=False,
454               disambiguated=False):
455    """All the fields here can be None (if addr2line replies with '??')."""
456    self.name = name
457    self.source_path = source_path
458    self.source_line = source_line
459    # In the case of |inlines|=True, the |inlined_by| points to the outer
460    # function inlining the current one (and so on, to form a chain).
461    self.inlined_by = None
462    self.disambiguated = disambiguated
463    self.was_ambiguous = was_ambiguous
464
465  def __str__(self):
466    return '%s [%s:%d]' % (
467        self.name or '??', self.source_path or '??', self.source_line or 0)
468