Looper.cpp revision 8d15c74d50fd01d6e63970aadd261a9d3bed27e7
1//
2// Copyright 2010 The Android Open Source Project
3//
4// A looper implementation based on epoll().
5//
6#define LOG_TAG "Looper"
7
8//#define LOG_NDEBUG 0
9
10// Debugs poll and wake interactions.
11#define DEBUG_POLL_AND_WAKE 0
12
13// Debugs callback registration and invocation.
14#define DEBUG_CALLBACKS 0
15
16#include <cutils/log.h>
17#include <utils/Looper.h>
18#include <utils/Timers.h>
19
20#include <unistd.h>
21#include <fcntl.h>
22
23
24namespace android {
25
26#ifdef LOOPER_USES_EPOLL
27// Hint for number of file descriptors to be associated with the epoll instance.
28static const int EPOLL_SIZE_HINT = 8;
29
30// Maximum number of file descriptors for which to retrieve poll events each iteration.
31static const int EPOLL_MAX_EVENTS = 16;
32#endif
33
34static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
35static pthread_key_t gTLSKey = 0;
36
37Looper::Looper(bool allowNonCallbacks) :
38        mAllowNonCallbacks(allowNonCallbacks),
39        mResponseIndex(0) {
40    int wakeFds[2];
41    int result = pipe(wakeFds);
42    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
43
44    mWakeReadPipeFd = wakeFds[0];
45    mWakeWritePipeFd = wakeFds[1];
46
47    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
48    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
49            errno);
50
51    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
52    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
53            errno);
54
55#ifdef LOOPER_USES_EPOLL
56    // Allocate the epoll instance and register the wake pipe.
57    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
58    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
59
60    struct epoll_event eventItem;
61    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
62    eventItem.events = EPOLLIN;
63    eventItem.data.fd = mWakeReadPipeFd;
64    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
65    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
66            errno);
67#else
68    // Add the wake pipe to the head of the request list with a null callback.
69    struct pollfd requestedFd;
70    requestedFd.fd = mWakeReadPipeFd;
71    requestedFd.events = POLLIN;
72    mRequestedFds.push(requestedFd);
73
74    Request request;
75    request.fd = mWakeReadPipeFd;
76    request.callback = NULL;
77    request.ident = 0;
78    request.data = NULL;
79    mRequests.push(request);
80
81    mPolling = false;
82    mWaiters = 0;
83#endif
84
85#ifdef LOOPER_STATISTICS
86    mPendingWakeTime = -1;
87    mPendingWakeCount = 0;
88    mSampledWakeCycles = 0;
89    mSampledWakeCountSum = 0;
90    mSampledWakeLatencySum = 0;
91
92    mSampledPolls = 0;
93    mSampledZeroPollCount = 0;
94    mSampledZeroPollLatencySum = 0;
95    mSampledTimeoutPollCount = 0;
96    mSampledTimeoutPollLatencySum = 0;
97#endif
98}
99
100Looper::~Looper() {
101    close(mWakeReadPipeFd);
102    close(mWakeWritePipeFd);
103#ifdef LOOPER_USES_EPOLL
104    close(mEpollFd);
105#endif
106}
107
108void Looper::initTLSKey() {
109    int result = pthread_key_create(& gTLSKey, threadDestructor);
110    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
111}
112
113void Looper::threadDestructor(void *st) {
114    Looper* const self = static_cast<Looper*>(st);
115    if (self != NULL) {
116        self->decStrong((void*)threadDestructor);
117    }
118}
119
120void Looper::setForThread(const sp<Looper>& looper) {
121    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
122
123    if (looper != NULL) {
124        looper->incStrong((void*)threadDestructor);
125    }
126
127    pthread_setspecific(gTLSKey, looper.get());
128
129    if (old != NULL) {
130        old->decStrong((void*)threadDestructor);
131    }
132}
133
134sp<Looper> Looper::getForThread() {
135    int result = pthread_once(& gTLSOnce, initTLSKey);
136    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
137
138    return (Looper*)pthread_getspecific(gTLSKey);
139}
140
141sp<Looper> Looper::prepare(int opts) {
142    bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS;
143    sp<Looper> looper = Looper::getForThread();
144    if (looper == NULL) {
145        looper = new Looper(allowNonCallbacks);
146        Looper::setForThread(looper);
147    }
148    if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
149        LOGW("Looper already prepared for this thread with a different value for the "
150                "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
151    }
152    return looper;
153}
154
155bool Looper::getAllowNonCallbacks() const {
156    return mAllowNonCallbacks;
157}
158
159int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
160    int result = 0;
161    for (;;) {
162        while (mResponseIndex < mResponses.size()) {
163            const Response& response = mResponses.itemAt(mResponseIndex++);
164            if (! response.request.callback) {
165#if DEBUG_POLL_AND_WAKE
166                LOGD("%p ~ pollOnce - returning signalled identifier %d: "
167                        "fd=%d, events=0x%x, data=%p", this,
168                        response.request.ident, response.request.fd,
169                        response.events, response.request.data);
170#endif
171                if (outFd != NULL) *outFd = response.request.fd;
172                if (outEvents != NULL) *outEvents = response.events;
173                if (outData != NULL) *outData = response.request.data;
174                return response.request.ident;
175            }
176        }
177
178        if (result != 0) {
179#if DEBUG_POLL_AND_WAKE
180            LOGD("%p ~ pollOnce - returning result %d", this, result);
181#endif
182            if (outFd != NULL) *outFd = 0;
183            if (outEvents != NULL) *outEvents = NULL;
184            if (outData != NULL) *outData = NULL;
185            return result;
186        }
187
188        result = pollInner(timeoutMillis);
189    }
190}
191
192int Looper::pollInner(int timeoutMillis) {
193#if DEBUG_POLL_AND_WAKE
194    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
195#endif
196
197    int result = ALOOPER_POLL_WAKE;
198    mResponses.clear();
199    mResponseIndex = 0;
200
201#ifdef LOOPER_STATISTICS
202    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
203#endif
204
205#ifdef LOOPER_USES_EPOLL
206    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
207    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
208    bool acquiredLock = false;
209#else
210    // Wait for wakeAndLock() waiters to run then set mPolling to true.
211    mLock.lock();
212    while (mWaiters != 0) {
213        mResume.wait(mLock);
214    }
215    mPolling = true;
216    mLock.unlock();
217
218    size_t requestedCount = mRequestedFds.size();
219    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
220#endif
221
222    if (eventCount < 0) {
223        if (errno == EINTR) {
224            goto Done;
225        }
226
227        LOGW("Poll failed with an unexpected error, errno=%d", errno);
228        result = ALOOPER_POLL_ERROR;
229        goto Done;
230    }
231
232    if (eventCount == 0) {
233#if DEBUG_POLL_AND_WAKE
234        LOGD("%p ~ pollOnce - timeout", this);
235#endif
236        result = ALOOPER_POLL_TIMEOUT;
237        goto Done;
238    }
239
240#if DEBUG_POLL_AND_WAKE
241    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
242#endif
243
244#ifdef LOOPER_USES_EPOLL
245    for (int i = 0; i < eventCount; i++) {
246        int fd = eventItems[i].data.fd;
247        uint32_t epollEvents = eventItems[i].events;
248        if (fd == mWakeReadPipeFd) {
249            if (epollEvents & EPOLLIN) {
250                awoken();
251            } else {
252                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
253            }
254        } else {
255            if (! acquiredLock) {
256                mLock.lock();
257                acquiredLock = true;
258            }
259
260            ssize_t requestIndex = mRequests.indexOfKey(fd);
261            if (requestIndex >= 0) {
262                int events = 0;
263                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
264                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
265                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
266                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
267                pushResponse(events, mRequests.valueAt(requestIndex));
268            } else {
269                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
270                        "no longer registered.", epollEvents, fd);
271            }
272        }
273    }
274    if (acquiredLock) {
275        mLock.unlock();
276    }
277Done: ;
278#else
279    for (size_t i = 0; i < requestedCount; i++) {
280        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
281
282        short pollEvents = requestedFd.revents;
283        if (pollEvents) {
284            if (requestedFd.fd == mWakeReadPipeFd) {
285                if (pollEvents & POLLIN) {
286                    awoken();
287                } else {
288                    LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
289                }
290            } else {
291                int events = 0;
292                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
293                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
294                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
295                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
296                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
297                pushResponse(events, mRequests.itemAt(i));
298            }
299            if (--eventCount == 0) {
300                break;
301            }
302        }
303    }
304
305Done:
306    // Set mPolling to false and wake up the wakeAndLock() waiters.
307    mLock.lock();
308    mPolling = false;
309    if (mWaiters != 0) {
310        mAwake.broadcast();
311    }
312    mLock.unlock();
313#endif
314
315#ifdef LOOPER_STATISTICS
316    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
317    mSampledPolls += 1;
318    if (timeoutMillis == 0) {
319        mSampledZeroPollCount += 1;
320        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
321    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
322        mSampledTimeoutPollCount += 1;
323        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
324                - milliseconds_to_nanoseconds(timeoutMillis);
325    }
326    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
327        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
328                0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
329                0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
330        mSampledPolls = 0;
331        mSampledZeroPollCount = 0;
332        mSampledZeroPollLatencySum = 0;
333        mSampledTimeoutPollCount = 0;
334        mSampledTimeoutPollLatencySum = 0;
335    }
336#endif
337
338    for (size_t i = 0; i < mResponses.size(); i++) {
339        const Response& response = mResponses.itemAt(i);
340        if (response.request.callback) {
341#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
342            LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
343                    response.request.fd, response.events, response.request.data);
344#endif
345            int callbackResult = response.request.callback(
346                    response.request.fd, response.events, response.request.data);
347            if (callbackResult == 0) {
348                removeFd(response.request.fd);
349            }
350
351            result = ALOOPER_POLL_CALLBACK;
352        }
353    }
354    return result;
355}
356
357int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
358    if (timeoutMillis <= 0) {
359        int result;
360        do {
361            result = pollOnce(timeoutMillis, outFd, outEvents, outData);
362        } while (result == ALOOPER_POLL_CALLBACK);
363        return result;
364    } else {
365        nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
366                + milliseconds_to_nanoseconds(timeoutMillis);
367
368        for (;;) {
369            int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
370            if (result != ALOOPER_POLL_CALLBACK) {
371                return result;
372            }
373
374            nsecs_t timeoutNanos = endTime - systemTime(SYSTEM_TIME_MONOTONIC);
375            if (timeoutNanos <= 0) {
376                return ALOOPER_POLL_TIMEOUT;
377            }
378
379            timeoutMillis = int(nanoseconds_to_milliseconds(timeoutNanos + 999999LL));
380        }
381    }
382}
383
384void Looper::wake() {
385#if DEBUG_POLL_AND_WAKE
386    LOGD("%p ~ wake", this);
387#endif
388
389#ifdef LOOPER_STATISTICS
390    // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
391    if (mPendingWakeCount++ == 0) {
392        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
393    }
394#endif
395
396    ssize_t nWrite;
397    do {
398        nWrite = write(mWakeWritePipeFd, "W", 1);
399    } while (nWrite == -1 && errno == EINTR);
400
401    if (nWrite != 1) {
402        if (errno != EAGAIN) {
403            LOGW("Could not write wake signal, errno=%d", errno);
404        }
405    }
406}
407
408void Looper::awoken() {
409#if DEBUG_POLL_AND_WAKE
410    LOGD("%p ~ awoken", this);
411#endif
412
413#ifdef LOOPER_STATISTICS
414    if (mPendingWakeCount == 0) {
415        LOGD("%p ~ awoken: spurious!", this);
416    } else {
417        mSampledWakeCycles += 1;
418        mSampledWakeCountSum += mPendingWakeCount;
419        mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
420        mPendingWakeCount = 0;
421        mPendingWakeTime = -1;
422        if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
423            LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
424                    0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
425                    float(mSampledWakeCountSum) / mSampledWakeCycles);
426            mSampledWakeCycles = 0;
427            mSampledWakeCountSum = 0;
428            mSampledWakeLatencySum = 0;
429        }
430    }
431#endif
432
433    char buffer[16];
434    ssize_t nRead;
435    do {
436        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
437    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
438}
439
440void Looper::pushResponse(int events, const Request& request) {
441    Response response;
442    response.events = events;
443    response.request = request;
444    mResponses.push(response);
445}
446
447int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
448#if DEBUG_CALLBACKS
449    LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
450            events, callback, data);
451#endif
452
453    if (! callback) {
454        if (! mAllowNonCallbacks) {
455            LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
456            return -1;
457        }
458
459        if (ident < 0) {
460            LOGE("Invalid attempt to set NULL callback with ident <= 0.");
461            return -1;
462        }
463    }
464
465#ifdef LOOPER_USES_EPOLL
466    int epollEvents = 0;
467    if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
468    if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
469
470    { // acquire lock
471        AutoMutex _l(mLock);
472
473        Request request;
474        request.fd = fd;
475        request.ident = ident;
476        request.callback = callback;
477        request.data = data;
478
479        struct epoll_event eventItem;
480        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
481        eventItem.events = epollEvents;
482        eventItem.data.fd = fd;
483
484        ssize_t requestIndex = mRequests.indexOfKey(fd);
485        if (requestIndex < 0) {
486            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
487            if (epollResult < 0) {
488                LOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
489                return -1;
490            }
491            mRequests.add(fd, request);
492        } else {
493            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
494            if (epollResult < 0) {
495                LOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
496                return -1;
497            }
498            mRequests.replaceValueAt(requestIndex, request);
499        }
500    } // release lock
501#else
502    int pollEvents = 0;
503    if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
504    if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;
505
506    wakeAndLock(); // acquire lock
507
508    struct pollfd requestedFd;
509    requestedFd.fd = fd;
510    requestedFd.events = pollEvents;
511
512    Request request;
513    request.fd = fd;
514    request.ident = ident;
515    request.callback = callback;
516    request.data = data;
517    ssize_t index = getRequestIndexLocked(fd);
518    if (index < 0) {
519        mRequestedFds.push(requestedFd);
520        mRequests.push(request);
521    } else {
522        mRequestedFds.replaceAt(requestedFd, size_t(index));
523        mRequests.replaceAt(request, size_t(index));
524    }
525
526    mLock.unlock(); // release lock
527#endif
528    return 1;
529}
530
531int Looper::removeFd(int fd) {
532#if DEBUG_CALLBACKS
533    LOGD("%p ~ removeFd - fd=%d", this, fd);
534#endif
535
536#ifdef LOOPER_USES_EPOLL
537    { // acquire lock
538        AutoMutex _l(mLock);
539        ssize_t requestIndex = mRequests.indexOfKey(fd);
540        if (requestIndex < 0) {
541            return 0;
542        }
543
544        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
545        if (epollResult < 0) {
546            LOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
547            return -1;
548        }
549
550        mRequests.removeItemsAt(requestIndex);
551    } // release lock
552    return 1;
553#else
554    wakeAndLock(); // acquire lock
555
556    ssize_t index = getRequestIndexLocked(fd);
557    if (index >= 0) {
558        mRequestedFds.removeAt(size_t(index));
559        mRequests.removeAt(size_t(index));
560    }
561
562    mLock.unlock(); // release lock
563    return index >= 0;
564#endif
565}
566
567#ifndef LOOPER_USES_EPOLL
568ssize_t Looper::getRequestIndexLocked(int fd) {
569    size_t requestCount = mRequestedFds.size();
570
571    for (size_t i = 0; i < requestCount; i++) {
572        if (mRequestedFds.itemAt(i).fd == fd) {
573            return i;
574        }
575    }
576
577    return -1;
578}
579
580void Looper::wakeAndLock() {
581    mLock.lock();
582
583    mWaiters += 1;
584    while (mPolling) {
585        wake();
586        mAwake.wait(mLock);
587    }
588
589    mWaiters -= 1;
590    if (mWaiters == 0) {
591        mResume.signal();
592    }
593}
594#endif
595
596} // namespace android
597