data_receiver.js revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5define('data_receiver', [ 6 'async_waiter', 7 'device/serial/data_stream.mojom', 8 'mojo/public/js/bindings/core', 9 'mojo/public/js/bindings/router', 10], function(asyncWaiter, dataStream, core, router) { 11 /** 12 * @module data_receiver 13 */ 14 15 /** 16 * @typedef module:data_receiver~PendingError 17 * @type {Object} 18 * @property {number} error - the error 19 * @property {number} offset - the location of the error 20 * @private 21 */ 22 23 /** 24 * A pending receive operation. 25 * @constructor 26 * @alias module:data_receiver~PendingReceive 27 * @private 28 */ 29 function PendingReceive() { 30 /** 31 * The promise that will be resolved or rejected when this receive completes 32 * or fails, respectively. 33 * @type {Promise.<ArrayBuffer>} 34 * @private 35 */ 36 this.promise_ = new Promise(function(resolve, reject) { 37 /** 38 * The callback to call with the data received on success. 39 * @type {Function} 40 * @private 41 */ 42 this.dataCallback_ = resolve; 43 /** 44 * The callback to call with the error on failure. 45 * @type {Function} 46 * @private 47 */ 48 this.errorCallback_ = reject; 49 }.bind(this)); 50 } 51 52 /** 53 * Returns the promise that will be resolved when this operation completes or 54 * rejected if an error occurs. 55 * @return {Promise.<ArrayBuffer>} A promise to the data received. 56 */ 57 PendingReceive.prototype.getPromise = function() { 58 return this.promise_; 59 }; 60 61 /** 62 * Dispatches received data to the promise returned by 63 * [getPromise]{@link module:data_receiver.PendingReceive#getPromise}. 64 * @param {ArrayBuffer} data The data to dispatch. 65 */ 66 PendingReceive.prototype.dispatchData = function(data) { 67 this.dataCallback_(data); 68 }; 69 70 /** 71 * Dispatches an error if the offset of the error has been reached. 72 * @param {module:data_receiver~PendingError} error The error to dispatch. 73 * @param {number} bytesReceived The number of bytes that have been received. 74 */ 75 PendingReceive.prototype.dispatchError = function(error, bytesReceived) { 76 if (bytesReceived != error.offset) 77 return false; 78 79 var e = new Error(); 80 e.error = error.error; 81 this.errorCallback_(e); 82 return true; 83 }; 84 85 /** 86 * Unconditionally dispatches an error. 87 * @param {number} error The error to dispatch. 88 */ 89 PendingReceive.prototype.dispatchFatalError = function(error) { 90 var e = new Error(); 91 e.error = error; 92 this.errorCallback_(e); 93 }; 94 95 /** 96 * A DataReceiver that receives data from a DataSource. 97 * @param {MojoHandle} handle The handle to the DataSource. 98 * @param {number} bufferSize How large a buffer the data pipe should use. 99 * @param {number} fatalErrorValue The receive error value to report in the 100 * event of a fatal error. 101 * @constructor 102 * @alias module:data_receiver.DataReceiver 103 */ 104 function DataReceiver(handle, bufferSize, fatalErrorValue) { 105 /** 106 * The [Router]{@link module:mojo/public/js/bindings/router.Router} for the 107 * connection to the DataSource. 108 * @private 109 */ 110 this.router_ = new router.Router(handle); 111 /** 112 * The connection to the DataSource. 113 * @private 114 */ 115 this.source_ = new dataStream.DataSourceProxy(this.router_); 116 this.router_.setIncomingReceiver(this); 117 var dataPipeOptions = { 118 flags: core.CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 119 elementNumBytes: 1, 120 capacityNumBytes: bufferSize, 121 }; 122 var receivePipe = core.createDataPipe(dataPipeOptions); 123 this.source_.init(receivePipe.producerHandle); 124 /** 125 * The handle to the data pipe to use for receiving data. 126 * @private 127 */ 128 this.receivePipe_ = receivePipe.consumerHandle; 129 /** 130 * The current receive operation. 131 * @type {module:data_receiver~PendingReceive} 132 * @private 133 */ 134 this.receive_ = null; 135 /** 136 * The error to be dispatched in the event of a fatal error. 137 * @type {number} 138 * @private 139 */ 140 this.fatalErrorValue_ = fatalErrorValue; 141 /** 142 * The async waiter used to wait for 143 * {@link module:data_receiver.DataReceiver#receivePipe_} to be readable. 144 * @type module:async_waiter.AsyncWaiter 145 * @private 146 */ 147 this.waiter_ = new asyncWaiter.AsyncWaiter(this.receivePipe_, 148 core.HANDLE_SIGNAL_READABLE, 149 this.onHandleReady_.bind(this)); 150 /** 151 * The number of bytes received from the DataSource. 152 * @type {number} 153 * @private 154 */ 155 this.bytesReceived_ = 0; 156 /** 157 * The pending error if there is one. 158 * @type module:data_receiver~PendingError 159 * @private 160 */ 161 this.pendingError_ = null; 162 /** 163 * Whether the DataSource is paused. 164 * @type {boolean} 165 * @private 166 */ 167 this.paused_ = false; 168 /** 169 * Whether this DataReceiver has shut down. 170 * @type {boolean} 171 * @private 172 */ 173 this.shutDown_ = false; 174 } 175 176 DataReceiver.prototype = 177 $Object.create(dataStream.DataSourceClientStub.prototype); 178 179 /** 180 * Closes this DataReceiver. 181 */ 182 DataReceiver.prototype.close = function() { 183 if (this.shutDown_) 184 return; 185 this.shutDown_ = true; 186 this.router_.close(); 187 this.waiter_.stop(); 188 core.close(this.receivePipe_); 189 if (this.receive_) { 190 this.receive_.dispatchFatalError(this.fatalErrorValue_); 191 this.receive_ = null; 192 } 193 }; 194 195 /** 196 * Receive data from the DataSource. 197 * @return {Promise.<ArrayBuffer>} A promise to the received data. If an error 198 * occurs, the promise will reject with an Error object with a property 199 * error containing the error code. 200 * @throws Will throw if this has encountered a fatal error or another receive 201 * is in progress. 202 */ 203 DataReceiver.prototype.receive = function() { 204 if (this.shutDown_) 205 throw new Error('System error'); 206 if (this.receive_) 207 throw new Error('Receive already in progress.'); 208 var receive = new PendingReceive(); 209 var promise = receive.getPromise(); 210 if (this.pendingError_ && 211 receive.dispatchError(this.pendingError_, this.bytesReceived_)) { 212 this.pendingError_ = null; 213 this.paused_ = true; 214 return promise; 215 } 216 if (this.paused_) { 217 this.source_.resume(); 218 this.paused_ = false; 219 } 220 this.receive_ = receive; 221 this.waiter_.start(); 222 return promise; 223 }; 224 225 /** 226 * Invoked when |handle_| is ready to read. Reads from the data pipe if the 227 * wait is successful. 228 * @param {number} waitResult The result of the asynchronous wait. 229 * @private 230 */ 231 DataReceiver.prototype.onHandleReady_ = function(waitResult) { 232 if (waitResult != core.RESULT_OK || !this.receive_) { 233 this.close(); 234 return; 235 } 236 var result = core.readData(this.receivePipe_, core.READ_DATA_FLAG_NONE); 237 if (result.result == core.RESULT_OK) { 238 // TODO(sammc): Handle overflow in the same fashion as the C++ receiver. 239 this.bytesReceived_ += result.buffer.byteLength; 240 this.receive_.dispatchData(result.buffer); 241 this.receive_ = null; 242 } else if (result.result == core.RESULT_SHOULD_WAIT) { 243 this.waiter_.start(); 244 } else { 245 this.close(); 246 } 247 }; 248 249 /** 250 * Invoked by the DataSource when an error is encountered. 251 * @param {number} offset The location at which the error occurred. 252 * @param {number} error The error that occurred. 253 * @private 254 */ 255 DataReceiver.prototype.onError = function(offset, error) { 256 if (this.shutDown_) 257 return; 258 259 /** 260 * @type module:data_receiver~PendingError 261 */ 262 var pendingError = { 263 error: error, 264 offset: offset, 265 }; 266 if (this.receive_ && 267 this.receive_.dispatchError(pendingError, this.bytesReceived_)) { 268 this.receive_ = null; 269 this.waiter_.stop(); 270 this.paused_ = true; 271 return; 272 } 273 this.pendingError_ = pendingError; 274 }; 275 276 return {DataReceiver: DataReceiver}; 277}); 278