Looper.cpp revision 588d5c8280c89c646aa7c8c54900225ee04176ea
1// 2// Copyright 2010 The Android Open Source Project 3// 4// A looper implementation based on epoll(). 5// 6#define LOG_TAG "Looper" 7 8//#define LOG_NDEBUG 0 9 10// Debugs poll and wake interactions. 11#define DEBUG_POLL_AND_WAKE 0 12 13// Debugs callback registration and invocation. 14#define DEBUG_CALLBACKS 0 15 16#include <cutils/log.h> 17#include <utils/Looper.h> 18#include <utils/Timers.h> 19 20#include <unistd.h> 21#include <fcntl.h> 22#include <limits.h> 23 24 25namespace android { 26 27// --- WeakMessageHandler --- 28 29WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) : 30 mHandler(handler) { 31} 32 33void WeakMessageHandler::handleMessage(const Message& message) { 34 sp<MessageHandler> handler = mHandler.promote(); 35 if (handler != NULL) { 36 handler->handleMessage(message); 37 } 38} 39 40 41// --- Looper --- 42 43// Hint for number of file descriptors to be associated with the epoll instance. 44static const int EPOLL_SIZE_HINT = 8; 45 46// Maximum number of file descriptors for which to retrieve poll events each iteration. 47static const int EPOLL_MAX_EVENTS = 16; 48 49static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; 50static pthread_key_t gTLSKey = 0; 51 52Looper::Looper(bool allowNonCallbacks) : 53 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), 54 mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { 55 int wakeFds[2]; 56 int result = pipe(wakeFds); 57 LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); 58 59 mWakeReadPipeFd = wakeFds[0]; 60 mWakeWritePipeFd = wakeFds[1]; 61 62 result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK); 63 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", 64 errno); 65 66 result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK); 67 LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", 68 errno); 69 70 // Allocate the epoll instance and register the wake pipe. 71 mEpollFd = epoll_create(EPOLL_SIZE_HINT); 72 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); 73 74 struct epoll_event eventItem; 75 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 76 eventItem.events = EPOLLIN; 77 eventItem.data.fd = mWakeReadPipeFd; 78 result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 79 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", 80 errno); 81} 82 83Looper::~Looper() { 84 close(mWakeReadPipeFd); 85 close(mWakeWritePipeFd); 86 close(mEpollFd); 87} 88 89void Looper::initTLSKey() { 90 int result = pthread_key_create(& gTLSKey, threadDestructor); 91 LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key."); 92} 93 94void Looper::threadDestructor(void *st) { 95 Looper* const self = static_cast<Looper*>(st); 96 if (self != NULL) { 97 self->decStrong((void*)threadDestructor); 98 } 99} 100 101void Looper::setForThread(const sp<Looper>& looper) { 102 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS 103 104 if (looper != NULL) { 105 looper->incStrong((void*)threadDestructor); 106 } 107 108 pthread_setspecific(gTLSKey, looper.get()); 109 110 if (old != NULL) { 111 old->decStrong((void*)threadDestructor); 112 } 113} 114 115sp<Looper> Looper::getForThread() { 116 int result = pthread_once(& gTLSOnce, initTLSKey); 117 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed"); 118 119 return (Looper*)pthread_getspecific(gTLSKey); 120} 121 122sp<Looper> Looper::prepare(int opts) { 123 bool allowNonCallbacks = opts & ALOOPER_PREPARE_ALLOW_NON_CALLBACKS; 124 sp<Looper> looper = Looper::getForThread(); 125 if (looper == NULL) { 126 looper = new Looper(allowNonCallbacks); 127 Looper::setForThread(looper); 128 } 129 if (looper->getAllowNonCallbacks() != allowNonCallbacks) { 130 ALOGW("Looper already prepared for this thread with a different value for the " 131 "ALOOPER_PREPARE_ALLOW_NON_CALLBACKS option."); 132 } 133 return looper; 134} 135 136bool Looper::getAllowNonCallbacks() const { 137 return mAllowNonCallbacks; 138} 139 140int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 141 int result = 0; 142 for (;;) { 143 while (mResponseIndex < mResponses.size()) { 144 const Response& response = mResponses.itemAt(mResponseIndex++); 145 ALooper_callbackFunc callback = response.request.callback; 146 if (!callback) { 147 int ident = response.request.ident; 148 int fd = response.request.fd; 149 int events = response.events; 150 void* data = response.request.data; 151#if DEBUG_POLL_AND_WAKE 152 ALOGD("%p ~ pollOnce - returning signalled identifier %d: " 153 "fd=%d, events=0x%x, data=%p", 154 this, ident, fd, events, data); 155#endif 156 if (outFd != NULL) *outFd = fd; 157 if (outEvents != NULL) *outEvents = events; 158 if (outData != NULL) *outData = data; 159 return ident; 160 } 161 } 162 163 if (result != 0) { 164#if DEBUG_POLL_AND_WAKE 165 ALOGD("%p ~ pollOnce - returning result %d", this, result); 166#endif 167 if (outFd != NULL) *outFd = 0; 168 if (outEvents != NULL) *outEvents = NULL; 169 if (outData != NULL) *outData = NULL; 170 return result; 171 } 172 173 result = pollInner(timeoutMillis); 174 } 175} 176 177int Looper::pollInner(int timeoutMillis) { 178#if DEBUG_POLL_AND_WAKE 179 ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); 180#endif 181 182 // Adjust the timeout based on when the next message is due. 183 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { 184 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 185 int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); 186 if (messageTimeoutMillis >= 0 187 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { 188 timeoutMillis = messageTimeoutMillis; 189 } 190#if DEBUG_POLL_AND_WAKE 191 ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", 192 this, mNextMessageUptime - now, timeoutMillis); 193#endif 194 } 195 196 // Poll. 197 int result = ALOOPER_POLL_WAKE; 198 mResponses.clear(); 199 mResponseIndex = 0; 200 201 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 202 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 203 204 // Acquire lock. 205 mLock.lock(); 206 207 // Check for poll error. 208 if (eventCount < 0) { 209 if (errno == EINTR) { 210 goto Done; 211 } 212 ALOGW("Poll failed with an unexpected error, errno=%d", errno); 213 result = ALOOPER_POLL_ERROR; 214 goto Done; 215 } 216 217 // Check for poll timeout. 218 if (eventCount == 0) { 219#if DEBUG_POLL_AND_WAKE 220 ALOGD("%p ~ pollOnce - timeout", this); 221#endif 222 result = ALOOPER_POLL_TIMEOUT; 223 goto Done; 224 } 225 226 // Handle all events. 227#if DEBUG_POLL_AND_WAKE 228 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); 229#endif 230 231 for (int i = 0; i < eventCount; i++) { 232 int fd = eventItems[i].data.fd; 233 uint32_t epollEvents = eventItems[i].events; 234 if (fd == mWakeReadPipeFd) { 235 if (epollEvents & EPOLLIN) { 236 awoken(); 237 } else { 238 ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); 239 } 240 } else { 241 ssize_t requestIndex = mRequests.indexOfKey(fd); 242 if (requestIndex >= 0) { 243 int events = 0; 244 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; 245 if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; 246 if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; 247 if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; 248 pushResponse(events, mRequests.valueAt(requestIndex)); 249 } else { 250 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " 251 "no longer registered.", epollEvents, fd); 252 } 253 } 254 } 255Done: ; 256 257 // Invoke pending message callbacks. 258 mNextMessageUptime = LLONG_MAX; 259 while (mMessageEnvelopes.size() != 0) { 260 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 261 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); 262 if (messageEnvelope.uptime <= now) { 263 // Remove the envelope from the list. 264 // We keep a strong reference to the handler until the call to handleMessage 265 // finishes. Then we drop it so that the handler can be deleted *before* 266 // we reacquire our lock. 267 { // obtain handler 268 sp<MessageHandler> handler = messageEnvelope.handler; 269 Message message = messageEnvelope.message; 270 mMessageEnvelopes.removeAt(0); 271 mSendingMessage = true; 272 mLock.unlock(); 273 274#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 275 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", 276 this, handler.get(), message.what); 277#endif 278 handler->handleMessage(message); 279 } // release handler 280 281 mLock.lock(); 282 mSendingMessage = false; 283 result = ALOOPER_POLL_CALLBACK; 284 } else { 285 // The last message left at the head of the queue determines the next wakeup time. 286 mNextMessageUptime = messageEnvelope.uptime; 287 break; 288 } 289 } 290 291 // Release lock. 292 mLock.unlock(); 293 294 // Invoke all response callbacks. 295 for (size_t i = 0; i < mResponses.size(); i++) { 296 const Response& response = mResponses.itemAt(i); 297 ALooper_callbackFunc callback = response.request.callback; 298 if (callback) { 299 int fd = response.request.fd; 300 int events = response.events; 301 void* data = response.request.data; 302#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS 303 ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", 304 this, callback, fd, events, data); 305#endif 306 int callbackResult = callback(fd, events, data); 307 if (callbackResult == 0) { 308 removeFd(fd); 309 } 310 result = ALOOPER_POLL_CALLBACK; 311 } 312 } 313 return result; 314} 315 316int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) { 317 if (timeoutMillis <= 0) { 318 int result; 319 do { 320 result = pollOnce(timeoutMillis, outFd, outEvents, outData); 321 } while (result == ALOOPER_POLL_CALLBACK); 322 return result; 323 } else { 324 nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC) 325 + milliseconds_to_nanoseconds(timeoutMillis); 326 327 for (;;) { 328 int result = pollOnce(timeoutMillis, outFd, outEvents, outData); 329 if (result != ALOOPER_POLL_CALLBACK) { 330 return result; 331 } 332 333 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 334 timeoutMillis = toMillisecondTimeoutDelay(now, endTime); 335 if (timeoutMillis == 0) { 336 return ALOOPER_POLL_TIMEOUT; 337 } 338 } 339 } 340} 341 342void Looper::wake() { 343#if DEBUG_POLL_AND_WAKE 344 ALOGD("%p ~ wake", this); 345#endif 346 347 ssize_t nWrite; 348 do { 349 nWrite = write(mWakeWritePipeFd, "W", 1); 350 } while (nWrite == -1 && errno == EINTR); 351 352 if (nWrite != 1) { 353 if (errno != EAGAIN) { 354 ALOGW("Could not write wake signal, errno=%d", errno); 355 } 356 } 357} 358 359void Looper::awoken() { 360#if DEBUG_POLL_AND_WAKE 361 ALOGD("%p ~ awoken", this); 362#endif 363 364 char buffer[16]; 365 ssize_t nRead; 366 do { 367 nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); 368 } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer)); 369} 370 371void Looper::pushResponse(int events, const Request& request) { 372 Response response; 373 response.events = events; 374 response.request = request; 375 mResponses.push(response); 376} 377 378int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { 379#if DEBUG_CALLBACKS 380 ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, 381 events, callback, data); 382#endif 383 384 if (! callback) { 385 if (! mAllowNonCallbacks) { 386 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); 387 return -1; 388 } 389 390 if (ident < 0) { 391 ALOGE("Invalid attempt to set NULL callback with ident <= 0."); 392 return -1; 393 } 394 } 395 396 int epollEvents = 0; 397 if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN; 398 if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; 399 400 { // acquire lock 401 AutoMutex _l(mLock); 402 403 Request request; 404 request.fd = fd; 405 request.ident = ident; 406 request.callback = callback; 407 request.data = data; 408 409 struct epoll_event eventItem; 410 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 411 eventItem.events = epollEvents; 412 eventItem.data.fd = fd; 413 414 ssize_t requestIndex = mRequests.indexOfKey(fd); 415 if (requestIndex < 0) { 416 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); 417 if (epollResult < 0) { 418 ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); 419 return -1; 420 } 421 mRequests.add(fd, request); 422 } else { 423 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); 424 if (epollResult < 0) { 425 ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); 426 return -1; 427 } 428 mRequests.replaceValueAt(requestIndex, request); 429 } 430 } // release lock 431 return 1; 432} 433 434int Looper::removeFd(int fd) { 435#if DEBUG_CALLBACKS 436 ALOGD("%p ~ removeFd - fd=%d", this, fd); 437#endif 438 439 { // acquire lock 440 AutoMutex _l(mLock); 441 ssize_t requestIndex = mRequests.indexOfKey(fd); 442 if (requestIndex < 0) { 443 return 0; 444 } 445 446 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL); 447 if (epollResult < 0) { 448 ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno); 449 return -1; 450 } 451 452 mRequests.removeItemsAt(requestIndex); 453 } // release lock 454 return 1; 455} 456 457void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) { 458 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 459 sendMessageAtTime(now, handler, message); 460} 461 462void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler, 463 const Message& message) { 464 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); 465 sendMessageAtTime(now + uptimeDelay, handler, message); 466} 467 468void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, 469 const Message& message) { 470#if DEBUG_CALLBACKS 471 ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d", 472 this, uptime, handler.get(), message.what); 473#endif 474 475 size_t i = 0; 476 { // acquire lock 477 AutoMutex _l(mLock); 478 479 size_t messageCount = mMessageEnvelopes.size(); 480 while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { 481 i += 1; 482 } 483 484 MessageEnvelope messageEnvelope(uptime, handler, message); 485 mMessageEnvelopes.insertAt(messageEnvelope, i, 1); 486 487 // Optimization: If the Looper is currently sending a message, then we can skip 488 // the call to wake() because the next thing the Looper will do after processing 489 // messages is to decide when the next wakeup time should be. In fact, it does 490 // not even matter whether this code is running on the Looper thread. 491 if (mSendingMessage) { 492 return; 493 } 494 } // release lock 495 496 // Wake the poll loop only when we enqueue a new message at the head. 497 if (i == 0) { 498 wake(); 499 } 500} 501 502void Looper::removeMessages(const sp<MessageHandler>& handler) { 503#if DEBUG_CALLBACKS 504 ALOGD("%p ~ removeMessages - handler=%p", this, handler.get()); 505#endif 506 507 { // acquire lock 508 AutoMutex _l(mLock); 509 510 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 511 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 512 if (messageEnvelope.handler == handler) { 513 mMessageEnvelopes.removeAt(i); 514 } 515 } 516 } // release lock 517} 518 519void Looper::removeMessages(const sp<MessageHandler>& handler, int what) { 520#if DEBUG_CALLBACKS 521 ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what); 522#endif 523 524 { // acquire lock 525 AutoMutex _l(mLock); 526 527 for (size_t i = mMessageEnvelopes.size(); i != 0; ) { 528 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i); 529 if (messageEnvelope.handler == handler 530 && messageEnvelope.message.what == what) { 531 mMessageEnvelopes.removeAt(i); 532 } 533 } 534 } // release lock 535} 536 537} // namespace android 538