1//
2// Copyright 2010 The Android Open Source Project
3//
4// A looper implementation based on epoll().
5//
6#define LOG_TAG "Looper"
7
8//#define LOG_NDEBUG 0
9
10// Debugs poll and wake interactions.
11#define DEBUG_POLL_AND_WAKE 0
12
13// Debugs callback registration and invocation.
14#define DEBUG_CALLBACKS 0
15
16#include <cutils/log.h>
17#include <utils/Looper.h>
18#include <utils/Timers.h>
19
20#include <unistd.h>
21#include <fcntl.h>
22#include <limits.h>
23
24
25namespace android {
26
27// --- WeakMessageHandler ---
28
29WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
30        mHandler(handler) {
31}
32
33WeakMessageHandler::~WeakMessageHandler() {
34}
35
36void WeakMessageHandler::handleMessage(const Message& message) {
37    sp<MessageHandler> handler = mHandler.promote();
38    if (handler != NULL) {
39        handler->handleMessage(message);
40    }
41}
42
43
44// --- SimpleLooperCallback ---
45
46SimpleLooperCallback::SimpleLooperCallback(ALooper_callbackFunc callback) :
47        mCallback(callback) {
48}
49
50SimpleLooperCallback::~SimpleLooperCallback() {
51}
52
53int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
54    return mCallback(fd, events, data);
55}
56
57
58// --- Looper ---
59
60// Hint for number of file descriptors to be associated with the epoll instance.
61static const int EPOLL_SIZE_HINT = 8;
62
63// Maximum number of file descriptors for which to retrieve poll events each iteration.
64static const int EPOLL_MAX_EVENTS = 16;
65
66static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
67static pthread_key_t gTLSKey = 0;
68
69Looper::Looper(bool allowNonCallbacks) :
70        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
71        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
72    int wakeFds[2];
73    int result = pipe(wakeFds);
74    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
75
76    mWakeReadPipeFd = wakeFds[0];
77    mWakeWritePipeFd = wakeFds[1];
78
79    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
80    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
81            errno);
82
83    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
84    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
85            errno);
86
87    mIdling = false;
88
89    // Allocate the epoll instance and register the wake pipe.
90    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
91    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
92
93    struct epoll_event eventItem;
94    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
95    eventItem.events = EPOLLIN;
96    eventItem.data.fd = mWakeReadPipeFd;
97    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
98    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
99            errno);
100}
101
102Looper::~Looper() {
103    close(mWakeReadPipeFd);
104    close(mWakeWritePipeFd);
105    close(mEpollFd);
106}
107
108void Looper::initTLSKey() {
109    int result = pthread_key_create(& gTLSKey, threadDestructor);
110    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
111}
112
113void Looper::threadDestructor(void *st) {
114    Looper* const self = static_cast<Looper*>(st);
115    if (self != NULL) {
116        self->decStrong((void*)threadDestructor);
117    }
118}
119
120void Looper::setForThread(const sp<Looper>& looper) {
121    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
122
123    if (looper != NULL) {
124        looper->incStrong((void*)threadDestructor);
125    }
126
127    pthread_setspecific(gTLSKey, looper.get());
128
129    if (old != NULL) {
130        old->decStrong((void*)threadDestructor);
131    }
132}
133
134sp<Looper> Looper::getForThread() {
135    int result = pthread_once(& gTLSOnce, initTLSKey);
136    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
137
138    return (Looper*)pthread_getspecific(gTLSKey);
139}
140
141sp<Looper> Looper::prepare(int opts) {
142    bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
143    sp<Looper> looper = Looper::getForThread();
144    if (looper == NULL) {
145        looper = new Looper(allowNonCallbacks);
146        Looper::setForThread(looper);
147    }
148    if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
149        ALOGW("Looper already prepared for this thread with a different value for the "
150                "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
151    }
152    return looper;
153}
154
155bool Looper::getAllowNonCallbacks() const {
156    return mAllowNonCallbacks;
157}
158
159int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
160    int result = 0;
161    for (;;) {
162        while (mResponseIndex < mResponses.size()) {
163            const Response& response = mResponses.itemAt(mResponseIndex++);
164            int ident = response.request.ident;
165            if (ident >= 0) {
166                int fd = response.request.fd;
167                int events = response.events;
168                void* data = response.request.data;
169#if DEBUG_POLL_AND_WAKE
170                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
171                        "fd=%d, events=0x%x, data=%p",
172                        this, ident, fd, events, data);
173#endif
174                if (outFd != NULL) *outFd = fd;
175                if (outEvents != NULL) *outEvents = events;
176                if (outData != NULL) *outData = data;
177                return ident;
178            }
179        }
180
181        if (result != 0) {
182#if DEBUG_POLL_AND_WAKE
183            ALOGD("%p ~ pollOnce - returning result %d", this, result);
184#endif
185            if (outFd != NULL) *outFd = 0;
186            if (outEvents != NULL) *outEvents = 0;
187            if (outData != NULL) *outData = NULL;
188            return result;
189        }
190
191        result = pollInner(timeoutMillis);
192    }
193}
194
195int Looper::pollInner(int timeoutMillis) {
196#if DEBUG_POLL_AND_WAKE
197    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
198#endif
199
200    // Adjust the timeout based on when the next message is due.
201    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
202        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
203        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
204        if (messageTimeoutMillis >= 0
205                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
206            timeoutMillis = messageTimeoutMillis;
207        }
208#if DEBUG_POLL_AND_WAKE
209        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
210                this, mNextMessageUptime - now, timeoutMillis);
211#endif
212    }
213
214    // Poll.
215    int result = ALOOPER_POLL_WAKE;
216    mResponses.clear();
217    mResponseIndex = 0;
218
219    // We are about to idle.
220    mIdling = true;
221
222    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
223    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
224
225    // No longer idling.
226    mIdling = false;
227
228    // Acquire lock.
229    mLock.lock();
230
231    // Check for poll error.
232    if (eventCount < 0) {
233        if (errno == EINTR) {
234            goto Done;
235        }
236        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
237        result = ALOOPER_POLL_ERROR;
238        goto Done;
239    }
240
241    // Check for poll timeout.
242    if (eventCount == 0) {
243#if DEBUG_POLL_AND_WAKE
244        ALOGD("%p ~ pollOnce - timeout", this);
245#endif
246        result = ALOOPER_POLL_TIMEOUT;
247        goto Done;
248    }
249
250    // Handle all events.
251#if DEBUG_POLL_AND_WAKE
252    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
253#endif
254
255    for (int i = 0; i < eventCount; i++) {
256        int fd = eventItems[i].data.fd;
257        uint32_t epollEvents = eventItems[i].events;
258        if (fd == mWakeReadPipeFd) {
259            if (epollEvents & EPOLLIN) {
260                awoken();
261            } else {
262                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
263            }
264        } else {
265            ssize_t requestIndex = mRequests.indexOfKey(fd);
266            if (requestIndex >= 0) {
267                int events = 0;
268                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
269                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
270                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
271                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
272                pushResponse(events, mRequests.valueAt(requestIndex));
273            } else {
274                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
275                        "no longer registered.", epollEvents, fd);
276            }
277        }
278    }
279Done: ;
280
281    // Invoke pending message callbacks.
282    mNextMessageUptime = LLONG_MAX;
283    while (mMessageEnvelopes.size() != 0) {
284        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
285        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
286        if (messageEnvelope.uptime <= now) {
287            // Remove the envelope from the list.
288            // We keep a strong reference to the handler until the call to handleMessage
289            // finishes.  Then we drop it so that the handler can be deleted *before*
290            // we reacquire our lock.
291            { // obtain handler
292                sp<MessageHandler> handler = messageEnvelope.handler;
293                Message message = messageEnvelope.message;
294                mMessageEnvelopes.removeAt(0);
295                mSendingMessage = true;
296                mLock.unlock();
297
298#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
299                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
300                        this, handler.get(), message.what);
301#endif
302                handler->handleMessage(message);
303            } // release handler
304
305            mLock.lock();
306            mSendingMessage = false;
307            result = ALOOPER_POLL_CALLBACK;
308        } else {
309            // The last message left at the head of the queue determines the next wakeup time.
310            mNextMessageUptime = messageEnvelope.uptime;
311            break;
312        }
313    }
314
315    // Release lock.
316    mLock.unlock();
317
318    // Invoke all response callbacks.
319    for (size_t i = 0; i < mResponses.size(); i++) {
320        Response& response = mResponses.editItemAt(i);
321        if (response.request.ident == ALOOPER_POLL_CALLBACK) {
322            int fd = response.request.fd;
323            int events = response.events;
324            void* data = response.request.data;
325#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
326            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
327                    this, response.request.callback.get(), fd, events, data);
328#endif
329            int callbackResult = response.request.callback->handleEvent(fd, events, data);
330            if (callbackResult == 0) {
331                removeFd(fd);
332            }
333            // Clear the callback reference in the response structure promptly because we
334            // will not clear the response vector itself until the next poll.
335            response.request.callback.clear();
336            result = ALOOPER_POLL_CALLBACK;
337        }
338    }
339    return result;
340}
341
342int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
343    if (timeoutMillis <= 0) {
344        int result;
345        do {
346            result = pollOnce(timeoutMillis, outFd, outEvents, outData);
347        } while (result == ALOOPER_POLL_CALLBACK);
348        return result;
349    } else {
350        nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
351                + milliseconds_to_nanoseconds(timeoutMillis);
352
353        for (;;) {
354            int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
355            if (result != ALOOPER_POLL_CALLBACK) {
356                return result;
357            }
358
359            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
360            timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
361            if (timeoutMillis == 0) {
362                return ALOOPER_POLL_TIMEOUT;
363            }
364        }
365    }
366}
367
368void Looper::wake() {
369#if DEBUG_POLL_AND_WAKE
370    ALOGD("%p ~ wake", this);
371#endif
372
373    ssize_t nWrite;
374    do {
375        nWrite = write(mWakeWritePipeFd, "W", 1);
376    } while (nWrite == -1 && errno == EINTR);
377
378    if (nWrite != 1) {
379        if (errno != EAGAIN) {
380            ALOGW("Could not write wake signal, errno=%d", errno);
381        }
382    }
383}
384
385void Looper::awoken() {
386#if DEBUG_POLL_AND_WAKE
387    ALOGD("%p ~ awoken", this);
388#endif
389
390    char buffer[16];
391    ssize_t nRead;
392    do {
393        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
394    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
395}
396
397void Looper::pushResponse(int events, const Request& request) {
398    Response response;
399    response.events = events;
400    response.request = request;
401    mResponses.push(response);
402}
403
404int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
405    return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
406}
407
408int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
409#if DEBUG_CALLBACKS
410    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
411            events, callback.get(), data);
412#endif
413
414    if (!callback.get()) {
415        if (! mAllowNonCallbacks) {
416            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
417            return -1;
418        }
419
420        if (ident < 0) {
421            ALOGE("Invalid attempt to set NULL callback with ident < 0.");
422            return -1;
423        }
424    } else {
425        ident = ALOOPER_POLL_CALLBACK;
426    }
427
428    int epollEvents = 0;
429    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
430    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
431
432    { // acquire lock
433        AutoMutex _l(mLock);
434
435        Request request;
436        request.fd = fd;
437        request.ident = ident;
438        request.callback = callback;
439        request.data = data;
440
441        struct epoll_event eventItem;
442        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
443        eventItem.events = epollEvents;
444        eventItem.data.fd = fd;
445
446        ssize_t requestIndex = mRequests.indexOfKey(fd);
447        if (requestIndex < 0) {
448            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
449            if (epollResult < 0) {
450                ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
451                return -1;
452            }
453            mRequests.add(fd, request);
454        } else {
455            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
456            if (epollResult < 0) {
457                ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
458                return -1;
459            }
460            mRequests.replaceValueAt(requestIndex, request);
461        }
462    } // release lock
463    return 1;
464}
465
466int Looper::removeFd(int fd) {
467#if DEBUG_CALLBACKS
468    ALOGD("%p ~ removeFd - fd=%d", this, fd);
469#endif
470
471    { // acquire lock
472        AutoMutex _l(mLock);
473        ssize_t requestIndex = mRequests.indexOfKey(fd);
474        if (requestIndex < 0) {
475            return 0;
476        }
477
478        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
479        if (epollResult < 0) {
480            ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
481            return -1;
482        }
483
484        mRequests.removeItemsAt(requestIndex);
485    } // release lock
486    return 1;
487}
488
489void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
490    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
491    sendMessageAtTime(now, handler, message);
492}
493
494void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
495        const Message& message) {
496    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
497    sendMessageAtTime(now + uptimeDelay, handler, message);
498}
499
500void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
501        const Message& message) {
502#if DEBUG_CALLBACKS
503    ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
504            this, uptime, handler.get(), message.what);
505#endif
506
507    size_t i = 0;
508    { // acquire lock
509        AutoMutex _l(mLock);
510
511        size_t messageCount = mMessageEnvelopes.size();
512        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
513            i += 1;
514        }
515
516        MessageEnvelope messageEnvelope(uptime, handler, message);
517        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
518
519        // Optimization: If the Looper is currently sending a message, then we can skip
520        // the call to wake() because the next thing the Looper will do after processing
521        // messages is to decide when the next wakeup time should be.  In fact, it does
522        // not even matter whether this code is running on the Looper thread.
523        if (mSendingMessage) {
524            return;
525        }
526    } // release lock
527
528    // Wake the poll loop only when we enqueue a new message at the head.
529    if (i == 0) {
530        wake();
531    }
532}
533
534void Looper::removeMessages(const sp<MessageHandler>& handler) {
535#if DEBUG_CALLBACKS
536    ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
537#endif
538
539    { // acquire lock
540        AutoMutex _l(mLock);
541
542        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
543            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
544            if (messageEnvelope.handler == handler) {
545                mMessageEnvelopes.removeAt(i);
546            }
547        }
548    } // release lock
549}
550
551void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
552#if DEBUG_CALLBACKS
553    ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
554#endif
555
556    { // acquire lock
557        AutoMutex _l(mLock);
558
559        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
560            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
561            if (messageEnvelope.handler == handler
562                    && messageEnvelope.message.what == what) {
563                mMessageEnvelopes.removeAt(i);
564            }
565        }
566    } // release lock
567}
568
569bool Looper::isIdling() const {
570    return mIdling;
571}
572
573} // namespace android
574