1// Copyright 2015 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include <brillo/message_loops/base_message_loop.h> 6 7#include <fcntl.h> 8#include <sys/stat.h> 9#include <sys/types.h> 10#include <unistd.h> 11 12#ifndef __ANDROID_HOST__ 13// Used for MISC_MAJOR. Only required for the target and not always available 14// for the host. 15#include <linux/major.h> 16#endif 17 18#include <vector> 19 20#include <base/bind.h> 21#include <base/files/file_path.h> 22#include <base/files/file_util.h> 23#include <base/run_loop.h> 24#include <base/strings/string_number_conversions.h> 25#include <base/strings/string_split.h> 26 27#include <brillo/location_logging.h> 28#include <brillo/strings/string_utils.h> 29 30using base::Closure; 31 32namespace { 33 34const char kMiscMinorPath[] = "/proc/misc"; 35const char kBinderDriverName[] = "binder"; 36 37} // namespace 38 39namespace brillo { 40 41const int BaseMessageLoop::kInvalidMinor = -1; 42const int BaseMessageLoop::kUninitializedMinor = -2; 43 44BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop) 45 : base_loop_(base_loop), 46 weak_ptr_factory_(this) {} 47 48BaseMessageLoop::~BaseMessageLoop() { 49 for (auto& io_task : io_tasks_) { 50 DVLOG_LOC(io_task.second.location(), 1) 51 << "Removing file descriptor watcher task_id " << io_task.first 52 << " leaked on BaseMessageLoop, scheduled from this location."; 53 io_task.second.StopWatching(); 54 } 55 56 // Note all pending canceled delayed tasks when destroying the message loop. 57 size_t lazily_deleted_tasks = 0; 58 for (const auto& delayed_task : delayed_tasks_) { 59 if (delayed_task.second.closure.is_null()) { 60 lazily_deleted_tasks++; 61 } else { 62 DVLOG_LOC(delayed_task.second.location, 1) 63 << "Removing delayed task_id " << delayed_task.first 64 << " leaked on BaseMessageLoop, scheduled from this location."; 65 } 66 } 67 if (lazily_deleted_tasks) { 68 LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks."; 69 } 70} 71 72MessageLoop::TaskId BaseMessageLoop::PostDelayedTask( 73 const tracked_objects::Location& from_here, 74 const Closure &task, 75 base::TimeDelta delay) { 76 TaskId task_id = NextTaskId(); 77 bool base_scheduled = base_loop_->task_runner()->PostDelayedTask( 78 from_here, 79 base::Bind(&BaseMessageLoop::OnRanPostedTask, 80 weak_ptr_factory_.GetWeakPtr(), 81 task_id), 82 delay); 83 DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id 84 << " to run in " << delay << "."; 85 if (!base_scheduled) 86 return MessageLoop::kTaskIdNull; 87 88 delayed_tasks_.emplace(task_id, 89 DelayedTask{from_here, task_id, std::move(task)}); 90 return task_id; 91} 92 93MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor( 94 const tracked_objects::Location& from_here, 95 int fd, 96 WatchMode mode, 97 bool persistent, 98 const Closure &task) { 99 // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here. 100 if (fd < 0) 101 return MessageLoop::kTaskIdNull; 102 103 base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ; 104 switch (mode) { 105 case MessageLoop::kWatchRead: 106 base_mode = base::MessageLoopForIO::WATCH_READ; 107 break; 108 case MessageLoop::kWatchWrite: 109 base_mode = base::MessageLoopForIO::WATCH_WRITE; 110 break; 111 default: 112 return MessageLoop::kTaskIdNull; 113 } 114 115 TaskId task_id = NextTaskId(); 116 auto it_bool = io_tasks_.emplace( 117 std::piecewise_construct, 118 std::forward_as_tuple(task_id), 119 std::forward_as_tuple( 120 from_here, this, task_id, fd, base_mode, persistent, task)); 121 // This should always insert a new element. 122 DCHECK(it_bool.second); 123 bool scheduled = it_bool.first->second.StartWatching(); 124 DVLOG_LOC(from_here, 1) 125 << "Watching fd " << fd << " for " 126 << (mode == MessageLoop::kWatchRead ? "reading" : "writing") 127 << (persistent ? " persistently" : " just once") 128 << " as task_id " << task_id 129 << (scheduled ? " successfully" : " failed."); 130 131 if (!scheduled) { 132 io_tasks_.erase(task_id); 133 return MessageLoop::kTaskIdNull; 134 } 135 136#ifndef __ANDROID_HOST__ 137 // Determine if the passed fd is the binder file descriptor. For that, we need 138 // to check that is a special char device and that the major and minor device 139 // numbers match. The binder file descriptor can't be removed and added back 140 // to an epoll group when there's work available to be done by the file 141 // descriptor due to bugs in the binder driver (b/26524111) when used with 142 // epoll. Therefore, we flag the binder fd and never attempt to remove it. 143 // This may cause the binder file descriptor to be attended with higher 144 // priority and cause starvation of other events. 145 struct stat buf; 146 if (fstat(fd, &buf) == 0 && 147 S_ISCHR(buf.st_mode) && 148 major(buf.st_rdev) == MISC_MAJOR && 149 minor(buf.st_rdev) == GetBinderMinor()) { 150 it_bool.first->second.RunImmediately(); 151 } 152#endif 153 154 return task_id; 155} 156 157bool BaseMessageLoop::CancelTask(TaskId task_id) { 158 if (task_id == kTaskIdNull) 159 return false; 160 auto delayed_task_it = delayed_tasks_.find(task_id); 161 if (delayed_task_it == delayed_tasks_.end()) { 162 // This might be an IOTask then. 163 auto io_task_it = io_tasks_.find(task_id); 164 if (io_task_it == io_tasks_.end()) 165 return false; 166 return io_task_it->second.CancelTask(); 167 } 168 // A DelayedTask was found for this task_id at this point. 169 170 // Check if the callback was already canceled but we have the entry in 171 // delayed_tasks_ since it didn't fire yet in the message loop. 172 if (delayed_task_it->second.closure.is_null()) 173 return false; 174 175 DVLOG_LOC(delayed_task_it->second.location, 1) 176 << "Removing task_id " << task_id << " scheduled from this location."; 177 // We reset to closure to a null Closure to release all the resources 178 // used by this closure at this point, but we don't remove the task_id from 179 // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it. 180 delayed_task_it->second.closure = Closure(); 181 182 return true; 183} 184 185bool BaseMessageLoop::RunOnce(bool may_block) { 186 run_once_ = true; 187 base::RunLoop run_loop; // Uses the base::MessageLoopForIO implicitly. 188 base_run_loop_ = &run_loop; 189 if (!may_block) 190 run_loop.RunUntilIdle(); 191 else 192 run_loop.Run(); 193 base_run_loop_ = nullptr; 194 // If the flag was reset to false, it means a closure was run. 195 if (!run_once_) 196 return true; 197 198 run_once_ = false; 199 return false; 200} 201 202void BaseMessageLoop::Run() { 203 base::RunLoop run_loop; // Uses the base::MessageLoopForIO implicitly. 204 base_run_loop_ = &run_loop; 205 run_loop.Run(); 206 base_run_loop_ = nullptr; 207} 208 209void BaseMessageLoop::BreakLoop() { 210 if (base_run_loop_ == nullptr) { 211 DVLOG(1) << "Message loop not running, ignoring BreakLoop()."; 212 return; // Message loop not running, nothing to do. 213 } 214 base_run_loop_->Quit(); 215} 216 217Closure BaseMessageLoop::QuitClosure() const { 218 if (base_run_loop_ == nullptr) 219 return base::Bind(&base::DoNothing); 220 return base_run_loop_->QuitClosure(); 221} 222 223MessageLoop::TaskId BaseMessageLoop::NextTaskId() { 224 TaskId res; 225 do { 226 res = ++last_id_; 227 // We would run out of memory before we run out of task ids. 228 } while (!res || 229 delayed_tasks_.find(res) != delayed_tasks_.end() || 230 io_tasks_.find(res) != io_tasks_.end()); 231 return res; 232} 233 234void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) { 235 auto task_it = delayed_tasks_.find(task_id); 236 DCHECK(task_it != delayed_tasks_.end()); 237 if (!task_it->second.closure.is_null()) { 238 DVLOG_LOC(task_it->second.location, 1) 239 << "Running delayed task_id " << task_id 240 << " scheduled from this location."; 241 // Mark the task as canceled while we are running it so CancelTask returns 242 // false. 243 Closure closure = std::move(task_it->second.closure); 244 task_it->second.closure = Closure(); 245 closure.Run(); 246 247 // If the |run_once_| flag is set, it is because we are instructed to run 248 // only once callback. 249 if (run_once_) { 250 run_once_ = false; 251 BreakLoop(); 252 } 253 } 254 delayed_tasks_.erase(task_it); 255} 256 257void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) { 258 auto task_it = io_tasks_.find(task_id); 259 // Even if this task was canceled while we were waiting in the message loop 260 // for this method to run, the entry in io_tasks_ should still be present, but 261 // won't do anything. 262 DCHECK(task_it != io_tasks_.end()); 263 task_it->second.OnFileReadyPostedTask(); 264} 265 266int BaseMessageLoop::ParseBinderMinor( 267 const std::string& file_contents) { 268 int result = kInvalidMinor; 269 // Split along '\n', then along the ' '. Note that base::SplitString trims all 270 // white spaces at the beginning and end after splitting. 271 std::vector<std::string> lines = 272 base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE, 273 base::SPLIT_WANT_ALL); 274 for (const std::string& line : lines) { 275 if (line.empty()) 276 continue; 277 std::string number; 278 std::string name; 279 if (!string_utils::SplitAtFirst(line, " ", &number, &name, false)) 280 continue; 281 282 if (name == kBinderDriverName && base::StringToInt(number, &result)) 283 break; 284 } 285 return result; 286} 287 288unsigned int BaseMessageLoop::GetBinderMinor() { 289 if (binder_minor_ != kUninitializedMinor) 290 return binder_minor_; 291 292 std::string proc_misc; 293 if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc)) 294 return binder_minor_; 295 binder_minor_ = ParseBinderMinor(proc_misc); 296 return binder_minor_; 297} 298 299BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location, 300 BaseMessageLoop* loop, 301 MessageLoop::TaskId task_id, 302 int fd, 303 base::MessageLoopForIO::Mode base_mode, 304 bool persistent, 305 const Closure& task) 306 : location_(location), loop_(loop), task_id_(task_id), 307 fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {} 308 309bool BaseMessageLoop::IOTask::StartWatching() { 310 return loop_->base_loop_->WatchFileDescriptor( 311 fd_, persistent_, base_mode_, &fd_watcher_, this); 312} 313 314void BaseMessageLoop::IOTask::StopWatching() { 315 // This is safe to call even if we are not watching for it. 316 fd_watcher_.StopWatchingFileDescriptor(); 317} 318 319void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) { 320 OnFileReady(); 321} 322 323void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) { 324 OnFileReady(); 325} 326 327void BaseMessageLoop::IOTask::OnFileReady() { 328 // For file descriptors marked with the immediate_run flag, we don't call 329 // StopWatching() and wait, instead we dispatch the callback immediately. 330 if (immediate_run_) { 331 posted_task_pending_ = true; 332 OnFileReadyPostedTask(); 333 return; 334 } 335 336 // When the file descriptor becomes available we stop watching for it and 337 // schedule a task to run the callback from the main loop. The callback will 338 // run using the same scheduler used to run other delayed tasks, avoiding 339 // starvation of the available posted tasks if there are file descriptors 340 // always available. The new posted task will use the same TaskId as the 341 // current file descriptor watching task an could be canceled in either state, 342 // when waiting for the file descriptor or waiting in the main loop. 343 StopWatching(); 344 bool base_scheduled = loop_->base_loop_->task_runner()->PostTask( 345 location_, 346 base::Bind(&BaseMessageLoop::OnFileReadyPostedTask, 347 loop_->weak_ptr_factory_.GetWeakPtr(), 348 task_id_)); 349 posted_task_pending_ = true; 350 if (base_scheduled) { 351 DVLOG_LOC(location_, 1) 352 << "Dispatching task_id " << task_id_ << " for " 353 << (base_mode_ == base::MessageLoopForIO::WATCH_READ ? 354 "reading" : "writing") 355 << " file descriptor " << fd_ << ", scheduled from this location."; 356 } else { 357 // In the rare case that PostTask() fails, we fall back to run it directly. 358 // This would indicate a bigger problem with the message loop setup. 359 LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask()."; 360 OnFileReadyPostedTask(); 361 } 362} 363 364void BaseMessageLoop::IOTask::OnFileReadyPostedTask() { 365 // We can't access |this| after running the |closure_| since it could call 366 // CancelTask on its own task_id, so we copy the members we need now. 367 BaseMessageLoop* loop_ptr = loop_; 368 DCHECK(posted_task_pending_ = true); 369 posted_task_pending_ = false; 370 371 // If this task was already canceled, the closure will be null and there is 372 // nothing else to do here. This execution doesn't count a step for RunOnce() 373 // unless we have a callback to run. 374 if (closure_.is_null()) { 375 loop_->io_tasks_.erase(task_id_); 376 return; 377 } 378 379 DVLOG_LOC(location_, 1) 380 << "Running task_id " << task_id_ << " for " 381 << (base_mode_ == base::MessageLoopForIO::WATCH_READ ? 382 "reading" : "writing") 383 << " file descriptor " << fd_ << ", scheduled from this location."; 384 385 if (persistent_) { 386 // In the persistent case we just run the callback. If this callback cancels 387 // the task id, we can't access |this| anymore, so we re-start watching the 388 // file descriptor before running the callback, unless this is a fd where 389 // we didn't stop watching the file descriptor when it became available. 390 if (!immediate_run_) 391 StartWatching(); 392 closure_.Run(); 393 } else { 394 // This will destroy |this|, the fd_watcher and therefore stop watching this 395 // file descriptor. 396 Closure closure_copy = std::move(closure_); 397 loop_->io_tasks_.erase(task_id_); 398 // Run the closure from the local copy we just made. 399 closure_copy.Run(); 400 } 401 402 if (loop_ptr->run_once_) { 403 loop_ptr->run_once_ = false; 404 loop_ptr->BreakLoop(); 405 } 406} 407 408bool BaseMessageLoop::IOTask::CancelTask() { 409 if (closure_.is_null()) 410 return false; 411 412 DVLOG_LOC(location_, 1) 413 << "Removing task_id " << task_id_ << " scheduled from this location."; 414 415 if (!posted_task_pending_) { 416 // Destroying the FileDescriptorWatcher implicitly stops watching the file 417 // descriptor. This will delete our instance. 418 loop_->io_tasks_.erase(task_id_); 419 return true; 420 } 421 // The IOTask is waiting for the message loop to run its delayed task, so 422 // it is not watching for the file descriptor. We release the closure 423 // resources now but keep the IOTask instance alive while we wait for the 424 // callback to run and delete the IOTask. 425 closure_ = Closure(); 426 return true; 427} 428 429} // namespace brillo 430