1/* 2 * Copyright (C) 2010 Apple Inc. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' 14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS 17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 23 * THE POSSIBILITY OF SUCH DAMAGE. 24 */ 25 26#include "config.h" 27#include "Connection.h" 28 29#include "BinarySemaphore.h" 30#include "CoreIPCMessageKinds.h" 31#include "RunLoop.h" 32#include "WebProcess.h" 33#include "WorkItem.h" 34#include <wtf/CurrentTime.h> 35 36using namespace std; 37 38namespace CoreIPC { 39 40class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> { 41public: 42 static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*); 43 ~SyncMessageState(); 44 45 void wakeUpClientRunLoop() 46 { 47 m_waitForSyncReplySemaphore.signal(); 48 } 49 50 bool wait(double absoluteTime) 51 { 52 return m_waitForSyncReplySemaphore.wait(absoluteTime); 53 } 54 55#if PLATFORM(WIN) 56 bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages) 57 { 58 return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime); 59 } 60#endif 61 62 // Returns true if this message will be handled on a client thread that is currently 63 // waiting for a reply to a synchronous message. 64 bool processIncomingMessage(Connection*, IncomingMessage&); 65 66 void dispatchMessages(); 67 68private: 69 explicit SyncMessageState(RunLoop*); 70 71 typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap; 72 static SyncMessageStateMap& syncMessageStateMap() 73 { 74 DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ()); 75 return syncMessageStateMap; 76 } 77 78 static Mutex& syncMessageStateMapMutex() 79 { 80 DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ()); 81 return syncMessageStateMapMutex; 82 } 83 84 void dispatchMessageAndResetDidScheduleDispatchMessagesWork(); 85 86 RunLoop* m_runLoop; 87 BinarySemaphore m_waitForSyncReplySemaphore; 88 89 // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply. 90 Mutex m_mutex; 91 92 bool m_didScheduleDispatchMessagesWork; 93 94 struct ConnectionAndIncomingMessage { 95 RefPtr<Connection> connection; 96 IncomingMessage incomingMessage; 97 }; 98 Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply; 99}; 100 101PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop) 102{ 103 MutexLocker locker(syncMessageStateMapMutex()); 104 pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0); 105 106 if (!result.second) { 107 ASSERT(result.first->second); 108 return result.first->second; 109 } 110 111 RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop)); 112 result.first->second = syncMessageState.get(); 113 114 return syncMessageState.release(); 115} 116 117Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop) 118 : m_runLoop(runLoop) 119 , m_didScheduleDispatchMessagesWork(false) 120{ 121} 122 123Connection::SyncMessageState::~SyncMessageState() 124{ 125 MutexLocker locker(syncMessageStateMapMutex()); 126 127 ASSERT(syncMessageStateMap().contains(m_runLoop)); 128 syncMessageStateMap().remove(m_runLoop); 129} 130 131bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage) 132{ 133 MessageID messageID = incomingMessage.messageID(); 134 if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply()) 135 return false; 136 137 ConnectionAndIncomingMessage connectionAndIncomingMessage; 138 connectionAndIncomingMessage.connection = connection; 139 connectionAndIncomingMessage.incomingMessage = incomingMessage; 140 141 { 142 MutexLocker locker(m_mutex); 143 144 if (!m_didScheduleDispatchMessagesWork) { 145 m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork)); 146 m_didScheduleDispatchMessagesWork = true; 147 } 148 149 m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage); 150 } 151 152 wakeUpClientRunLoop(); 153 154 return true; 155} 156 157void Connection::SyncMessageState::dispatchMessages() 158{ 159 ASSERT(m_runLoop == RunLoop::current()); 160 161 Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply; 162 163 { 164 MutexLocker locker(m_mutex); 165 m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply); 166 } 167 168 for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) { 169 ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i]; 170 connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage); 171 } 172} 173 174void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork() 175{ 176 { 177 MutexLocker locker(m_mutex); 178 ASSERT(m_didScheduleDispatchMessagesWork); 179 m_didScheduleDispatchMessagesWork = false; 180 } 181 182 dispatchMessages(); 183} 184 185PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) 186{ 187 return adoptRef(new Connection(identifier, true, client, clientRunLoop)); 188} 189 190PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop) 191{ 192 return adoptRef(new Connection(identifier, false, client, clientRunLoop)); 193} 194 195Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop) 196 : m_client(client) 197 , m_isServer(isServer) 198 , m_syncRequestID(0) 199 , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false) 200 , m_shouldExitOnSyncMessageSendFailure(false) 201 , m_didCloseOnConnectionWorkQueueCallback(0) 202 , m_isConnected(false) 203 , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue") 204 , m_clientRunLoop(clientRunLoop) 205 , m_inDispatchMessageCount(0) 206 , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0) 207 , m_didReceiveInvalidMessage(false) 208 , m_defaultSyncMessageTimeout(NoTimeout) 209 , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop)) 210 , m_shouldWaitForSyncReplies(true) 211{ 212 ASSERT(m_client); 213 214 platformInitialize(identifier); 215} 216 217Connection::~Connection() 218{ 219 ASSERT(!isValid()); 220 221 m_connectionQueue.invalidate(); 222} 223 224void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag) 225{ 226 ASSERT(!m_isConnected); 227 228 m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag; 229} 230 231void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure) 232{ 233 ASSERT(!m_isConnected); 234 235 m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure; 236} 237 238void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback) 239{ 240 ASSERT(!m_isConnected); 241 242 m_didCloseOnConnectionWorkQueueCallback = callback; 243} 244 245void Connection::invalidate() 246{ 247 if (!isValid()) { 248 // Someone already called invalidate(). 249 return; 250 } 251 252 // Reset the client. 253 m_client = 0; 254 255 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate)); 256} 257 258void Connection::markCurrentlyDispatchedMessageAsInvalid() 259{ 260 // This should only be called while processing a message. 261 ASSERT(m_inDispatchMessageCount > 0); 262 263 m_didReceiveInvalidMessage = true; 264} 265 266void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout) 267{ 268 ASSERT(defaultSyncMessageTimeout != DefaultTimeout); 269 270 m_defaultSyncMessageTimeout = defaultSyncMessageTimeout; 271} 272 273PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID) 274{ 275 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID); 276 277 // Encode the sync request ID. 278 syncRequestID = ++m_syncRequestID; 279 argumentEncoder->encode(syncRequestID); 280 281 return argumentEncoder.release(); 282} 283 284bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags) 285{ 286 if (!isValid()) 287 return false; 288 289 if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply 290 && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage 291 || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount)) 292 messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply); 293 294 MutexLocker locker(m_outgoingMessagesLock); 295 m_outgoingMessages.append(OutgoingMessage(messageID, arguments)); 296 297 // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled. 298 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages)); 299 return true; 300} 301 302bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments) 303{ 304 return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments); 305} 306 307PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout) 308{ 309 // First, check if this message is already in the incoming messages queue. 310 { 311 MutexLocker locker(m_incomingMessagesLock); 312 313 for (size_t i = 0; i < m_incomingMessages.size(); ++i) { 314 const IncomingMessage& message = m_incomingMessages[i]; 315 316 if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) { 317 OwnPtr<ArgumentDecoder> arguments(message.arguments()); 318 319 // Erase the incoming message. 320 m_incomingMessages.remove(i); 321 return arguments.release(); 322 } 323 } 324 } 325 326 double absoluteTime = currentTime() + timeout; 327 328 std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID)); 329 330 { 331 MutexLocker locker(m_waitForMessageMutex); 332 333 // We don't support having multiple clients wait for the same message. 334 ASSERT(!m_waitForMessageMap.contains(messageAndDestination)); 335 336 // Insert our pending wait. 337 m_waitForMessageMap.set(messageAndDestination, 0); 338 } 339 340 // Now wait for it to be set. 341 while (true) { 342 MutexLocker locker(m_waitForMessageMutex); 343 344 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination); 345 if (it->second) { 346 OwnPtr<ArgumentDecoder> arguments(it->second); 347 m_waitForMessageMap.remove(it); 348 349 return arguments.release(); 350 } 351 352 // Now we wait. 353 if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) { 354 // We timed out, now remove the pending wait. 355 m_waitForMessageMap.remove(messageAndDestination); 356 357 break; 358 } 359 } 360 361 return PassOwnPtr<ArgumentDecoder>(); 362} 363 364PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout) 365{ 366 // We only allow sending sync messages from the client run loop. 367 ASSERT(RunLoop::current() == m_clientRunLoop); 368 369 if (!isValid()) { 370 didFailToSendSyncMessage(); 371 return 0; 372 } 373 374 // Push the pending sync reply information on our stack. 375 { 376 MutexLocker locker(m_syncReplyStateMutex); 377 if (!m_shouldWaitForSyncReplies) { 378 didFailToSendSyncMessage(); 379 return 0; 380 } 381 382 m_pendingSyncReplies.append(PendingSyncReply(syncRequestID)); 383 } 384 385 // First send the message. 386 sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply); 387 388 // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so 389 // keep an extra reference to the connection here in case it's invalidated. 390 RefPtr<Connection> protect(this); 391 OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout); 392 393 // Finally, pop the pending sync reply information. 394 { 395 MutexLocker locker(m_syncReplyStateMutex); 396 ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID); 397 m_pendingSyncReplies.removeLast(); 398 } 399 400 if (!reply) 401 didFailToSendSyncMessage(); 402 403 return reply.release(); 404} 405 406PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout) 407{ 408 if (timeout == DefaultTimeout) 409 timeout = m_defaultSyncMessageTimeout; 410 411 // Use a really long timeout. 412 if (timeout == NoTimeout) 413 timeout = 1e10; 414 415 double absoluteTime = currentTime() + timeout; 416 417 bool timedOut = false; 418 while (!timedOut) { 419 // First, check if we have any messages that we need to process. 420 m_syncMessageState->dispatchMessages(); 421 422 { 423 MutexLocker locker(m_syncReplyStateMutex); 424 425 // Second, check if there is a sync reply at the top of the stack. 426 ASSERT(!m_pendingSyncReplies.isEmpty()); 427 428 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last(); 429 ASSERT(pendingSyncReply.syncRequestID == syncRequestID); 430 431 // We found the sync reply, or the connection was closed. 432 if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) 433 return pendingSyncReply.releaseReplyDecoder(); 434 } 435 436 // We didn't find a sync reply yet, keep waiting. 437#if PLATFORM(WIN) 438 timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply()); 439#else 440 timedOut = !m_syncMessageState->wait(absoluteTime); 441#endif 442 } 443 444 // We timed out. 445 if (m_client) 446 m_client->syncMessageSendTimedOut(this); 447 448 return 0; 449} 450 451void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments) 452{ 453 MutexLocker locker(m_syncReplyStateMutex); 454 ASSERT(!m_pendingSyncReplies.isEmpty()); 455 456 // Go through the stack of sync requests that have pending replies and see which one 457 // this reply is for. 458 for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) { 459 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1]; 460 461 if (pendingSyncReply.syncRequestID != arguments->destinationID()) 462 continue; 463 464 ASSERT(!pendingSyncReply.replyDecoder); 465 466 pendingSyncReply.replyDecoder = arguments.leakPtr(); 467 pendingSyncReply.didReceiveReply = true; 468 469 // We got a reply to the last send message, wake up the client run loop so it can be processed. 470 if (i == m_pendingSyncReplies.size()) 471 m_syncMessageState->wakeUpClientRunLoop(); 472 473 return; 474 } 475 476 // We got a reply for a message we never sent. 477 // FIXME: Dispatch a didReceiveInvalidMessage callback on the client. 478 ASSERT_NOT_REACHED(); 479} 480 481void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments) 482{ 483 // Check if this is a sync reply. 484 if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) { 485 processIncomingSyncReply(arguments); 486 return; 487 } 488 489 IncomingMessage incomingMessage(messageID, arguments); 490 491 // Check if this is a sync message or if it's a message that should be dispatched even when waiting for 492 // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched. 493 // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply. 494 if (m_syncMessageState->processIncomingMessage(this, incomingMessage)) 495 return; 496 497 // Check if we're waiting for this message. 498 { 499 MutexLocker locker(m_waitForMessageMutex); 500 501 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID())); 502 if (it != m_waitForMessageMap.end()) { 503 it->second = incomingMessage.releaseArguments().leakPtr(); 504 ASSERT(it->second); 505 506 m_waitForMessageCondition.signal(); 507 return; 508 } 509 } 510 511 enqueueIncomingMessage(incomingMessage); 512} 513 514void Connection::connectionDidClose() 515{ 516 // The connection is now invalid. 517 platformInvalidate(); 518 519 { 520 MutexLocker locker(m_syncReplyStateMutex); 521 522 ASSERT(m_shouldWaitForSyncReplies); 523 m_shouldWaitForSyncReplies = false; 524 525 if (!m_pendingSyncReplies.isEmpty()) 526 m_syncMessageState->wakeUpClientRunLoop(); 527 } 528 529 if (m_didCloseOnConnectionWorkQueueCallback) 530 m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this); 531 532 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose)); 533} 534 535void Connection::dispatchConnectionDidClose() 536{ 537 // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called, 538 // then the client will be null here. 539 if (!m_client) 540 return; 541 542 543 // Because we define a connection as being "valid" based on wheter it has a null client, we null out 544 // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and 545 // will then wait indefinitely for a reply. 546 Client* client = m_client; 547 m_client = 0; 548 549 client->didClose(this); 550} 551 552bool Connection::canSendOutgoingMessages() const 553{ 554 return m_isConnected && platformCanSendOutgoingMessages(); 555} 556 557void Connection::sendOutgoingMessages() 558{ 559 if (!canSendOutgoingMessages()) 560 return; 561 562 while (true) { 563 OutgoingMessage message; 564 { 565 MutexLocker locker(m_outgoingMessagesLock); 566 if (m_outgoingMessages.isEmpty()) 567 break; 568 message = m_outgoingMessages.takeFirst(); 569 } 570 571 if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments()))) 572 break; 573 } 574} 575 576void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments) 577{ 578 ASSERT(messageID.isSync()); 579 580 // Decode the sync request ID. 581 uint64_t syncRequestID = 0; 582 583 if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) { 584 // We received an invalid sync message. 585 arguments->markInvalid(); 586 return; 587 } 588 589 // Create our reply encoder. 590 ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr(); 591 592 // Hand off both the decoder and encoder to the client.. 593 SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder); 594 595 // FIXME: If the message was invalid, we should send back a SyncMessageError. 596 ASSERT(!arguments->isInvalid()); 597 598 if (syncReplyMode == ManualReply) { 599 // The client will take ownership of the reply encoder and send it at some point in the future. 600 // We won't do anything here. 601 return; 602 } 603 604 // Send the reply. 605 sendSyncReply(replyEncoder); 606} 607 608void Connection::didFailToSendSyncMessage() 609{ 610 if (!m_shouldExitOnSyncMessageSendFailure) 611 return; 612 613 exit(0); 614} 615 616void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage) 617{ 618 MutexLocker locker(m_incomingMessagesLock); 619 m_incomingMessages.append(incomingMessage); 620 621 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages)); 622} 623 624void Connection::dispatchMessage(IncomingMessage& message) 625{ 626 OwnPtr<ArgumentDecoder> arguments = message.releaseArguments(); 627 628 // If there's no client, return. We do this after calling releaseArguments so that 629 // the ArgumentDecoder message will be freed. 630 if (!m_client) 631 return; 632 633 m_inDispatchMessageCount++; 634 635 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply()) 636 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++; 637 638 bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage; 639 m_didReceiveInvalidMessage = false; 640 641 if (message.messageID().isSync()) 642 dispatchSyncMessage(message.messageID(), arguments.get()); 643 else 644 m_client->didReceiveMessage(this, message.messageID(), arguments.get()); 645 646 m_didReceiveInvalidMessage |= arguments->isInvalid(); 647 m_inDispatchMessageCount--; 648 649 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply()) 650 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--; 651 652 if (m_didReceiveInvalidMessage && m_client) 653 m_client->didReceiveInvalidMessage(this, message.messageID()); 654 655 m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage; 656} 657 658void Connection::dispatchMessages() 659{ 660 Vector<IncomingMessage> incomingMessages; 661 662 { 663 MutexLocker locker(m_incomingMessagesLock); 664 m_incomingMessages.swap(incomingMessages); 665 } 666 667 for (size_t i = 0; i < incomingMessages.size(); ++i) 668 dispatchMessage(incomingMessages[i]); 669} 670 671} // namespace CoreIPC 672