1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5'use strict';
6
7/**
8 * @fileoverview Implements a WebSocket client that receives
9 * a stream of slices from a server.
10 *
11 */
12
13base.require('model');
14base.require('model.slice');
15
16base.exportTo('tracing.importer', function() {
17
18  var STATE_PAUSED = 0x1;
19  var STATE_CAPTURING = 0x2;
20
21  /**
22   * Converts a stream of trace data from a websocket into a model.
23   *
24   * Events consumed by this importer have the following JSON structure:
25   *
26   * {
27   *   'cmd': 'commandName',
28   *   ... command specific data
29   * }
30   *
31   * The importer understands 2 commands:
32   *      'ptd' (Process Thread Data)
33   *      'pcd' (Process Counter Data)
34   *
35   * The command specific data is as follows:
36   *
37   * {
38   *   'pid': 'Remote Process Id',
39   *   'td': {
40   *                  'n': 'Thread Name Here',
41   *                  's: [ {
42   *                              'l': 'Slice Label',
43   *                              's': startTime,
44   *                              'e': endTime
45   *                              }, ... ]
46   *         }
47   * }
48   *
49   * {
50   *  'pid' 'Remote Process Id',
51   *  'cd': {
52   *      'n': 'Counter Name',
53   *      'sn': ['Series Name',...]
54   *      'sc': [seriesColor, ...]
55   *      'c': [
56   *            {
57   *              't': timestamp,
58   *              'v': [value0, value1, ...]
59   *            },
60   *            ....
61   *           ]
62   *       }
63   * }
64   * @param {Model} model that will be updated
65   * when events are received.
66   * @constructor
67   */
68  function TimelineStreamImporter(model) {
69    var self = this;
70    this.model_ = model;
71    this.connection_ = undefined;
72    this.state_ = STATE_CAPTURING;
73    this.connectionOpenHandler_ =
74      this.connectionOpenHandler_.bind(this);
75    this.connectionCloseHandler_ =
76      this.connectionCloseHandler_.bind(this);
77    this.connectionErrorHandler_ =
78      this.connectionErrorHandler_.bind(this);
79    this.connectionMessageHandler_ =
80      this.connectionMessageHandler_.bind(this);
81  }
82
83  TimelineStreamImporter.prototype = {
84    __proto__: base.EventTarget.prototype,
85
86    cleanup_: function() {
87      if (!this.connection_)
88        return;
89      this.connection_.removeEventListener('open',
90        this.connectionOpenHandler_);
91      this.connection_.removeEventListener('close',
92        this.connectionCloseHandler_);
93      this.connection_.removeEventListener('error',
94        this.connectionErrorHandler_);
95      this.connection_.removeEventListener('message',
96        this.connectionMessageHandler_);
97    },
98
99    connectionOpenHandler_: function() {
100      this.dispatchEvent({'type': 'connect'});
101    },
102
103    connectionCloseHandler_: function() {
104      this.dispatchEvent({'type': 'disconnect'});
105      this.cleanup_();
106    },
107
108    connectionErrorHandler_: function() {
109      this.dispatchEvent({'type': 'connectionerror'});
110      this.cleanup_();
111    },
112
113    connectionMessageHandler_: function(event) {
114      var packet = JSON.parse(event.data);
115      var command = packet['cmd'];
116      var pid = packet['pid'];
117      var modelDirty = false;
118      if (command == 'ptd') {
119        var process = this.model_.getOrCreateProcess(pid);
120        var threadData = packet['td'];
121        var threadName = threadData['n'];
122        var threadSlices = threadData['s'];
123        var thread = process.getOrCreateThread(threadName);
124        for (var s = 0; s < threadSlices.length; s++) {
125          var slice = threadSlices[s];
126          thread.slices.push(new tracing.model.Slice('streamed',
127            slice['l'],
128            0,
129            slice['s'],
130            {},
131            slice['e'] - slice['s']));
132        }
133        modelDirty = true;
134      } else if (command == 'pcd') {
135        var process = this.model_.getOrCreateProcess(pid);
136        var counterData = packet['cd'];
137        var counterName = counterData['n'];
138        var counterSeriesNames = counterData['sn'];
139        var counterSeriesColors = counterData['sc'];
140        var counterValues = counterData['c'];
141        var counter = process.getOrCreateCounter('streamed', counterName);
142        if (counterSeriesNames.length != counterSeriesColors.length) {
143          var importError = 'Streamed counter name length does not match' +
144                            'counter color length' + counterSeriesNames.length +
145                            ' vs ' + counterSeriesColors.length;
146          this.model_.importErrors.push(importError);
147          return;
148        }
149        if (counter.seriesNames.length == 0) {
150          // First time
151          counter.seriesNames = counterSeriesNames;
152          counter.seriesColors = counterSeriesColors;
153        } else {
154          if (counter.seriesNames.length != counterSeriesNames.length) {
155            var importError = 'Streamed counter ' + counterName +
156              'changed number of seriesNames';
157            this.model_.importErrors.push(importError);
158            return;
159          } else {
160            for (var i = 0; i < counter.seriesNames.length; i++) {
161              var oldSeriesName = counter.seriesNames[i];
162              var newSeriesName = counterSeriesNames[i];
163              if (oldSeriesName != newSeriesName) {
164                var importError = 'Streamed counter ' + counterName +
165                  'series name changed from ' +
166                  oldSeriesName + ' to ' +
167                  newSeriesName;
168                this.model_.importErrors.push(importError);
169                return;
170              }
171            }
172          }
173        }
174        for (var c = 0; c < counterValues.length; c++) {
175          var count = counterValues[c];
176          var x = count['t'];
177          var y = count['v'];
178          counter.timestamps.push(x);
179          counter.samples = counter.samples.concat(y);
180        }
181        modelDirty = true;
182      }
183      if (modelDirty == true) {
184        this.model_.updateBounds();
185        this.dispatchEvent({'type': 'modelchange',
186          'model': this.model_});
187      }
188    },
189
190    get connected() {
191      if (this.connection_ !== undefined &&
192        this.connection_.readyState == WebSocket.OPEN) {
193        return true;
194      }
195      return false;
196    },
197
198    get paused() {
199      return this.state_ == STATE_PAUSED;
200    },
201
202    /**
203     * Connects the stream to a websocket.
204     * @param {WebSocket} wsConnection The websocket to use for the stream
205     */
206    connect: function(wsConnection) {
207      this.connection_ = wsConnection;
208      this.connection_.addEventListener('open',
209        this.connectionOpenHandler_);
210      this.connection_.addEventListener('close',
211        this.connectionCloseHandler_);
212      this.connection_.addEventListener('error',
213        this.connectionErrorHandler_);
214      this.connection_.addEventListener('message',
215        this.connectionMessageHandler_);
216    },
217
218    pause: function() {
219      if (this.state_ == STATE_PAUSED)
220        throw new Error('Already paused.');
221      if (!this.connection_)
222        throw new Error('Not connected.');
223      this.connection_.send(JSON.stringify({'cmd': 'pause'}));
224      this.state_ = STATE_PAUSED;
225    },
226
227    resume: function() {
228      if (this.state_ == STATE_CAPTURING)
229        throw new Error('Already capturing.');
230      if (!this.connection_)
231        throw new Error('Not connected.');
232      this.connection_.send(JSON.stringify({'cmd': 'resume'}));
233      this.state_ = STATE_CAPTURING;
234    }
235  };
236
237  return {
238    TimelineStreamImporter: TimelineStreamImporter
239  };
240});
241