data_receiver.js revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5define('data_receiver', [
6    'async_waiter',
7    'device/serial/data_stream.mojom',
8    'mojo/public/js/bindings/core',
9    'mojo/public/js/bindings/router',
10], function(asyncWaiter, dataStream, core, router) {
11  /**
12   * @module data_receiver
13   */
14
15  /**
16   * @typedef module:data_receiver~PendingError
17   * @type {Object}
18   * @property {number} error - the error
19   * @property {number} offset - the location of the error
20   * @private
21   */
22
23  /**
24   * A pending receive operation.
25   * @constructor
26   * @alias module:data_receiver~PendingReceive
27   * @private
28   */
29  function PendingReceive() {
30    /**
31     * The promise that will be resolved or rejected when this receive completes
32     * or fails, respectively.
33     * @type {Promise.<ArrayBuffer>}
34     * @private
35     */
36    this.promise_ = new Promise(function(resolve, reject) {
37      /**
38       * The callback to call with the data received on success.
39       * @type {Function}
40       * @private
41       */
42      this.dataCallback_ = resolve;
43      /**
44       * The callback to call with the error on failure.
45       * @type {Function}
46       * @private
47       */
48      this.errorCallback_ = reject;
49    }.bind(this));
50  }
51
52  /**
53   * Returns the promise that will be resolved when this operation completes or
54   * rejected if an error occurs.
55   * @return {Promise.<ArrayBuffer>} A promise to the data received.
56   */
57  PendingReceive.prototype.getPromise = function() {
58    return this.promise_;
59  };
60
61  /**
62   * Dispatches received data to the promise returned by
63   * [getPromise]{@link module:data_receiver.PendingReceive#getPromise}.
64   * @param {ArrayBuffer} data The data to dispatch.
65   */
66  PendingReceive.prototype.dispatchData = function(data) {
67    this.dataCallback_(data);
68  };
69
70  /**
71   * Dispatches an error if the offset of the error has been reached.
72   * @param {module:data_receiver~PendingError} error The error to dispatch.
73   * @param {number} bytesReceived The number of bytes that have been received.
74   */
75  PendingReceive.prototype.dispatchError = function(error, bytesReceived) {
76    if (bytesReceived != error.offset)
77      return false;
78
79    var e = new Error();
80    e.error = error.error;
81    this.errorCallback_(e);
82    return true;
83  };
84
85  /**
86   * Unconditionally dispatches an error.
87   * @param {number} error The error to dispatch.
88   */
89  PendingReceive.prototype.dispatchFatalError = function(error) {
90    var e = new Error();
91    e.error = error;
92    this.errorCallback_(e);
93  };
94
95  /**
96   * A DataReceiver that receives data from a DataSource.
97   * @param {MojoHandle} handle The handle to the DataSource.
98   * @param {number} bufferSize How large a buffer the data pipe should use.
99   * @param {number} fatalErrorValue The receive error value to report in the
100   *     event of a fatal error.
101   * @constructor
102   * @alias module:data_receiver.DataReceiver
103   */
104  function DataReceiver(handle, bufferSize, fatalErrorValue) {
105    /**
106     * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the
107     * connection to the DataSource.
108     * @private
109     */
110    this.router_ = new router.Router(handle);
111    /**
112     * The connection to the DataSource.
113     * @private
114     */
115    this.source_ = new dataStream.DataSourceProxy(this.router_);
116    this.router_.setIncomingReceiver(this);
117    var dataPipeOptions = {
118      flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
119      elementNumBytes: 1,
120      capacityNumBytes: bufferSize,
121    };
122    var receivePipe = core.createDataPipe(dataPipeOptions);
123    this.source_.init(receivePipe.producerHandle);
124    /**
125     * The handle to the data pipe to use for receiving data.
126     * @private
127     */
128    this.receivePipe_ = receivePipe.consumerHandle;
129    /**
130     * The current receive operation.
131     * @type {module:data_receiver~PendingReceive}
132     * @private
133     */
134    this.receive_ = null;
135    /**
136     * The error to be dispatched in the event of a fatal error.
137     * @type {number}
138     * @private
139     */
140    this.fatalErrorValue_ = fatalErrorValue;
141    /**
142     * The async waiter used to wait for
143     * {@link module:data_receiver.DataReceiver#receivePipe_} to be readable.
144     * @type module:async_waiter.AsyncWaiter
145     * @private
146     */
147    this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_,
148                                               core.HANDLE_SIGNAL_READABLE,
149                                               this.onHandleReady_.bind(this));
150    /**
151     * The number of bytes received from the DataSource.
152     * @type {number}
153     * @private
154     */
155    this.bytesReceived_ = 0;
156    /**
157     * The pending error if there is one.
158     * @type module:data_receiver~PendingError
159     * @private
160     */
161    this.pendingError_ = null;
162    /**
163     * Whether the DataSource is paused.
164     * @type {boolean}
165     * @private
166     */
167    this.paused_ = false;
168    /**
169     * Whether this DataReceiver has shut down.
170     * @type {boolean}
171     * @private
172     */
173    this.shutDown_ = false;
174  }
175
176  DataReceiver.prototype =
177      $Object.create(dataStream.DataSourceClientStub.prototype);
178
179  /**
180   * Closes this DataReceiver.
181   */
182  DataReceiver.prototype.close = function() {
183    if (this.shutDown_)
184      return;
185    this.shutDown_ = true;
186    this.router_.close();
187    this.waiter_.stop();
188    core.close(this.receivePipe_);
189    if (this.receive_) {
190      this.receive_.dispatchFatalError(this.fatalErrorValue_);
191      this.receive_ = null;
192    }
193  };
194
195  /**
196   * Receive data from the DataSource.
197   * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error
198   *     occurs, the promise will reject with an Error object with a property
199   *     error containing the error code.
200   * @throws Will throw if this has encountered a fatal error or another receive
201   *     is in progress.
202   */
203  DataReceiver.prototype.receive = function() {
204    if (this.shutDown_)
205      throw new Error('System error');
206    if (this.receive_)
207      throw new Error('Receive already in progress.');
208    var receive = new PendingReceive();
209    var promise = receive.getPromise();
210    if (this.pendingError_ &&
211        receive.dispatchError(this.pendingError_, this.bytesReceived_)) {
212      this.pendingError_ = null;
213      this.paused_ = true;
214      return promise;
215    }
216    if (this.paused_) {
217      this.source_.resume();
218      this.paused_ = false;
219    }
220    this.receive_ = receive;
221    this.waiter_.start();
222    return promise;
223  };
224
225  /**
226   * Invoked when |handle_| is ready to read. Reads from the data pipe if the
227   * wait is successful.
228   * @param {number} waitResult The result of the asynchronous wait.
229   * @private
230   */
231  DataReceiver.prototype.onHandleReady_ = function(waitResult) {
232    if (waitResult != core.RESULT_OK || !this.receive_) {
233      this.close();
234      return;
235    }
236    var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE);
237    if (result.result == core.RESULT_OK) {
238      // TODO(sammc): Handle overflow in the same fashion as the C++ receiver.
239      this.bytesReceived_ += result.buffer.byteLength;
240      this.receive_.dispatchData(result.buffer);
241      this.receive_ = null;
242    } else if (result.result == core.RESULT_SHOULD_WAIT) {
243      this.waiter_.start();
244    } else {
245      this.close();
246    }
247  };
248
249  /**
250   * Invoked by the DataSource when an error is encountered.
251   * @param {number} offset The location at which the error occurred.
252   * @param {number} error The error that occurred.
253   * @private
254   */
255  DataReceiver.prototype.onError = function(offset, error) {
256    if (this.shutDown_)
257      return;
258
259    /**
260     * @type module:data_receiver~PendingError
261     */
262    var pendingError = {
263      error: error,
264      offset: offset,
265    };
266    if (this.receive_ &&
267        this.receive_.dispatchError(pendingError, this.bytesReceived_)) {
268      this.receive_ = null;
269      this.waiter_.stop();
270      this.paused_ = true;
271      return;
272    }
273    this.pendingError_ = pendingError;
274  };
275
276  return {DataReceiver: DataReceiver};
277});
278