1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef ANDROID_QUEUE_WORKER_H_
18#define ANDROID_QUEUE_WORKER_H_
19
20#include "worker.h"
21
22#include <queue>
23
24namespace android {
25
26template <typename T>
27class QueueWorker : public Worker {
28 public:
29  static const size_t kDefaultMaxQueueSize = 2;
30  static const int64_t kTimeoutDisabled = -1;
31
32  QueueWorker(const char *name, int priority)
33      : Worker(name, priority),
34        max_queue_size_(kDefaultMaxQueueSize),
35        queue_timeout_ms_(kTimeoutDisabled),
36        idle_timeout_ms_(kTimeoutDisabled),
37        idled_out_(false) {
38  }
39
40  int QueueWork(std::unique_ptr<T> workitem);
41
42  bool IsWorkPending() const {
43    return !queue_.empty();
44  }
45  bool idle() const {
46    return idled_out_;
47  }
48
49  int64_t idle_timeout() {
50    return idle_timeout_ms_;
51  }
52  void set_idle_timeout(int64_t timeout_ms) {
53    idle_timeout_ms_ = timeout_ms;
54  }
55
56  int64_t queue_timeout() {
57    return queue_timeout_ms_;
58  }
59  void set_queue_timeout(int64_t timeout_ms) {
60    queue_timeout_ms_ = timeout_ms;
61  }
62
63  size_t max_queue_size() const {
64    return max_queue_size_;
65  }
66  void set_max_queue_size(size_t size) {
67    max_queue_size_ = size;
68  }
69
70 protected:
71  virtual void ProcessWork(std::unique_ptr<T> workitem) = 0;
72  virtual void ProcessIdle(){}
73  virtual void Routine();
74
75  template <typename Predicate>
76  int WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred,
77               int64_t max_msecs);
78
79 private:
80  std::queue<std::unique_ptr<T>> queue_;
81  size_t max_queue_size_;
82  int64_t queue_timeout_ms_;
83  int64_t idle_timeout_ms_;
84  bool idled_out_;
85};
86
87template <typename T>
88template <typename Predicate>
89int QueueWorker<T>::WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred,
90                             int64_t max_msecs) {
91  bool ret = true;
92  auto wait_func = [&] { return pred() || should_exit(); };
93
94  if (max_msecs < 0) {
95    cond_.wait(lock, wait_func);
96  } else {
97    auto timeout = std::chrono::milliseconds(max_msecs);
98    ret = cond_.wait_for(lock, timeout, wait_func);
99  }
100
101  if (!ret)
102    return -ETIMEDOUT;
103  else if (should_exit())
104    return -EINTR;
105
106  return 0;
107}
108
109template <typename T>
110void QueueWorker<T>::Routine() {
111  std::unique_lock<std::mutex> lk(mutex_);
112  std::unique_ptr<T> workitem;
113
114  auto wait_func = [&] { return !queue_.empty(); };
115  int ret =
116      WaitCond(lk, wait_func, idled_out_ ? kTimeoutDisabled : idle_timeout_ms_);
117  switch (ret) {
118    case 0:
119      break;
120    case -ETIMEDOUT:
121      ProcessIdle();
122      idled_out_ = true;
123      return;
124    case -EINTR:
125    default:
126      return;
127  }
128
129  if (!queue_.empty()) {
130    workitem = std::move(queue_.front());
131    queue_.pop();
132  }
133  lk.unlock();
134  cond_.notify_all();
135
136  idled_out_ = false;
137  ProcessWork(std::move(workitem));
138}
139
140template <typename T>
141int QueueWorker<T>::QueueWork(std::unique_ptr<T> workitem) {
142  std::unique_lock<std::mutex> lk(mutex_);
143
144  auto wait_func = [&] { return queue_.size() < max_queue_size_; };
145  int ret = WaitCond(lk, wait_func, queue_timeout_ms_);
146  if (ret)
147    return ret;
148
149  queue_.push(std::move(workitem));
150  lk.unlock();
151
152  cond_.notify_one();
153
154  return 0;
155}
156};
157#endif
158