1// Copyright 2015 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <brillo/message_loops/base_message_loop.h>
6
7#include <fcntl.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
11
12#ifndef __ANDROID_HOST__
13// Used for MISC_MAJOR. Only required for the target and not always available
14// for the host.
15#include <linux/major.h>
16#endif
17
18#include <vector>
19
20#include <base/bind.h>
21#include <base/files/file_path.h>
22#include <base/files/file_util.h>
23#include <base/run_loop.h>
24#include <base/strings/string_number_conversions.h>
25#include <base/strings/string_split.h>
26
27#include <brillo/location_logging.h>
28#include <brillo/strings/string_utils.h>
29
30using base::Closure;
31
32namespace {
33
34const char kMiscMinorPath[] = "/proc/misc";
35const char kBinderDriverName[] = "binder";
36
37}  // namespace
38
39namespace brillo {
40
41const int BaseMessageLoop::kInvalidMinor = -1;
42const int BaseMessageLoop::kUninitializedMinor = -2;
43
44BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
45    : base_loop_(base_loop),
46      weak_ptr_factory_(this) {}
47
48BaseMessageLoop::~BaseMessageLoop() {
49  for (auto& io_task : io_tasks_) {
50    DVLOG_LOC(io_task.second.location(), 1)
51        << "Removing file descriptor watcher task_id " << io_task.first
52        << " leaked on BaseMessageLoop, scheduled from this location.";
53    io_task.second.StopWatching();
54  }
55
56  // Note all pending canceled delayed tasks when destroying the message loop.
57  size_t lazily_deleted_tasks = 0;
58  for (const auto& delayed_task : delayed_tasks_) {
59    if (delayed_task.second.closure.is_null()) {
60      lazily_deleted_tasks++;
61    } else {
62      DVLOG_LOC(delayed_task.second.location, 1)
63          << "Removing delayed task_id " << delayed_task.first
64          << " leaked on BaseMessageLoop, scheduled from this location.";
65    }
66  }
67  if (lazily_deleted_tasks) {
68    LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
69  }
70}
71
72MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
73    const tracked_objects::Location& from_here,
74    const Closure &task,
75    base::TimeDelta delay) {
76  TaskId task_id =  NextTaskId();
77  bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
78      from_here,
79      base::Bind(&BaseMessageLoop::OnRanPostedTask,
80                 weak_ptr_factory_.GetWeakPtr(),
81                 task_id),
82      delay);
83  DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
84                          << " to run in " << delay << ".";
85  if (!base_scheduled)
86    return MessageLoop::kTaskIdNull;
87
88  delayed_tasks_.emplace(task_id,
89                         DelayedTask{from_here, task_id, std::move(task)});
90  return task_id;
91}
92
93MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
94    const tracked_objects::Location& from_here,
95    int fd,
96    WatchMode mode,
97    bool persistent,
98    const Closure &task) {
99  // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
100  if (fd < 0)
101    return MessageLoop::kTaskIdNull;
102
103  base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
104  switch (mode) {
105    case MessageLoop::kWatchRead:
106      base_mode = base::MessageLoopForIO::WATCH_READ;
107      break;
108    case MessageLoop::kWatchWrite:
109      base_mode = base::MessageLoopForIO::WATCH_WRITE;
110      break;
111    default:
112      return MessageLoop::kTaskIdNull;
113  }
114
115  TaskId task_id =  NextTaskId();
116  auto it_bool = io_tasks_.emplace(
117      std::piecewise_construct,
118      std::forward_as_tuple(task_id),
119      std::forward_as_tuple(
120          from_here, this, task_id, fd, base_mode, persistent, task));
121  // This should always insert a new element.
122  DCHECK(it_bool.second);
123  bool scheduled = it_bool.first->second.StartWatching();
124  DVLOG_LOC(from_here, 1)
125      << "Watching fd " << fd << " for "
126      << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
127      << (persistent ? " persistently" : " just once")
128      << " as task_id " << task_id
129      << (scheduled ? " successfully" : " failed.");
130
131  if (!scheduled) {
132    io_tasks_.erase(task_id);
133    return MessageLoop::kTaskIdNull;
134  }
135
136#ifndef __ANDROID_HOST__
137  // Determine if the passed fd is the binder file descriptor. For that, we need
138  // to check that is a special char device and that the major and minor device
139  // numbers match. The binder file descriptor can't be removed and added back
140  // to an epoll group when there's work available to be done by the file
141  // descriptor due to bugs in the binder driver (b/26524111) when used with
142  // epoll. Therefore, we flag the binder fd and never attempt to remove it.
143  // This may cause the binder file descriptor to be attended with higher
144  // priority and cause starvation of other events.
145  struct stat buf;
146  if (fstat(fd, &buf) == 0 &&
147      S_ISCHR(buf.st_mode) &&
148      major(buf.st_rdev) == MISC_MAJOR &&
149      minor(buf.st_rdev) == GetBinderMinor()) {
150    it_bool.first->second.RunImmediately();
151  }
152#endif
153
154  return task_id;
155}
156
157bool BaseMessageLoop::CancelTask(TaskId task_id) {
158  if (task_id == kTaskIdNull)
159    return false;
160  auto delayed_task_it = delayed_tasks_.find(task_id);
161  if (delayed_task_it == delayed_tasks_.end()) {
162    // This might be an IOTask then.
163    auto io_task_it = io_tasks_.find(task_id);
164    if (io_task_it == io_tasks_.end())
165      return false;
166    return io_task_it->second.CancelTask();
167  }
168  // A DelayedTask was found for this task_id at this point.
169
170  // Check if the callback was already canceled but we have the entry in
171  // delayed_tasks_ since it didn't fire yet in the message loop.
172  if (delayed_task_it->second.closure.is_null())
173    return false;
174
175  DVLOG_LOC(delayed_task_it->second.location, 1)
176      << "Removing task_id " << task_id << " scheduled from this location.";
177  // We reset to closure to a null Closure to release all the resources
178  // used by this closure at this point, but we don't remove the task_id from
179  // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
180  delayed_task_it->second.closure = Closure();
181
182  return true;
183}
184
185bool BaseMessageLoop::RunOnce(bool may_block) {
186  run_once_ = true;
187  base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
188  base_run_loop_ = &run_loop;
189  if (!may_block)
190    run_loop.RunUntilIdle();
191  else
192    run_loop.Run();
193  base_run_loop_ = nullptr;
194  // If the flag was reset to false, it means a closure was run.
195  if (!run_once_)
196    return true;
197
198  run_once_ = false;
199  return false;
200}
201
202void BaseMessageLoop::Run() {
203  base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
204  base_run_loop_ = &run_loop;
205  run_loop.Run();
206  base_run_loop_ = nullptr;
207}
208
209void BaseMessageLoop::BreakLoop() {
210  if (base_run_loop_ == nullptr) {
211    DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
212    return;  // Message loop not running, nothing to do.
213  }
214  base_run_loop_->Quit();
215}
216
217Closure BaseMessageLoop::QuitClosure() const {
218  if (base_run_loop_ == nullptr)
219    return base::Bind(&base::DoNothing);
220  return base_run_loop_->QuitClosure();
221}
222
223MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
224  TaskId res;
225  do {
226    res = ++last_id_;
227    // We would run out of memory before we run out of task ids.
228  } while (!res ||
229           delayed_tasks_.find(res) != delayed_tasks_.end() ||
230           io_tasks_.find(res) != io_tasks_.end());
231  return res;
232}
233
234void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
235  auto task_it = delayed_tasks_.find(task_id);
236  DCHECK(task_it != delayed_tasks_.end());
237  if (!task_it->second.closure.is_null()) {
238    DVLOG_LOC(task_it->second.location, 1)
239        << "Running delayed task_id " << task_id
240        << " scheduled from this location.";
241    // Mark the task as canceled while we are running it so CancelTask returns
242    // false.
243    Closure closure = std::move(task_it->second.closure);
244    task_it->second.closure = Closure();
245    closure.Run();
246
247    // If the |run_once_| flag is set, it is because we are instructed to run
248    // only once callback.
249    if (run_once_) {
250      run_once_ = false;
251      BreakLoop();
252    }
253  }
254  delayed_tasks_.erase(task_it);
255}
256
257void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
258  auto task_it = io_tasks_.find(task_id);
259  // Even if this task was canceled while we were waiting in the message loop
260  // for this method to run, the entry in io_tasks_ should still be present, but
261  // won't do anything.
262  DCHECK(task_it != io_tasks_.end());
263  task_it->second.OnFileReadyPostedTask();
264}
265
266int BaseMessageLoop::ParseBinderMinor(
267    const std::string& file_contents) {
268  int result = kInvalidMinor;
269  // Split along '\n', then along the ' '. Note that base::SplitString trims all
270  // white spaces at the beginning and end after splitting.
271  std::vector<std::string> lines =
272      base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
273                        base::SPLIT_WANT_ALL);
274  for (const std::string& line : lines) {
275    if (line.empty())
276      continue;
277    std::string number;
278    std::string name;
279    if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
280      continue;
281
282    if (name == kBinderDriverName && base::StringToInt(number, &result))
283      break;
284  }
285  return result;
286}
287
288unsigned int BaseMessageLoop::GetBinderMinor() {
289  if (binder_minor_ != kUninitializedMinor)
290    return binder_minor_;
291
292  std::string proc_misc;
293  if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
294    return binder_minor_;
295  binder_minor_ = ParseBinderMinor(proc_misc);
296  return binder_minor_;
297}
298
299BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
300                                BaseMessageLoop* loop,
301                                MessageLoop::TaskId task_id,
302                                int fd,
303                                base::MessageLoopForIO::Mode base_mode,
304                                bool persistent,
305                                const Closure& task)
306    : location_(location), loop_(loop), task_id_(task_id),
307      fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
308
309bool BaseMessageLoop::IOTask::StartWatching() {
310  return loop_->base_loop_->WatchFileDescriptor(
311      fd_, persistent_, base_mode_, &fd_watcher_, this);
312}
313
314void BaseMessageLoop::IOTask::StopWatching() {
315  // This is safe to call even if we are not watching for it.
316  fd_watcher_.StopWatchingFileDescriptor();
317}
318
319void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
320  OnFileReady();
321}
322
323void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
324  OnFileReady();
325}
326
327void BaseMessageLoop::IOTask::OnFileReady() {
328  // For file descriptors marked with the immediate_run flag, we don't call
329  // StopWatching() and wait, instead we dispatch the callback immediately.
330  if (immediate_run_) {
331    posted_task_pending_ = true;
332    OnFileReadyPostedTask();
333    return;
334  }
335
336  // When the file descriptor becomes available we stop watching for it and
337  // schedule a task to run the callback from the main loop. The callback will
338  // run using the same scheduler used to run other delayed tasks, avoiding
339  // starvation of the available posted tasks if there are file descriptors
340  // always available. The new posted task will use the same TaskId as the
341  // current file descriptor watching task an could be canceled in either state,
342  // when waiting for the file descriptor or waiting in the main loop.
343  StopWatching();
344  bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
345      location_,
346      base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
347                 loop_->weak_ptr_factory_.GetWeakPtr(),
348                 task_id_));
349  posted_task_pending_ = true;
350  if (base_scheduled) {
351    DVLOG_LOC(location_, 1)
352        << "Dispatching task_id " << task_id_ << " for "
353        << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
354            "reading" : "writing")
355        << " file descriptor " << fd_ << ", scheduled from this location.";
356  } else {
357    // In the rare case that PostTask() fails, we fall back to run it directly.
358    // This would indicate a bigger problem with the message loop setup.
359    LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
360    OnFileReadyPostedTask();
361  }
362}
363
364void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
365  // We can't access |this| after running the |closure_| since it could call
366  // CancelTask on its own task_id, so we copy the members we need now.
367  BaseMessageLoop* loop_ptr = loop_;
368  DCHECK(posted_task_pending_ = true);
369  posted_task_pending_ = false;
370
371  // If this task was already canceled, the closure will be null and there is
372  // nothing else to do here. This execution doesn't count a step for RunOnce()
373  // unless we have a callback to run.
374  if (closure_.is_null()) {
375    loop_->io_tasks_.erase(task_id_);
376    return;
377  }
378
379  DVLOG_LOC(location_, 1)
380      << "Running task_id " << task_id_ << " for "
381      << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
382          "reading" : "writing")
383      << " file descriptor " << fd_ << ", scheduled from this location.";
384
385  if (persistent_) {
386    // In the persistent case we just run the callback. If this callback cancels
387    // the task id, we can't access |this| anymore, so we re-start watching the
388    // file descriptor before running the callback, unless this is a fd where
389    // we didn't stop watching the file descriptor when it became available.
390    if (!immediate_run_)
391      StartWatching();
392    closure_.Run();
393  } else {
394    // This will destroy |this|, the fd_watcher and therefore stop watching this
395    // file descriptor.
396    Closure closure_copy = std::move(closure_);
397    loop_->io_tasks_.erase(task_id_);
398    // Run the closure from the local copy we just made.
399    closure_copy.Run();
400  }
401
402  if (loop_ptr->run_once_) {
403    loop_ptr->run_once_ = false;
404    loop_ptr->BreakLoop();
405  }
406}
407
408bool BaseMessageLoop::IOTask::CancelTask() {
409  if (closure_.is_null())
410    return false;
411
412  DVLOG_LOC(location_, 1)
413      << "Removing task_id " << task_id_ << " scheduled from this location.";
414
415  if (!posted_task_pending_) {
416    // Destroying the FileDescriptorWatcher implicitly stops watching the file
417    // descriptor. This will delete our instance.
418    loop_->io_tasks_.erase(task_id_);
419    return true;
420  }
421  // The IOTask is waiting for the message loop to run its delayed task, so
422  // it is not watching for the file descriptor. We release the closure
423  // resources now but keep the IOTask instance alive while we wait for the
424  // callback to run and delete the IOTask.
425  closure_ = Closure();
426  return true;
427}
428
429}  // namespace brillo
430