1//
2// Copyright 2010 The Android Open Source Project
3//
4// A looper implementation based on epoll().
5//
6#define LOG_TAG "Looper"
7
8//#define LOG_NDEBUG 0
9
10// Debugs poll and wake interactions.
11#define DEBUG_POLL_AND_WAKE 0
12
13// Debugs callback registration and invocation.
14#define DEBUG_CALLBACKS 0
15
16#include <cutils/log.h>
17#include <utils/Looper.h>
18#include <utils/Timers.h>
19
20#include <unistd.h>
21#include <fcntl.h>
22#include <limits.h>
23
24
25namespace android {
26
27// --- WeakMessageHandler ---
28
29WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
30        mHandler(handler) {
31}
32
33WeakMessageHandler::~WeakMessageHandler() {
34}
35
36void WeakMessageHandler::handleMessage(const Message& message) {
37    sp<MessageHandler> handler = mHandler.promote();
38    if (handler != NULL) {
39        handler->handleMessage(message);
40    }
41}
42
43
44// --- SimpleLooperCallback ---
45
46SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
47        mCallback(callback) {
48}
49
50SimpleLooperCallback::~SimpleLooperCallback() {
51}
52
53int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
54    return mCallback(fd, events, data);
55}
56
57
58// --- Looper ---
59
60// Hint for number of file descriptors to be associated with the epoll instance.
61static const int EPOLL_SIZE_HINT = 8;
62
63// Maximum number of file descriptors for which to retrieve poll events each iteration.
64static const int EPOLL_MAX_EVENTS = 16;
65
66static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
67static pthread_key_t gTLSKey = 0;
68
69Looper::Looper(bool allowNonCallbacks) :
70        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
71        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
72    int wakeFds[2];
73    int result = pipe(wakeFds);
74    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
75
76    mWakeReadPipeFd = wakeFds[0];
77    mWakeWritePipeFd = wakeFds[1];
78
79    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
80    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
81            errno);
82
83    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
84    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
85            errno);
86
87    // Allocate the epoll instance and register the wake pipe.
88    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
89    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
90
91    struct epoll_event eventItem;
92    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
93    eventItem.events = EPOLLIN;
94    eventItem.data.fd = mWakeReadPipeFd;
95    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
96    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
97            errno);
98}
99
100Looper::~Looper() {
101    close(mWakeReadPipeFd);
102    close(mWakeWritePipeFd);
103    close(mEpollFd);
104}
105
106void Looper::initTLSKey() {
107    int result = pthread_key_create(& gTLSKey, threadDestructor);
108    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
109}
110
111void Looper::threadDestructor(void *st) {
112    Looper* const self = static_cast<Looper*>(st);
113    if (self != NULL) {
114        self->decStrong((void*)threadDestructor);
115    }
116}
117
118void Looper::setForThread(const sp<Looper>& looper) {
119    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
120
121    if (looper != NULL) {
122        looper->incStrong((void*)threadDestructor);
123    }
124
125    pthread_setspecific(gTLSKey, looper.get());
126
127    if (old != NULL) {
128        old->decStrong((void*)threadDestructor);
129    }
130}
131
132sp<Looper> Looper::getForThread() {
133    int result = pthread_once(& gTLSOnce, initTLSKey);
134    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
135
136    return (Looper*)pthread_getspecific(gTLSKey);
137}
138
139sp<Looper> Looper::prepare(int opts) {
140    bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
141    sp<Looper> looper = Looper::getForThread();
142    if (looper == NULL) {
143        looper = new Looper(allowNonCallbacks);
144        Looper::setForThread(looper);
145    }
146    if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
147        ALOGW("Looper already prepared for this thread with a different value for the "
148                "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
149    }
150    return looper;
151}
152
153bool Looper::getAllowNonCallbacks() const {
154    return mAllowNonCallbacks;
155}
156
157int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
158    int result = 0;
159    for (;;) {
160        while (mResponseIndex < mResponses.size()) {
161            const Response& response = mResponses.itemAt(mResponseIndex++);
162            int ident = response.request.ident;
163            if (ident >= 0) {
164                int fd = response.request.fd;
165                int events = response.events;
166                void* data = response.request.data;
167#if DEBUG_POLL_AND_WAKE
168                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
169                        "fd=%d, events=0x%x, data=%p",
170                        this, ident, fd, events, data);
171#endif
172                if (outFd != NULL) *outFd = fd;
173                if (outEvents != NULL) *outEvents = events;
174                if (outData != NULL) *outData = data;
175                return ident;
176            }
177        }
178
179        if (result != 0) {
180#if DEBUG_POLL_AND_WAKE
181            ALOGD("%p ~ pollOnce - returning result %d", this, result);
182#endif
183            if (outFd != NULL) *outFd = 0;
184            if (outEvents != NULL) *outEvents = 0;
185            if (outData != NULL) *outData = NULL;
186            return result;
187        }
188
189        result = pollInner(timeoutMillis);
190    }
191}
192
193int Looper::pollInner(int timeoutMillis) {
194#if DEBUG_POLL_AND_WAKE
195    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
196#endif
197
198    // Adjust the timeout based on when the next message is due.
199    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
200        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
201        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
202        if (messageTimeoutMillis >= 0
203                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
204            timeoutMillis = messageTimeoutMillis;
205        }
206#if DEBUG_POLL_AND_WAKE
207        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
208                this, mNextMessageUptime - now, timeoutMillis);
209#endif
210    }
211
212    // Poll.
213    int result = ALOOPER_POLL_WAKE;
214    mResponses.clear();
215    mResponseIndex = 0;
216
217    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
218    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
219
220    // Acquire lock.
221    mLock.lock();
222
223    // Check for poll error.
224    if (eventCount < 0) {
225        if (errno == EINTR) {
226            goto Done;
227        }
228        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
229        result = ALOOPER_POLL_ERROR;
230        goto Done;
231    }
232
233    // Check for poll timeout.
234    if (eventCount == 0) {
235#if DEBUG_POLL_AND_WAKE
236        ALOGD("%p ~ pollOnce - timeout", this);
237#endif
238        result = ALOOPER_POLL_TIMEOUT;
239        goto Done;
240    }
241
242    // Handle all events.
243#if DEBUG_POLL_AND_WAKE
244    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
245#endif
246
247    for (int i = 0; i < eventCount; i++) {
248        int fd = eventItems[i].data.fd;
249        uint32_t epollEvents = eventItems[i].events;
250        if (fd == mWakeReadPipeFd) {
251            if (epollEvents & EPOLLIN) {
252                awoken();
253            } else {
254                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
255            }
256        } else {
257            ssize_t requestIndex = mRequests.indexOfKey(fd);
258            if (requestIndex >= 0) {
259                int events = 0;
260                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
261                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
262                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
263                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
264                pushResponse(events, mRequests.valueAt(requestIndex));
265            } else {
266                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
267                        "no longer registered.", epollEvents, fd);
268            }
269        }
270    }
271Done: ;
272
273    // Invoke pending message callbacks.
274    mNextMessageUptime = LLONG_MAX;
275    while (mMessageEnvelopes.size() != 0) {
276        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
277        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
278        if (messageEnvelope.uptime <= now) {
279            // Remove the envelope from the list.
280            // We keep a strong reference to the handler until the call to handleMessage
281            // finishes.  Then we drop it so that the handler can be deleted *before*
282            // we reacquire our lock.
283            { // obtain handler
284                sp<MessageHandler> handler = messageEnvelope.handler;
285                Message message = messageEnvelope.message;
286                mMessageEnvelopes.removeAt(0);
287                mSendingMessage = true;
288                mLock.unlock();
289
290#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
291                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
292                        this, handler.get(), message.what);
293#endif
294                handler->handleMessage(message);
295            } // release handler
296
297            mLock.lock();
298            mSendingMessage = false;
299            result = ALOOPER_POLL_CALLBACK;
300        } else {
301            // The last message left at the head of the queue determines the next wakeup time.
302            mNextMessageUptime = messageEnvelope.uptime;
303            break;
304        }
305    }
306
307    // Release lock.
308    mLock.unlock();
309
310    // Invoke all response callbacks.
311    for (size_t i = 0; i < mResponses.size(); i++) {
312        Response& response = mResponses.editItemAt(i);
313        if (response.request.ident == ALOOPER_POLL_CALLBACK) {
314            int fd = response.request.fd;
315            int events = response.events;
316            void* data = response.request.data;
317#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
318            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
319                    this, response.request.callback.get(), fd, events, data);
320#endif
321            int callbackResult = response.request.callback->handleEvent(fd, events, data);
322            if (callbackResult == 0) {
323                removeFd(fd);
324            }
325            // Clear the callback reference in the response structure promptly because we
326            // will not clear the response vector itself until the next poll.
327            response.request.callback.clear();
328            result = ALOOPER_POLL_CALLBACK;
329        }
330    }
331    return result;
332}
333
334int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
335    if (timeoutMillis <= 0) {
336        int result;
337        do {
338            result = pollOnce(timeoutMillis, outFd, outEvents, outData);
339        } while (result == ALOOPER_POLL_CALLBACK);
340        return result;
341    } else {
342        nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
343                + milliseconds_to_nanoseconds(timeoutMillis);
344
345        for (;;) {
346            int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
347            if (result != ALOOPER_POLL_CALLBACK) {
348                return result;
349            }
350
351            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
352            timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
353            if (timeoutMillis == 0) {
354                return ALOOPER_POLL_TIMEOUT;
355            }
356        }
357    }
358}
359
360void Looper::wake() {
361#if DEBUG_POLL_AND_WAKE
362    ALOGD("%p ~ wake", this);
363#endif
364
365    ssize_t nWrite;
366    do {
367        nWrite = write(mWakeWritePipeFd, "W", 1);
368    } while (nWrite == -1 && errno == EINTR);
369
370    if (nWrite != 1) {
371        if (errno != EAGAIN) {
372            ALOGW("Could not write wake signal, errno=%d", errno);
373        }
374    }
375}
376
377void Looper::awoken() {
378#if DEBUG_POLL_AND_WAKE
379    ALOGD("%p ~ awoken", this);
380#endif
381
382    char buffer[16];
383    ssize_t nRead;
384    do {
385        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
386    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
387}
388
389void Looper::pushResponse(int events, const Request& request) {
390    Response response;
391    response.events = events;
392    response.request = request;
393    mResponses.push(response);
394}
395
396int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
397    return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
398}
399
400int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
401#if DEBUG_CALLBACKS
402    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
403            events, callback.get(), data);
404#endif
405
406    if (!callback.get()) {
407        if (! mAllowNonCallbacks) {
408            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
409            return -1;
410        }
411
412        if (ident < 0) {
413            ALOGE("Invalid attempt to set NULL callback with ident < 0.");
414            return -1;
415        }
416    } else {
417        ident = ALOOPER_POLL_CALLBACK;
418    }
419
420    int epollEvents = 0;
421    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
422    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
423
424    { // acquire lock
425        AutoMutex _l(mLock);
426
427        Request request;
428        request.fd = fd;
429        request.ident = ident;
430        request.callback = callback;
431        request.data = data;
432
433        struct epoll_event eventItem;
434        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
435        eventItem.events = epollEvents;
436        eventItem.data.fd = fd;
437
438        ssize_t requestIndex = mRequests.indexOfKey(fd);
439        if (requestIndex < 0) {
440            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
441            if (epollResult < 0) {
442                ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
443                return -1;
444            }
445            mRequests.add(fd, request);
446        } else {
447            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
448            if (epollResult < 0) {
449                ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
450                return -1;
451            }
452            mRequests.replaceValueAt(requestIndex, request);
453        }
454    } // release lock
455    return 1;
456}
457
458int Looper::removeFd(int fd) {
459#if DEBUG_CALLBACKS
460    ALOGD("%p ~ removeFd - fd=%d", this, fd);
461#endif
462
463    { // acquire lock
464        AutoMutex _l(mLock);
465        ssize_t requestIndex = mRequests.indexOfKey(fd);
466        if (requestIndex < 0) {
467            return 0;
468        }
469
470        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
471        if (epollResult < 0) {
472            ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
473            return -1;
474        }
475
476        mRequests.removeItemsAt(requestIndex);
477    } // release lock
478    return 1;
479}
480
481void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
482    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
483    sendMessageAtTime(now, handler, message);
484}
485
486void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
487        const Message& message) {
488    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
489    sendMessageAtTime(now + uptimeDelay, handler, message);
490}
491
492void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
493        const Message& message) {
494#if DEBUG_CALLBACKS
495    ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
496            this, uptime, handler.get(), message.what);
497#endif
498
499    size_t i = 0;
500    { // acquire lock
501        AutoMutex _l(mLock);
502
503        size_t messageCount = mMessageEnvelopes.size();
504        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
505            i += 1;
506        }
507
508        MessageEnvelope messageEnvelope(uptime, handler, message);
509        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
510
511        // Optimization: If the Looper is currently sending a message, then we can skip
512        // the call to wake() because the next thing the Looper will do after processing
513        // messages is to decide when the next wakeup time should be.  In fact, it does
514        // not even matter whether this code is running on the Looper thread.
515        if (mSendingMessage) {
516            return;
517        }
518    } // release lock
519
520    // Wake the poll loop only when we enqueue a new message at the head.
521    if (i == 0) {
522        wake();
523    }
524}
525
526void Looper::removeMessages(const sp<MessageHandler>& handler) {
527#if DEBUG_CALLBACKS
528    ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
529#endif
530
531    { // acquire lock
532        AutoMutex _l(mLock);
533
534        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
535            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
536            if (messageEnvelope.handler == handler) {
537                mMessageEnvelopes.removeAt(i);
538            }
539        }
540    } // release lock
541}
542
543void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
544#if DEBUG_CALLBACKS
545    ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
546#endif
547
548    { // acquire lock
549        AutoMutex _l(mLock);
550
551        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
552            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
553            if (messageEnvelope.handler == handler
554                    && messageEnvelope.message.what == what) {
555                mMessageEnvelopes.removeAt(i);
556            }
557        }
558    } // release lock
559}
560
561} // namespace android
562