1/**
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "logging.h"
18#include "status.h"
19#include "worker.h"
20
21#include <time.h>
22
23//#define WORKER_DEBUG
24#ifdef  WORKER_DEBUG
25
26#define DBG(...) ALOGD(__VA_ARGS__)
27
28#else
29
30#define DBG(...)
31
32#endif
33
34void * WorkerThread::Work(void *param) {
35    WorkerThread *t = (WorkerThread *)param;
36    android_atomic_acquire_store(STATE_RUNNING, &t->state_);
37    void * v = t->Worker(t->workerParam_);
38    android_atomic_acquire_store(STATE_STOPPED, &t->state_);
39    return v;
40}
41
42bool WorkerThread::isRunning() {
43    DBG("WorkerThread::isRunning E");
44    bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING;
45    DBG("WorkerThread::isRunning X ret_value=%d", ret_value);
46    return ret_value;
47}
48
49WorkerThread::WorkerThread() {
50    DBG("WorkerThread::WorkerThread E");
51    state_ = STATE_INITIALIZED;
52    pthread_mutex_init(&mutex_, NULL);
53    pthread_cond_init(&cond_, NULL);
54    DBG("WorkerThread::WorkerThread X");
55}
56
57WorkerThread::~WorkerThread() {
58    DBG("WorkerThread::~WorkerThread E");
59    Stop();
60    pthread_mutex_destroy(&mutex_);
61    DBG("WorkerThread::~WorkerThread X");
62}
63
64// Return true if changed from STATE_RUNNING to STATE_STOPPING
65bool WorkerThread::BeginStopping() {
66    DBG("WorkerThread::BeginStopping E");
67    bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0);
68    DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value);
69    return ret_value;
70}
71
72// Wait until state is not STATE_STOPPING
73void WorkerThread::WaitUntilStopped() {
74    DBG("WorkerThread::WaitUntilStopped E");
75    pthread_cond_signal(&cond_);
76    while(android_atomic_release_load(&state_) == STATE_STOPPING) {
77        usleep(200000);
78    }
79    DBG("WorkerThread::WaitUntilStopped X");
80}
81
82void WorkerThread::Stop() {
83    DBG("WorkerThread::Stop E");
84    if (BeginStopping()) {
85        WaitUntilStopped();
86    }
87    DBG("WorkerThread::Stop X");
88}
89
90int WorkerThread::Run(void *workerParam) {
91    DBG("WorkerThread::Run E workerParam=%p", workerParam);
92    int status;
93    int ret;
94
95    workerParam_ = workerParam;
96
97    ret = pthread_attr_init(&attr_);
98    if (ret != 0) {
99        ALOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret));
100        return STATUS_ERR;
101    }
102    ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
103    if (ret != 0) {
104        ALOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s",
105                strerror(ret));
106        return STATUS_ERR;
107    }
108    ret = pthread_create(&tid_, &attr_,
109                (void * (*)(void *))&WorkerThread::Work, this);
110    if (ret != 0) {
111        ALOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret));
112        return STATUS_ERR;
113    }
114
115    // Wait until worker is running
116    while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) {
117        usleep(200000);
118    }
119
120    DBG("WorkerThread::Run X workerParam=%p", workerParam);
121    return STATUS_OK;
122}
123
124
125class WorkerQueueThread : public WorkerThread {
126  private:
127    friend class WorkerQueue;
128
129  public:
130    WorkerQueueThread() {
131    }
132
133    virtual ~WorkerQueueThread() {
134        Stop();
135    }
136
137    void * Worker(void *param) {
138        DBG("WorkerQueueThread::Worker E");
139        WorkerQueue *wq = (WorkerQueue *)param;
140
141        // Do the work until we're told to stop
142        while (isRunning()) {
143            pthread_mutex_lock(&mutex_);
144            while (isRunning() && wq->q_.size() == 0) {
145                if (wq->delayed_q_.size() == 0) {
146                    // Both queue's are empty so wait
147                    pthread_cond_wait(&cond_, &mutex_);
148                } else {
149                    // delayed_q_ is not empty, move any
150                    // timed out records to q_.
151                    int64_t now = android::elapsedRealtime();
152                    while((wq->delayed_q_.size() != 0) &&
153                            ((wq->delayed_q_.top()->time - now) <= 0)) {
154                        struct WorkerQueue::Record *r = wq->delayed_q_.top();
155                        DBG("WorkerQueueThread::Worker move p=%p time=%lldms",
156                                r->p, r->time);
157                        wq->delayed_q_.pop();
158                        wq->q_.push_back(r);
159                    }
160
161                    if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) {
162                        // We need to do a timed wait
163                        struct timeval tv;
164                        struct timespec ts;
165                        struct WorkerQueue::Record *r = wq->delayed_q_.top();
166                        int64_t delay_ms = r->time - now;
167                        DBG("WorkerQueueThread::Worker wait"
168                            " p=%p time=%lldms delay_ms=%lldms",
169                                r->p, r->time, delay_ms);
170                        gettimeofday(&tv, NULL);
171                        ts.tv_sec = tv.tv_sec + (delay_ms / 1000);
172                        ts.tv_nsec = (tv.tv_usec +
173                                        ((delay_ms % 1000) * 1000)) * 1000;
174                        pthread_cond_timedwait(&cond_, &mutex_, &ts);
175                    }
176                }
177            }
178            if (isRunning()) {
179                struct WorkerQueue::Record *r = wq->q_.front();
180                wq->q_.pop_front();
181                void *p = r->p;
182                wq->release_record(r);
183                pthread_mutex_unlock(&mutex_);
184                wq->Process(r->p);
185            } else {
186                pthread_mutex_unlock(&mutex_);
187            }
188        }
189        DBG("WorkerQueueThread::Worker X");
190        return NULL;
191    }
192};
193
194WorkerQueue::WorkerQueue() {
195    DBG("WorkerQueue::WorkerQueue E");
196    wqt_ = new WorkerQueueThread();
197    DBG("WorkerQueue::WorkerQueue X");
198}
199
200WorkerQueue::~WorkerQueue() {
201    DBG("WorkerQueue::~WorkerQueue E");
202    Stop();
203
204    Record *r;
205    pthread_mutex_lock(&wqt_->mutex_);
206    while(free_list_.size() != 0) {
207        r = free_list_.front();
208        free_list_.pop_front();
209        DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r);
210        delete r;
211    }
212    while(delayed_q_.size() != 0) {
213        r = delayed_q_.top();
214        delayed_q_.pop();
215        DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r);
216        delete r;
217    }
218    pthread_mutex_unlock(&wqt_->mutex_);
219
220    delete wqt_;
221    DBG("WorkerQueue::~WorkerQueue X");
222}
223
224int WorkerQueue::Run() {
225    return wqt_->Run(this);
226}
227
228void WorkerQueue::Stop() {
229    wqt_->Stop();
230}
231
232/**
233 * Obtain a record from free_list if it is not empty, fill in the record with provided
234 * information: *p and delay_in_ms
235 */
236struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) {
237    struct Record *r;
238    if (free_list_.size() == 0) {
239        r = new Record();
240        DBG("WorkerQueue::obtain_record new r=%p", r);
241    } else {
242        r = free_list_.front();
243        DBG("WorkerQueue::obtain_record reuse r=%p", r);
244        free_list_.pop_front();
245    }
246    r->p = p;
247    if (delay_in_ms != 0) {
248        r->time = android::elapsedRealtime() + delay_in_ms;
249    } else {
250        r->time = 0;
251    }
252    return r;
253}
254
255/**
256 * release a record and insert into the front of the free_list
257 */
258void WorkerQueue::release_record(struct Record *r) {
259    DBG("WorkerQueue::release_record r=%p", r);
260    free_list_.push_front(r);
261}
262
263/**
264 * Add a record to processing queue q_
265 */
266void WorkerQueue::Add(void *p) {
267    DBG("WorkerQueue::Add E:");
268    pthread_mutex_lock(&wqt_->mutex_);
269    struct Record *r = obtain_record(p, 0);
270    q_.push_back(r);
271    if (q_.size() == 1) {
272        pthread_cond_signal(&wqt_->cond_);
273    }
274    pthread_mutex_unlock(&wqt_->mutex_);
275    DBG("WorkerQueue::Add X:");
276}
277
278void WorkerQueue::AddDelayed(void *p, int delay_in_ms) {
279    DBG("WorkerQueue::AddDelayed E:");
280    if (delay_in_ms <= 0) {
281        Add(p);
282    } else {
283        pthread_mutex_lock(&wqt_->mutex_);
284        struct Record *r = obtain_record(p, delay_in_ms);
285        delayed_q_.push(r);
286#ifdef WORKER_DEBUG
287        int64_t now = android::elapsedRealtime();
288        DBG("WorkerQueue::AddDelayed"
289            " p=%p delay_in_ms=%d now=%lldms top->p=%p"
290            " top->time=%lldms diff=%lldms",
291                p, delay_in_ms, now, delayed_q_.top()->p,
292                delayed_q_.top()->time, delayed_q_.top()->time - now);
293#endif
294        if ((q_.size() == 0) && (delayed_q_.top() == r)) {
295            // q_ is empty and the new record is at delayed_q_.top
296            // so we signal the waiting thread so it can readjust
297            // the wait time.
298            DBG("WorkerQueue::AddDelayed signal");
299            pthread_cond_signal(&wqt_->cond_);
300        }
301        pthread_mutex_unlock(&wqt_->mutex_);
302    }
303    DBG("WorkerQueue::AddDelayed X:");
304}
305
306
307class TestWorkerQueue : public WorkerQueue {
308    virtual void Process(void *p) {
309        ALOGD("TestWorkerQueue::Process: EX p=%p", p);
310    }
311};
312
313class TesterThread : public WorkerThread {
314  public:
315    void * Worker(void *param)
316    {
317        ALOGD("TesterThread::Worker E param=%p", param);
318        WorkerQueue *wq = (WorkerQueue *)param;
319
320        // Test AddDelayed
321        wq->AddDelayed((void *)1000, 1000);
322        wq->Add((void *)0);
323        wq->Add((void *)0);
324        wq->Add((void *)0);
325        wq->Add((void *)0);
326        wq->AddDelayed((void *)100, 100);
327        wq->AddDelayed((void *)2000, 2000);
328
329        for (int i = 1; isRunning(); i++) {
330            ALOGD("TesterThread: looping %d", i);
331            wq->Add((void *)i);
332            wq->Add((void *)i);
333            wq->Add((void *)i);
334            wq->Add((void *)i);
335            sleep(1);
336        }
337
338        ALOGD("TesterThread::Worker X param=%p", param);
339
340        return NULL;
341    }
342};
343
344void testWorker() {
345    ALOGD("testWorker E: ********");
346
347    // Test we can create a thread and delete it
348    TesterThread *tester = new TesterThread();
349    delete tester;
350
351    TestWorkerQueue *wq = new TestWorkerQueue();
352    if (wq->Run() == STATUS_OK) {
353        ALOGD("testWorker WorkerQueue %p running", wq);
354
355        // Test we can run a thread, stop it then delete it
356        tester = new TesterThread();
357        tester->Run(wq);
358        ALOGD("testWorker tester %p running", tester);
359        sleep(10);
360        ALOGD("testWorker tester %p stopping", tester);
361        tester->Stop();
362        ALOGD("testWorker tester %p stopped", tester);
363        wq->Stop();
364        ALOGD("testWorker wq %p stopped", wq);
365    }
366    ALOGD("testWorker X: ********\n");
367}
368