TimedEventQueue.cpp revision a5750e0dad9e90f2195ce36f2c4457fa04b2b83e
1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#undef __STRICT_ANSI__
18#define __STDINT_LIMITS
19#define __STDC_LIMIT_MACROS
20
21#include <inttypes.h>
22#include <stdint.h>
23#include <sys/prctl.h>
24#include <sys/time.h>
25
26//#define LOG_NDEBUG 0
27#define LOG_TAG "TimedEventQueue"
28#include <utils/Log.h>
29#include <utils/threads.h>
30
31#include "include/TimedEventQueue.h"
32
33#include <media/stagefright/foundation/ADebug.h>
34#include <media/stagefright/foundation/ALooper.h>
35#include <binder/IServiceManager.h>
36#include <powermanager/PowerManager.h>
37#include <binder/IPCThreadState.h>
38#include <utils/CallStack.h>
39
40namespace android {
41
42static int64_t kWakelockMinDelay = 100000ll;  // 100ms
43
44TimedEventQueue::TimedEventQueue()
45    : mNextEventID(1),
46      mRunning(false),
47      mStopped(false),
48      mDeathRecipient(new PMDeathRecipient(this)),
49      mWakeLockCount(0) {
50}
51
52TimedEventQueue::~TimedEventQueue() {
53    stop();
54    if (mPowerManager != 0) {
55        sp<IBinder> binder = mPowerManager->asBinder();
56        binder->unlinkToDeath(mDeathRecipient);
57    }
58}
59
60void TimedEventQueue::start() {
61    if (mRunning) {
62        return;
63    }
64
65    mStopped = false;
66
67    pthread_attr_t attr;
68    pthread_attr_init(&attr);
69    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
70
71    pthread_create(&mThread, &attr, ThreadWrapper, this);
72
73    pthread_attr_destroy(&attr);
74
75    mRunning = true;
76}
77
78void TimedEventQueue::stop(bool flush) {
79    if (!mRunning) {
80        return;
81    }
82
83    if (flush) {
84        postEventToBack(new StopEvent);
85    } else {
86        postTimedEvent(new StopEvent, INT64_MIN);
87    }
88
89    void *dummy;
90    pthread_join(mThread, &dummy);
91
92    // some events may be left in the queue if we did not flush and the wake lock
93    // must be released.
94    releaseWakeLock_l(true /*force*/);
95    mQueue.clear();
96
97    mRunning = false;
98}
99
100TimedEventQueue::event_id TimedEventQueue::postEvent(const sp<Event> &event) {
101    // Reserve an earlier timeslot an INT64_MIN to be able to post
102    // the StopEvent to the absolute head of the queue.
103    return postTimedEvent(event, INT64_MIN + 1);
104}
105
106TimedEventQueue::event_id TimedEventQueue::postEventToBack(
107        const sp<Event> &event) {
108    return postTimedEvent(event, INT64_MAX);
109}
110
111TimedEventQueue::event_id TimedEventQueue::postEventWithDelay(
112        const sp<Event> &event, int64_t delay_us) {
113    CHECK(delay_us >= 0);
114    return postTimedEvent(event, ALooper::GetNowUs() + delay_us);
115}
116
117TimedEventQueue::event_id TimedEventQueue::postTimedEvent(
118        const sp<Event> &event, int64_t realtime_us) {
119    Mutex::Autolock autoLock(mLock);
120
121    event->setEventID(mNextEventID++);
122
123    List<QueueItem>::iterator it = mQueue.begin();
124    while (it != mQueue.end() && realtime_us >= (*it).realtime_us) {
125        ++it;
126    }
127
128    QueueItem item;
129    item.event = event;
130    item.realtime_us = realtime_us;
131    item.has_wakelock = false;
132
133    if (it == mQueue.begin()) {
134        mQueueHeadChangedCondition.signal();
135    }
136
137    if (realtime_us > ALooper::GetNowUs() + kWakelockMinDelay) {
138        acquireWakeLock_l();
139        item.has_wakelock = true;
140    }
141    mQueue.insert(it, item);
142
143    mQueueNotEmptyCondition.signal();
144
145    return event->eventID();
146}
147
148static bool MatchesEventID(
149        void *cookie, const sp<TimedEventQueue::Event> &event) {
150    TimedEventQueue::event_id *id =
151        static_cast<TimedEventQueue::event_id *>(cookie);
152
153    if (event->eventID() != *id) {
154        return false;
155    }
156
157    *id = 0;
158
159    return true;
160}
161
162bool TimedEventQueue::cancelEvent(event_id id) {
163    if (id == 0) {
164        return false;
165    }
166
167    cancelEvents(&MatchesEventID, &id, true /* stopAfterFirstMatch */);
168
169    // if MatchesEventID found a match, it will have set id to 0
170    // (which is not a valid event_id).
171
172    return id == 0;
173}
174
175void TimedEventQueue::cancelEvents(
176        bool (*predicate)(void *cookie, const sp<Event> &event),
177        void *cookie,
178        bool stopAfterFirstMatch) {
179    Mutex::Autolock autoLock(mLock);
180
181    List<QueueItem>::iterator it = mQueue.begin();
182    while (it != mQueue.end()) {
183        if (!(*predicate)(cookie, (*it).event)) {
184            ++it;
185            continue;
186        }
187
188        if (it == mQueue.begin()) {
189            mQueueHeadChangedCondition.signal();
190        }
191
192        ALOGV("cancelling event %d", (*it).event->eventID());
193
194        (*it).event->setEventID(0);
195        if ((*it).has_wakelock) {
196            releaseWakeLock_l();
197        }
198        it = mQueue.erase(it);
199        if (stopAfterFirstMatch) {
200            return;
201        }
202    }
203}
204
205// static
206void *TimedEventQueue::ThreadWrapper(void *me) {
207
208    androidSetThreadPriority(0, ANDROID_PRIORITY_FOREGROUND);
209
210    static_cast<TimedEventQueue *>(me)->threadEntry();
211
212    return NULL;
213}
214
215void TimedEventQueue::threadEntry() {
216    prctl(PR_SET_NAME, (unsigned long)"TimedEventQueue", 0, 0, 0);
217
218    for (;;) {
219        int64_t now_us = 0;
220        sp<Event> event;
221        bool wakeLocked = false;
222
223        {
224            Mutex::Autolock autoLock(mLock);
225
226            if (mStopped) {
227                break;
228            }
229
230            while (mQueue.empty()) {
231                mQueueNotEmptyCondition.wait(mLock);
232            }
233
234            event_id eventID = 0;
235            for (;;) {
236                if (mQueue.empty()) {
237                    // The only event in the queue could have been cancelled
238                    // while we were waiting for its scheduled time.
239                    break;
240                }
241
242                List<QueueItem>::iterator it = mQueue.begin();
243                eventID = (*it).event->eventID();
244
245                now_us = ALooper::GetNowUs();
246                int64_t when_us = (*it).realtime_us;
247
248                int64_t delay_us;
249                if (when_us < 0 || when_us == INT64_MAX) {
250                    delay_us = 0;
251                } else {
252                    delay_us = when_us - now_us;
253                }
254
255                if (delay_us <= 0) {
256                    break;
257                }
258
259                static int64_t kMaxTimeoutUs = 10000000ll;  // 10 secs
260                bool timeoutCapped = false;
261                if (delay_us > kMaxTimeoutUs) {
262                    ALOGW("delay_us exceeds max timeout: %" PRId64 " us", delay_us);
263
264                    // We'll never block for more than 10 secs, instead
265                    // we will split up the full timeout into chunks of
266                    // 10 secs at a time. This will also avoid overflow
267                    // when converting from us to ns.
268                    delay_us = kMaxTimeoutUs;
269                    timeoutCapped = true;
270                }
271
272                status_t err = mQueueHeadChangedCondition.waitRelative(
273                        mLock, delay_us * 1000ll);
274
275                if (!timeoutCapped && err == -ETIMEDOUT) {
276                    // We finally hit the time this event is supposed to
277                    // trigger.
278                    now_us = ALooper::GetNowUs();
279                    break;
280                }
281            }
282
283            // The event w/ this id may have been cancelled while we're
284            // waiting for its trigger-time, in that case
285            // removeEventFromQueue_l will return NULL.
286            // Otherwise, the QueueItem will be removed
287            // from the queue and the referenced event returned.
288            event = removeEventFromQueue_l(eventID, &wakeLocked);
289        }
290
291        if (event != NULL) {
292            // Fire event with the lock NOT held.
293            event->fire(this, now_us);
294            if (wakeLocked) {
295                Mutex::Autolock autoLock(mLock);
296                releaseWakeLock_l();
297            }
298        }
299    }
300}
301
302sp<TimedEventQueue::Event> TimedEventQueue::removeEventFromQueue_l(
303        event_id id, bool *wakeLocked) {
304    for (List<QueueItem>::iterator it = mQueue.begin();
305         it != mQueue.end(); ++it) {
306        if ((*it).event->eventID() == id) {
307            sp<Event> event = (*it).event;
308            event->setEventID(0);
309            *wakeLocked = (*it).has_wakelock;
310            mQueue.erase(it);
311            return event;
312        }
313    }
314
315    ALOGW("Event %d was not found in the queue, already cancelled?", id);
316
317    return NULL;
318}
319
320void TimedEventQueue::acquireWakeLock_l()
321{
322    if (mWakeLockCount == 0) {
323        CHECK(mWakeLockToken == 0);
324        if (mPowerManager == 0) {
325            // use checkService() to avoid blocking if power service is not up yet
326            sp<IBinder> binder =
327                defaultServiceManager()->checkService(String16("power"));
328            if (binder == 0) {
329                ALOGW("cannot connect to the power manager service");
330            } else {
331                mPowerManager = interface_cast<IPowerManager>(binder);
332                binder->linkToDeath(mDeathRecipient);
333            }
334        }
335        if (mPowerManager != 0) {
336            sp<IBinder> binder = new BBinder();
337            int64_t token = IPCThreadState::self()->clearCallingIdentity();
338            status_t status = mPowerManager->acquireWakeLock(POWERMANAGER_PARTIAL_WAKE_LOCK,
339                                                             binder,
340                                                             String16("TimedEventQueue"),
341                                                             String16("media"));
342            IPCThreadState::self()->restoreCallingIdentity(token);
343            if (status == NO_ERROR) {
344                mWakeLockToken = binder;
345                mWakeLockCount++;
346            }
347        }
348    } else {
349        mWakeLockCount++;
350    }
351}
352
353void TimedEventQueue::releaseWakeLock_l(bool force)
354{
355    if (mWakeLockCount == 0) {
356        return;
357    }
358    if (force) {
359        // Force wakelock release below by setting reference count to 1.
360        mWakeLockCount = 1;
361    }
362    if (--mWakeLockCount == 0) {
363        CHECK(mWakeLockToken != 0);
364        if (mPowerManager != 0) {
365            int64_t token = IPCThreadState::self()->clearCallingIdentity();
366            mPowerManager->releaseWakeLock(mWakeLockToken, 0);
367            IPCThreadState::self()->restoreCallingIdentity(token);
368        }
369        mWakeLockToken.clear();
370    }
371}
372
373void TimedEventQueue::clearPowerManager()
374{
375    Mutex::Autolock _l(mLock);
376    releaseWakeLock_l(true /*force*/);
377    mPowerManager.clear();
378}
379
380void TimedEventQueue::PMDeathRecipient::binderDied(
381        const wp<IBinder>& /* who */) {
382    mQueue->clearPowerManager();
383}
384
385}  // namespace android
386
387