Looper.cpp revision 588d5c8280c89c646aa7c8c54900225ee04176ea
1//
2// Copyright 2010 The Android Open Source Project
3//
4// A looper implementation based on epoll().
5//
6#define LOG_TAG "Looper"
7
8//#define LOG_NDEBUG 0
9
10// Debugs poll and wake interactions.
11#define DEBUG_POLL_AND_WAKE 0
12
13// Debugs callback registration and invocation.
14#define DEBUG_CALLBACKS 0
15
16#include <cutils/log.h>
17#include <utils/Looper.h>
18#include <utils/Timers.h>
19
20#include <unistd.h>
21#include <fcntl.h>
22#include <limits.h>
23
24
25namespace android {
26
27// --- WeakMessageHandler ---
28
29WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
30        mHandler(handler) {
31}
32
33void WeakMessageHandler::handleMessage(const Message& message) {
34    sp<MessageHandler> handler = mHandler.promote();
35    if (handler != NULL) {
36        handler->handleMessage(message);
37    }
38}
39
40
41// --- Looper ---
42
43// Hint for number of file descriptors to be associated with the epoll instance.
44static const int EPOLL_SIZE_HINT = 8;
45
46// Maximum number of file descriptors for which to retrieve poll events each iteration.
47static const int EPOLL_MAX_EVENTS = 16;
48
49static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
50static pthread_key_t gTLSKey = 0;
51
52Looper::Looper(bool allowNonCallbacks) :
53        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
54        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
55    int wakeFds[2];
56    int result = pipe(wakeFds);
57    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
58
59    mWakeReadPipeFd = wakeFds[0];
60    mWakeWritePipeFd = wakeFds[1];
61
62    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
63    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
64            errno);
65
66    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
67    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
68            errno);
69
70    // Allocate the epoll instance and register the wake pipe.
71    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
72    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
73
74    struct epoll_event eventItem;
75    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
76    eventItem.events = EPOLLIN;
77    eventItem.data.fd = mWakeReadPipeFd;
78    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
79    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
80            errno);
81}
82
83Looper::~Looper() {
84    close(mWakeReadPipeFd);
85    close(mWakeWritePipeFd);
86    close(mEpollFd);
87}
88
89void Looper::initTLSKey() {
90    int result = pthread_key_create(& gTLSKey, threadDestructor);
91    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
92}
93
94void Looper::threadDestructor(void *st) {
95    Looper* const self = static_cast<Looper*>(st);
96    if (self != NULL) {
97        self->decStrong((void*)threadDestructor);
98    }
99}
100
101void Looper::setForThread(const sp<Looper>& looper) {
102    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
103
104    if (looper != NULL) {
105        looper->incStrong((void*)threadDestructor);
106    }
107
108    pthread_setspecific(gTLSKey, looper.get());
109
110    if (old != NULL) {
111        old->decStrong((void*)threadDestructor);
112    }
113}
114
115sp<Looper> Looper::getForThread() {
116    int result = pthread_once(& gTLSOnce, initTLSKey);
117    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
118
119    return (Looper*)pthread_getspecific(gTLSKey);
120}
121
122sp<Looper> Looper::prepare(int opts) {
123    bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
124    sp<Looper> looper = Looper::getForThread();
125    if (looper == NULL) {
126        looper = new Looper(allowNonCallbacks);
127        Looper::setForThread(looper);
128    }
129    if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
130        ALOGW("Looper already prepared for this thread with a different value for the "
131                "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
132    }
133    return looper;
134}
135
136bool Looper::getAllowNonCallbacks() const {
137    return mAllowNonCallbacks;
138}
139
140int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
141    int result = 0;
142    for (;;) {
143        while (mResponseIndex < mResponses.size()) {
144            const Response& response = mResponses.itemAt(mResponseIndex++);
145            ALooper_callbackFunc callback = response.request.callback;
146            if (!callback) {
147                int ident = response.request.ident;
148                int fd = response.request.fd;
149                int events = response.events;
150                void* data = response.request.data;
151#if DEBUG_POLL_AND_WAKE
152                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
153                        "fd=%d, events=0x%x, data=%p",
154                        this, ident, fd, events, data);
155#endif
156                if (outFd != NULL) *outFd = fd;
157                if (outEvents != NULL) *outEvents = events;
158                if (outData != NULL) *outData = data;
159                return ident;
160            }
161        }
162
163        if (result != 0) {
164#if DEBUG_POLL_AND_WAKE
165            ALOGD("%p ~ pollOnce - returning result %d", this, result);
166#endif
167            if (outFd != NULL) *outFd = 0;
168            if (outEvents != NULL) *outEvents = NULL;
169            if (outData != NULL) *outData = NULL;
170            return result;
171        }
172
173        result = pollInner(timeoutMillis);
174    }
175}
176
177int Looper::pollInner(int timeoutMillis) {
178#if DEBUG_POLL_AND_WAKE
179    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
180#endif
181
182    // Adjust the timeout based on when the next message is due.
183    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
184        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
185        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
186        if (messageTimeoutMillis >= 0
187                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
188            timeoutMillis = messageTimeoutMillis;
189        }
190#if DEBUG_POLL_AND_WAKE
191        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
192                this, mNextMessageUptime - now, timeoutMillis);
193#endif
194    }
195
196    // Poll.
197    int result = ALOOPER_POLL_WAKE;
198    mResponses.clear();
199    mResponseIndex = 0;
200
201    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
202    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
203
204    // Acquire lock.
205    mLock.lock();
206
207    // Check for poll error.
208    if (eventCount < 0) {
209        if (errno == EINTR) {
210            goto Done;
211        }
212        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
213        result = ALOOPER_POLL_ERROR;
214        goto Done;
215    }
216
217    // Check for poll timeout.
218    if (eventCount == 0) {
219#if DEBUG_POLL_AND_WAKE
220        ALOGD("%p ~ pollOnce - timeout", this);
221#endif
222        result = ALOOPER_POLL_TIMEOUT;
223        goto Done;
224    }
225
226    // Handle all events.
227#if DEBUG_POLL_AND_WAKE
228    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
229#endif
230
231    for (int i = 0; i < eventCount; i++) {
232        int fd = eventItems[i].data.fd;
233        uint32_t epollEvents = eventItems[i].events;
234        if (fd == mWakeReadPipeFd) {
235            if (epollEvents & EPOLLIN) {
236                awoken();
237            } else {
238                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
239            }
240        } else {
241            ssize_t requestIndex = mRequests.indexOfKey(fd);
242            if (requestIndex >= 0) {
243                int events = 0;
244                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
245                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
246                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
247                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
248                pushResponse(events, mRequests.valueAt(requestIndex));
249            } else {
250                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
251                        "no longer registered.", epollEvents, fd);
252            }
253        }
254    }
255Done: ;
256
257    // Invoke pending message callbacks.
258    mNextMessageUptime = LLONG_MAX;
259    while (mMessageEnvelopes.size() != 0) {
260        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
261        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
262        if (messageEnvelope.uptime <= now) {
263            // Remove the envelope from the list.
264            // We keep a strong reference to the handler until the call to handleMessage
265            // finishes.  Then we drop it so that the handler can be deleted *before*
266            // we reacquire our lock.
267            { // obtain handler
268                sp<MessageHandler> handler = messageEnvelope.handler;
269                Message message = messageEnvelope.message;
270                mMessageEnvelopes.removeAt(0);
271                mSendingMessage = true;
272                mLock.unlock();
273
274#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
275                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
276                        this, handler.get(), message.what);
277#endif
278                handler->handleMessage(message);
279            } // release handler
280
281            mLock.lock();
282            mSendingMessage = false;
283            result = ALOOPER_POLL_CALLBACK;
284        } else {
285            // The last message left at the head of the queue determines the next wakeup time.
286            mNextMessageUptime = messageEnvelope.uptime;
287            break;
288        }
289    }
290
291    // Release lock.
292    mLock.unlock();
293
294    // Invoke all response callbacks.
295    for (size_t i = 0; i < mResponses.size(); i++) {
296        const Response& response = mResponses.itemAt(i);
297        ALooper_callbackFunc callback = response.request.callback;
298        if (callback) {
299            int fd = response.request.fd;
300            int events = response.events;
301            void* data = response.request.data;
302#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
303            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
304                    this, callback, fd, events, data);
305#endif
306            int callbackResult = callback(fd, events, data);
307            if (callbackResult == 0) {
308                removeFd(fd);
309            }
310            result = ALOOPER_POLL_CALLBACK;
311        }
312    }
313    return result;
314}
315
316int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
317    if (timeoutMillis <= 0) {
318        int result;
319        do {
320            result = pollOnce(timeoutMillis, outFd, outEvents, outData);
321        } while (result == ALOOPER_POLL_CALLBACK);
322        return result;
323    } else {
324        nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
325                + milliseconds_to_nanoseconds(timeoutMillis);
326
327        for (;;) {
328            int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
329            if (result != ALOOPER_POLL_CALLBACK) {
330                return result;
331            }
332
333            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
334            timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
335            if (timeoutMillis == 0) {
336                return ALOOPER_POLL_TIMEOUT;
337            }
338        }
339    }
340}
341
342void Looper::wake() {
343#if DEBUG_POLL_AND_WAKE
344    ALOGD("%p ~ wake", this);
345#endif
346
347    ssize_t nWrite;
348    do {
349        nWrite = write(mWakeWritePipeFd, "W", 1);
350    } while (nWrite == -1 && errno == EINTR);
351
352    if (nWrite != 1) {
353        if (errno != EAGAIN) {
354            ALOGW("Could not write wake signal, errno=%d", errno);
355        }
356    }
357}
358
359void Looper::awoken() {
360#if DEBUG_POLL_AND_WAKE
361    ALOGD("%p ~ awoken", this);
362#endif
363
364    char buffer[16];
365    ssize_t nRead;
366    do {
367        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
368    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
369}
370
371void Looper::pushResponse(int events, const Request& request) {
372    Response response;
373    response.events = events;
374    response.request = request;
375    mResponses.push(response);
376}
377
378int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
379#if DEBUG_CALLBACKS
380    ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
381            events, callback, data);
382#endif
383
384    if (! callback) {
385        if (! mAllowNonCallbacks) {
386            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
387            return -1;
388        }
389
390        if (ident < 0) {
391            ALOGE("Invalid attempt to set NULL callback with ident <= 0.");
392            return -1;
393        }
394    }
395
396    int epollEvents = 0;
397    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
398    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
399
400    { // acquire lock
401        AutoMutex _l(mLock);
402
403        Request request;
404        request.fd = fd;
405        request.ident = ident;
406        request.callback = callback;
407        request.data = data;
408
409        struct epoll_event eventItem;
410        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
411        eventItem.events = epollEvents;
412        eventItem.data.fd = fd;
413
414        ssize_t requestIndex = mRequests.indexOfKey(fd);
415        if (requestIndex < 0) {
416            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
417            if (epollResult < 0) {
418                ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
419                return -1;
420            }
421            mRequests.add(fd, request);
422        } else {
423            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
424            if (epollResult < 0) {
425                ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
426                return -1;
427            }
428            mRequests.replaceValueAt(requestIndex, request);
429        }
430    } // release lock
431    return 1;
432}
433
434int Looper::removeFd(int fd) {
435#if DEBUG_CALLBACKS
436    ALOGD("%p ~ removeFd - fd=%d", this, fd);
437#endif
438
439    { // acquire lock
440        AutoMutex _l(mLock);
441        ssize_t requestIndex = mRequests.indexOfKey(fd);
442        if (requestIndex < 0) {
443            return 0;
444        }
445
446        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
447        if (epollResult < 0) {
448            ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
449            return -1;
450        }
451
452        mRequests.removeItemsAt(requestIndex);
453    } // release lock
454    return 1;
455}
456
457void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
458    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
459    sendMessageAtTime(now, handler, message);
460}
461
462void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
463        const Message& message) {
464    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
465    sendMessageAtTime(now + uptimeDelay, handler, message);
466}
467
468void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
469        const Message& message) {
470#if DEBUG_CALLBACKS
471    ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
472            this, uptime, handler.get(), message.what);
473#endif
474
475    size_t i = 0;
476    { // acquire lock
477        AutoMutex _l(mLock);
478
479        size_t messageCount = mMessageEnvelopes.size();
480        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
481            i += 1;
482        }
483
484        MessageEnvelope messageEnvelope(uptime, handler, message);
485        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
486
487        // Optimization: If the Looper is currently sending a message, then we can skip
488        // the call to wake() because the next thing the Looper will do after processing
489        // messages is to decide when the next wakeup time should be.  In fact, it does
490        // not even matter whether this code is running on the Looper thread.
491        if (mSendingMessage) {
492            return;
493        }
494    } // release lock
495
496    // Wake the poll loop only when we enqueue a new message at the head.
497    if (i == 0) {
498        wake();
499    }
500}
501
502void Looper::removeMessages(const sp<MessageHandler>& handler) {
503#if DEBUG_CALLBACKS
504    ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
505#endif
506
507    { // acquire lock
508        AutoMutex _l(mLock);
509
510        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
511            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
512            if (messageEnvelope.handler == handler) {
513                mMessageEnvelopes.removeAt(i);
514            }
515        }
516    } // release lock
517}
518
519void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
520#if DEBUG_CALLBACKS
521    ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
522#endif
523
524    { // acquire lock
525        AutoMutex _l(mLock);
526
527        for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
528            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
529            if (messageEnvelope.handler == handler
530                    && messageEnvelope.message.what == what) {
531                mMessageEnvelopes.removeAt(i);
532            }
533        }
534    } // release lock
535}
536
537} // namespace android
538