1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#undef __STRICT_ANSI__
18#define __STDINT_LIMITS
19#define __STDC_LIMIT_MACROS
20#include <stdint.h>
21
22//#define LOG_NDEBUG 0
23#define LOG_TAG "TimedEventQueue"
24#include <utils/Log.h>
25#include <utils/threads.h>
26
27#include "include/TimedEventQueue.h"
28
29#include <sys/prctl.h>
30#include <sys/time.h>
31
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/ALooper.h>
34
35namespace android {
36
37TimedEventQueue::TimedEventQueue()
38    : mNextEventID(1),
39      mRunning(false),
40      mStopped(false) {
41}
42
43TimedEventQueue::~TimedEventQueue() {
44    stop();
45}
46
47void TimedEventQueue::start() {
48    if (mRunning) {
49        return;
50    }
51
52    mStopped = false;
53
54    pthread_attr_t attr;
55    pthread_attr_init(&attr);
56    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
57
58    pthread_create(&mThread, &attr, ThreadWrapper, this);
59
60    pthread_attr_destroy(&attr);
61
62    mRunning = true;
63}
64
65void TimedEventQueue::stop(bool flush) {
66    if (!mRunning) {
67        return;
68    }
69
70    if (flush) {
71        postEventToBack(new StopEvent);
72    } else {
73        postTimedEvent(new StopEvent, INT64_MIN);
74    }
75
76    void *dummy;
77    pthread_join(mThread, &dummy);
78
79    mQueue.clear();
80
81    mRunning = false;
82}
83
84TimedEventQueue::event_id TimedEventQueue::postEvent(const sp<Event> &event) {
85    // Reserve an earlier timeslot an INT64_MIN to be able to post
86    // the StopEvent to the absolute head of the queue.
87    return postTimedEvent(event, INT64_MIN + 1);
88}
89
90TimedEventQueue::event_id TimedEventQueue::postEventToBack(
91        const sp<Event> &event) {
92    return postTimedEvent(event, INT64_MAX);
93}
94
95TimedEventQueue::event_id TimedEventQueue::postEventWithDelay(
96        const sp<Event> &event, int64_t delay_us) {
97    CHECK(delay_us >= 0);
98    return postTimedEvent(event, ALooper::GetNowUs() + delay_us);
99}
100
101TimedEventQueue::event_id TimedEventQueue::postTimedEvent(
102        const sp<Event> &event, int64_t realtime_us) {
103    Mutex::Autolock autoLock(mLock);
104
105    event->setEventID(mNextEventID++);
106
107    List<QueueItem>::iterator it = mQueue.begin();
108    while (it != mQueue.end() && realtime_us >= (*it).realtime_us) {
109        ++it;
110    }
111
112    QueueItem item;
113    item.event = event;
114    item.realtime_us = realtime_us;
115
116    if (it == mQueue.begin()) {
117        mQueueHeadChangedCondition.signal();
118    }
119
120    mQueue.insert(it, item);
121
122    mQueueNotEmptyCondition.signal();
123
124    return event->eventID();
125}
126
127static bool MatchesEventID(
128        void *cookie, const sp<TimedEventQueue::Event> &event) {
129    TimedEventQueue::event_id *id =
130        static_cast<TimedEventQueue::event_id *>(cookie);
131
132    if (event->eventID() != *id) {
133        return false;
134    }
135
136    *id = 0;
137
138    return true;
139}
140
141bool TimedEventQueue::cancelEvent(event_id id) {
142    if (id == 0) {
143        return false;
144    }
145
146    cancelEvents(&MatchesEventID, &id, true /* stopAfterFirstMatch */);
147
148    // if MatchesEventID found a match, it will have set id to 0
149    // (which is not a valid event_id).
150
151    return id == 0;
152}
153
154void TimedEventQueue::cancelEvents(
155        bool (*predicate)(void *cookie, const sp<Event> &event),
156        void *cookie,
157        bool stopAfterFirstMatch) {
158    Mutex::Autolock autoLock(mLock);
159
160    List<QueueItem>::iterator it = mQueue.begin();
161    while (it != mQueue.end()) {
162        if (!(*predicate)(cookie, (*it).event)) {
163            ++it;
164            continue;
165        }
166
167        if (it == mQueue.begin()) {
168            mQueueHeadChangedCondition.signal();
169        }
170
171        ALOGV("cancelling event %d", (*it).event->eventID());
172
173        (*it).event->setEventID(0);
174        it = mQueue.erase(it);
175
176        if (stopAfterFirstMatch) {
177            return;
178        }
179    }
180}
181
182// static
183void *TimedEventQueue::ThreadWrapper(void *me) {
184
185    androidSetThreadPriority(0, ANDROID_PRIORITY_FOREGROUND);
186
187    static_cast<TimedEventQueue *>(me)->threadEntry();
188
189    return NULL;
190}
191
192void TimedEventQueue::threadEntry() {
193    prctl(PR_SET_NAME, (unsigned long)"TimedEventQueue", 0, 0, 0);
194
195    for (;;) {
196        int64_t now_us = 0;
197        sp<Event> event;
198
199        {
200            Mutex::Autolock autoLock(mLock);
201
202            if (mStopped) {
203                break;
204            }
205
206            while (mQueue.empty()) {
207                mQueueNotEmptyCondition.wait(mLock);
208            }
209
210            event_id eventID = 0;
211            for (;;) {
212                if (mQueue.empty()) {
213                    // The only event in the queue could have been cancelled
214                    // while we were waiting for its scheduled time.
215                    break;
216                }
217
218                List<QueueItem>::iterator it = mQueue.begin();
219                eventID = (*it).event->eventID();
220
221                now_us = ALooper::GetNowUs();
222                int64_t when_us = (*it).realtime_us;
223
224                int64_t delay_us;
225                if (when_us < 0 || when_us == INT64_MAX) {
226                    delay_us = 0;
227                } else {
228                    delay_us = when_us - now_us;
229                }
230
231                if (delay_us <= 0) {
232                    break;
233                }
234
235                static int64_t kMaxTimeoutUs = 10000000ll;  // 10 secs
236                bool timeoutCapped = false;
237                if (delay_us > kMaxTimeoutUs) {
238                    ALOGW("delay_us exceeds max timeout: %lld us", delay_us);
239
240                    // We'll never block for more than 10 secs, instead
241                    // we will split up the full timeout into chunks of
242                    // 10 secs at a time. This will also avoid overflow
243                    // when converting from us to ns.
244                    delay_us = kMaxTimeoutUs;
245                    timeoutCapped = true;
246                }
247
248                status_t err = mQueueHeadChangedCondition.waitRelative(
249                        mLock, delay_us * 1000ll);
250
251                if (!timeoutCapped && err == -ETIMEDOUT) {
252                    // We finally hit the time this event is supposed to
253                    // trigger.
254                    now_us = ALooper::GetNowUs();
255                    break;
256                }
257            }
258
259            // The event w/ this id may have been cancelled while we're
260            // waiting for its trigger-time, in that case
261            // removeEventFromQueue_l will return NULL.
262            // Otherwise, the QueueItem will be removed
263            // from the queue and the referenced event returned.
264            event = removeEventFromQueue_l(eventID);
265        }
266
267        if (event != NULL) {
268            // Fire event with the lock NOT held.
269            event->fire(this, now_us);
270        }
271    }
272}
273
274sp<TimedEventQueue::Event> TimedEventQueue::removeEventFromQueue_l(
275        event_id id) {
276    for (List<QueueItem>::iterator it = mQueue.begin();
277         it != mQueue.end(); ++it) {
278        if ((*it).event->eventID() == id) {
279            sp<Event> event = (*it).event;
280            event->setEventID(0);
281
282            mQueue.erase(it);
283
284            return event;
285        }
286    }
287
288    ALOGW("Event %d was not found in the queue, already cancelled?", id);
289
290    return NULL;
291}
292
293}  // namespace android
294
295