android_os_MessageQueue.cpp revision fa9e7c05c7be6891a6cf85a11dc635a6e6853078
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "MQNative"
18
19#include "JNIHelp.h"
20
21#include <sys/socket.h>
22#include <sys/select.h>
23#include <sys/time.h>
24#include <fcntl.h>
25
26#include <android_runtime/AndroidRuntime.h>
27#include <utils/SystemClock.h>
28#include <utils/Vector.h>
29#include <utils/Log.h>
30
31using namespace android;
32
33// ----------------------------------------------------------------------------
34
35static struct {
36    jclass mClass;
37
38    jfieldID mObject;   // native object attached to the DVM MessageQueue
39} gMessageQueueOffsets;
40
41static struct {
42    jclass mClass;
43    jmethodID mConstructor;
44} gKeyEventOffsets;
45
46// TODO: also MotionEvent offsets etc. a la gKeyEventOffsets
47
48static struct {
49    jclass mClass;
50    jmethodID mObtain;      // obtain(Handler h, int what, Object obj)
51} gMessageOffsets;
52
53// ----------------------------------------------------------------------------
54
55static void doThrow(JNIEnv* env, const char* exc, const char* msg = NULL)
56{
57    if (jniThrowException(env, exc, msg) != 0)
58        assert(false);
59}
60
61// ----------------------------------------------------------------------------
62
63class MessageQueueNative {
64public:
65    MessageQueueNative(int readSocket, int writeSocket);
66    ~MessageQueueNative();
67
68    // select on all FDs until the designated time; forever if wakeupTime is < 0
69    int waitForSignal(jobject mqueue, jlong wakeupTime);
70
71    // signal the queue-ready pipe
72    void signalQueuePipe();
73
74    // Specify a new input pipe, passing in responsibility for the socket fd and
75    // ashmem region
76    int registerInputPipe(JNIEnv* env, int socketFd, int memRegionFd, jobject handler);
77
78    // Forget about this input pipe, closing the socket and ashmem region as well
79    int unregisterInputPipe(JNIEnv* env, int socketFd);
80
81    size_t numRegisteredPipes() const { return mInputPipes.size(); }
82
83private:
84    struct InputPipe {
85        int fd;
86        int region;
87        jobject handler;
88
89        InputPipe() {}
90        InputPipe(int _fd, int _r, jobject _h) : fd(_fd), region(_r), handler(_h) {}
91    };
92
93    // consume an event from a socket, put it on the DVM MessageQueue indicated,
94    // and notify the other end of the pipe that we've consumed it.
95    void queueEventFromPipe(const InputPipe& pipe, jobject mqueue);
96
97    int mQueueReadFd, mQueueWriteFd;
98    Vector<InputPipe> mInputPipes;
99};
100
101MessageQueueNative::MessageQueueNative(int readSocket, int writeSocket)
102        : mQueueReadFd(readSocket), mQueueWriteFd(writeSocket) {
103}
104
105MessageQueueNative::~MessageQueueNative() {
106}
107
108int MessageQueueNative::waitForSignal(jobject mqueue, jlong timeoutMillis) {
109    struct timeval tv, *timeout;
110    fd_set fdset;
111
112    if (timeoutMillis < 0) {
113        timeout = NULL;
114    } else {
115        if (timeoutMillis == 0) {
116            tv.tv_sec = 0;
117            tv.tv_usec = 0;
118        } else {
119            tv.tv_sec = (timeoutMillis / 1000);
120            tv.tv_usec = (timeoutMillis - (1000 * tv.tv_sec)) * 1000;
121        }
122        timeout = &tv;
123    }
124
125    // always rebuild the fd set from scratch
126    FD_ZERO(&fdset);
127
128    // the queue signalling pipe
129    FD_SET(mQueueReadFd, &fdset);
130    int maxFd = mQueueReadFd;
131
132    // and the input sockets themselves
133    for (size_t i = 0; i < mInputPipes.size(); i++) {
134        FD_SET(mInputPipes[i].fd, &fdset);
135        if (maxFd < mInputPipes[i].fd) {
136            maxFd = mInputPipes[i].fd;
137        }
138    }
139
140    // now wait
141    int res = select(maxFd + 1, &fdset, NULL, NULL, timeout);
142
143    // Error?  Just return it and bail
144    if (res < 0) return res;
145
146    // What happened -- timeout or actual data arrived?
147    if (res == 0) {
148        // select() returned zero, which means we timed out, which means that it's time
149        // to deliver the head element that was already on the queue.  Just fall through
150        // without doing anything else.
151    } else {
152        // Data (or a queue signal) arrived!
153        //
154        // If it's data, pull the data off the pipe, build a new Message with it, put it on
155        // the DVM-side MessageQueue (pointed to by the 'mqueue' parameter), then proceed
156        // into the queue-signal case.
157        //
158        // If a queue signal arrived, just consume any data pending in that pipe and
159        // fall out.
160        bool queue_signalled = (FD_ISSET(mQueueReadFd, &fdset) != 0);
161
162        for (size_t i = 0; i < mInputPipes.size(); i++) {
163            if (FD_ISSET(mInputPipes[i].fd, &fdset)) {
164                queueEventFromPipe(mInputPipes[i], mqueue);
165                queue_signalled = true;     // we know a priori that queueing the event does this
166            }
167        }
168
169        // Okay, stuff went on the queue.  Consume the contents of the signal pipe
170        // now that we're awake and about to start dispatching messages again.
171        if (queue_signalled) {
172            uint8_t buf[16];
173            ssize_t nRead;
174            do {
175                nRead = read(mQueueReadFd, buf, sizeof(buf));
176            } while (nRead > 0); // in nonblocking mode we'll get -1 when it's drained
177        }
178    }
179
180    return 0;
181}
182
183// signals to the queue pipe are one undefined byte.  it's just a "data has arrived" token
184// and the pipe is drained on receipt of at least one signal
185void MessageQueueNative::signalQueuePipe() {
186    int dummy[1];
187    write(mQueueWriteFd, dummy, 1);
188}
189
190void MessageQueueNative::queueEventFromPipe(const InputPipe& inPipe, jobject mqueue) {
191    // !!! TODO: read the event data from the InputPipe's ashmem region, convert it to a DVM
192    // event object of the proper type [MotionEvent or KeyEvent], create a Message holding
193    // it as appropriate, point the Message to the Handler associated with this InputPipe,
194    // and call up to the DVM MessageQueue implementation to enqueue it for delivery.
195}
196
197// the number of registered pipes on success; < 0 on error
198int MessageQueueNative::registerInputPipe(JNIEnv* env,
199        int socketFd, int memRegionFd, jobject handler) {
200    // make sure this fd is not already known to us
201    for (size_t i = 0; i < mInputPipes.size(); i++) {
202        if (mInputPipes[i].fd == socketFd) {
203            LOGE("Attempt to re-register input fd %d", socketFd);
204            return -1;
205        }
206    }
207
208    mInputPipes.push( InputPipe(socketFd, memRegionFd, env->NewGlobalRef(handler)) );
209    return mInputPipes.size();
210}
211
212// Remove an input pipe from our bookkeeping.  Also closes the socket and ashmem
213// region file descriptor!
214//
215// returns the number of remaining input pipes on success; < 0 on error
216int MessageQueueNative::unregisterInputPipe(JNIEnv* env, int socketFd) {
217    for (size_t i = 0; i < mInputPipes.size(); i++) {
218        if (mInputPipes[i].fd == socketFd) {
219            close(mInputPipes[i].fd);
220            close(mInputPipes[i].region);
221            env->DeleteGlobalRef(mInputPipes[i].handler);
222            mInputPipes.removeAt(i);
223            return mInputPipes.size();
224        }
225    }
226    LOGW("Tried to unregister input pipe %d but not found!", socketFd);
227    return -1;
228}
229
230// ----------------------------------------------------------------------------
231
232namespace android {
233
234static void android_os_MessageQueue_init(JNIEnv* env, jobject obj) {
235    // Create the pipe
236    int fds[2];
237    int err = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
238    if (err != 0) {
239        doThrow(env, "java/lang/RuntimeException", "Unable to create socket pair");
240    }
241
242    MessageQueueNative *mqn = new MessageQueueNative(fds[0], fds[1]);
243    if (mqn == NULL) {
244        close(fds[0]);
245        close(fds[1]);
246        doThrow(env, "java/lang/RuntimeException", "Unable to allocate native queue");
247    }
248
249    int flags = fcntl(fds[0], F_GETFL);
250    fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
251    flags = fcntl(fds[1], F_GETFL);
252    fcntl(fds[1], F_SETFL, flags | O_NONBLOCK);
253
254    env->SetIntField(obj, gMessageQueueOffsets.mObject, (jint)mqn);
255}
256
257static void android_os_MessageQueue_signal(JNIEnv* env, jobject obj) {
258    MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
259    if (mqn != NULL) {
260        mqn->signalQueuePipe();
261    } else {
262        doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
263    }
264}
265
266static int android_os_MessageQueue_waitForNext(JNIEnv* env, jobject obj, jlong when) {
267    MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
268    if (mqn != NULL) {
269        int res = mqn->waitForSignal(obj, when);
270        return res; // the DVM event, if any, has been constructed and queued now
271    }
272
273    return -1;
274}
275
276static void android_os_MessageQueue_registerInputStream(JNIEnv* env, jobject obj,
277        jint socketFd, jint regionFd, jobject handler) {
278    MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
279    if (mqn != NULL) {
280        mqn->registerInputPipe(env, socketFd, regionFd, handler);
281    } else {
282        doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
283    }
284}
285
286static void android_os_MessageQueue_unregisterInputStream(JNIEnv* env, jobject obj,
287        jint socketFd) {
288    MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
289    if (mqn != NULL) {
290        mqn->unregisterInputPipe(env, socketFd);
291    } else {
292        doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
293    }
294}
295
296// ----------------------------------------------------------------------------
297
298const char* const kKeyEventPathName = "android/view/KeyEvent";
299const char* const kMessagePathName = "android/os/Message";
300const char* const kMessageQueuePathName = "android/os/MessageQueue";
301
302static JNINativeMethod gMessageQueueMethods[] = {
303    /* name, signature, funcPtr */
304    { "nativeInit", "()V", (void*)android_os_MessageQueue_init },
305    { "nativeSignal", "()V", (void*)android_os_MessageQueue_signal },
306    { "nativeWaitForNext", "(J)I", (void*)android_os_MessageQueue_waitForNext },
307    { "nativeRegisterInputStream", "(IILandroid/os/Handler;)V", (void*)android_os_MessageQueue_registerInputStream },
308    { "nativeUnregisterInputStream", "(I)V", (void*)android_os_MessageQueue_unregisterInputStream },
309};
310
311int register_android_os_MessageQueue(JNIEnv* env) {
312    jclass clazz;
313
314    clazz = env->FindClass(kMessageQueuePathName);
315    LOG_FATAL_IF(clazz == NULL, "Unable to find class android.os.MessageQueue");
316    gMessageQueueOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
317    gMessageQueueOffsets.mObject = env->GetFieldID(clazz, "mObject", "I");
318    assert(gMessageQueueOffsets.mObject);
319
320    clazz = env->FindClass(kMessagePathName);
321    LOG_FATAL_IF(clazz == NULL, "Unable to find class android.os.Message");
322    gMessageOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
323    gMessageOffsets.mObtain = env->GetStaticMethodID(clazz, "obtain",
324            "(Landroid/os/Handler;ILjava/lang/Object;)Landroid/os/Message;");
325    assert(gMessageOffsets.mObtain);
326
327    clazz = env->FindClass(kKeyEventPathName);
328    LOG_FATAL_IF(clazz == NULL, "Unable to find class android.view.KeyEvent");
329    gKeyEventOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
330    gKeyEventOffsets.mConstructor = env->GetMethodID(clazz, "<init>", "(JJIIIIIII)V");
331    assert(gKeyEventOffsets.mConstructor);
332
333    return AndroidRuntime::registerNativeMethods(env, kMessageQueuePathName,
334            gMessageQueueMethods, NELEM(gMessageQueueMethods));
335}
336
337
338}; // end of namespace android
339