bus.cc revision eb525c5499e34cc9c4b825d6d9e75bb07cc06ace
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "dbus/bus.h" 6 7#include "base/bind.h" 8#include "base/logging.h" 9#include "base/message_loop.h" 10#include "base/message_loop/message_loop_proxy.h" 11#include "base/stl_util.h" 12#include "base/strings/stringprintf.h" 13#include "base/threading/thread.h" 14#include "base/threading/thread_restrictions.h" 15#include "base/time/time.h" 16#include "dbus/exported_object.h" 17#include "dbus/message.h" 18#include "dbus/object_manager.h" 19#include "dbus/object_path.h" 20#include "dbus/object_proxy.h" 21#include "dbus/scoped_dbus_error.h" 22 23namespace dbus { 24 25namespace { 26 27const char kDisconnectedSignal[] = "Disconnected"; 28const char kDisconnectedMatchRule[] = 29 "type='signal', path='/org/freedesktop/DBus/Local'," 30 "interface='org.freedesktop.DBus.Local', member='Disconnected'"; 31 32// The NameOwnerChanged member in org.freedesktop.DBus 33const char kNameOwnerChangedSignal[] = "NameOwnerChanged"; 34 35// The match rule used to filter for changes to a given service name owner. 36const char kServiceNameOwnerChangeMatchRule[] = 37 "type='signal',interface='org.freedesktop.DBus'," 38 "member='NameOwnerChanged',path='/org/freedesktop/DBus'," 39 "sender='org.freedesktop.DBus',arg0='%s'"; 40 41// The class is used for watching the file descriptor used for D-Bus 42// communication. 43class Watch : public base::MessagePumpLibevent::Watcher { 44 public: 45 explicit Watch(DBusWatch* watch) 46 : raw_watch_(watch) { 47 dbus_watch_set_data(raw_watch_, this, NULL); 48 } 49 50 virtual ~Watch() { 51 dbus_watch_set_data(raw_watch_, NULL, NULL); 52 } 53 54 // Returns true if the underlying file descriptor is ready to be watched. 55 bool IsReadyToBeWatched() { 56 return dbus_watch_get_enabled(raw_watch_); 57 } 58 59 // Starts watching the underlying file descriptor. 60 void StartWatching() { 61 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); 62 const int flags = dbus_watch_get_flags(raw_watch_); 63 64 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ; 65 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) 66 mode = base::MessageLoopForIO::WATCH_READ_WRITE; 67 else if (flags & DBUS_WATCH_READABLE) 68 mode = base::MessageLoopForIO::WATCH_READ; 69 else if (flags & DBUS_WATCH_WRITABLE) 70 mode = base::MessageLoopForIO::WATCH_WRITE; 71 else 72 NOTREACHED(); 73 74 const bool persistent = true; // Watch persistently. 75 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor( 76 file_descriptor, persistent, mode, &file_descriptor_watcher_, this); 77 CHECK(success) << "Unable to allocate memory"; 78 } 79 80 // Stops watching the underlying file descriptor. 81 void StopWatching() { 82 file_descriptor_watcher_.StopWatchingFileDescriptor(); 83 } 84 85 private: 86 // Implement MessagePumpLibevent::Watcher. 87 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE { 88 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); 89 CHECK(success) << "Unable to allocate memory"; 90 } 91 92 // Implement MessagePumpLibevent::Watcher. 93 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE { 94 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); 95 CHECK(success) << "Unable to allocate memory"; 96 } 97 98 DBusWatch* raw_watch_; 99 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; 100}; 101 102// The class is used for monitoring the timeout used for D-Bus method 103// calls. 104// 105// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of 106// the object is is alive when HandleTimeout() is called. It's unlikely 107// but it may be possible that HandleTimeout() is called after 108// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in 109// Bus::OnRemoveTimeout(). 110class Timeout : public base::RefCountedThreadSafe<Timeout> { 111 public: 112 explicit Timeout(DBusTimeout* timeout) 113 : raw_timeout_(timeout), 114 monitoring_is_active_(false), 115 is_completed(false) { 116 dbus_timeout_set_data(raw_timeout_, this, NULL); 117 AddRef(); // Balanced on Complete(). 118 } 119 120 // Returns true if the timeout is ready to be monitored. 121 bool IsReadyToBeMonitored() { 122 return dbus_timeout_get_enabled(raw_timeout_); 123 } 124 125 // Starts monitoring the timeout. 126 void StartMonitoring(Bus* bus) { 127 bus->PostDelayedTaskToDBusThread(FROM_HERE, 128 base::Bind(&Timeout::HandleTimeout, 129 this), 130 GetInterval()); 131 monitoring_is_active_ = true; 132 } 133 134 // Stops monitoring the timeout. 135 void StopMonitoring() { 136 // We cannot take back the delayed task we posted in 137 // StartMonitoring(), so we just mark the monitoring is inactive now. 138 monitoring_is_active_ = false; 139 } 140 141 // Returns the interval. 142 base::TimeDelta GetInterval() { 143 return base::TimeDelta::FromMilliseconds( 144 dbus_timeout_get_interval(raw_timeout_)); 145 } 146 147 // Cleans up the raw_timeout and marks that timeout is completed. 148 // See the class comment above for why we are doing this. 149 void Complete() { 150 dbus_timeout_set_data(raw_timeout_, NULL, NULL); 151 is_completed = true; 152 Release(); 153 } 154 155 private: 156 friend class base::RefCountedThreadSafe<Timeout>; 157 ~Timeout() { 158 } 159 160 // Handles the timeout. 161 void HandleTimeout() { 162 // If the timeout is marked completed, we should do nothing. This can 163 // occur if this function is called after Bus::OnRemoveTimeout(). 164 if (is_completed) 165 return; 166 // Skip if monitoring is canceled. 167 if (!monitoring_is_active_) 168 return; 169 170 const bool success = dbus_timeout_handle(raw_timeout_); 171 CHECK(success) << "Unable to allocate memory"; 172 } 173 174 DBusTimeout* raw_timeout_; 175 bool monitoring_is_active_; 176 bool is_completed; 177}; 178 179} // namespace 180 181Bus::Options::Options() 182 : bus_type(SESSION), 183 connection_type(PRIVATE) { 184} 185 186Bus::Options::~Options() { 187} 188 189Bus::Bus(const Options& options) 190 : bus_type_(options.bus_type), 191 connection_type_(options.connection_type), 192 dbus_task_runner_(options.dbus_task_runner), 193 on_shutdown_(false /* manual_reset */, false /* initially_signaled */), 194 connection_(NULL), 195 origin_thread_id_(base::PlatformThread::CurrentId()), 196 async_operations_set_up_(false), 197 shutdown_completed_(false), 198 num_pending_watches_(0), 199 num_pending_timeouts_(0), 200 address_(options.address), 201 on_disconnected_closure_(options.disconnected_callback) { 202 // This is safe to call multiple times. 203 dbus_threads_init_default(); 204 // The origin message loop is unnecessary if the client uses synchronous 205 // functions only. 206 if (base::MessageLoop::current()) 207 origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy(); 208} 209 210Bus::~Bus() { 211 DCHECK(!connection_); 212 DCHECK(owned_service_names_.empty()); 213 DCHECK(match_rules_added_.empty()); 214 DCHECK(filter_functions_added_.empty()); 215 DCHECK(registered_object_paths_.empty()); 216 DCHECK_EQ(0, num_pending_watches_); 217 // TODO(satorux): This check fails occasionally in browser_tests for tests 218 // that run very quickly. Perhaps something does not have time to clean up. 219 // Despite the check failing, the tests seem to run fine. crosbug.com/23416 220 // DCHECK_EQ(0, num_pending_timeouts_); 221} 222 223ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, 224 const ObjectPath& object_path) { 225 return GetObjectProxyWithOptions(service_name, object_path, 226 ObjectProxy::DEFAULT_OPTIONS); 227} 228 229ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name, 230 const ObjectPath& object_path, 231 int options) { 232 AssertOnOriginThread(); 233 234 // Check if we already have the requested object proxy. 235 const ObjectProxyTable::key_type key(service_name + object_path.value(), 236 options); 237 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 238 if (iter != object_proxy_table_.end()) { 239 return iter->second.get(); 240 } 241 242 scoped_refptr<ObjectProxy> object_proxy = 243 new ObjectProxy(this, service_name, object_path, options); 244 object_proxy_table_[key] = object_proxy; 245 246 return object_proxy.get(); 247} 248 249bool Bus::RemoveObjectProxy(const std::string& service_name, 250 const ObjectPath& object_path, 251 const base::Closure& callback) { 252 return RemoveObjectProxyWithOptions(service_name, object_path, 253 ObjectProxy::DEFAULT_OPTIONS, 254 callback); 255} 256 257bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name, 258 const ObjectPath& object_path, 259 int options, 260 const base::Closure& callback) { 261 AssertOnOriginThread(); 262 263 // Check if we have the requested object proxy. 264 const ObjectProxyTable::key_type key(service_name + object_path.value(), 265 options); 266 ObjectProxyTable::iterator iter = object_proxy_table_.find(key); 267 if (iter != object_proxy_table_.end()) { 268 // Object is present. Remove it now and Detach in the DBus thread. 269 PostTaskToDBusThread(FROM_HERE, base::Bind( 270 &Bus::RemoveObjectProxyInternal, 271 this, iter->second, callback)); 272 273 object_proxy_table_.erase(iter); 274 return true; 275 } 276 return false; 277} 278 279void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy, 280 const base::Closure& callback) { 281 AssertOnDBusThread(); 282 283 object_proxy.get()->Detach(); 284 285 PostTaskToOriginThread(FROM_HERE, callback); 286} 287 288ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) { 289 AssertOnOriginThread(); 290 291 // Check if we already have the requested exported object. 292 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 293 if (iter != exported_object_table_.end()) { 294 return iter->second.get(); 295 } 296 297 scoped_refptr<ExportedObject> exported_object = 298 new ExportedObject(this, object_path); 299 exported_object_table_[object_path] = exported_object; 300 301 return exported_object.get(); 302} 303 304void Bus::UnregisterExportedObject(const ObjectPath& object_path) { 305 AssertOnOriginThread(); 306 307 // Remove the registered object from the table first, to allow a new 308 // GetExportedObject() call to return a new object, rather than this one. 309 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path); 310 if (iter == exported_object_table_.end()) 311 return; 312 313 scoped_refptr<ExportedObject> exported_object = iter->second; 314 exported_object_table_.erase(iter); 315 316 // Post the task to perform the final unregistration to the D-Bus thread. 317 // Since the registration also happens on the D-Bus thread in 318 // TryRegisterObjectPath(), and the task runner we post to is a 319 // SequencedTaskRunner, there is a guarantee that this will happen before any 320 // future registration call. 321 PostTaskToDBusThread(FROM_HERE, 322 base::Bind(&Bus::UnregisterExportedObjectInternal, 323 this, exported_object)); 324} 325 326void Bus::UnregisterExportedObjectInternal( 327 scoped_refptr<ExportedObject> exported_object) { 328 AssertOnDBusThread(); 329 330 exported_object->Unregister(); 331} 332 333ObjectManager* Bus::GetObjectManager(const std::string& service_name, 334 const ObjectPath& object_path) { 335 AssertOnOriginThread(); 336 337 // Check if we already have the requested object manager. 338 const ObjectManagerTable::key_type key(service_name + object_path.value()); 339 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 340 if (iter != object_manager_table_.end()) { 341 return iter->second.get(); 342 } 343 344 scoped_refptr<ObjectManager> object_manager = 345 new ObjectManager(this, service_name, object_path); 346 object_manager_table_[key] = object_manager; 347 348 return object_manager.get(); 349} 350 351void Bus::RemoveObjectManager(const std::string& service_name, 352 const ObjectPath& object_path) { 353 AssertOnOriginThread(); 354 355 const ObjectManagerTable::key_type key(service_name + object_path.value()); 356 ObjectManagerTable::iterator iter = object_manager_table_.find(key); 357 if (iter == object_manager_table_.end()) 358 return; 359 360 scoped_refptr<ObjectManager> object_manager = iter->second; 361 object_manager_table_.erase(iter); 362} 363 364void Bus::GetManagedObjects() { 365 for (ObjectManagerTable::iterator iter = object_manager_table_.begin(); 366 iter != object_manager_table_.end(); ++iter) { 367 iter->second->GetManagedObjects(); 368 } 369} 370 371bool Bus::Connect() { 372 // dbus_bus_get_private() and dbus_bus_get() are blocking calls. 373 AssertOnDBusThread(); 374 375 // Check if it's already initialized. 376 if (connection_) 377 return true; 378 379 ScopedDBusError error; 380 if (bus_type_ == CUSTOM_ADDRESS) { 381 if (connection_type_ == PRIVATE) { 382 connection_ = dbus_connection_open_private(address_.c_str(), error.get()); 383 } else { 384 connection_ = dbus_connection_open(address_.c_str(), error.get()); 385 } 386 } else { 387 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); 388 if (connection_type_ == PRIVATE) { 389 connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); 390 } else { 391 connection_ = dbus_bus_get(dbus_bus_type, error.get()); 392 } 393 } 394 if (!connection_) { 395 LOG(ERROR) << "Failed to connect to the bus: " 396 << (error.is_set() ? error.message() : ""); 397 return false; 398 } 399 400 if (bus_type_ == CUSTOM_ADDRESS) { 401 // We should call dbus_bus_register here, otherwise unique name can not be 402 // acquired. According to dbus specification, it is responsible to call 403 // org.freedesktop.DBus.Hello method at the beging of bus connection to 404 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is 405 // called internally. 406 if (!dbus_bus_register(connection_, error.get())) { 407 LOG(ERROR) << "Failed to register the bus component: " 408 << (error.is_set() ? error.message() : ""); 409 return false; 410 } 411 } 412 // We shouldn't exit on the disconnected signal. 413 dbus_connection_set_exit_on_disconnect(connection_, false); 414 415 // Watch Disconnected signal. 416 AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 417 AddMatch(kDisconnectedMatchRule, error.get()); 418 419 return true; 420} 421 422void Bus::ClosePrivateConnection() { 423 // dbus_connection_close is blocking call. 424 AssertOnDBusThread(); 425 DCHECK_EQ(PRIVATE, connection_type_) 426 << "non-private connection should not be closed"; 427 dbus_connection_close(connection_); 428} 429 430void Bus::ShutdownAndBlock() { 431 AssertOnDBusThread(); 432 433 if (shutdown_completed_) 434 return; // Already shutdowned, just return. 435 436 // Unregister the exported objects. 437 for (ExportedObjectTable::iterator iter = exported_object_table_.begin(); 438 iter != exported_object_table_.end(); ++iter) { 439 iter->second->Unregister(); 440 } 441 442 // Release all service names. 443 for (std::set<std::string>::iterator iter = owned_service_names_.begin(); 444 iter != owned_service_names_.end();) { 445 // This is a bit tricky but we should increment the iter here as 446 // ReleaseOwnership() may remove |service_name| from the set. 447 const std::string& service_name = *iter++; 448 ReleaseOwnership(service_name); 449 } 450 if (!owned_service_names_.empty()) { 451 LOG(ERROR) << "Failed to release all service names. # of services left: " 452 << owned_service_names_.size(); 453 } 454 455 // Detach from the remote objects. 456 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin(); 457 iter != object_proxy_table_.end(); ++iter) { 458 iter->second->Detach(); 459 } 460 461 // Release object proxies and exported objects here. We should do this 462 // here rather than in the destructor to avoid memory leaks due to 463 // cyclic references. 464 object_proxy_table_.clear(); 465 exported_object_table_.clear(); 466 467 // Private connection should be closed. 468 if (connection_) { 469 // Remove Disconnected watcher. 470 ScopedDBusError error; 471 RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this); 472 RemoveMatch(kDisconnectedMatchRule, error.get()); 473 474 if (connection_type_ == PRIVATE) 475 ClosePrivateConnection(); 476 // dbus_connection_close() won't unref. 477 dbus_connection_unref(connection_); 478 } 479 480 connection_ = NULL; 481 shutdown_completed_ = true; 482} 483 484void Bus::ShutdownOnDBusThreadAndBlock() { 485 AssertOnOriginThread(); 486 DCHECK(dbus_task_runner_.get()); 487 488 PostTaskToDBusThread(FROM_HERE, base::Bind( 489 &Bus::ShutdownOnDBusThreadAndBlockInternal, 490 this)); 491 492 // http://crbug.com/125222 493 base::ThreadRestrictions::ScopedAllowWait allow_wait; 494 495 // Wait until the shutdown is complete on the D-Bus thread. 496 // The shutdown should not hang, but set timeout just in case. 497 const int kTimeoutSecs = 3; 498 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs)); 499 const bool signaled = on_shutdown_.TimedWait(timeout); 500 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus"; 501} 502 503void Bus::RequestOwnership(const std::string& service_name, 504 OnOwnershipCallback on_ownership_callback) { 505 AssertOnOriginThread(); 506 507 PostTaskToDBusThread(FROM_HERE, base::Bind( 508 &Bus::RequestOwnershipInternal, 509 this, service_name, on_ownership_callback)); 510} 511 512void Bus::RequestOwnershipInternal(const std::string& service_name, 513 OnOwnershipCallback on_ownership_callback) { 514 AssertOnDBusThread(); 515 516 bool success = Connect(); 517 if (success) 518 success = RequestOwnershipAndBlock(service_name); 519 520 PostTaskToOriginThread(FROM_HERE, 521 base::Bind(on_ownership_callback, 522 service_name, 523 success)); 524} 525 526bool Bus::RequestOwnershipAndBlock(const std::string& service_name) { 527 DCHECK(connection_); 528 // dbus_bus_request_name() is a blocking call. 529 AssertOnDBusThread(); 530 531 // Check if we already own the service name. 532 if (owned_service_names_.find(service_name) != owned_service_names_.end()) { 533 return true; 534 } 535 536 ScopedDBusError error; 537 const int result = dbus_bus_request_name(connection_, 538 service_name.c_str(), 539 DBUS_NAME_FLAG_DO_NOT_QUEUE, 540 error.get()); 541 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { 542 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": " 543 << (error.is_set() ? error.message() : ""); 544 return false; 545 } 546 owned_service_names_.insert(service_name); 547 return true; 548} 549 550bool Bus::ReleaseOwnership(const std::string& service_name) { 551 DCHECK(connection_); 552 // dbus_bus_request_name() is a blocking call. 553 AssertOnDBusThread(); 554 555 // Check if we already own the service name. 556 std::set<std::string>::iterator found = 557 owned_service_names_.find(service_name); 558 if (found == owned_service_names_.end()) { 559 LOG(ERROR) << service_name << " is not owned by the bus"; 560 return false; 561 } 562 563 ScopedDBusError error; 564 const int result = dbus_bus_release_name(connection_, service_name.c_str(), 565 error.get()); 566 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { 567 owned_service_names_.erase(found); 568 return true; 569 } else { 570 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": " 571 << (error.is_set() ? error.message() : ""); 572 return false; 573 } 574} 575 576bool Bus::SetUpAsyncOperations() { 577 DCHECK(connection_); 578 AssertOnDBusThread(); 579 580 if (async_operations_set_up_) 581 return true; 582 583 // Process all the incoming data if any, so that OnDispatchStatus() will 584 // be called when the incoming data is ready. 585 ProcessAllIncomingDataIfAny(); 586 587 bool success = dbus_connection_set_watch_functions(connection_, 588 &Bus::OnAddWatchThunk, 589 &Bus::OnRemoveWatchThunk, 590 &Bus::OnToggleWatchThunk, 591 this, 592 NULL); 593 CHECK(success) << "Unable to allocate memory"; 594 595 success = dbus_connection_set_timeout_functions(connection_, 596 &Bus::OnAddTimeoutThunk, 597 &Bus::OnRemoveTimeoutThunk, 598 &Bus::OnToggleTimeoutThunk, 599 this, 600 NULL); 601 CHECK(success) << "Unable to allocate memory"; 602 603 dbus_connection_set_dispatch_status_function( 604 connection_, 605 &Bus::OnDispatchStatusChangedThunk, 606 this, 607 NULL); 608 609 async_operations_set_up_ = true; 610 611 return true; 612} 613 614DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, 615 int timeout_ms, 616 DBusError* error) { 617 DCHECK(connection_); 618 AssertOnDBusThread(); 619 620 return dbus_connection_send_with_reply_and_block( 621 connection_, request, timeout_ms, error); 622} 623 624void Bus::SendWithReply(DBusMessage* request, 625 DBusPendingCall** pending_call, 626 int timeout_ms) { 627 DCHECK(connection_); 628 AssertOnDBusThread(); 629 630 const bool success = dbus_connection_send_with_reply( 631 connection_, request, pending_call, timeout_ms); 632 CHECK(success) << "Unable to allocate memory"; 633} 634 635void Bus::Send(DBusMessage* request, uint32* serial) { 636 DCHECK(connection_); 637 AssertOnDBusThread(); 638 639 const bool success = dbus_connection_send(connection_, request, serial); 640 CHECK(success) << "Unable to allocate memory"; 641} 642 643bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, 644 void* user_data) { 645 DCHECK(connection_); 646 AssertOnDBusThread(); 647 648 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 649 std::make_pair(filter_function, user_data); 650 if (filter_functions_added_.find(filter_data_pair) != 651 filter_functions_added_.end()) { 652 VLOG(1) << "Filter function already exists: " << filter_function 653 << " with associated data: " << user_data; 654 return false; 655 } 656 657 const bool success = dbus_connection_add_filter( 658 connection_, filter_function, user_data, NULL); 659 CHECK(success) << "Unable to allocate memory"; 660 filter_functions_added_.insert(filter_data_pair); 661 return true; 662} 663 664bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, 665 void* user_data) { 666 DCHECK(connection_); 667 AssertOnDBusThread(); 668 669 std::pair<DBusHandleMessageFunction, void*> filter_data_pair = 670 std::make_pair(filter_function, user_data); 671 if (filter_functions_added_.find(filter_data_pair) == 672 filter_functions_added_.end()) { 673 VLOG(1) << "Requested to remove an unknown filter function: " 674 << filter_function 675 << " with associated data: " << user_data; 676 return false; 677 } 678 679 dbus_connection_remove_filter(connection_, filter_function, user_data); 680 filter_functions_added_.erase(filter_data_pair); 681 return true; 682} 683 684void Bus::AddMatch(const std::string& match_rule, DBusError* error) { 685 DCHECK(connection_); 686 AssertOnDBusThread(); 687 688 std::map<std::string, int>::iterator iter = 689 match_rules_added_.find(match_rule); 690 if (iter != match_rules_added_.end()) { 691 // The already existing rule's counter is incremented. 692 iter->second++; 693 694 VLOG(1) << "Match rule already exists: " << match_rule; 695 return; 696 } 697 698 dbus_bus_add_match(connection_, match_rule.c_str(), error); 699 match_rules_added_[match_rule] = 1; 700} 701 702bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { 703 DCHECK(connection_); 704 AssertOnDBusThread(); 705 706 std::map<std::string, int>::iterator iter = 707 match_rules_added_.find(match_rule); 708 if (iter == match_rules_added_.end()) { 709 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; 710 return false; 711 } 712 713 // The rule's counter is decremented and the rule is deleted when reachs 0. 714 iter->second--; 715 if (iter->second == 0) { 716 dbus_bus_remove_match(connection_, match_rule.c_str(), error); 717 match_rules_added_.erase(match_rule); 718 } 719 return true; 720} 721 722bool Bus::TryRegisterObjectPath(const ObjectPath& object_path, 723 const DBusObjectPathVTable* vtable, 724 void* user_data, 725 DBusError* error) { 726 DCHECK(connection_); 727 AssertOnDBusThread(); 728 729 if (registered_object_paths_.find(object_path) != 730 registered_object_paths_.end()) { 731 LOG(ERROR) << "Object path already registered: " << object_path.value(); 732 return false; 733 } 734 735 const bool success = dbus_connection_try_register_object_path( 736 connection_, 737 object_path.value().c_str(), 738 vtable, 739 user_data, 740 error); 741 if (success) 742 registered_object_paths_.insert(object_path); 743 return success; 744} 745 746void Bus::UnregisterObjectPath(const ObjectPath& object_path) { 747 DCHECK(connection_); 748 AssertOnDBusThread(); 749 750 if (registered_object_paths_.find(object_path) == 751 registered_object_paths_.end()) { 752 LOG(ERROR) << "Requested to unregister an unknown object path: " 753 << object_path.value(); 754 return; 755 } 756 757 const bool success = dbus_connection_unregister_object_path( 758 connection_, 759 object_path.value().c_str()); 760 CHECK(success) << "Unable to allocate memory"; 761 registered_object_paths_.erase(object_path); 762} 763 764void Bus::ShutdownOnDBusThreadAndBlockInternal() { 765 AssertOnDBusThread(); 766 767 ShutdownAndBlock(); 768 on_shutdown_.Signal(); 769} 770 771void Bus::ProcessAllIncomingDataIfAny() { 772 AssertOnDBusThread(); 773 774 // As mentioned at the class comment in .h file, connection_ can be NULL. 775 if (!connection_) 776 return; 777 778 // It is safe and necessary to call dbus_connection_get_dispatch_status even 779 // if the connection is lost. Otherwise we will miss "Disconnected" signal. 780 // (crbug.com/174431) 781 if (dbus_connection_get_dispatch_status(connection_) == 782 DBUS_DISPATCH_DATA_REMAINS) { 783 while (dbus_connection_dispatch(connection_) == 784 DBUS_DISPATCH_DATA_REMAINS) { 785 } 786 } 787} 788 789void Bus::PostTaskToDBusThreadAndReply( 790 const tracked_objects::Location& from_here, 791 const base::Closure& task, 792 const base::Closure& reply) { 793 AssertOnOriginThread(); 794 795 if (dbus_task_runner_.get()) { 796 if (!dbus_task_runner_->PostTaskAndReply(from_here, task, reply)) { 797 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 798 } 799 } else { 800 DCHECK(origin_task_runner_.get()); 801 if (!origin_task_runner_->PostTaskAndReply(from_here, task, reply)) { 802 LOG(WARNING) << "Failed to post a task to the origin message loop"; 803 } 804 } 805} 806 807void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, 808 const base::Closure& task) { 809 DCHECK(origin_task_runner_.get()); 810 if (!origin_task_runner_->PostTask(from_here, task)) { 811 LOG(WARNING) << "Failed to post a task to the origin message loop"; 812 } 813} 814 815void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, 816 const base::Closure& task) { 817 if (dbus_task_runner_.get()) { 818 if (!dbus_task_runner_->PostTask(from_here, task)) { 819 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 820 } 821 } else { 822 DCHECK(origin_task_runner_.get()); 823 if (!origin_task_runner_->PostTask(from_here, task)) { 824 LOG(WARNING) << "Failed to post a task to the origin message loop"; 825 } 826 } 827} 828 829void Bus::PostDelayedTaskToDBusThread( 830 const tracked_objects::Location& from_here, 831 const base::Closure& task, 832 base::TimeDelta delay) { 833 if (dbus_task_runner_.get()) { 834 if (!dbus_task_runner_->PostDelayedTask( 835 from_here, task, delay)) { 836 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop"; 837 } 838 } else { 839 DCHECK(origin_task_runner_.get()); 840 if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) { 841 LOG(WARNING) << "Failed to post a task to the origin message loop"; 842 } 843 } 844} 845 846bool Bus::HasDBusThread() { 847 return dbus_task_runner_.get() != NULL; 848} 849 850void Bus::AssertOnOriginThread() { 851 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); 852} 853 854void Bus::AssertOnDBusThread() { 855 base::ThreadRestrictions::AssertIOAllowed(); 856 857 if (dbus_task_runner_.get()) { 858 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread()); 859 } else { 860 AssertOnOriginThread(); 861 } 862} 863 864std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name, 865 GetServiceOwnerOption options) { 866 AssertOnDBusThread(); 867 868 MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner"); 869 MessageWriter writer(&get_name_owner_call); 870 writer.AppendString(service_name); 871 VLOG(1) << "Method call: " << get_name_owner_call.ToString(); 872 873 const ObjectPath obj_path("/org/freedesktop/DBus"); 874 if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") || 875 !get_name_owner_call.SetPath(obj_path)) { 876 if (options == REPORT_ERRORS) 877 LOG(ERROR) << "Failed to get name owner."; 878 return ""; 879 } 880 881 ScopedDBusError error; 882 DBusMessage* response_message = 883 SendWithReplyAndBlock(get_name_owner_call.raw_message(), 884 ObjectProxy::TIMEOUT_USE_DEFAULT, 885 error.get()); 886 if (!response_message) { 887 if (options == REPORT_ERRORS) { 888 LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": " 889 << error.message(); 890 } 891 return ""; 892 } 893 894 scoped_ptr<Response> response(Response::FromRawMessage(response_message)); 895 MessageReader reader(response.get()); 896 897 std::string service_owner; 898 if (!reader.PopString(&service_owner)) 899 service_owner.clear(); 900 return service_owner; 901} 902 903void Bus::GetServiceOwner(const std::string& service_name, 904 const GetServiceOwnerCallback& callback) { 905 AssertOnOriginThread(); 906 907 PostTaskToDBusThread( 908 FROM_HERE, 909 base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback)); 910} 911 912void Bus::GetServiceOwnerInternal(const std::string& service_name, 913 const GetServiceOwnerCallback& callback) { 914 AssertOnDBusThread(); 915 916 std::string service_owner; 917 if (Connect()) 918 service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS); 919 PostTaskToOriginThread(FROM_HERE, base::Bind(callback, service_owner)); 920} 921 922void Bus::ListenForServiceOwnerChange( 923 const std::string& service_name, 924 const GetServiceOwnerCallback& callback) { 925 AssertOnOriginThread(); 926 DCHECK(!service_name.empty()); 927 DCHECK(!callback.is_null()); 928 929 PostTaskToDBusThread(FROM_HERE, 930 base::Bind(&Bus::ListenForServiceOwnerChangeInternal, 931 this, service_name, callback)); 932} 933 934void Bus::ListenForServiceOwnerChangeInternal( 935 const std::string& service_name, 936 const GetServiceOwnerCallback& callback) { 937 AssertOnDBusThread(); 938 DCHECK(!service_name.empty()); 939 DCHECK(!callback.is_null()); 940 941 if (!Connect() || !SetUpAsyncOperations()) 942 return; 943 944 if (service_owner_changed_listener_map_.empty()) { 945 bool filter_added = 946 AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 947 DCHECK(filter_added); 948 } 949 950 ServiceOwnerChangedListenerMap::iterator it = 951 service_owner_changed_listener_map_.find(service_name); 952 if (it == service_owner_changed_listener_map_.end()) { 953 // Add a match rule for the new service name. 954 const std::string name_owner_changed_match_rule = 955 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 956 service_name.c_str()); 957 ScopedDBusError error; 958 AddMatch(name_owner_changed_match_rule, error.get()); 959 if (error.is_set()) { 960 LOG(ERROR) << "Failed to add match rule for " << service_name 961 << ". Got " << error.name() << ": " << error.message(); 962 return; 963 } 964 965 service_owner_changed_listener_map_[service_name].push_back(callback); 966 return; 967 } 968 969 // Check if the callback has already been added. 970 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 971 for (size_t i = 0; i < callbacks.size(); ++i) { 972 if (callbacks[i].Equals(callback)) 973 return; 974 } 975 callbacks.push_back(callback); 976} 977 978void Bus::UnlistenForServiceOwnerChange( 979 const std::string& service_name, 980 const GetServiceOwnerCallback& callback) { 981 AssertOnOriginThread(); 982 DCHECK(!service_name.empty()); 983 DCHECK(!callback.is_null()); 984 985 PostTaskToDBusThread(FROM_HERE, 986 base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal, 987 this, service_name, callback)); 988} 989 990void Bus::UnlistenForServiceOwnerChangeInternal( 991 const std::string& service_name, 992 const GetServiceOwnerCallback& callback) { 993 AssertOnDBusThread(); 994 DCHECK(!service_name.empty()); 995 DCHECK(!callback.is_null()); 996 997 ServiceOwnerChangedListenerMap::iterator it = 998 service_owner_changed_listener_map_.find(service_name); 999 if (it == service_owner_changed_listener_map_.end()) 1000 return; 1001 1002 std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1003 for (size_t i = 0; i < callbacks.size(); ++i) { 1004 if (callbacks[i].Equals(callback)) { 1005 callbacks.erase(callbacks.begin() + i); 1006 break; // There can be only one. 1007 } 1008 } 1009 if (!callbacks.empty()) 1010 return; 1011 1012 // Last callback for |service_name| has been removed, remove match rule. 1013 const std::string name_owner_changed_match_rule = 1014 base::StringPrintf(kServiceNameOwnerChangeMatchRule, 1015 service_name.c_str()); 1016 ScopedDBusError error; 1017 RemoveMatch(name_owner_changed_match_rule, error.get()); 1018 // And remove |service_owner_changed_listener_map_| entry. 1019 service_owner_changed_listener_map_.erase(it); 1020 1021 if (service_owner_changed_listener_map_.empty()) { 1022 bool filter_removed = 1023 RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this); 1024 DCHECK(filter_removed); 1025 } 1026} 1027 1028dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { 1029 AssertOnDBusThread(); 1030 1031 // watch will be deleted when raw_watch is removed in OnRemoveWatch(). 1032 Watch* watch = new Watch(raw_watch); 1033 if (watch->IsReadyToBeWatched()) { 1034 watch->StartWatching(); 1035 } 1036 ++num_pending_watches_; 1037 return true; 1038} 1039 1040void Bus::OnRemoveWatch(DBusWatch* raw_watch) { 1041 AssertOnDBusThread(); 1042 1043 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1044 delete watch; 1045 --num_pending_watches_; 1046} 1047 1048void Bus::OnToggleWatch(DBusWatch* raw_watch) { 1049 AssertOnDBusThread(); 1050 1051 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); 1052 if (watch->IsReadyToBeWatched()) { 1053 watch->StartWatching(); 1054 } else { 1055 // It's safe to call this if StartWatching() wasn't called, per 1056 // message_pump_libevent.h. 1057 watch->StopWatching(); 1058 } 1059} 1060 1061dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { 1062 AssertOnDBusThread(); 1063 1064 // timeout will be deleted when raw_timeout is removed in 1065 // OnRemoveTimeoutThunk(). 1066 Timeout* timeout = new Timeout(raw_timeout); 1067 if (timeout->IsReadyToBeMonitored()) { 1068 timeout->StartMonitoring(this); 1069 } 1070 ++num_pending_timeouts_; 1071 return true; 1072} 1073 1074void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { 1075 AssertOnDBusThread(); 1076 1077 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1078 timeout->Complete(); 1079 --num_pending_timeouts_; 1080} 1081 1082void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { 1083 AssertOnDBusThread(); 1084 1085 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); 1086 if (timeout->IsReadyToBeMonitored()) { 1087 timeout->StartMonitoring(this); 1088 } else { 1089 timeout->StopMonitoring(); 1090 } 1091} 1092 1093void Bus::OnDispatchStatusChanged(DBusConnection* connection, 1094 DBusDispatchStatus status) { 1095 DCHECK_EQ(connection, connection_); 1096 AssertOnDBusThread(); 1097 1098 // We cannot call ProcessAllIncomingDataIfAny() here, as calling 1099 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is 1100 // prohibited by the D-Bus library. Hence, we post a task here instead. 1101 // See comments for dbus_connection_set_dispatch_status_function(). 1102 PostTaskToDBusThread(FROM_HERE, 1103 base::Bind(&Bus::ProcessAllIncomingDataIfAny, 1104 this)); 1105} 1106 1107void Bus::OnConnectionDisconnected(DBusConnection* connection) { 1108 AssertOnDBusThread(); 1109 1110 if (!on_disconnected_closure_.is_null()) 1111 PostTaskToOriginThread(FROM_HERE, on_disconnected_closure_); 1112 1113 if (!connection) 1114 return; 1115 DCHECK(!dbus_connection_get_is_connected(connection)); 1116 1117 ShutdownAndBlock(); 1118} 1119 1120void Bus::OnServiceOwnerChanged(DBusMessage* message) { 1121 DCHECK(message); 1122 AssertOnDBusThread(); 1123 1124 // |message| will be unrefed on exit of the function. Increment the 1125 // reference so we can use it in Signal::FromRawMessage() below. 1126 dbus_message_ref(message); 1127 scoped_ptr<Signal> signal(Signal::FromRawMessage(message)); 1128 1129 // Confirm the validity of the NameOwnerChanged signal. 1130 if (signal->GetMember() != kNameOwnerChangedSignal || 1131 signal->GetInterface() != DBUS_INTERFACE_DBUS || 1132 signal->GetSender() != DBUS_SERVICE_DBUS) { 1133 return; 1134 } 1135 1136 MessageReader reader(signal.get()); 1137 std::string service_name; 1138 std::string old_owner; 1139 std::string new_owner; 1140 if (!reader.PopString(&service_name) || 1141 !reader.PopString(&old_owner) || 1142 !reader.PopString(&new_owner)) { 1143 return; 1144 } 1145 1146 ServiceOwnerChangedListenerMap::const_iterator it = 1147 service_owner_changed_listener_map_.find(service_name); 1148 if (it == service_owner_changed_listener_map_.end()) 1149 return; 1150 1151 const std::vector<GetServiceOwnerCallback>& callbacks = it->second; 1152 for (size_t i = 0; i < callbacks.size(); ++i) { 1153 PostTaskToOriginThread(FROM_HERE, 1154 base::Bind(callbacks[i], new_owner)); 1155 } 1156} 1157 1158// static 1159dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { 1160 Bus* self = static_cast<Bus*>(data); 1161 return self->OnAddWatch(raw_watch); 1162} 1163 1164// static 1165void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { 1166 Bus* self = static_cast<Bus*>(data); 1167 self->OnRemoveWatch(raw_watch); 1168} 1169 1170// static 1171void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { 1172 Bus* self = static_cast<Bus*>(data); 1173 self->OnToggleWatch(raw_watch); 1174} 1175 1176// static 1177dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1178 Bus* self = static_cast<Bus*>(data); 1179 return self->OnAddTimeout(raw_timeout); 1180} 1181 1182// static 1183void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1184 Bus* self = static_cast<Bus*>(data); 1185 self->OnRemoveTimeout(raw_timeout); 1186} 1187 1188// static 1189void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { 1190 Bus* self = static_cast<Bus*>(data); 1191 self->OnToggleTimeout(raw_timeout); 1192} 1193 1194// static 1195void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, 1196 DBusDispatchStatus status, 1197 void* data) { 1198 Bus* self = static_cast<Bus*>(data); 1199 self->OnDispatchStatusChanged(connection, status); 1200} 1201 1202// static 1203DBusHandlerResult Bus::OnConnectionDisconnectedFilter( 1204 DBusConnection* connection, 1205 DBusMessage* message, 1206 void* data) { 1207 if (dbus_message_is_signal(message, 1208 DBUS_INTERFACE_LOCAL, 1209 kDisconnectedSignal)) { 1210 Bus* self = static_cast<Bus*>(data); 1211 self->OnConnectionDisconnected(connection); 1212 return DBUS_HANDLER_RESULT_HANDLED; 1213 } 1214 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1215} 1216 1217// static 1218DBusHandlerResult Bus::OnServiceOwnerChangedFilter( 1219 DBusConnection* connection, 1220 DBusMessage* message, 1221 void* data) { 1222 if (dbus_message_is_signal(message, 1223 DBUS_INTERFACE_DBUS, 1224 kNameOwnerChangedSignal)) { 1225 Bus* self = static_cast<Bus*>(data); 1226 self->OnServiceOwnerChanged(message); 1227 } 1228 // Always return unhandled to let others, e.g. ObjectProxies, handle the same 1229 // signal. 1230 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; 1231} 1232 1233} // namespace dbus 1234