1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4''' TraceEventImporter imports TraceEvent-formatted data 5into the provided model. 6This is a port of the trace event importer from 7https://code.google.com/p/trace-viewer/ 8''' 9 10import copy 11import json 12import re 13 14import telemetry.timeline.async_slice as tracing_async_slice 15import telemetry.timeline.flow_event as tracing_flow_event 16from telemetry.timeline import importer 17from telemetry.timeline import tracing_timeline_data 18 19 20class TraceBufferOverflowException(Exception): 21 pass 22 23 24class TraceEventTimelineImporter(importer.TimelineImporter): 25 def __init__(self, model, timeline_data): 26 super(TraceEventTimelineImporter, self).__init__( 27 model, timeline_data, import_priority=1) 28 29 event_data = timeline_data.EventData() 30 31 self._events_were_from_string = False 32 self._all_async_events = [] 33 self._all_object_events = [] 34 self._all_flow_events = [] 35 36 if type(event_data) is str: 37 # If the event data begins with a [, then we know it should end with a ]. 38 # The reason we check for this is because some tracing implementations 39 # cannot guarantee that a ']' gets written to the trace file. So, we are 40 # forgiving and if this is obviously the case, we fix it up before 41 # throwing the string at JSON.parse. 42 if event_data[0] == '[': 43 event_data = re.sub(r'[\r|\n]*$', '', event_data) 44 event_data = re.sub(r'\s*,\s*$', '', event_data) 45 if event_data[-1] != ']': 46 event_data = event_data + ']' 47 48 self._events = json.loads(event_data) 49 self._events_were_from_string = True 50 else: 51 self._events = event_data 52 53 # Some trace_event implementations put the actual trace events 54 # inside a container. E.g { ... , traceEvents: [ ] } 55 # If we see that, just pull out the trace events. 56 if 'traceEvents' in self._events: 57 container = self._events 58 self._events = self._events['traceEvents'] 59 for field_name in container: 60 if field_name == 'traceEvents': 61 continue 62 63 # Any other fields in the container should be treated as metadata. 64 self._model.metadata.append({ 65 'name' : field_name, 66 'value' : container[field_name]}) 67 68 @staticmethod 69 def CanImport(timeline_data): 70 ''' Returns whether obj is a TraceEvent array. ''' 71 if not isinstance(timeline_data, 72 tracing_timeline_data.TracingTimelineData): 73 return False 74 75 event_data = timeline_data.EventData() 76 77 # May be encoded JSON. But we dont want to parse it fully yet. 78 # Use a simple heuristic: 79 # - event_data that starts with [ are probably trace_event 80 # - event_data that starts with { are probably trace_event 81 # May be encoded JSON. Treat files that start with { as importable by us. 82 if isinstance(event_data, str): 83 return len(event_data) > 0 and (event_data[0] == '{' 84 or event_data[0] == '[') 85 86 # Might just be an array of events 87 if (isinstance(event_data, list) and len(event_data) 88 and 'ph' in event_data[0]): 89 return True 90 91 # Might be an object with a traceEvents field in it. 92 if 'traceEvents' in event_data: 93 trace_events = event_data.get('traceEvents', None) 94 return (type(trace_events) is list and 95 len(trace_events) > 0 and 'ph' in trace_events[0]) 96 97 return False 98 99 def _GetOrCreateProcess(self, pid): 100 return self._model.GetOrCreateProcess(pid) 101 102 def _DeepCopyIfNeeded(self, obj): 103 if self._events_were_from_string: 104 return obj 105 return copy.deepcopy(obj) 106 107 def _ProcessAsyncEvent(self, event): 108 '''Helper to process an 'async finish' event, which will close an 109 open slice. 110 ''' 111 thread = (self._GetOrCreateProcess(event['pid']) 112 .GetOrCreateThread(event['tid'])) 113 self._all_async_events.append({ 114 'event': event, 115 'thread': thread}) 116 117 def _ProcessCounterEvent(self, event): 118 '''Helper that creates and adds samples to a Counter object based on 119 'C' phase events. 120 ''' 121 if 'id' in event: 122 ctr_name = event['name'] + '[' + str(event['id']) + ']' 123 else: 124 ctr_name = event['name'] 125 126 ctr = (self._GetOrCreateProcess(event['pid']) 127 .GetOrCreateCounter(event['cat'], ctr_name)) 128 # Initialize the counter's series fields if needed. 129 if len(ctr.series_names) == 0: 130 #TODO: implement counter object 131 for series_name in event['args']: 132 ctr.series_names.append(series_name) 133 if len(ctr.series_names) == 0: 134 self._model.import_errors.append('Expected counter ' + event['name'] + 135 ' to have at least one argument to use as a value.') 136 # Drop the counter. 137 del ctr.parent.counters[ctr.full_name] 138 return 139 140 # Add the sample values. 141 ctr.timestamps.append(event['ts'] / 1000.0) 142 for series_name in ctr.series_names: 143 if series_name not in event['args']: 144 ctr.samples.append(0) 145 continue 146 ctr.samples.append(event['args'][series_name]) 147 148 def _ProcessObjectEvent(self, event): 149 thread = (self._GetOrCreateProcess(event['pid']) 150 .GetOrCreateThread(event['tid'])) 151 self._all_object_events.append({ 152 'event': event, 153 'thread': thread}) 154 155 def _ProcessDurationEvent(self, event): 156 thread = (self._GetOrCreateProcess(event['pid']) 157 .GetOrCreateThread(event['tid'])) 158 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 159 self._model.import_errors.append( 160 'Timestamps are moving backward.') 161 return 162 163 if event['ph'] == 'B': 164 thread.BeginSlice(event['cat'], 165 event['name'], 166 event['ts'] / 1000.0, 167 event['tts'] / 1000.0 if 'tts' in event else None, 168 event['args']) 169 elif event['ph'] == 'E': 170 thread = (self._GetOrCreateProcess(event['pid']) 171 .GetOrCreateThread(event['tid'])) 172 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 173 self._model.import_errors.append( 174 'Timestamps are moving backward.') 175 return 176 if not thread.open_slice_count: 177 self._model.import_errors.append( 178 'E phase event without a matching B phase event.') 179 return 180 181 new_slice = thread.EndSlice( 182 event['ts'] / 1000.0, 183 event['tts'] / 1000.0 if 'tts' in event else None) 184 for arg_name, arg_value in event.get('args', {}).iteritems(): 185 if arg_name in new_slice.args: 186 self._model.import_errors.append( 187 'Both the B and E phases of ' + new_slice.name + 188 ' provided values for argument ' + arg_name + '. ' + 189 'The value of the E phase event will be used.') 190 new_slice.args[arg_name] = arg_value 191 192 def _ProcessCompleteEvent(self, event): 193 thread = (self._GetOrCreateProcess(event['pid']) 194 .GetOrCreateThread(event['tid'])) 195 thread.PushCompleteSlice( 196 event['cat'], 197 event['name'], 198 event['ts'] / 1000.0, 199 event['dur'] / 1000.0 if 'dur' in event else None, 200 event['tts'] / 1000.0 if 'tts' in event else None, 201 event['tdur'] / 1000.0 if 'tdur' in event else None, 202 event['args']) 203 204 def _ProcessMetadataEvent(self, event): 205 if event['name'] == 'thread_name': 206 thread = (self._GetOrCreateProcess(event['pid']) 207 .GetOrCreateThread(event['tid'])) 208 thread.name = event['args']['name'] 209 elif event['name'] == 'process_name': 210 process = self._GetOrCreateProcess(event['pid']) 211 process.name = event['args']['name'] 212 elif event['name'] == 'trace_buffer_overflowed': 213 process = self._GetOrCreateProcess(event['pid']) 214 process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts']) 215 else: 216 self._model.import_errors.append( 217 'Unrecognized metadata name: ' + event['name']) 218 219 def _ProcessInstantEvent(self, event): 220 # Treat an Instant event as a duration 0 slice. 221 # SliceTrack's redraw() knows how to handle this. 222 thread = (self._GetOrCreateProcess(event['pid']) 223 .GetOrCreateThread(event['tid'])) 224 thread.BeginSlice(event['cat'], 225 event['name'], 226 event['ts'] / 1000.0, 227 args=event.get('args')) 228 thread.EndSlice(event['ts'] / 1000.0) 229 230 def _ProcessSampleEvent(self, event): 231 thread = (self._GetOrCreateProcess(event['pid']) 232 .GetOrCreateThread(event['tid'])) 233 thread.AddSample(event['cat'], 234 event['name'], 235 event['ts'] / 1000.0, 236 event.get('args')) 237 238 def _ProcessFlowEvent(self, event): 239 thread = (self._GetOrCreateProcess(event['pid']) 240 .GetOrCreateThread(event['tid'])) 241 self._all_flow_events.append({ 242 'event': event, 243 'thread': thread}) 244 245 def ImportEvents(self): 246 ''' Walks through the events_ list and outputs the structures discovered to 247 model_. 248 ''' 249 for event in self._events: 250 phase = event.get('ph', None) 251 if phase == 'B' or phase == 'E': 252 self._ProcessDurationEvent(event) 253 elif phase == 'X': 254 self._ProcessCompleteEvent(event) 255 elif phase == 'S' or phase == 'F' or phase == 'T': 256 self._ProcessAsyncEvent(event) 257 # Note, I is historic. The instant event marker got changed, but we 258 # want to support loading old trace files so we have both I and i. 259 elif phase == 'I' or phase == 'i': 260 self._ProcessInstantEvent(event) 261 elif phase == 'P': 262 self._ProcessSampleEvent(event) 263 elif phase == 'C': 264 self._ProcessCounterEvent(event) 265 elif phase == 'M': 266 self._ProcessMetadataEvent(event) 267 elif phase == 'N' or phase == 'D' or phase == 'O': 268 self._ProcessObjectEvent(event) 269 elif phase == 's' or phase == 't' or phase == 'f': 270 self._ProcessFlowEvent(event) 271 else: 272 self._model.import_errors.append('Unrecognized event phase: ' + 273 phase + '(' + event['name'] + ')') 274 275 return self._model 276 277 def FinalizeImport(self): 278 '''Called by the Model after all other importers have imported their 279 events.''' 280 self._model.UpdateBounds() 281 282 # We need to reupdate the bounds in case the minimum start time changes 283 self._model.UpdateBounds() 284 self._CreateAsyncSlices() 285 self._CreateFlowSlices() 286 self._SetBrowserProcess() 287 self._CreateExplicitObjects() 288 self._CreateImplicitObjects() 289 self._CreateTabIdsToThreadsMap() 290 291 def _CreateAsyncSlices(self): 292 if len(self._all_async_events) == 0: 293 return 294 295 self._all_async_events.sort( 296 cmp=lambda x, y: int(x['event']['ts'] - y['event']['ts'])) 297 298 async_event_states_by_name_then_id = {} 299 300 all_async_events = self._all_async_events 301 for async_event_state in all_async_events: 302 event = async_event_state['event'] 303 name = event.get('name', None) 304 if name is None: 305 self._model.import_errors.append( 306 'Async events (ph: S, T or F) require an name parameter.') 307 continue 308 309 event_id = event.get('id') 310 if event_id is None: 311 self._model.import_errors.append( 312 'Async events (ph: S, T or F) require an id parameter.') 313 continue 314 315 # TODO(simonjam): Add a synchronous tick on the appropriate thread. 316 317 if event['ph'] == 'S': 318 if not name in async_event_states_by_name_then_id: 319 async_event_states_by_name_then_id[name] = {} 320 if event_id in async_event_states_by_name_then_id[name]: 321 self._model.import_errors.append( 322 'At %d, a slice of the same id %s was already open.' % ( 323 event['ts'], event_id)) 324 continue 325 326 async_event_states_by_name_then_id[name][event_id] = [] 327 async_event_states_by_name_then_id[name][event_id].append( 328 async_event_state) 329 else: 330 if name not in async_event_states_by_name_then_id: 331 self._model.import_errors.append( 332 'At %d, no slice named %s was open.' % (event['ts'], name,)) 333 continue 334 if event_id not in async_event_states_by_name_then_id[name]: 335 self._model.import_errors.append( 336 'At %d, no slice named %s with id=%s was open.' % ( 337 event['ts'], name, event_id)) 338 continue 339 events = async_event_states_by_name_then_id[name][event_id] 340 events.append(async_event_state) 341 342 if event['ph'] == 'F': 343 # Create a slice from start to end. 344 async_slice = tracing_async_slice.AsyncSlice( 345 events[0]['event']['cat'], 346 name, 347 events[0]['event']['ts'] / 1000.0) 348 349 async_slice.duration = ((event['ts'] / 1000.0) 350 - (events[0]['event']['ts'] / 1000.0)) 351 352 async_slice.start_thread = events[0]['thread'] 353 async_slice.end_thread = async_event_state['thread'] 354 if async_slice.start_thread == async_slice.end_thread: 355 if 'tts' in event and 'tts' in events[0]['event']: 356 async_slice.thread_start = events[0]['event']['tts'] / 1000.0 357 async_slice.thread_duration = ((event['tts'] / 1000.0) 358 - (events[0]['event']['tts'] / 1000.0)) 359 async_slice.id = event_id 360 async_slice.args = events[0]['event']['args'] 361 362 # Create sub_slices for each step. 363 for j in xrange(1, len(events)): 364 sub_name = name 365 if events[j - 1]['event']['ph'] == 'T': 366 sub_name = name + ':' + events[j - 1]['event']['args']['step'] 367 sub_slice = tracing_async_slice.AsyncSlice( 368 events[0]['event']['cat'], 369 sub_name, 370 events[j - 1]['event']['ts'] / 1000.0) 371 sub_slice.parent_slice = async_slice 372 373 sub_slice.duration = ((events[j]['event']['ts'] / 1000.0) 374 - (events[j - 1]['event']['ts'] / 1000.0)) 375 376 sub_slice.start_thread = events[j - 1]['thread'] 377 sub_slice.end_thread = events[j]['thread'] 378 if sub_slice.start_thread == sub_slice.end_thread: 379 if 'tts' in events[j]['event'] and \ 380 'tts' in events[j - 1]['event']: 381 sub_slice.thread_duration = \ 382 ((events[j]['event']['tts'] / 1000.0) 383 - (events[j - 1]['event']['tts'] / 1000.0)) 384 385 sub_slice.id = event_id 386 sub_slice.args = events[j - 1]['event']['args'] 387 388 async_slice.AddSubSlice(sub_slice) 389 390 # The args for the finish event go in the last sub_slice. 391 last_slice = async_slice.sub_slices[-1] 392 for arg_name, arg_value in event['args'].iteritems(): 393 last_slice.args[arg_name] = arg_value 394 395 # Add |async_slice| to the start-thread's async_slices. 396 async_slice.start_thread.AddAsyncSlice(async_slice) 397 del async_event_states_by_name_then_id[name][event_id] 398 399 def _CreateExplicitObjects(self): 400 # TODO(tengs): Implement object instance parsing 401 pass 402 403 def _CreateImplicitObjects(self): 404 # TODO(tengs): Implement object instance parsing 405 pass 406 407 def _CreateFlowSlices(self): 408 if len(self._all_flow_events) == 0: 409 return 410 411 self._all_flow_events.sort( 412 cmp=lambda x, y: int(x['event']['ts'] - y['event']['ts'])) 413 414 flow_id_to_event = {} 415 for data in self._all_flow_events: 416 event = data['event'] 417 thread = data['thread'] 418 if 'name' not in event: 419 self._model.import_errors.append( 420 'Flow events (ph: s, t or f) require a name parameter.') 421 continue 422 if 'id' not in event: 423 self._model.import_errors.append( 424 'Flow events (ph: s, t or f) require an id parameter.') 425 continue 426 427 flow_event = tracing_flow_event.FlowEvent( 428 event['cat'], 429 event['id'], 430 event['name'], 431 event['ts'] / 1000.0, 432 event['args']) 433 thread.AddFlowEvent(flow_event) 434 435 if event['ph'] == 's': 436 if event['id'] in flow_id_to_event: 437 self._model.import_errors.append( 438 'event id %s already seen when encountering start of' 439 'flow event.' % event['id']) 440 continue 441 flow_id_to_event[event['id']] = flow_event 442 elif event['ph'] == 't' or event['ph'] == 'f': 443 if not event['id'] in flow_id_to_event: 444 self._model.import_errors.append( 445 'Found flow phase %s for id: %s but no flow start found.' % ( 446 event['ph'], event['id'])) 447 continue 448 flow_position = flow_id_to_event[event['id']] 449 self._model.flow_events.append([flow_position, flow_event]) 450 451 if event['ph'] == 'f': 452 del flow_id_to_event[event['id']] 453 else: 454 # Make this event the next start event in this flow. 455 flow_id_to_event[event['id']] = flow_event 456 457 def _SetBrowserProcess(self): 458 for thread in self._model.GetAllThreads(): 459 if thread.name == 'CrBrowserMain': 460 self._model.browser_process = thread.parent 461 462 def _CheckTraceBufferOverflow(self): 463 for process in self._model.GetAllProcesses(): 464 if process.trace_buffer_did_overflow: 465 raise TraceBufferOverflowException( 466 'Trace buffer of process with pid=%d overflowed at timestamp %d. ' 467 'Raw trace data:\n%s' % 468 (process.pid, process.trace_buffer_overflow_event.start, 469 repr(self._events))) 470 471 def _CreateTabIdsToThreadsMap(self): 472 # Since _CreateTabIdsToThreadsMap() relies on markers output on timeline 473 # tracing data, it maynot work in case we have trace events dropped due to 474 # trace buffer overflow. 475 self._CheckTraceBufferOverflow() 476 477 tab_ids_list = [] 478 for metadata in self._model.metadata: 479 if metadata['name'] == 'tabIds': 480 tab_ids_list = metadata['value'] 481 break 482 for tab_id in tab_ids_list: 483 timeline_markers = self._model.FindTimelineMarkers(tab_id) 484 assert(len(timeline_markers) == 1) 485 assert(timeline_markers[0].start_thread == 486 timeline_markers[0].end_thread) 487 self._model.AddMappingFromTabIdToRendererThread( 488 tab_id, timeline_markers[0].start_thread) 489