message_pump_libevent.cc revision 731df977c0511bca2206b5f333555b1205ff1f43
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/message_pump_libevent.h"
6
7#include <errno.h>
8#include <fcntl.h>
9
10#include "base/auto_reset.h"
11#include "base/eintr_wrapper.h"
12#include "base/logging.h"
13#include "base/mac/scoped_nsautorelease_pool.h"
14#include "base/observer_list.h"
15#include "base/scoped_ptr.h"
16#include "base/time.h"
17#if defined(USE_SYSTEM_LIBEVENT)
18#include <event.h>
19#else
20#include "third_party/libevent/event.h"
21#endif
22
23// Lifecycle of struct event
24// Libevent uses two main data structures:
25// struct event_base (of which there is one per message pump), and
26// struct event (of which there is roughly one per socket).
27// The socket's struct event is created in
28// MessagePumpLibevent::WatchFileDescriptor(),
29// is owned by the FileDescriptorWatcher, and is destroyed in
30// StopWatchingFileDescriptor().
31// It is moved into and out of lists in struct event_base by
32// the libevent functions event_add() and event_del().
33//
34// TODO(dkegel):
35// At the moment bad things happen if a FileDescriptorWatcher
36// is active after its MessagePumpLibevent has been destroyed.
37// See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
38// Not clear yet whether that situation occurs in practice,
39// but if it does, we need to fix it.
40
41namespace base {
42
43// Return 0 on success
44// Too small a function to bother putting in a library?
45static int SetNonBlocking(int fd) {
46  int flags = fcntl(fd, F_GETFL, 0);
47  if (flags == -1)
48    flags = 0;
49  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
50}
51
52MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
53    : is_persistent_(false),
54      event_(NULL),
55      pump_(NULL),
56      watcher_(NULL) {
57}
58
59MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
60  if (event_) {
61    StopWatchingFileDescriptor();
62  }
63}
64
65void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
66                                                      bool is_persistent) {
67  DCHECK(e);
68  DCHECK(event_ == NULL);
69
70  is_persistent_ = is_persistent;
71  event_ = e;
72}
73
74event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
75  struct event *e = event_;
76  event_ = NULL;
77  return e;
78}
79
80bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
81  event* e = ReleaseEvent();
82  if (e == NULL)
83    return true;
84
85  // event_del() is a no-op if the event isn't active.
86  int rv = event_del(e);
87  delete e;
88  pump_ = NULL;
89  watcher_ = NULL;
90  return (rv == 0);
91}
92
93void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
94    int fd, MessagePumpLibevent* pump) {
95  pump->WillProcessIOEvent();
96  watcher_->OnFileCanReadWithoutBlocking(fd);
97  pump->DidProcessIOEvent();
98}
99
100void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
101    int fd, MessagePumpLibevent* pump) {
102  pump->WillProcessIOEvent();
103  watcher_->OnFileCanWriteWithoutBlocking(fd);
104  pump->DidProcessIOEvent();
105}
106
107// Called if a byte is received on the wakeup pipe.
108void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
109  base::MessagePumpLibevent* that =
110              static_cast<base::MessagePumpLibevent*>(context);
111  DCHECK(that->wakeup_pipe_out_ == socket);
112
113  // Remove and discard the wakeup byte.
114  char buf;
115  int nread = HANDLE_EINTR(read(socket, &buf, 1));
116  DCHECK_EQ(nread, 1);
117  // Tell libevent to break out of inner loop.
118  event_base_loopbreak(that->event_base_);
119}
120
121MessagePumpLibevent::MessagePumpLibevent()
122    : keep_running_(true),
123      in_run_(false),
124      event_base_(event_base_new()),
125      wakeup_pipe_in_(-1),
126      wakeup_pipe_out_(-1) {
127  if (!Init())
128     NOTREACHED();
129}
130
131bool MessagePumpLibevent::Init() {
132  int fds[2];
133  if (pipe(fds)) {
134    DLOG(ERROR) << "pipe() failed, errno: " << errno;
135    return false;
136  }
137  if (SetNonBlocking(fds[0])) {
138    DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
139    return false;
140  }
141  if (SetNonBlocking(fds[1])) {
142    DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
143    return false;
144  }
145  wakeup_pipe_out_ = fds[0];
146  wakeup_pipe_in_ = fds[1];
147
148  wakeup_event_ = new event;
149  event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
150            OnWakeup, this);
151  event_base_set(event_base_, wakeup_event_);
152
153  if (event_add(wakeup_event_, 0))
154    return false;
155  return true;
156}
157
158MessagePumpLibevent::~MessagePumpLibevent() {
159  DCHECK(wakeup_event_);
160  DCHECK(event_base_);
161  event_del(wakeup_event_);
162  delete wakeup_event_;
163  if (wakeup_pipe_in_ >= 0) {
164    if (HANDLE_EINTR(close(wakeup_pipe_in_)) < 0)
165      PLOG(ERROR) << "close";
166  }
167  if (wakeup_pipe_out_ >= 0) {
168    if (HANDLE_EINTR(close(wakeup_pipe_out_)) < 0)
169      PLOG(ERROR) << "close";
170  }
171  event_base_free(event_base_);
172}
173
174bool MessagePumpLibevent::WatchFileDescriptor(int fd,
175                                              bool persistent,
176                                              Mode mode,
177                                              FileDescriptorWatcher *controller,
178                                              Watcher *delegate) {
179  DCHECK_GE(fd, 0);
180  DCHECK(controller);
181  DCHECK(delegate);
182  DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
183
184  int event_mask = persistent ? EV_PERSIST : 0;
185  if ((mode & WATCH_READ) != 0) {
186    event_mask |= EV_READ;
187  }
188  if ((mode & WATCH_WRITE) != 0) {
189    event_mask |= EV_WRITE;
190  }
191
192  scoped_ptr<event> evt(controller->ReleaseEvent());
193  if (evt.get() == NULL) {
194    // Ownership is transferred to the controller.
195    evt.reset(new event);
196  } else {
197    // Make sure we don't pick up any funky internal libevent masks.
198    int old_interest_mask = evt.get()->ev_events &
199        (EV_READ | EV_WRITE | EV_PERSIST);
200
201    // Combine old/new event masks.
202    event_mask |= old_interest_mask;
203
204    // Must disarm the event before we can reuse it.
205    event_del(evt.get());
206
207    // It's illegal to use this function to listen on 2 separate fds with the
208    // same |controller|.
209    if (EVENT_FD(evt.get()) != fd) {
210      NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
211      return false;
212    }
213  }
214
215  // Set current interest mask and message pump for this event.
216  event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
217
218  // Tell libevent which message pump this socket will belong to when we add it.
219  if (event_base_set(event_base_, evt.get()) != 0) {
220    return false;
221  }
222
223  // Add this socket to the list of monitored sockets.
224  if (event_add(evt.get(), NULL) != 0) {
225    return false;
226  }
227
228  // Transfer ownership of evt to controller.
229  controller->Init(evt.release(), persistent);
230
231  controller->set_watcher(delegate);
232  controller->set_pump(this);
233
234  return true;
235}
236
237void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
238                                                 void* context) {
239  FileDescriptorWatcher* controller =
240      static_cast<FileDescriptorWatcher*>(context);
241
242  MessagePumpLibevent* pump = controller->pump();
243
244  if (flags & EV_WRITE) {
245    controller->OnFileCanWriteWithoutBlocking(fd, pump);
246  }
247  if (flags & EV_READ) {
248    controller->OnFileCanReadWithoutBlocking(fd, pump);
249  }
250}
251
252// Tell libevent to break out of inner loop.
253static void timer_callback(int fd, short events, void *context)
254{
255  event_base_loopbreak((struct event_base *)context);
256}
257
258// Reentrant!
259void MessagePumpLibevent::Run(Delegate* delegate) {
260  DCHECK(keep_running_) << "Quit must have been called outside of Run!";
261  AutoReset<bool> auto_reset_in_run(&in_run_, true);
262
263  // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
264  // Instead, make our own timer and reuse it on each call to event_base_loop().
265  scoped_ptr<event> timer_event(new event);
266
267  for (;;) {
268    mac::ScopedNSAutoreleasePool autorelease_pool;
269
270    bool did_work = delegate->DoWork();
271    if (!keep_running_)
272      break;
273
274    did_work |= delegate->DoDelayedWork(&delayed_work_time_);
275    if (!keep_running_)
276      break;
277
278    if (did_work)
279      continue;
280
281    did_work = delegate->DoIdleWork();
282    if (!keep_running_)
283      break;
284
285    if (did_work)
286      continue;
287
288    // EVLOOP_ONCE tells libevent to only block once,
289    // but to service all pending events when it wakes up.
290    if (delayed_work_time_.is_null()) {
291      event_base_loop(event_base_, EVLOOP_ONCE);
292    } else {
293      TimeDelta delay = delayed_work_time_ - Time::Now();
294      if (delay > TimeDelta()) {
295        struct timeval poll_tv;
296        poll_tv.tv_sec = delay.InSeconds();
297        poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
298        event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
299        event_base_set(event_base_, timer_event.get());
300        event_add(timer_event.get(), &poll_tv);
301        event_base_loop(event_base_, EVLOOP_ONCE);
302        event_del(timer_event.get());
303      } else {
304        // It looks like delayed_work_time_ indicates a time in the past, so we
305        // need to call DoDelayedWork now.
306        delayed_work_time_ = Time();
307      }
308    }
309  }
310
311  keep_running_ = true;
312}
313
314void MessagePumpLibevent::Quit() {
315  DCHECK(in_run_);
316  // Tell both libevent and Run that they should break out of their loops.
317  keep_running_ = false;
318  ScheduleWork();
319}
320
321void MessagePumpLibevent::ScheduleWork() {
322  // Tell libevent (in a threadsafe way) that it should break out of its loop.
323  char buf = 0;
324  int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
325  DCHECK(nwrite == 1 || errno == EAGAIN)
326      << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
327}
328
329void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
330  // We know that we can't be blocked on Wait right now since this method can
331  // only be called on the same thread as Run, so we only need to update our
332  // record of how long to sleep when we do sleep.
333  delayed_work_time_ = delayed_work_time;
334}
335
336void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
337  io_observers_.AddObserver(obs);
338}
339
340void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
341  io_observers_.RemoveObserver(obs);
342}
343
344void MessagePumpLibevent::WillProcessIOEvent() {
345  FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
346}
347
348void MessagePumpLibevent::DidProcessIOEvent() {
349  FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
350}
351
352}  // namespace base
353