1/*
2 * Copyright (C) 2011 Igalia S.L.
3 * Copyright (C) 2010 Apple Inc. All rights reserved.
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc.  All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "config.h"
29#include "WorkQueue.h"
30
31#include "WKBase.h"
32#include <WebCore/NotImplemented.h>
33#include <gio/gio.h>
34#include <glib.h>
35#include <wtf/gobject/GRefPtr.h>
36
37// WorkQueue::EventSource
38class WorkQueue::EventSource {
39public:
40    EventSource(PassOwnPtr<WorkItem> workItem, WorkQueue* workQueue, GCancellable* cancellable)
41        : m_workItem(workItem)
42        , m_workQueue(workQueue)
43        , m_cancellable(cancellable)
44    {
45    }
46
47    void cancel()
48    {
49        if (!m_cancellable)
50            return;
51        g_cancellable_cancel(m_cancellable);
52    }
53
54    static void executeEventSource(EventSource* eventSource)
55    {
56        ASSERT(eventSource);
57        WorkQueue* queue = eventSource->m_workQueue;
58        {
59            MutexLocker locker(queue->m_isValidMutex);
60            if (!queue->m_isValid)
61                return;
62        }
63
64        eventSource->m_workItem->execute();
65    }
66
67    static gboolean performWorkOnce(EventSource* eventSource)
68    {
69        executeEventSource(eventSource);
70        return FALSE;
71    }
72
73    static gboolean performWork(GSocket* socket, GIOCondition condition, EventSource* eventSource)
74    {
75        if (!(condition & G_IO_IN) && !(condition & G_IO_HUP) && !(condition & G_IO_ERR)) {
76            // EventSource has been cancelled, return FALSE to destroy the source.
77            return FALSE;
78        }
79
80        executeEventSource(eventSource);
81        return TRUE;
82    }
83
84    static gboolean performWorkOnTermination(GPid, gint, EventSource* eventSource)
85    {
86        executeEventSource(eventSource);
87        return FALSE;
88    }
89
90    static void deleteEventSource(EventSource* eventSource)
91    {
92        ASSERT(eventSource);
93        delete eventSource;
94    }
95
96public:
97    PassOwnPtr<WorkItem> m_workItem;
98    WorkQueue* m_workQueue;
99    GCancellable* m_cancellable;
100};
101
102// WorkQueue
103void WorkQueue::platformInitialize(const char* name)
104{
105    m_eventContext = g_main_context_new();
106    ASSERT(m_eventContext);
107    m_eventLoop = g_main_loop_new(m_eventContext, FALSE);
108    ASSERT(m_eventLoop);
109    m_workQueueThread = createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::startWorkQueueThread), this, name);
110}
111
112void WorkQueue::platformInvalidate()
113{
114    MutexLocker locker(m_eventLoopLock);
115
116    if (m_eventLoop) {
117        if (g_main_loop_is_running(m_eventLoop))
118            g_main_loop_quit(m_eventLoop);
119
120        g_main_loop_unref(m_eventLoop);
121        m_eventLoop = 0;
122    }
123
124    if (m_eventContext) {
125        g_main_context_unref(m_eventContext);
126        m_eventContext = 0;
127    }
128}
129
130void* WorkQueue::startWorkQueueThread(WorkQueue* workQueue)
131{
132    workQueue->workQueueThreadBody();
133    return 0;
134}
135
136void WorkQueue::workQueueThreadBody()
137{
138    g_main_loop_run(m_eventLoop);
139}
140
141void WorkQueue::registerEventSourceHandler(int fileDescriptor, int condition, PassOwnPtr<WorkItem> item)
142{
143    GRefPtr<GSocket> socket = adoptGRef(g_socket_new_from_fd(fileDescriptor, 0));
144    ASSERT(socket);
145    GRefPtr<GCancellable> cancellable = adoptGRef(g_cancellable_new());
146    GRefPtr<GSource> dispatchSource = adoptGRef(g_socket_create_source(socket.get(), static_cast<GIOCondition>(condition), cancellable.get()));
147    ASSERT(dispatchSource);
148    EventSource* eventSource = new EventSource(item, this, cancellable.get());
149    ASSERT(eventSource);
150
151    g_source_set_callback(dispatchSource.get(), reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWork),
152        eventSource, reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
153
154    // Set up the event sources under the mutex since this is shared across multiple threads.
155    {
156        MutexLocker locker(m_eventSourcesLock);
157        Vector<EventSource*> sources;
158        EventSourceIterator it = m_eventSources.find(fileDescriptor);
159        if (it != m_eventSources.end())
160            sources = it->second;
161
162        sources.append(eventSource);
163        m_eventSources.set(fileDescriptor, sources);
164    }
165
166    g_source_attach(dispatchSource.get(), m_eventContext);
167}
168
169void WorkQueue::unregisterEventSourceHandler(int fileDescriptor)
170{
171    ASSERT(fileDescriptor);
172
173    MutexLocker locker(m_eventSourcesLock);
174
175    EventSourceIterator it = m_eventSources.find(fileDescriptor);
176    ASSERT(it != m_eventSources.end());
177    ASSERT(m_eventSources.contains(fileDescriptor));
178
179    if (it != m_eventSources.end()) {
180        Vector<EventSource*> sources = it->second;
181        for (unsigned i = 0; i < sources.size(); i++)
182            sources[i]->cancel();
183
184        m_eventSources.remove(it);
185    }
186}
187
188void WorkQueue::scheduleWorkOnSource(GSource* dispatchSource, PassOwnPtr<WorkItem> item, GSourceFunc sourceCallback)
189{
190    EventSource* eventSource = new EventSource(item, this, 0);
191
192    g_source_set_callback(dispatchSource, sourceCallback, eventSource,
193                          reinterpret_cast<GDestroyNotify>(&WorkQueue::EventSource::deleteEventSource));
194
195    g_source_attach(dispatchSource, m_eventContext);
196}
197
198void WorkQueue::scheduleWork(PassOwnPtr<WorkItem> item)
199{
200    GRefPtr<GSource> dispatchSource = adoptGRef(g_idle_source_new());
201    ASSERT(dispatchSource);
202    g_source_set_priority(dispatchSource.get(), G_PRIORITY_DEFAULT);
203
204    scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
205}
206
207void WorkQueue::scheduleWorkAfterDelay(PassOwnPtr<WorkItem> item, double delay)
208{
209    GRefPtr<GSource> dispatchSource = adoptGRef(g_timeout_source_new(static_cast<guint>(delay * 1000)));
210    ASSERT(dispatchSource);
211
212    scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnce));
213}
214
215void WorkQueue::scheduleWorkOnTermination(WebKit::PlatformProcessIdentifier process, PassOwnPtr<WorkItem> item)
216{
217    GRefPtr<GSource> dispatchSource = adoptGRef(g_child_watch_source_new(process));
218    ASSERT(dispatchSource);
219
220    scheduleWorkOnSource(dispatchSource.get(), item, reinterpret_cast<GSourceFunc>(&WorkQueue::EventSource::performWorkOnTermination));
221}
222