1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4''' TraceEventImporter imports TraceEvent-formatted data
5into the provided model.
6This is a port of the trace event importer from
7https://code.google.com/p/trace-viewer/
8'''
9
10import copy
11import json
12import re
13
14import telemetry.timeline.async_slice as tracing_async_slice
15import telemetry.timeline.flow_event as tracing_flow_event
16from telemetry.timeline import importer
17from telemetry.timeline import tracing_timeline_data
18
19
20class TraceBufferOverflowException(Exception):
21  pass
22
23
24class TraceEventTimelineImporter(importer.TimelineImporter):
25  def __init__(self, model, timeline_data):
26    super(TraceEventTimelineImporter, self).__init__(
27        model, timeline_data, import_priority=1)
28
29    event_data = timeline_data.EventData()
30
31    self._events_were_from_string = False
32    self._all_async_events = []
33    self._all_object_events = []
34    self._all_flow_events = []
35
36    if type(event_data) is str:
37      # If the event data begins with a [, then we know it should end with a ].
38      # The reason we check for this is because some tracing implementations
39      # cannot guarantee that a ']' gets written to the trace file. So, we are
40      # forgiving and if this is obviously the case, we fix it up before
41      # throwing the string at JSON.parse.
42      if event_data[0] == '[':
43        event_data = re.sub(r'[\r|\n]*$', '', event_data)
44        event_data = re.sub(r'\s*,\s*$', '', event_data)
45        if event_data[-1] != ']':
46          event_data = event_data + ']'
47
48      self._events = json.loads(event_data)
49      self._events_were_from_string = True
50    else:
51      self._events = event_data
52
53    # Some trace_event implementations put the actual trace events
54    # inside a container. E.g { ... , traceEvents: [ ] }
55    # If we see that, just pull out the trace events.
56    if 'traceEvents' in self._events:
57      container = self._events
58      self._events = self._events['traceEvents']
59      for field_name in container:
60        if field_name == 'traceEvents':
61          continue
62
63        # Any other fields in the container should be treated as metadata.
64        self._model.metadata.append({
65            'name' : field_name,
66            'value' : container[field_name]})
67
68  @staticmethod
69  def CanImport(timeline_data):
70    ''' Returns whether obj is a TraceEvent array. '''
71    if not isinstance(timeline_data,
72                      tracing_timeline_data.TracingTimelineData):
73      return False
74
75    event_data = timeline_data.EventData()
76
77    # May be encoded JSON. But we dont want to parse it fully yet.
78    # Use a simple heuristic:
79    #   - event_data that starts with [ are probably trace_event
80    #   - event_data that starts with { are probably trace_event
81    # May be encoded JSON. Treat files that start with { as importable by us.
82    if isinstance(event_data, str):
83      return len(event_data) > 0 and (event_data[0] == '{'
84          or event_data[0] == '[')
85
86    # Might just be an array of events
87    if (isinstance(event_data, list) and len(event_data)
88        and 'ph' in event_data[0]):
89      return True
90
91    # Might be an object with a traceEvents field in it.
92    if 'traceEvents' in event_data:
93      trace_events = event_data.get('traceEvents', None)
94      return (type(trace_events) is list and
95          len(trace_events) > 0 and 'ph' in trace_events[0])
96
97    return False
98
99  def _GetOrCreateProcess(self, pid):
100    return self._model.GetOrCreateProcess(pid)
101
102  def _DeepCopyIfNeeded(self, obj):
103    if self._events_were_from_string:
104      return obj
105    return copy.deepcopy(obj)
106
107  def _ProcessAsyncEvent(self, event):
108    '''Helper to process an 'async finish' event, which will close an
109    open slice.
110    '''
111    thread = (self._GetOrCreateProcess(event['pid'])
112        .GetOrCreateThread(event['tid']))
113    self._all_async_events.append({
114        'event': event,
115        'thread': thread})
116
117  def _ProcessCounterEvent(self, event):
118    '''Helper that creates and adds samples to a Counter object based on
119    'C' phase events.
120    '''
121    if 'id' in event:
122      ctr_name = event['name'] + '[' + str(event['id']) + ']'
123    else:
124      ctr_name = event['name']
125
126    ctr = (self._GetOrCreateProcess(event['pid'])
127        .GetOrCreateCounter(event['cat'], ctr_name))
128    # Initialize the counter's series fields if needed.
129    if len(ctr.series_names) == 0:
130      #TODO: implement counter object
131      for series_name in event['args']:
132        ctr.series_names.append(series_name)
133      if len(ctr.series_names) == 0:
134        self._model.import_errors.append('Expected counter ' + event['name'] +
135            ' to have at least one argument to use as a value.')
136        # Drop the counter.
137        del ctr.parent.counters[ctr.full_name]
138        return
139
140    # Add the sample values.
141    ctr.timestamps.append(event['ts'] / 1000.0)
142    for series_name in ctr.series_names:
143      if series_name not in event['args']:
144        ctr.samples.append(0)
145        continue
146      ctr.samples.append(event['args'][series_name])
147
148  def _ProcessObjectEvent(self, event):
149    thread = (self._GetOrCreateProcess(event['pid'])
150      .GetOrCreateThread(event['tid']))
151    self._all_object_events.append({
152        'event': event,
153        'thread': thread})
154
155  def _ProcessDurationEvent(self, event):
156    thread = (self._GetOrCreateProcess(event['pid'])
157      .GetOrCreateThread(event['tid']))
158    if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
159      self._model.import_errors.append(
160          'Timestamps are moving backward.')
161      return
162
163    if event['ph'] == 'B':
164      thread.BeginSlice(event['cat'],
165                        event['name'],
166                        event['ts'] / 1000.0,
167                        event['tts'] / 1000.0 if 'tts' in event else None,
168                        event['args'])
169    elif event['ph'] == 'E':
170      thread = (self._GetOrCreateProcess(event['pid'])
171        .GetOrCreateThread(event['tid']))
172      if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
173        self._model.import_errors.append(
174            'Timestamps are moving backward.')
175        return
176      if not thread.open_slice_count:
177        self._model.import_errors.append(
178            'E phase event without a matching B phase event.')
179        return
180
181      new_slice = thread.EndSlice(
182          event['ts'] / 1000.0,
183          event['tts'] / 1000.0 if 'tts' in event else None)
184      for arg_name, arg_value in event.get('args', {}).iteritems():
185        if arg_name in new_slice.args:
186          self._model.import_errors.append(
187              'Both the B and E phases of ' + new_slice.name +
188              ' provided values for argument ' + arg_name + '. ' +
189              'The value of the E phase event will be used.')
190        new_slice.args[arg_name] = arg_value
191
192  def _ProcessCompleteEvent(self, event):
193    thread = (self._GetOrCreateProcess(event['pid'])
194        .GetOrCreateThread(event['tid']))
195    thread.PushCompleteSlice(
196        event['cat'],
197        event['name'],
198        event['ts'] / 1000.0,
199        event['dur'] / 1000.0 if 'dur' in event else None,
200        event['tts'] / 1000.0 if 'tts' in event else None,
201        event['tdur'] / 1000.0 if 'tdur' in event else None,
202        event['args'])
203
204  def _ProcessMetadataEvent(self, event):
205    if event['name'] == 'thread_name':
206      thread = (self._GetOrCreateProcess(event['pid'])
207          .GetOrCreateThread(event['tid']))
208      thread.name = event['args']['name']
209    elif event['name'] == 'process_name':
210      process = self._GetOrCreateProcess(event['pid'])
211      process.name = event['args']['name']
212    elif event['name'] == 'trace_buffer_overflowed':
213      process = self._GetOrCreateProcess(event['pid'])
214      process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts'])
215    else:
216      self._model.import_errors.append(
217          'Unrecognized metadata name: ' + event['name'])
218
219  def _ProcessInstantEvent(self, event):
220    # Treat an Instant event as a duration 0 slice.
221    # SliceTrack's redraw() knows how to handle this.
222    thread = (self._GetOrCreateProcess(event['pid'])
223      .GetOrCreateThread(event['tid']))
224    thread.BeginSlice(event['cat'],
225                      event['name'],
226                      event['ts'] / 1000.0,
227                      args=event.get('args'))
228    thread.EndSlice(event['ts'] / 1000.0)
229
230  def _ProcessSampleEvent(self, event):
231    thread = (self._GetOrCreateProcess(event['pid'])
232        .GetOrCreateThread(event['tid']))
233    thread.AddSample(event['cat'],
234                     event['name'],
235                     event['ts'] / 1000.0,
236                     event.get('args'))
237
238  def _ProcessFlowEvent(self, event):
239    thread = (self._GetOrCreateProcess(event['pid'])
240        .GetOrCreateThread(event['tid']))
241    self._all_flow_events.append({
242        'event': event,
243        'thread': thread})
244
245  def ImportEvents(self):
246    ''' Walks through the events_ list and outputs the structures discovered to
247    model_.
248    '''
249    for event in self._events:
250      phase = event.get('ph', None)
251      if phase == 'B' or phase == 'E':
252        self._ProcessDurationEvent(event)
253      elif phase == 'X':
254        self._ProcessCompleteEvent(event)
255      elif phase == 'S' or phase == 'F' or phase == 'T':
256        self._ProcessAsyncEvent(event)
257      # Note, I is historic. The instant event marker got changed, but we
258      # want to support loading old trace files so we have both I and i.
259      elif phase == 'I' or phase == 'i':
260        self._ProcessInstantEvent(event)
261      elif phase == 'P':
262        self._ProcessSampleEvent(event)
263      elif phase == 'C':
264        self._ProcessCounterEvent(event)
265      elif phase == 'M':
266        self._ProcessMetadataEvent(event)
267      elif phase == 'N' or phase == 'D' or phase == 'O':
268        self._ProcessObjectEvent(event)
269      elif phase == 's' or phase == 't' or phase == 'f':
270        self._ProcessFlowEvent(event)
271      else:
272        self._model.import_errors.append('Unrecognized event phase: ' +
273            phase + '(' + event['name'] + ')')
274
275    return self._model
276
277  def FinalizeImport(self):
278    '''Called by the Model after all other importers have imported their
279    events.'''
280    self._model.UpdateBounds()
281
282    # We need to reupdate the bounds in case the minimum start time changes
283    self._model.UpdateBounds()
284    self._CreateAsyncSlices()
285    self._CreateFlowSlices()
286    self._SetBrowserProcess()
287    self._CreateExplicitObjects()
288    self._CreateImplicitObjects()
289    self._CreateTabIdsToThreadsMap()
290
291  def _CreateAsyncSlices(self):
292    if len(self._all_async_events) == 0:
293      return
294
295    self._all_async_events.sort(
296        cmp=lambda x, y: int(x['event']['ts'] - y['event']['ts']))
297
298    async_event_states_by_name_then_id = {}
299
300    all_async_events = self._all_async_events
301    for async_event_state in all_async_events:
302      event = async_event_state['event']
303      name = event.get('name', None)
304      if name is None:
305        self._model.import_errors.append(
306            'Async events (ph: S, T or F) require an name parameter.')
307        continue
308
309      event_id = event.get('id')
310      if event_id is None:
311        self._model.import_errors.append(
312            'Async events (ph: S, T or F) require an id parameter.')
313        continue
314
315      # TODO(simonjam): Add a synchronous tick on the appropriate thread.
316
317      if event['ph'] == 'S':
318        if not name in async_event_states_by_name_then_id:
319          async_event_states_by_name_then_id[name] = {}
320        if event_id in async_event_states_by_name_then_id[name]:
321          self._model.import_errors.append(
322              'At %d, a slice of the same id %s was already open.' % (
323                  event['ts'], event_id))
324          continue
325
326        async_event_states_by_name_then_id[name][event_id] = []
327        async_event_states_by_name_then_id[name][event_id].append(
328            async_event_state)
329      else:
330        if name not in async_event_states_by_name_then_id:
331          self._model.import_errors.append(
332              'At %d, no slice named %s was open.' % (event['ts'], name,))
333          continue
334        if event_id not in async_event_states_by_name_then_id[name]:
335          self._model.import_errors.append(
336              'At %d, no slice named %s with id=%s was open.' % (
337                  event['ts'], name, event_id))
338          continue
339        events = async_event_states_by_name_then_id[name][event_id]
340        events.append(async_event_state)
341
342        if event['ph'] == 'F':
343          # Create a slice from start to end.
344          async_slice = tracing_async_slice.AsyncSlice(
345              events[0]['event']['cat'],
346              name,
347              events[0]['event']['ts'] / 1000.0)
348
349          async_slice.duration = ((event['ts'] / 1000.0)
350              - (events[0]['event']['ts'] / 1000.0))
351
352          async_slice.start_thread = events[0]['thread']
353          async_slice.end_thread = async_event_state['thread']
354          if async_slice.start_thread == async_slice.end_thread:
355            if 'tts' in event and 'tts' in events[0]['event']:
356              async_slice.thread_start = events[0]['event']['tts'] / 1000.0
357              async_slice.thread_duration = ((event['tts'] / 1000.0)
358                  - (events[0]['event']['tts'] / 1000.0))
359          async_slice.id = event_id
360          async_slice.args = events[0]['event']['args']
361
362          # Create sub_slices for each step.
363          for j in xrange(1, len(events)):
364            sub_name = name
365            if events[j - 1]['event']['ph'] == 'T':
366              sub_name = name + ':' + events[j - 1]['event']['args']['step']
367            sub_slice = tracing_async_slice.AsyncSlice(
368                events[0]['event']['cat'],
369                sub_name,
370                events[j - 1]['event']['ts'] / 1000.0)
371            sub_slice.parent_slice = async_slice
372
373            sub_slice.duration = ((events[j]['event']['ts'] / 1000.0)
374                - (events[j - 1]['event']['ts'] / 1000.0))
375
376            sub_slice.start_thread = events[j - 1]['thread']
377            sub_slice.end_thread = events[j]['thread']
378            if sub_slice.start_thread == sub_slice.end_thread:
379              if 'tts' in events[j]['event'] and \
380                  'tts' in events[j - 1]['event']:
381                sub_slice.thread_duration = \
382                    ((events[j]['event']['tts'] / 1000.0)
383                        - (events[j - 1]['event']['tts'] / 1000.0))
384
385            sub_slice.id = event_id
386            sub_slice.args = events[j - 1]['event']['args']
387
388            async_slice.AddSubSlice(sub_slice)
389
390          # The args for the finish event go in the last sub_slice.
391          last_slice = async_slice.sub_slices[-1]
392          for arg_name, arg_value in event['args'].iteritems():
393            last_slice.args[arg_name] = arg_value
394
395          # Add |async_slice| to the start-thread's async_slices.
396          async_slice.start_thread.AddAsyncSlice(async_slice)
397          del async_event_states_by_name_then_id[name][event_id]
398
399  def _CreateExplicitObjects(self):
400    # TODO(tengs): Implement object instance parsing
401    pass
402
403  def _CreateImplicitObjects(self):
404    # TODO(tengs): Implement object instance parsing
405    pass
406
407  def _CreateFlowSlices(self):
408    if len(self._all_flow_events) == 0:
409      return
410
411    self._all_flow_events.sort(
412        cmp=lambda x, y: int(x['event']['ts'] - y['event']['ts']))
413
414    flow_id_to_event = {}
415    for data in self._all_flow_events:
416      event = data['event']
417      thread = data['thread']
418      if 'name' not in event:
419        self._model.import_errors.append(
420          'Flow events (ph: s, t or f) require a name parameter.')
421        continue
422      if 'id' not in event:
423        self._model.import_errors.append(
424          'Flow events (ph: s, t or f) require an id parameter.')
425        continue
426
427      flow_event = tracing_flow_event.FlowEvent(
428          event['cat'],
429          event['id'],
430          event['name'],
431          event['ts'] / 1000.0,
432          event['args'])
433      thread.AddFlowEvent(flow_event)
434
435      if event['ph'] == 's':
436        if event['id'] in flow_id_to_event:
437          self._model.import_errors.append(
438              'event id %s already seen when encountering start of'
439              'flow event.' % event['id'])
440          continue
441        flow_id_to_event[event['id']] = flow_event
442      elif event['ph'] == 't' or event['ph'] == 'f':
443        if not event['id'] in flow_id_to_event:
444          self._model.import_errors.append(
445            'Found flow phase %s for id: %s but no flow start found.' % (
446                event['ph'], event['id']))
447          continue
448        flow_position = flow_id_to_event[event['id']]
449        self._model.flow_events.append([flow_position, flow_event])
450
451        if event['ph'] == 'f':
452          del flow_id_to_event[event['id']]
453        else:
454          # Make this event the next start event in this flow.
455          flow_id_to_event[event['id']] = flow_event
456
457  def _SetBrowserProcess(self):
458    for thread in self._model.GetAllThreads():
459      if thread.name == 'CrBrowserMain':
460        self._model.browser_process = thread.parent
461
462  def _CheckTraceBufferOverflow(self):
463    for process in self._model.GetAllProcesses():
464      if process.trace_buffer_did_overflow:
465        raise TraceBufferOverflowException(
466            'Trace buffer of process with pid=%d overflowed at timestamp %d. '
467            'Raw trace data:\n%s' %
468            (process.pid, process.trace_buffer_overflow_event.start,
469             repr(self._events)))
470
471  def _CreateTabIdsToThreadsMap(self):
472    # Since _CreateTabIdsToThreadsMap() relies on markers output on timeline
473    # tracing data, it maynot work in case we have trace events dropped due to
474    # trace buffer overflow.
475    self._CheckTraceBufferOverflow()
476
477    tab_ids_list = []
478    for metadata in self._model.metadata:
479      if metadata['name'] == 'tabIds':
480        tab_ids_list = metadata['value']
481        break
482    for tab_id in tab_ids_list:
483      timeline_markers = self._model.FindTimelineMarkers(tab_id)
484      assert(len(timeline_markers) == 1)
485      assert(timeline_markers[0].start_thread ==
486             timeline_markers[0].end_thread)
487      self._model.AddMappingFromTabIdToRendererThread(
488          tab_id, timeline_markers[0].start_thread)
489