1/* 2 * Copyright (C) 2011, 2012 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31#include "config.h" 32 33#include "modules/websockets/WorkerThreadableWebSocketChannel.h" 34 35#include "bindings/core/v8/ScriptCallStackFactory.h" 36#include "core/dom/CrossThreadTask.h" 37#include "core/dom/Document.h" 38#include "core/dom/ExecutionContext.h" 39#include "core/dom/ExecutionContextTask.h" 40#include "core/fileapi/Blob.h" 41#include "core/inspector/ScriptCallFrame.h" 42#include "core/inspector/ScriptCallStack.h" 43#include "core/workers/WorkerGlobalScope.h" 44#include "core/workers/WorkerLoaderProxy.h" 45#include "core/workers/WorkerThread.h" 46#include "modules/websockets/MainThreadWebSocketChannel.h" 47#include "modules/websockets/NewWebSocketChannelImpl.h" 48#include "platform/RuntimeEnabledFeatures.h" 49#include "public/platform/Platform.h" 50#include "public/platform/WebWaitableEvent.h" 51#include "wtf/ArrayBuffer.h" 52#include "wtf/Assertions.h" 53#include "wtf/Functional.h" 54#include "wtf/MainThread.h" 55#include "wtf/text/WTFString.h" 56 57namespace blink { 58 59typedef WorkerThreadableWebSocketChannel::Bridge Bridge; 60typedef WorkerThreadableWebSocketChannel::Peer Peer; 61 62// Created and destroyed on the worker thread. All setters of this class are 63// called on the main thread, while all getters are called on the worker 64// thread. signalWorkerThread() must be called before any getters are called. 65class ThreadableWebSocketChannelSyncHelper : public GarbageCollectedFinalized<ThreadableWebSocketChannelSyncHelper> { 66public: 67 static ThreadableWebSocketChannelSyncHelper* create(PassOwnPtr<WebWaitableEvent> event) 68 { 69 return new ThreadableWebSocketChannelSyncHelper(event); 70 } 71 72 ~ThreadableWebSocketChannelSyncHelper() 73 { 74 } 75 76 // All setters are called on the main thread. 77 void setConnectRequestResult(bool connectRequestResult) 78 { 79 m_connectRequestResult = connectRequestResult; 80 } 81 82 // All getter are called on the worker thread. 83 bool connectRequestResult() const 84 { 85 return m_connectRequestResult; 86 } 87 88 // This should be called after all setters are called and before any 89 // getters are called. 90 void signalWorkerThread() 91 { 92 m_event->signal(); 93 } 94 void wait() 95 { 96 m_event->wait(); 97 } 98 99 void trace(Visitor* visitor) { } 100 101private: 102 explicit ThreadableWebSocketChannelSyncHelper(PassOwnPtr<WebWaitableEvent> event) 103 : m_event(event) 104 , m_connectRequestResult(false) 105 { 106 } 107 108 OwnPtr<WebWaitableEvent> m_event; 109 bool m_connectRequestResult; 110}; 111 112WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& workerGlobalScope, WebSocketChannelClient* client, const String& sourceURL, unsigned lineNumber) 113 : m_bridge(new Bridge(client, workerGlobalScope)) 114 , m_sourceURLAtConnection(sourceURL) 115 , m_lineNumberAtConnection(lineNumber) 116{ 117 m_bridge->initialize(sourceURL, lineNumber); 118} 119 120WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 121{ 122 ASSERT(!m_bridge); 123} 124 125bool WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol) 126{ 127 ASSERT(m_bridge); 128 return m_bridge->connect(url, protocol); 129} 130 131void WorkerThreadableWebSocketChannel::send(const String& message) 132{ 133 ASSERT(m_bridge); 134 m_bridge->send(message); 135} 136 137void WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) 138{ 139 ASSERT(m_bridge); 140 m_bridge->send(binaryData, byteOffset, byteLength); 141} 142 143void WorkerThreadableWebSocketChannel::send(PassRefPtr<BlobDataHandle> blobData) 144{ 145 ASSERT(m_bridge); 146 m_bridge->send(blobData); 147} 148 149void WorkerThreadableWebSocketChannel::close(int code, const String& reason) 150{ 151 ASSERT(m_bridge); 152 m_bridge->close(code, reason); 153} 154 155void WorkerThreadableWebSocketChannel::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 156{ 157 if (!m_bridge) 158 return; 159 160 RefPtrWillBeRawPtr<ScriptCallStack> callStack = createScriptCallStack(1, true); 161 if (callStack && callStack->size()) { 162 // In order to emulate the ConsoleMessage behavior, 163 // we should ignore the specified url and line number if 164 // we can get the JavaScript context. 165 m_bridge->fail(reason, level, callStack->at(0).sourceURL(), callStack->at(0).lineNumber()); 166 } else if (sourceURL.isEmpty() && !lineNumber) { 167 // No information is specified by the caller - use the url 168 // and the line number at the connection. 169 m_bridge->fail(reason, level, m_sourceURLAtConnection, m_lineNumberAtConnection); 170 } else { 171 // Use the specified information. 172 m_bridge->fail(reason, level, sourceURL, lineNumber); 173 } 174} 175 176void WorkerThreadableWebSocketChannel::disconnect() 177{ 178 m_bridge->disconnect(); 179 m_bridge.clear(); 180} 181 182void WorkerThreadableWebSocketChannel::trace(Visitor* visitor) 183{ 184 visitor->trace(m_bridge); 185 WebSocketChannel::trace(visitor); 186} 187 188Peer::Peer(Bridge* bridge, WorkerLoaderProxy& loaderProxy, ThreadableWebSocketChannelSyncHelper* syncHelper) 189 : m_bridge(bridge) 190 , m_loaderProxy(loaderProxy) 191 , m_mainWebSocketChannel(nullptr) 192 , m_syncHelper(syncHelper) 193{ 194 ASSERT(!isMainThread()); 195} 196 197Peer::~Peer() 198{ 199 ASSERT(!isMainThread()); 200} 201 202void Peer::initializeInternal(ExecutionContext* context, const String& sourceURL, unsigned lineNumber) 203{ 204 ASSERT(isMainThread()); 205 Document* document = toDocument(context); 206 if (RuntimeEnabledFeatures::experimentalWebSocketEnabled()) { 207 m_mainWebSocketChannel = NewWebSocketChannelImpl::create(document, this, sourceURL, lineNumber); 208 } else { 209 m_mainWebSocketChannel = MainThreadWebSocketChannel::create(document, this, sourceURL, lineNumber); 210 } 211 m_syncHelper->signalWorkerThread(); 212} 213 214void Peer::connect(const KURL& url, const String& protocol) 215{ 216 ASSERT(isMainThread()); 217 ASSERT(m_syncHelper); 218 if (!m_mainWebSocketChannel) { 219 m_syncHelper->setConnectRequestResult(false); 220 } else { 221 bool connectRequestResult = m_mainWebSocketChannel->connect(url, protocol); 222 m_syncHelper->setConnectRequestResult(connectRequestResult); 223 } 224 m_syncHelper->signalWorkerThread(); 225} 226 227void Peer::send(const String& message) 228{ 229 ASSERT(isMainThread()); 230 if (m_mainWebSocketChannel) 231 m_mainWebSocketChannel->send(message); 232} 233 234void Peer::sendArrayBuffer(PassOwnPtr<Vector<char> > data) 235{ 236 ASSERT(isMainThread()); 237 if (m_mainWebSocketChannel) 238 m_mainWebSocketChannel->send(data); 239} 240 241void Peer::sendBlob(PassRefPtr<BlobDataHandle> blobData) 242{ 243 ASSERT(isMainThread()); 244 if (m_mainWebSocketChannel) 245 m_mainWebSocketChannel->send(blobData); 246} 247 248void Peer::close(int code, const String& reason) 249{ 250 ASSERT(isMainThread()); 251 ASSERT(m_syncHelper); 252 if (!m_mainWebSocketChannel) 253 return; 254 m_mainWebSocketChannel->close(code, reason); 255} 256 257void Peer::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 258{ 259 ASSERT(isMainThread()); 260 ASSERT(m_syncHelper); 261 if (!m_mainWebSocketChannel) 262 return; 263 m_mainWebSocketChannel->fail(reason, level, sourceURL, lineNumber); 264} 265 266void Peer::disconnect() 267{ 268 ASSERT(isMainThread()); 269 ASSERT(m_syncHelper); 270 if (m_mainWebSocketChannel) { 271 m_mainWebSocketChannel->disconnect(); 272 m_mainWebSocketChannel = nullptr; 273 } 274 m_syncHelper->signalWorkerThread(); 275} 276 277static void workerGlobalScopeDidConnect(ExecutionContext* context, Bridge* bridge, const String& subprotocol, const String& extensions) 278{ 279 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 280 if (bridge->client()) 281 bridge->client()->didConnect(subprotocol, extensions); 282} 283 284void Peer::didConnect(const String& subprotocol, const String& extensions) 285{ 286 ASSERT(isMainThread()); 287 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConnect, m_bridge, subprotocol, extensions)); 288} 289 290static void workerGlobalScopeDidReceiveMessage(ExecutionContext* context, Bridge* bridge, const String& message) 291{ 292 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 293 if (bridge->client()) 294 bridge->client()->didReceiveMessage(message); 295} 296 297void Peer::didReceiveMessage(const String& message) 298{ 299 ASSERT(isMainThread()); 300 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessage, m_bridge, message)); 301} 302 303static void workerGlobalScopeDidReceiveBinaryData(ExecutionContext* context, Bridge* bridge, PassOwnPtr<Vector<char> > binaryData) 304{ 305 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 306 if (bridge->client()) 307 bridge->client()->didReceiveBinaryData(binaryData); 308} 309 310void Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData) 311{ 312 ASSERT(isMainThread()); 313 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveBinaryData, m_bridge, binaryData)); 314} 315 316static void workerGlobalScopeDidConsumeBufferedAmount(ExecutionContext* context, Bridge* bridge, unsigned long consumed) 317{ 318 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 319 if (bridge->client()) 320 bridge->client()->didConsumeBufferedAmount(consumed); 321} 322 323void Peer::didConsumeBufferedAmount(unsigned long consumed) 324{ 325 ASSERT(isMainThread()); 326 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidConsumeBufferedAmount, m_bridge, consumed)); 327} 328 329static void workerGlobalScopeDidStartClosingHandshake(ExecutionContext* context, Bridge* bridge) 330{ 331 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 332 if (bridge->client()) 333 bridge->client()->didStartClosingHandshake(); 334} 335 336void Peer::didStartClosingHandshake() 337{ 338 ASSERT(isMainThread()); 339 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidStartClosingHandshake, m_bridge)); 340} 341 342static void workerGlobalScopeDidClose(ExecutionContext* context, Bridge* bridge, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) 343{ 344 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 345 if (bridge->client()) 346 bridge->client()->didClose(closingHandshakeCompletion, code, reason); 347} 348 349void Peer::didClose(ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) 350{ 351 ASSERT(isMainThread()); 352 if (m_mainWebSocketChannel) { 353 m_mainWebSocketChannel->disconnect(); 354 m_mainWebSocketChannel = nullptr; 355 } 356 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidClose, m_bridge, closingHandshakeCompletion, code, reason)); 357} 358 359static void workerGlobalScopeDidReceiveMessageError(ExecutionContext* context, Bridge* bridge) 360{ 361 ASSERT_UNUSED(context, context->isWorkerGlobalScope()); 362 if (bridge->client()) 363 bridge->client()->didReceiveMessageError(); 364} 365 366void Peer::didReceiveMessageError() 367{ 368 ASSERT(isMainThread()); 369 m_loaderProxy.postTaskToWorkerGlobalScope(createCrossThreadTask(&workerGlobalScopeDidReceiveMessageError, m_bridge)); 370} 371 372void Peer::trace(Visitor* visitor) 373{ 374 visitor->trace(m_bridge); 375 visitor->trace(m_mainWebSocketChannel); 376 visitor->trace(m_syncHelper); 377 WebSocketChannelClient::trace(visitor); 378} 379 380Bridge::Bridge(WebSocketChannelClient* client, WorkerGlobalScope& workerGlobalScope) 381 : m_client(client) 382 , m_workerGlobalScope(workerGlobalScope) 383 , m_loaderProxy(m_workerGlobalScope->thread()->workerLoaderProxy()) 384 , m_syncHelper(ThreadableWebSocketChannelSyncHelper::create(adoptPtr(Platform::current()->createWaitableEvent()))) 385 , m_peer(new Peer(this, m_loaderProxy, m_syncHelper)) 386{ 387} 388 389Bridge::~Bridge() 390{ 391 ASSERT(!m_peer); 392} 393 394void Bridge::initialize(const String& sourceURL, unsigned lineNumber) 395{ 396 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::initialize, AllowCrossThreadAccess(m_peer.get()), sourceURL, lineNumber))) { 397 // The worker thread has been signalled to shutdown before method completion. 398 disconnect(); 399 } 400} 401 402bool Bridge::connect(const KURL& url, const String& protocol) 403{ 404 if (!m_peer) 405 return false; 406 407 if (!waitForMethodCompletion(createCrossThreadTask(&Peer::connect, m_peer.get(), url, protocol))) 408 return false; 409 410 return m_syncHelper->connectRequestResult(); 411} 412 413void Bridge::send(const String& message) 414{ 415 ASSERT(m_peer); 416 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::send, m_peer.get(), message)); 417} 418 419void Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) 420{ 421 ASSERT(m_peer); 422 // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. 423 OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength)); 424 if (binaryData.byteLength()) 425 memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength); 426 427 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendArrayBuffer, m_peer.get(), data.release())); 428} 429 430void Bridge::send(PassRefPtr<BlobDataHandle> data) 431{ 432 ASSERT(m_peer); 433 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::sendBlob, m_peer.get(), data)); 434} 435 436void Bridge::close(int code, const String& reason) 437{ 438 ASSERT(m_peer); 439 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::close, m_peer.get(), code, reason)); 440} 441 442void Bridge::fail(const String& reason, MessageLevel level, const String& sourceURL, unsigned lineNumber) 443{ 444 ASSERT(m_peer); 445 m_loaderProxy.postTaskToLoader(createCrossThreadTask(&Peer::fail, m_peer.get(), reason, level, sourceURL, lineNumber)); 446} 447 448void Bridge::disconnect() 449{ 450 if (!m_peer) 451 return; 452 453 waitForMethodCompletion(createCrossThreadTask(&Peer::disconnect, m_peer.get())); 454 // Here |m_peer| is detached from the main thread and we can delete it. 455 456 m_client = nullptr; 457 m_peer = nullptr; 458 m_syncHelper = nullptr; 459 // We won't use this any more. 460 m_workerGlobalScope.clear(); 461} 462 463// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end, 464// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 465bool Bridge::waitForMethodCompletion(PassOwnPtr<ExecutionContextTask> task) 466{ 467 ASSERT(m_workerGlobalScope); 468 ASSERT(m_syncHelper); 469 470 m_loaderProxy.postTaskToLoader(task); 471 472 // We wait for the syncHelper event even if a shutdown event is fired. 473 // See https://codereview.chromium.org/267323004/#msg43 for why we need to wait this. 474 ThreadState::SafePointScope scope(ThreadState::HeapPointersOnStack); 475 m_syncHelper->wait(); 476 // This is checking whether a shutdown event is fired or not. 477 return !m_workerGlobalScope->thread()->terminated(); 478} 479 480void Bridge::trace(Visitor* visitor) 481{ 482 visitor->trace(m_client); 483 visitor->trace(m_workerGlobalScope); 484 visitor->trace(m_syncHelper); 485 visitor->trace(m_peer); 486} 487 488} // namespace blink 489