1/*
2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 *    notice, this list of conditions and the following disclaimer in the
11 *    documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "Connection.h"
28
29#include "BinarySemaphore.h"
30#include "CoreIPCMessageKinds.h"
31#include "RunLoop.h"
32#include "WebProcess.h"
33#include "WorkItem.h"
34#include <wtf/CurrentTime.h>
35
36using namespace std;
37
38namespace CoreIPC {
39
40class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
41public:
42    static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
43    ~SyncMessageState();
44
45    void wakeUpClientRunLoop()
46    {
47        m_waitForSyncReplySemaphore.signal();
48    }
49
50    bool wait(double absoluteTime)
51    {
52        return m_waitForSyncReplySemaphore.wait(absoluteTime);
53    }
54
55#if PLATFORM(WIN)
56    bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
57    {
58        return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
59    }
60#endif
61
62    // Returns true if this message will be handled on a client thread that is currently
63    // waiting for a reply to a synchronous message.
64    bool processIncomingMessage(Connection*, IncomingMessage&);
65
66    void dispatchMessages();
67
68private:
69    explicit SyncMessageState(RunLoop*);
70
71    typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
72    static SyncMessageStateMap& syncMessageStateMap()
73    {
74        DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
75        return syncMessageStateMap;
76    }
77
78    static Mutex& syncMessageStateMapMutex()
79    {
80        DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
81        return syncMessageStateMapMutex;
82    }
83
84    void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
85
86    RunLoop* m_runLoop;
87    BinarySemaphore m_waitForSyncReplySemaphore;
88
89    // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
90    Mutex m_mutex;
91
92    bool m_didScheduleDispatchMessagesWork;
93
94    struct ConnectionAndIncomingMessage {
95        RefPtr<Connection> connection;
96        IncomingMessage incomingMessage;
97    };
98    Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
99};
100
101PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
102{
103    MutexLocker locker(syncMessageStateMapMutex());
104    pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
105
106    if (!result.second) {
107        ASSERT(result.first->second);
108        return result.first->second;
109    }
110
111    RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
112    result.first->second = syncMessageState.get();
113
114    return syncMessageState.release();
115}
116
117Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
118    : m_runLoop(runLoop)
119    , m_didScheduleDispatchMessagesWork(false)
120{
121}
122
123Connection::SyncMessageState::~SyncMessageState()
124{
125    MutexLocker locker(syncMessageStateMapMutex());
126
127    ASSERT(syncMessageStateMap().contains(m_runLoop));
128    syncMessageStateMap().remove(m_runLoop);
129}
130
131bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
132{
133    MessageID messageID = incomingMessage.messageID();
134    if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
135        return false;
136
137    ConnectionAndIncomingMessage connectionAndIncomingMessage;
138    connectionAndIncomingMessage.connection = connection;
139    connectionAndIncomingMessage.incomingMessage = incomingMessage;
140
141    {
142        MutexLocker locker(m_mutex);
143
144        if (!m_didScheduleDispatchMessagesWork) {
145            m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
146            m_didScheduleDispatchMessagesWork = true;
147        }
148
149        m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
150    }
151
152    wakeUpClientRunLoop();
153
154    return true;
155}
156
157void Connection::SyncMessageState::dispatchMessages()
158{
159    ASSERT(m_runLoop == RunLoop::current());
160
161    Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
162
163    {
164        MutexLocker locker(m_mutex);
165        m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
166    }
167
168    for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
169        ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
170        connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
171    }
172}
173
174void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
175{
176    {
177        MutexLocker locker(m_mutex);
178        ASSERT(m_didScheduleDispatchMessagesWork);
179        m_didScheduleDispatchMessagesWork = false;
180    }
181
182    dispatchMessages();
183}
184
185PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
186{
187    return adoptRef(new Connection(identifier, true, client, clientRunLoop));
188}
189
190PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
191{
192    return adoptRef(new Connection(identifier, false, client, clientRunLoop));
193}
194
195Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
196    : m_client(client)
197    , m_isServer(isServer)
198    , m_syncRequestID(0)
199    , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
200    , m_shouldExitOnSyncMessageSendFailure(false)
201    , m_didCloseOnConnectionWorkQueueCallback(0)
202    , m_isConnected(false)
203    , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
204    , m_clientRunLoop(clientRunLoop)
205    , m_inDispatchMessageCount(0)
206    , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
207    , m_didReceiveInvalidMessage(false)
208    , m_defaultSyncMessageTimeout(NoTimeout)
209    , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
210    , m_shouldWaitForSyncReplies(true)
211{
212    ASSERT(m_client);
213
214    platformInitialize(identifier);
215}
216
217Connection::~Connection()
218{
219    ASSERT(!isValid());
220
221    m_connectionQueue.invalidate();
222}
223
224void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
225{
226    ASSERT(!m_isConnected);
227
228    m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
229}
230
231void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
232{
233    ASSERT(!m_isConnected);
234
235    m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
236}
237
238void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
239{
240    ASSERT(!m_isConnected);
241
242    m_didCloseOnConnectionWorkQueueCallback = callback;
243}
244
245void Connection::invalidate()
246{
247    if (!isValid()) {
248        // Someone already called invalidate().
249        return;
250    }
251
252    // Reset the client.
253    m_client = 0;
254
255    m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
256}
257
258void Connection::markCurrentlyDispatchedMessageAsInvalid()
259{
260    // This should only be called while processing a message.
261    ASSERT(m_inDispatchMessageCount > 0);
262
263    m_didReceiveInvalidMessage = true;
264}
265
266void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
267{
268    ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
269
270    m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
271}
272
273PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
274{
275    OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
276
277    // Encode the sync request ID.
278    syncRequestID = ++m_syncRequestID;
279    argumentEncoder->encode(syncRequestID);
280
281    return argumentEncoder.release();
282}
283
284bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
285{
286    if (!isValid())
287        return false;
288
289    if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
290        && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
291            || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
292        messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
293
294    MutexLocker locker(m_outgoingMessagesLock);
295    m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
296
297    // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
298    m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
299    return true;
300}
301
302bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
303{
304    return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
305}
306
307PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
308{
309    // First, check if this message is already in the incoming messages queue.
310    {
311        MutexLocker locker(m_incomingMessagesLock);
312
313        for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
314            const IncomingMessage& message = m_incomingMessages[i];
315
316            if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
317                OwnPtr<ArgumentDecoder> arguments(message.arguments());
318
319                // Erase the incoming message.
320                m_incomingMessages.remove(i);
321                return arguments.release();
322            }
323        }
324    }
325
326    double absoluteTime = currentTime() + timeout;
327
328    std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
329
330    {
331        MutexLocker locker(m_waitForMessageMutex);
332
333        // We don't support having multiple clients wait for the same message.
334        ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
335
336        // Insert our pending wait.
337        m_waitForMessageMap.set(messageAndDestination, 0);
338    }
339
340    // Now wait for it to be set.
341    while (true) {
342        MutexLocker locker(m_waitForMessageMutex);
343
344        HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
345        if (it->second) {
346            OwnPtr<ArgumentDecoder> arguments(it->second);
347            m_waitForMessageMap.remove(it);
348
349            return arguments.release();
350        }
351
352        // Now we wait.
353        if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
354            // We timed out, now remove the pending wait.
355            m_waitForMessageMap.remove(messageAndDestination);
356
357            break;
358        }
359    }
360
361    return PassOwnPtr<ArgumentDecoder>();
362}
363
364PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
365{
366    // We only allow sending sync messages from the client run loop.
367    ASSERT(RunLoop::current() == m_clientRunLoop);
368
369    if (!isValid()) {
370        didFailToSendSyncMessage();
371        return 0;
372    }
373
374    // Push the pending sync reply information on our stack.
375    {
376        MutexLocker locker(m_syncReplyStateMutex);
377        if (!m_shouldWaitForSyncReplies) {
378            didFailToSendSyncMessage();
379            return 0;
380        }
381
382        m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
383    }
384
385    // First send the message.
386    sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
387
388    // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
389    // keep an extra reference to the connection here in case it's invalidated.
390    RefPtr<Connection> protect(this);
391    OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
392
393    // Finally, pop the pending sync reply information.
394    {
395        MutexLocker locker(m_syncReplyStateMutex);
396        ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
397        m_pendingSyncReplies.removeLast();
398    }
399
400    if (!reply)
401        didFailToSendSyncMessage();
402
403    return reply.release();
404}
405
406PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
407{
408    if (timeout == DefaultTimeout)
409        timeout = m_defaultSyncMessageTimeout;
410
411    // Use a really long timeout.
412    if (timeout == NoTimeout)
413        timeout = 1e10;
414
415    double absoluteTime = currentTime() + timeout;
416
417    bool timedOut = false;
418    while (!timedOut) {
419        // First, check if we have any messages that we need to process.
420        m_syncMessageState->dispatchMessages();
421
422        {
423            MutexLocker locker(m_syncReplyStateMutex);
424
425            // Second, check if there is a sync reply at the top of the stack.
426            ASSERT(!m_pendingSyncReplies.isEmpty());
427
428            PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
429            ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
430
431            // We found the sync reply, or the connection was closed.
432            if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
433                return pendingSyncReply.releaseReplyDecoder();
434        }
435
436        // We didn't find a sync reply yet, keep waiting.
437#if PLATFORM(WIN)
438        timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
439#else
440        timedOut = !m_syncMessageState->wait(absoluteTime);
441#endif
442    }
443
444    // We timed out.
445    if (m_client)
446        m_client->syncMessageSendTimedOut(this);
447
448    return 0;
449}
450
451void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
452{
453    MutexLocker locker(m_syncReplyStateMutex);
454    ASSERT(!m_pendingSyncReplies.isEmpty());
455
456    // Go through the stack of sync requests that have pending replies and see which one
457    // this reply is for.
458    for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
459        PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
460
461        if (pendingSyncReply.syncRequestID != arguments->destinationID())
462            continue;
463
464        ASSERT(!pendingSyncReply.replyDecoder);
465
466        pendingSyncReply.replyDecoder = arguments.leakPtr();
467        pendingSyncReply.didReceiveReply = true;
468
469        // We got a reply to the last send message, wake up the client run loop so it can be processed.
470        if (i == m_pendingSyncReplies.size())
471            m_syncMessageState->wakeUpClientRunLoop();
472
473        return;
474    }
475
476    // We got a reply for a message we never sent.
477    // FIXME: Dispatch a didReceiveInvalidMessage callback on the client.
478    ASSERT_NOT_REACHED();
479}
480
481void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
482{
483    // Check if this is a sync reply.
484    if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
485        processIncomingSyncReply(arguments);
486        return;
487    }
488
489    IncomingMessage incomingMessage(messageID, arguments);
490
491    // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
492    // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
493    // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
494    if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
495        return;
496
497    // Check if we're waiting for this message.
498    {
499        MutexLocker locker(m_waitForMessageMutex);
500
501        HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
502        if (it != m_waitForMessageMap.end()) {
503            it->second = incomingMessage.releaseArguments().leakPtr();
504            ASSERT(it->second);
505
506            m_waitForMessageCondition.signal();
507            return;
508        }
509    }
510
511    enqueueIncomingMessage(incomingMessage);
512}
513
514void Connection::connectionDidClose()
515{
516    // The connection is now invalid.
517    platformInvalidate();
518
519    {
520        MutexLocker locker(m_syncReplyStateMutex);
521
522        ASSERT(m_shouldWaitForSyncReplies);
523        m_shouldWaitForSyncReplies = false;
524
525        if (!m_pendingSyncReplies.isEmpty())
526            m_syncMessageState->wakeUpClientRunLoop();
527    }
528
529    if (m_didCloseOnConnectionWorkQueueCallback)
530        m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
531
532    m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
533}
534
535void Connection::dispatchConnectionDidClose()
536{
537    // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
538    // then the client will be null here.
539    if (!m_client)
540        return;
541
542
543    // Because we define a connection as being "valid" based on wheter it has a null client, we null out
544    // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
545    // will then wait indefinitely for a reply.
546    Client* client = m_client;
547    m_client = 0;
548
549    client->didClose(this);
550}
551
552bool Connection::canSendOutgoingMessages() const
553{
554    return m_isConnected && platformCanSendOutgoingMessages();
555}
556
557void Connection::sendOutgoingMessages()
558{
559    if (!canSendOutgoingMessages())
560        return;
561
562    while (true) {
563        OutgoingMessage message;
564        {
565            MutexLocker locker(m_outgoingMessagesLock);
566            if (m_outgoingMessages.isEmpty())
567                break;
568            message = m_outgoingMessages.takeFirst();
569        }
570
571        if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
572            break;
573    }
574}
575
576void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
577{
578    ASSERT(messageID.isSync());
579
580    // Decode the sync request ID.
581    uint64_t syncRequestID = 0;
582
583    if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
584        // We received an invalid sync message.
585        arguments->markInvalid();
586        return;
587    }
588
589    // Create our reply encoder.
590    ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
591
592    // Hand off both the decoder and encoder to the client..
593    SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
594
595    // FIXME: If the message was invalid, we should send back a SyncMessageError.
596    ASSERT(!arguments->isInvalid());
597
598    if (syncReplyMode == ManualReply) {
599        // The client will take ownership of the reply encoder and send it at some point in the future.
600        // We won't do anything here.
601        return;
602    }
603
604    // Send the reply.
605    sendSyncReply(replyEncoder);
606}
607
608void Connection::didFailToSendSyncMessage()
609{
610    if (!m_shouldExitOnSyncMessageSendFailure)
611        return;
612
613    exit(0);
614}
615
616void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
617{
618    MutexLocker locker(m_incomingMessagesLock);
619    m_incomingMessages.append(incomingMessage);
620
621    m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
622}
623
624void Connection::dispatchMessage(IncomingMessage& message)
625{
626    OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
627
628    // If there's no client, return. We do this after calling releaseArguments so that
629    // the ArgumentDecoder message will be freed.
630    if (!m_client)
631        return;
632
633    m_inDispatchMessageCount++;
634
635    if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
636        m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
637
638    bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
639    m_didReceiveInvalidMessage = false;
640
641    if (message.messageID().isSync())
642        dispatchSyncMessage(message.messageID(), arguments.get());
643    else
644        m_client->didReceiveMessage(this, message.messageID(), arguments.get());
645
646    m_didReceiveInvalidMessage |= arguments->isInvalid();
647    m_inDispatchMessageCount--;
648
649    if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
650        m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
651
652    if (m_didReceiveInvalidMessage && m_client)
653        m_client->didReceiveInvalidMessage(this, message.messageID());
654
655    m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
656}
657
658void Connection::dispatchMessages()
659{
660    Vector<IncomingMessage> incomingMessages;
661
662    {
663        MutexLocker locker(m_incomingMessagesLock);
664        m_incomingMessages.swap(incomingMessages);
665    }
666
667    for (size_t i = 0; i < incomingMessages.size(); ++i)
668        dispatchMessage(incomingMessages[i]);
669}
670
671} // namespace CoreIPC
672