1/* 2 * Copyright (C) 2009, 2010 Google Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google Inc. nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31#include "config.h" 32 33#if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS) 34 35#include "WorkerThreadableWebSocketChannel.h" 36 37#include "CrossThreadTask.h" 38#include "PlatformString.h" 39#include "ScriptExecutionContext.h" 40#include "ThreadableWebSocketChannelClientWrapper.h" 41#include "WebSocketChannel.h" 42#include "WebSocketChannelClient.h" 43#include "WorkerContext.h" 44#include "WorkerLoaderProxy.h" 45#include "WorkerRunLoop.h" 46#include "WorkerThread.h" 47 48#include <wtf/PassRefPtr.h> 49 50namespace WebCore { 51 52WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol) 53 : m_workerContext(context) 54 , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client)) 55 , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol)) 56{ 57} 58 59WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() 60{ 61 if (m_bridge) 62 m_bridge->disconnect(); 63} 64 65void WorkerThreadableWebSocketChannel::connect() 66{ 67 if (m_bridge) 68 m_bridge->connect(); 69} 70 71bool WorkerThreadableWebSocketChannel::send(const String& message) 72{ 73 if (!m_bridge) 74 return false; 75 return m_bridge->send(message); 76} 77 78unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const 79{ 80 if (!m_bridge) 81 return 0; 82 return m_bridge->bufferedAmount(); 83} 84 85void WorkerThreadableWebSocketChannel::close() 86{ 87 if (m_bridge) 88 m_bridge->close(); 89} 90 91void WorkerThreadableWebSocketChannel::disconnect() 92{ 93 m_bridge->disconnect(); 94 m_bridge.clear(); 95} 96 97void WorkerThreadableWebSocketChannel::suspend() 98{ 99 m_workerClientWrapper->suspend(); 100 if (m_bridge) 101 m_bridge->suspend(); 102} 103 104void WorkerThreadableWebSocketChannel::resume() 105{ 106 m_workerClientWrapper->resume(); 107 if (m_bridge) 108 m_bridge->resume(); 109} 110 111WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol) 112 : m_workerClientWrapper(clientWrapper) 113 , m_loaderProxy(loaderProxy) 114 , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol)) 115 , m_taskMode(taskMode) 116{ 117 ASSERT(isMainThread()); 118} 119 120WorkerThreadableWebSocketChannel::Peer::~Peer() 121{ 122 ASSERT(isMainThread()); 123 if (m_mainWebSocketChannel) 124 m_mainWebSocketChannel->disconnect(); 125} 126 127void WorkerThreadableWebSocketChannel::Peer::connect() 128{ 129 ASSERT(isMainThread()); 130 if (!m_mainWebSocketChannel) 131 return; 132 m_mainWebSocketChannel->connect(); 133} 134 135static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent) 136{ 137 ASSERT_UNUSED(context, context->isWorkerContext()); 138 workerClientWrapper->setSent(sent); 139} 140 141void WorkerThreadableWebSocketChannel::Peer::send(const String& message) 142{ 143 ASSERT(isMainThread()); 144 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 145 return; 146 bool sent = m_mainWebSocketChannel->send(message); 147 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode); 148} 149 150static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount) 151{ 152 ASSERT_UNUSED(context, context->isWorkerContext()); 153 workerClientWrapper->setBufferedAmount(bufferedAmount); 154} 155 156void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() 157{ 158 ASSERT(isMainThread()); 159 if (!m_mainWebSocketChannel || !m_workerClientWrapper) 160 return; 161 unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); 162 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode); 163} 164 165void WorkerThreadableWebSocketChannel::Peer::close() 166{ 167 ASSERT(isMainThread()); 168 if (!m_mainWebSocketChannel) 169 return; 170 m_mainWebSocketChannel->close(); 171 m_mainWebSocketChannel = 0; 172} 173 174void WorkerThreadableWebSocketChannel::Peer::disconnect() 175{ 176 ASSERT(isMainThread()); 177 if (!m_mainWebSocketChannel) 178 return; 179 m_mainWebSocketChannel->disconnect(); 180 m_mainWebSocketChannel = 0; 181} 182 183void WorkerThreadableWebSocketChannel::Peer::suspend() 184{ 185 ASSERT(isMainThread()); 186 if (!m_mainWebSocketChannel) 187 return; 188 m_mainWebSocketChannel->suspend(); 189} 190 191void WorkerThreadableWebSocketChannel::Peer::resume() 192{ 193 ASSERT(isMainThread()); 194 if (!m_mainWebSocketChannel) 195 return; 196 m_mainWebSocketChannel->resume(); 197} 198 199static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 200{ 201 ASSERT_UNUSED(context, context->isWorkerContext()); 202 workerClientWrapper->didConnect(); 203} 204 205void WorkerThreadableWebSocketChannel::Peer::didConnect() 206{ 207 ASSERT(isMainThread()); 208 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode); 209} 210 211static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message) 212{ 213 ASSERT_UNUSED(context, context->isWorkerContext()); 214 workerClientWrapper->didReceiveMessage(message); 215} 216 217void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) 218{ 219 ASSERT(isMainThread()); 220 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode); 221} 222 223static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount) 224{ 225 ASSERT_UNUSED(context, context->isWorkerContext()); 226 workerClientWrapper->didClose(unhandledBufferedAmount); 227} 228 229void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount) 230{ 231 ASSERT(isMainThread()); 232 m_mainWebSocketChannel = 0; 233 m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode); 234} 235 236void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper) 237{ 238 ASSERT_UNUSED(context, context->isWorkerContext()); 239 thisPtr->m_peer = peer; 240 workerClientWrapper->setSyncMethodDone(); 241} 242 243void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol) 244{ 245 ASSERT(isMainThread()); 246 ASSERT_UNUSED(context, context->isDocument()); 247 248 Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol); 249 thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode); 250} 251 252WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol) 253 : m_workerClientWrapper(workerClientWrapper) 254 , m_workerContext(workerContext) 255 , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy()) 256 , m_taskMode(taskMode) 257 , m_peer(0) 258{ 259 ASSERT(m_workerClientWrapper.get()); 260 setMethodNotCompleted(); 261 m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol)); 262 waitForMethodCompletion(); 263 ASSERT(m_peer); 264} 265 266WorkerThreadableWebSocketChannel::Bridge::~Bridge() 267{ 268 disconnect(); 269} 270 271void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer) 272{ 273 ASSERT(isMainThread()); 274 ASSERT_UNUSED(context, context->isDocument()); 275 ASSERT(peer); 276 277 peer->connect(); 278} 279 280void WorkerThreadableWebSocketChannel::Bridge::connect() 281{ 282 ASSERT(m_workerClientWrapper); 283 ASSERT(m_peer); 284 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer)); 285} 286 287void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message) 288{ 289 ASSERT(isMainThread()); 290 ASSERT_UNUSED(context, context->isDocument()); 291 ASSERT(peer); 292 293 peer->send(message); 294} 295 296bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message) 297{ 298 if (!m_workerClientWrapper) 299 return false; 300 ASSERT(m_peer); 301 setMethodNotCompleted(); 302 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message)); 303 RefPtr<Bridge> protect(this); 304 waitForMethodCompletion(); 305 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 306 return clientWrapper && clientWrapper->sent(); 307} 308 309void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer) 310{ 311 ASSERT(isMainThread()); 312 ASSERT_UNUSED(context, context->isDocument()); 313 ASSERT(peer); 314 315 peer->bufferedAmount(); 316} 317 318unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() 319{ 320 if (!m_workerClientWrapper) 321 return 0; 322 ASSERT(m_peer); 323 setMethodNotCompleted(); 324 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer)); 325 RefPtr<Bridge> protect(this); 326 waitForMethodCompletion(); 327 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 328 if (clientWrapper) 329 return clientWrapper->bufferedAmount(); 330 return 0; 331} 332 333void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer) 334{ 335 ASSERT(isMainThread()); 336 ASSERT_UNUSED(context, context->isDocument()); 337 ASSERT(peer); 338 339 peer->close(); 340} 341 342void WorkerThreadableWebSocketChannel::Bridge::close() 343{ 344 ASSERT(m_peer); 345 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer)); 346} 347 348void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer) 349{ 350 ASSERT(isMainThread()); 351 ASSERT_UNUSED(context, context->isDocument()); 352 ASSERT(peer); 353 354 delete peer; 355} 356 357void WorkerThreadableWebSocketChannel::Bridge::disconnect() 358{ 359 clearClientWrapper(); 360 if (m_peer) { 361 Peer* peer = m_peer; 362 m_peer = 0; 363 m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer)); 364 } 365 m_workerContext = 0; 366} 367 368void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer) 369{ 370 ASSERT(isMainThread()); 371 ASSERT_UNUSED(context, context->isDocument()); 372 ASSERT(peer); 373 374 peer->suspend(); 375} 376 377void WorkerThreadableWebSocketChannel::Bridge::suspend() 378{ 379 ASSERT(m_peer); 380 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, m_peer)); 381} 382 383void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer) 384{ 385 ASSERT(isMainThread()); 386 ASSERT_UNUSED(context, context->isDocument()); 387 ASSERT(peer); 388 389 peer->resume(); 390} 391 392void WorkerThreadableWebSocketChannel::Bridge::resume() 393{ 394 ASSERT(m_peer); 395 m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, m_peer)); 396} 397 398void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() 399{ 400 m_workerClientWrapper->clearClient(); 401} 402 403void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() 404{ 405 ASSERT(m_workerClientWrapper); 406 m_workerClientWrapper->clearSyncMethodDone(); 407} 408 409// Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end, 410// which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. 411void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() 412{ 413 if (!m_workerContext) 414 return; 415 WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop(); 416 MessageQueueWaitResult result = MessageQueueMessageReceived; 417 ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get(); 418 while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { 419 result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null. 420 clientWrapper = m_workerClientWrapper.get(); 421 } 422} 423 424} // namespace WebCore 425 426#endif // ENABLE(WEB_SOCKETS) 427