1#include "uds/service_dispatcher.h"
2
3#include <errno.h>
4#include <log/log.h>
5#include <sys/epoll.h>
6#include <sys/eventfd.h>
7
8#include "pdx/service.h"
9#include "uds/service_endpoint.h"
10
11static const int kMaxEventsPerLoop = 128;
12
13namespace android {
14namespace pdx {
15namespace uds {
16
17std::unique_ptr<pdx::ServiceDispatcher> ServiceDispatcher::Create() {
18  std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
19  if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
20    dispatcher.reset();
21  }
22
23  return std::move(dispatcher);
24}
25
26ServiceDispatcher::ServiceDispatcher() {
27  event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
28  if (!event_fd_) {
29    ALOGE("Failed to create event fd because: %s\n", strerror(errno));
30    return;
31  }
32
33  epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
34  if (!epoll_fd_) {
35    ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
36    return;
37  }
38
39  // Use "this" as a unique pointer to distinguish the event fd from all
40  // the other entries that point to instances of Service.
41  epoll_event event;
42  event.events = EPOLLIN;
43  event.data.ptr = this;
44
45  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
46    ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
47
48    // Close the fds here and signal failure to the factory method.
49    event_fd_.Close();
50    epoll_fd_.Close();
51  }
52}
53
54ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
55
56int ServiceDispatcher::ThreadEnter() {
57  std::lock_guard<std::mutex> autolock(mutex_);
58
59  if (canceled_)
60    return -EBUSY;
61
62  thread_count_++;
63  return 0;
64}
65
66void ServiceDispatcher::ThreadExit() {
67  std::lock_guard<std::mutex> autolock(mutex_);
68  thread_count_--;
69  condition_.notify_one();
70}
71
72int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
73  if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
74    return -EINVAL;
75
76  std::lock_guard<std::mutex> autolock(mutex_);
77
78  auto* endpoint = static_cast<Endpoint*>(service->endpoint());
79  epoll_event event;
80  event.events = EPOLLIN;
81  event.data.ptr = service.get();
82
83  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, endpoint->epoll_fd(), &event) <
84      0) {
85    ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
86    return -errno;
87  }
88
89  services_.push_back(service);
90  return 0;
91}
92
93int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
94  if (service->endpoint()->GetIpcTag() != Endpoint::kIpcTag)
95    return -EINVAL;
96
97  std::lock_guard<std::mutex> autolock(mutex_);
98
99  // It's dangerous to remove a service while other threads may be using it.
100  if (thread_count_ > 0)
101    return -EBUSY;
102
103  epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
104
105  auto* endpoint = static_cast<Endpoint*>(service->endpoint());
106  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, endpoint->epoll_fd(), &dummy) <
107      0) {
108    ALOGE("Failed to remove service from dispatcher because: %s\n",
109          strerror(errno));
110    return -errno;
111  }
112
113  services_.remove(service);
114  return 0;
115}
116
117int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
118
119int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
120  int ret = ThreadEnter();
121  if (ret < 0)
122    return ret;
123
124  epoll_event events[kMaxEventsPerLoop];
125
126  int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
127  if (count <= 0) {
128    ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
129             strerror(errno));
130    ThreadExit();
131    return count < 0 ? -errno : -ETIMEDOUT;
132  }
133
134  for (int i = 0; i < count; i++) {
135    if (events[i].data.ptr == this) {
136      ThreadExit();
137      return -EBUSY;
138    } else {
139      Service* service = static_cast<Service*>(events[i].data.ptr);
140
141      ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
142               static_cast<Endpoint*>(service->endpoint())->epoll_fd());
143      service->ReceiveAndDispatch();
144    }
145  }
146
147  ThreadExit();
148  return 0;
149}
150
151int ServiceDispatcher::EnterDispatchLoop() {
152  int ret = ThreadEnter();
153  if (ret < 0)
154    return ret;
155
156  epoll_event events[kMaxEventsPerLoop];
157
158  while (!IsCanceled()) {
159    int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
160    if (count < 0 && errno != EINTR) {
161      ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
162      ThreadExit();
163      return -errno;
164    }
165
166    for (int i = 0; i < count; i++) {
167      if (events[i].data.ptr == this) {
168        ThreadExit();
169        return -EBUSY;
170      } else {
171        Service* service = static_cast<Service*>(events[i].data.ptr);
172
173        ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
174                 static_cast<Endpoint*>(service->endpoint())->epoll_fd());
175        service->ReceiveAndDispatch();
176      }
177    }
178  }
179
180  ThreadExit();
181  return 0;
182}
183
184void ServiceDispatcher::SetCanceled(bool cancel) {
185  std::unique_lock<std::mutex> lock(mutex_);
186  canceled_ = cancel;
187
188  if (canceled_ && thread_count_ > 0) {
189    eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.
190
191    condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
192
193    eventfd_t value;
194    eventfd_read(event_fd_.Get(), &value);  // Unsignal.
195  }
196}
197
198bool ServiceDispatcher::IsCanceled() const { return canceled_; }
199
200}  // namespace uds
201}  // namespace pdx
202}  // namespace android
203