1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "dbus/bus.h"
6
7#include <stddef.h>
8
9#include "base/bind.h"
10#include "base/logging.h"
11#include "base/message_loop/message_loop.h"
12#include "base/stl_util.h"
13#include "base/strings/stringprintf.h"
14#include "base/threading/thread.h"
15#include "base/threading/thread_restrictions.h"
16#include "base/threading/thread_task_runner_handle.h"
17#include "base/time/time.h"
18#include "dbus/exported_object.h"
19#include "dbus/message.h"
20#include "dbus/object_manager.h"
21#include "dbus/object_path.h"
22#include "dbus/object_proxy.h"
23#include "dbus/scoped_dbus_error.h"
24
25namespace dbus {
26
27namespace {
28
29// The NameOwnerChanged member in org.freedesktop.DBus
30const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
31
32// The match rule used to filter for changes to a given service name owner.
33const char kServiceNameOwnerChangeMatchRule[] =
34    "type='signal',interface='org.freedesktop.DBus',"
35    "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
36    "sender='org.freedesktop.DBus',arg0='%s'";
37
38// The class is used for watching the file descriptor used for D-Bus
39// communication.
40class Watch : public base::MessagePumpLibevent::Watcher {
41 public:
42  explicit Watch(DBusWatch* watch)
43      : raw_watch_(watch) {
44    dbus_watch_set_data(raw_watch_, this, NULL);
45  }
46
47  ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); }
48
49  // Returns true if the underlying file descriptor is ready to be watched.
50  bool IsReadyToBeWatched() {
51    return dbus_watch_get_enabled(raw_watch_);
52  }
53
54  // Starts watching the underlying file descriptor.
55  void StartWatching() {
56    const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
57    const int flags = dbus_watch_get_flags(raw_watch_);
58
59    base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
60    if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
61      mode = base::MessageLoopForIO::WATCH_READ_WRITE;
62    else if (flags & DBUS_WATCH_READABLE)
63      mode = base::MessageLoopForIO::WATCH_READ;
64    else if (flags & DBUS_WATCH_WRITABLE)
65      mode = base::MessageLoopForIO::WATCH_WRITE;
66    else
67      NOTREACHED();
68
69    const bool persistent = true;  // Watch persistently.
70    const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
71        file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
72    CHECK(success) << "Unable to allocate memory";
73  }
74
75  // Stops watching the underlying file descriptor.
76  void StopWatching() {
77    file_descriptor_watcher_.StopWatchingFileDescriptor();
78  }
79
80 private:
81  // Implement MessagePumpLibevent::Watcher.
82  void OnFileCanReadWithoutBlocking(int /*file_descriptor*/) override {
83    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
84    CHECK(success) << "Unable to allocate memory";
85  }
86
87  // Implement MessagePumpLibevent::Watcher.
88  void OnFileCanWriteWithoutBlocking(int /*file_descriptor*/) override {
89    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
90    CHECK(success) << "Unable to allocate memory";
91  }
92
93  DBusWatch* raw_watch_;
94  base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
95};
96
97// The class is used for monitoring the timeout used for D-Bus method
98// calls.
99//
100// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
101// the object is is alive when HandleTimeout() is called. It's unlikely
102// but it may be possible that HandleTimeout() is called after
103// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
104// Bus::OnRemoveTimeout().
105class Timeout : public base::RefCountedThreadSafe<Timeout> {
106 public:
107  explicit Timeout(DBusTimeout* timeout)
108      : raw_timeout_(timeout),
109        monitoring_is_active_(false),
110        is_completed(false) {
111    dbus_timeout_set_data(raw_timeout_, this, NULL);
112    AddRef();  // Balanced on Complete().
113  }
114
115  // Returns true if the timeout is ready to be monitored.
116  bool IsReadyToBeMonitored() {
117    return dbus_timeout_get_enabled(raw_timeout_);
118  }
119
120  // Starts monitoring the timeout.
121  void StartMonitoring(Bus* bus) {
122    bus->GetDBusTaskRunner()->PostDelayedTask(
123        FROM_HERE,
124        base::Bind(&Timeout::HandleTimeout, this),
125        GetInterval());
126    monitoring_is_active_ = true;
127  }
128
129  // Stops monitoring the timeout.
130  void StopMonitoring() {
131    // We cannot take back the delayed task we posted in
132    // StartMonitoring(), so we just mark the monitoring is inactive now.
133    monitoring_is_active_ = false;
134  }
135
136  // Returns the interval.
137  base::TimeDelta GetInterval() {
138    return base::TimeDelta::FromMilliseconds(
139        dbus_timeout_get_interval(raw_timeout_));
140  }
141
142  // Cleans up the raw_timeout and marks that timeout is completed.
143  // See the class comment above for why we are doing this.
144  void Complete() {
145    dbus_timeout_set_data(raw_timeout_, NULL, NULL);
146    is_completed = true;
147    Release();
148  }
149
150 private:
151  friend class base::RefCountedThreadSafe<Timeout>;
152  ~Timeout() {
153  }
154
155  // Handles the timeout.
156  void HandleTimeout() {
157    // If the timeout is marked completed, we should do nothing. This can
158    // occur if this function is called after Bus::OnRemoveTimeout().
159    if (is_completed)
160      return;
161    // Skip if monitoring is canceled.
162    if (!monitoring_is_active_)
163      return;
164
165    const bool success = dbus_timeout_handle(raw_timeout_);
166    CHECK(success) << "Unable to allocate memory";
167  }
168
169  DBusTimeout* raw_timeout_;
170  bool monitoring_is_active_;
171  bool is_completed;
172};
173
174}  // namespace
175
176Bus::Options::Options()
177  : bus_type(SESSION),
178    connection_type(PRIVATE) {
179}
180
181Bus::Options::~Options() {
182}
183
184Bus::Bus(const Options& options)
185    : bus_type_(options.bus_type),
186      connection_type_(options.connection_type),
187      dbus_task_runner_(options.dbus_task_runner),
188      on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
189                   base::WaitableEvent::InitialState::NOT_SIGNALED),
190      connection_(NULL),
191      origin_thread_id_(base::PlatformThread::CurrentId()),
192      async_operations_set_up_(false),
193      shutdown_completed_(false),
194      num_pending_watches_(0),
195      num_pending_timeouts_(0),
196      address_(options.address) {
197  // This is safe to call multiple times.
198  dbus_threads_init_default();
199  // The origin message loop is unnecessary if the client uses synchronous
200  // functions only.
201  if (base::ThreadTaskRunnerHandle::IsSet())
202    origin_task_runner_ = base::ThreadTaskRunnerHandle::Get();
203}
204
205Bus::~Bus() {
206  DCHECK(!connection_);
207  DCHECK(owned_service_names_.empty());
208  DCHECK(match_rules_added_.empty());
209  DCHECK(filter_functions_added_.empty());
210  DCHECK(registered_object_paths_.empty());
211  DCHECK_EQ(0, num_pending_watches_);
212  // TODO(satorux): This check fails occasionally in browser_tests for tests
213  // that run very quickly. Perhaps something does not have time to clean up.
214  // Despite the check failing, the tests seem to run fine. crosbug.com/23416
215  // DCHECK_EQ(0, num_pending_timeouts_);
216}
217
218ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
219                                 const ObjectPath& object_path) {
220  return GetObjectProxyWithOptions(service_name, object_path,
221                                   ObjectProxy::DEFAULT_OPTIONS);
222}
223
224ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
225                                            const ObjectPath& object_path,
226                                            int options) {
227  AssertOnOriginThread();
228
229  // Check if we already have the requested object proxy.
230  const ObjectProxyTable::key_type key(service_name + object_path.value(),
231                                       options);
232  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
233  if (iter != object_proxy_table_.end()) {
234    return iter->second.get();
235  }
236
237  scoped_refptr<ObjectProxy> object_proxy =
238      new ObjectProxy(this, service_name, object_path, options);
239  object_proxy_table_[key] = object_proxy;
240
241  return object_proxy.get();
242}
243
244bool Bus::RemoveObjectProxy(const std::string& service_name,
245                            const ObjectPath& object_path,
246                            const base::Closure& callback) {
247  return RemoveObjectProxyWithOptions(service_name, object_path,
248                                      ObjectProxy::DEFAULT_OPTIONS,
249                                      callback);
250}
251
252bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
253                                       const ObjectPath& object_path,
254                                       int options,
255                                       const base::Closure& callback) {
256  AssertOnOriginThread();
257
258  // Check if we have the requested object proxy.
259  const ObjectProxyTable::key_type key(service_name + object_path.value(),
260                                       options);
261  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
262  if (iter != object_proxy_table_.end()) {
263    scoped_refptr<ObjectProxy> object_proxy = iter->second;
264    object_proxy_table_.erase(iter);
265    // Object is present. Remove it now and Detach on the DBus thread.
266    GetDBusTaskRunner()->PostTask(
267        FROM_HERE,
268        base::Bind(&Bus::RemoveObjectProxyInternal,
269                   this, object_proxy, callback));
270    return true;
271  }
272  return false;
273}
274
275void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
276                                    const base::Closure& callback) {
277  AssertOnDBusThread();
278
279  object_proxy.get()->Detach();
280
281  GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
282}
283
284ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
285  AssertOnOriginThread();
286
287  // Check if we already have the requested exported object.
288  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
289  if (iter != exported_object_table_.end()) {
290    return iter->second.get();
291  }
292
293  scoped_refptr<ExportedObject> exported_object =
294      new ExportedObject(this, object_path);
295  exported_object_table_[object_path] = exported_object;
296
297  return exported_object.get();
298}
299
300void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
301  AssertOnOriginThread();
302
303  // Remove the registered object from the table first, to allow a new
304  // GetExportedObject() call to return a new object, rather than this one.
305  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
306  if (iter == exported_object_table_.end())
307    return;
308
309  scoped_refptr<ExportedObject> exported_object = iter->second;
310  exported_object_table_.erase(iter);
311
312  // Post the task to perform the final unregistration to the D-Bus thread.
313  // Since the registration also happens on the D-Bus thread in
314  // TryRegisterObjectPath(), and the task runner we post to is a
315  // SequencedTaskRunner, there is a guarantee that this will happen before any
316  // future registration call.
317  GetDBusTaskRunner()->PostTask(
318      FROM_HERE,
319      base::Bind(&Bus::UnregisterExportedObjectInternal,
320                 this, exported_object));
321}
322
323void Bus::UnregisterExportedObjectInternal(
324    scoped_refptr<ExportedObject> exported_object) {
325  AssertOnDBusThread();
326
327  exported_object->Unregister();
328}
329
330ObjectManager* Bus::GetObjectManager(const std::string& service_name,
331                                     const ObjectPath& object_path) {
332  AssertOnOriginThread();
333
334  // Check if we already have the requested object manager.
335  const ObjectManagerTable::key_type key(service_name + object_path.value());
336  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
337  if (iter != object_manager_table_.end()) {
338    return iter->second.get();
339  }
340
341  scoped_refptr<ObjectManager> object_manager =
342      new ObjectManager(this, service_name, object_path);
343  object_manager_table_[key] = object_manager;
344
345  return object_manager.get();
346}
347
348bool Bus::RemoveObjectManager(const std::string& service_name,
349                              const ObjectPath& object_path,
350                              const base::Closure& callback) {
351  AssertOnOriginThread();
352  DCHECK(!callback.is_null());
353
354  const ObjectManagerTable::key_type key(service_name + object_path.value());
355  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
356  if (iter == object_manager_table_.end())
357    return false;
358
359  // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
360  scoped_refptr<ObjectManager> object_manager = iter->second;
361  object_manager_table_.erase(iter);
362
363  GetDBusTaskRunner()->PostTask(
364      FROM_HERE,
365      base::Bind(&Bus::RemoveObjectManagerInternal,
366                 this, object_manager, callback));
367
368  return true;
369}
370
371void Bus::RemoveObjectManagerInternal(
372      scoped_refptr<dbus::ObjectManager> object_manager,
373      const base::Closure& callback) {
374  AssertOnDBusThread();
375  DCHECK(object_manager.get());
376
377  object_manager->CleanUp();
378
379  // The ObjectManager has to be deleted on the origin thread since it was
380  // created there.
381  GetOriginTaskRunner()->PostTask(
382      FROM_HERE,
383      base::Bind(&Bus::RemoveObjectManagerInternalHelper,
384                 this, object_manager, callback));
385}
386
387void Bus::RemoveObjectManagerInternalHelper(
388      scoped_refptr<dbus::ObjectManager> object_manager,
389      const base::Closure& callback) {
390  AssertOnOriginThread();
391  DCHECK(object_manager.get());
392
393  // Release the object manager and run the callback.
394  object_manager = NULL;
395  callback.Run();
396}
397
398void Bus::GetManagedObjects() {
399  for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
400       iter != object_manager_table_.end(); ++iter) {
401    iter->second->GetManagedObjects();
402  }
403}
404
405bool Bus::Connect() {
406  // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
407  AssertOnDBusThread();
408
409  // Check if it's already initialized.
410  if (connection_)
411    return true;
412
413  ScopedDBusError error;
414  if (bus_type_ == CUSTOM_ADDRESS) {
415    if (connection_type_ == PRIVATE) {
416      connection_ = dbus_connection_open_private(address_.c_str(), error.get());
417    } else {
418      connection_ = dbus_connection_open(address_.c_str(), error.get());
419    }
420  } else {
421    const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
422    if (connection_type_ == PRIVATE) {
423      connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
424    } else {
425      connection_ = dbus_bus_get(dbus_bus_type, error.get());
426    }
427  }
428  if (!connection_) {
429    LOG(ERROR) << "Failed to connect to the bus: "
430               << (error.is_set() ? error.message() : "");
431    return false;
432  }
433
434  if (bus_type_ == CUSTOM_ADDRESS) {
435    // We should call dbus_bus_register here, otherwise unique name can not be
436    // acquired. According to dbus specification, it is responsible to call
437    // org.freedesktop.DBus.Hello method at the beging of bus connection to
438    // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
439    // called internally.
440    if (!dbus_bus_register(connection_, error.get())) {
441      LOG(ERROR) << "Failed to register the bus component: "
442                 << (error.is_set() ? error.message() : "");
443      return false;
444    }
445  }
446
447  return true;
448}
449
450void Bus::ClosePrivateConnection() {
451  // dbus_connection_close is blocking call.
452  AssertOnDBusThread();
453  DCHECK_EQ(PRIVATE, connection_type_)
454      << "non-private connection should not be closed";
455  dbus_connection_close(connection_);
456}
457
458void Bus::ShutdownAndBlock() {
459  AssertOnDBusThread();
460
461  if (shutdown_completed_)
462    return;  // Already shutdowned, just return.
463
464  // Unregister the exported objects.
465  for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
466       iter != exported_object_table_.end(); ++iter) {
467    iter->second->Unregister();
468  }
469
470  // Release all service names.
471  for (std::set<std::string>::iterator iter = owned_service_names_.begin();
472       iter != owned_service_names_.end();) {
473    // This is a bit tricky but we should increment the iter here as
474    // ReleaseOwnership() may remove |service_name| from the set.
475    const std::string& service_name = *iter++;
476    ReleaseOwnership(service_name);
477  }
478  if (!owned_service_names_.empty()) {
479    LOG(ERROR) << "Failed to release all service names. # of services left: "
480               << owned_service_names_.size();
481  }
482
483  // Detach from the remote objects.
484  for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
485       iter != object_proxy_table_.end(); ++iter) {
486    iter->second->Detach();
487  }
488
489  // Clean up the object managers.
490  for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
491       iter != object_manager_table_.end(); ++iter) {
492    iter->second->CleanUp();
493  }
494
495  // Release object proxies and exported objects here. We should do this
496  // here rather than in the destructor to avoid memory leaks due to
497  // cyclic references.
498  object_proxy_table_.clear();
499  exported_object_table_.clear();
500
501  // Private connection should be closed.
502  if (connection_) {
503    // Remove Disconnected watcher.
504    ScopedDBusError error;
505
506    if (connection_type_ == PRIVATE)
507      ClosePrivateConnection();
508    // dbus_connection_close() won't unref.
509    dbus_connection_unref(connection_);
510  }
511
512  connection_ = NULL;
513  shutdown_completed_ = true;
514}
515
516void Bus::ShutdownOnDBusThreadAndBlock() {
517  AssertOnOriginThread();
518  DCHECK(dbus_task_runner_.get());
519
520  GetDBusTaskRunner()->PostTask(
521      FROM_HERE,
522      base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
523
524  // http://crbug.com/125222
525  base::ThreadRestrictions::ScopedAllowWait allow_wait;
526
527  // Wait until the shutdown is complete on the D-Bus thread.
528  // The shutdown should not hang, but set timeout just in case.
529  const int kTimeoutSecs = 3;
530  const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
531  const bool signaled = on_shutdown_.TimedWait(timeout);
532  LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
533}
534
535void Bus::RequestOwnership(const std::string& service_name,
536                           ServiceOwnershipOptions options,
537                           OnOwnershipCallback on_ownership_callback) {
538  AssertOnOriginThread();
539
540  GetDBusTaskRunner()->PostTask(
541      FROM_HERE,
542      base::Bind(&Bus::RequestOwnershipInternal,
543                 this, service_name, options, on_ownership_callback));
544}
545
546void Bus::RequestOwnershipInternal(const std::string& service_name,
547                                   ServiceOwnershipOptions options,
548                                   OnOwnershipCallback on_ownership_callback) {
549  AssertOnDBusThread();
550
551  bool success = Connect();
552  if (success)
553    success = RequestOwnershipAndBlock(service_name, options);
554
555  GetOriginTaskRunner()->PostTask(FROM_HERE,
556                                  base::Bind(on_ownership_callback,
557                                             service_name,
558                                             success));
559}
560
561bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
562                                   ServiceOwnershipOptions options) {
563  DCHECK(connection_);
564  // dbus_bus_request_name() is a blocking call.
565  AssertOnDBusThread();
566
567  // Check if we already own the service name.
568  if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
569    return true;
570  }
571
572  ScopedDBusError error;
573  const int result = dbus_bus_request_name(connection_,
574                                           service_name.c_str(),
575                                           options,
576                                           error.get());
577  if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
578    LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
579               << (error.is_set() ? error.message() : "");
580    return false;
581  }
582  owned_service_names_.insert(service_name);
583  return true;
584}
585
586bool Bus::ReleaseOwnership(const std::string& service_name) {
587  DCHECK(connection_);
588  // dbus_bus_request_name() is a blocking call.
589  AssertOnDBusThread();
590
591  // Check if we already own the service name.
592  std::set<std::string>::iterator found =
593      owned_service_names_.find(service_name);
594  if (found == owned_service_names_.end()) {
595    LOG(ERROR) << service_name << " is not owned by the bus";
596    return false;
597  }
598
599  ScopedDBusError error;
600  const int result = dbus_bus_release_name(connection_, service_name.c_str(),
601                                           error.get());
602  if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
603    owned_service_names_.erase(found);
604    return true;
605  } else {
606    LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
607               << (error.is_set() ? error.message() : "")
608               << ", result code: " << result;
609    return false;
610  }
611}
612
613bool Bus::SetUpAsyncOperations() {
614  DCHECK(connection_);
615  AssertOnDBusThread();
616
617  if (async_operations_set_up_)
618    return true;
619
620  // Process all the incoming data if any, so that OnDispatchStatus() will
621  // be called when the incoming data is ready.
622  ProcessAllIncomingDataIfAny();
623
624  bool success = dbus_connection_set_watch_functions(connection_,
625                                                     &Bus::OnAddWatchThunk,
626                                                     &Bus::OnRemoveWatchThunk,
627                                                     &Bus::OnToggleWatchThunk,
628                                                     this,
629                                                     NULL);
630  CHECK(success) << "Unable to allocate memory";
631
632  success = dbus_connection_set_timeout_functions(connection_,
633                                                  &Bus::OnAddTimeoutThunk,
634                                                  &Bus::OnRemoveTimeoutThunk,
635                                                  &Bus::OnToggleTimeoutThunk,
636                                                  this,
637                                                  NULL);
638  CHECK(success) << "Unable to allocate memory";
639
640  dbus_connection_set_dispatch_status_function(
641      connection_,
642      &Bus::OnDispatchStatusChangedThunk,
643      this,
644      NULL);
645
646  async_operations_set_up_ = true;
647
648  return true;
649}
650
651DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
652                                        int timeout_ms,
653                                        DBusError* error) {
654  DCHECK(connection_);
655  AssertOnDBusThread();
656
657  return dbus_connection_send_with_reply_and_block(
658      connection_, request, timeout_ms, error);
659}
660
661void Bus::SendWithReply(DBusMessage* request,
662                        DBusPendingCall** pending_call,
663                        int timeout_ms) {
664  DCHECK(connection_);
665  AssertOnDBusThread();
666
667  const bool success = dbus_connection_send_with_reply(
668      connection_, request, pending_call, timeout_ms);
669  CHECK(success) << "Unable to allocate memory";
670}
671
672void Bus::Send(DBusMessage* request, uint32_t* serial) {
673  DCHECK(connection_);
674  AssertOnDBusThread();
675
676  const bool success = dbus_connection_send(connection_, request, serial);
677  CHECK(success) << "Unable to allocate memory";
678}
679
680void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
681                            void* user_data) {
682  DCHECK(connection_);
683  AssertOnDBusThread();
684
685  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
686      std::make_pair(filter_function, user_data);
687  if (filter_functions_added_.find(filter_data_pair) !=
688      filter_functions_added_.end()) {
689    VLOG(1) << "Filter function already exists: " << filter_function
690            << " with associated data: " << user_data;
691    return;
692  }
693
694  const bool success = dbus_connection_add_filter(
695      connection_, filter_function, user_data, NULL);
696  CHECK(success) << "Unable to allocate memory";
697  filter_functions_added_.insert(filter_data_pair);
698}
699
700void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
701                               void* user_data) {
702  DCHECK(connection_);
703  AssertOnDBusThread();
704
705  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
706      std::make_pair(filter_function, user_data);
707  if (filter_functions_added_.find(filter_data_pair) ==
708      filter_functions_added_.end()) {
709    VLOG(1) << "Requested to remove an unknown filter function: "
710            << filter_function
711            << " with associated data: " << user_data;
712    return;
713  }
714
715  dbus_connection_remove_filter(connection_, filter_function, user_data);
716  filter_functions_added_.erase(filter_data_pair);
717}
718
719void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
720  DCHECK(connection_);
721  AssertOnDBusThread();
722
723  std::map<std::string, int>::iterator iter =
724      match_rules_added_.find(match_rule);
725  if (iter != match_rules_added_.end()) {
726    // The already existing rule's counter is incremented.
727    iter->second++;
728
729    VLOG(1) << "Match rule already exists: " << match_rule;
730    return;
731  }
732
733  dbus_bus_add_match(connection_, match_rule.c_str(), error);
734  match_rules_added_[match_rule] = 1;
735}
736
737bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
738  DCHECK(connection_);
739  AssertOnDBusThread();
740
741  std::map<std::string, int>::iterator iter =
742      match_rules_added_.find(match_rule);
743  if (iter == match_rules_added_.end()) {
744    LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
745    return false;
746  }
747
748  // The rule's counter is decremented and the rule is deleted when reachs 0.
749  iter->second--;
750  if (iter->second == 0) {
751    dbus_bus_remove_match(connection_, match_rule.c_str(), error);
752    match_rules_added_.erase(match_rule);
753  }
754  return true;
755}
756
757bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
758                                const DBusObjectPathVTable* vtable,
759                                void* user_data,
760                                DBusError* error) {
761  DCHECK(connection_);
762  AssertOnDBusThread();
763
764  if (registered_object_paths_.find(object_path) !=
765      registered_object_paths_.end()) {
766    LOG(ERROR) << "Object path already registered: " << object_path.value();
767    return false;
768  }
769
770  const bool success = dbus_connection_try_register_object_path(
771      connection_,
772      object_path.value().c_str(),
773      vtable,
774      user_data,
775      error);
776  if (success)
777    registered_object_paths_.insert(object_path);
778  return success;
779}
780
781void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
782  DCHECK(connection_);
783  AssertOnDBusThread();
784
785  if (registered_object_paths_.find(object_path) ==
786      registered_object_paths_.end()) {
787    LOG(ERROR) << "Requested to unregister an unknown object path: "
788               << object_path.value();
789    return;
790  }
791
792  const bool success = dbus_connection_unregister_object_path(
793      connection_,
794      object_path.value().c_str());
795  CHECK(success) << "Unable to allocate memory";
796  registered_object_paths_.erase(object_path);
797}
798
799void Bus::ShutdownOnDBusThreadAndBlockInternal() {
800  AssertOnDBusThread();
801
802  ShutdownAndBlock();
803  on_shutdown_.Signal();
804}
805
806void Bus::ProcessAllIncomingDataIfAny() {
807  AssertOnDBusThread();
808
809  // As mentioned at the class comment in .h file, connection_ can be NULL.
810  if (!connection_)
811    return;
812
813  // It is safe and necessary to call dbus_connection_get_dispatch_status even
814  // if the connection is lost.
815  if (dbus_connection_get_dispatch_status(connection_) ==
816      DBUS_DISPATCH_DATA_REMAINS) {
817    while (dbus_connection_dispatch(connection_) ==
818           DBUS_DISPATCH_DATA_REMAINS) {
819    }
820  }
821}
822
823base::TaskRunner* Bus::GetDBusTaskRunner() {
824  if (dbus_task_runner_.get())
825    return dbus_task_runner_.get();
826  else
827    return GetOriginTaskRunner();
828}
829
830base::TaskRunner* Bus::GetOriginTaskRunner() {
831  DCHECK(origin_task_runner_.get());
832  return origin_task_runner_.get();
833}
834
835bool Bus::HasDBusThread() {
836  return dbus_task_runner_.get() != NULL;
837}
838
839void Bus::AssertOnOriginThread() {
840  DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
841}
842
843void Bus::AssertOnDBusThread() {
844  base::ThreadRestrictions::AssertIOAllowed();
845
846  if (dbus_task_runner_.get()) {
847    DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
848  } else {
849    AssertOnOriginThread();
850  }
851}
852
853std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
854                                         GetServiceOwnerOption options) {
855  AssertOnDBusThread();
856
857  MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
858  MessageWriter writer(&get_name_owner_call);
859  writer.AppendString(service_name);
860  VLOG(1) << "Method call: " << get_name_owner_call.ToString();
861
862  const ObjectPath obj_path("/org/freedesktop/DBus");
863  if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
864      !get_name_owner_call.SetPath(obj_path)) {
865    if (options == REPORT_ERRORS)
866      LOG(ERROR) << "Failed to get name owner.";
867    return "";
868  }
869
870  ScopedDBusError error;
871  DBusMessage* response_message =
872      SendWithReplyAndBlock(get_name_owner_call.raw_message(),
873                            ObjectProxy::TIMEOUT_USE_DEFAULT,
874                            error.get());
875  if (!response_message) {
876    if (options == REPORT_ERRORS) {
877      LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
878                 << error.message();
879    }
880    return "";
881  }
882
883  std::unique_ptr<Response> response(
884      Response::FromRawMessage(response_message));
885  MessageReader reader(response.get());
886
887  std::string service_owner;
888  if (!reader.PopString(&service_owner))
889    service_owner.clear();
890  return service_owner;
891}
892
893void Bus::GetServiceOwner(const std::string& service_name,
894                          const GetServiceOwnerCallback& callback) {
895  AssertOnOriginThread();
896
897  GetDBusTaskRunner()->PostTask(
898      FROM_HERE,
899      base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
900}
901
902void Bus::GetServiceOwnerInternal(const std::string& service_name,
903                                  const GetServiceOwnerCallback& callback) {
904  AssertOnDBusThread();
905
906  std::string service_owner;
907  if (Connect())
908    service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
909  GetOriginTaskRunner()->PostTask(FROM_HERE,
910                                  base::Bind(callback, service_owner));
911}
912
913void Bus::ListenForServiceOwnerChange(
914    const std::string& service_name,
915    const GetServiceOwnerCallback& callback) {
916  AssertOnOriginThread();
917  DCHECK(!service_name.empty());
918  DCHECK(!callback.is_null());
919
920  GetDBusTaskRunner()->PostTask(
921      FROM_HERE,
922      base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
923                 this, service_name, callback));
924}
925
926void Bus::ListenForServiceOwnerChangeInternal(
927    const std::string& service_name,
928    const GetServiceOwnerCallback& callback) {
929  AssertOnDBusThread();
930  DCHECK(!service_name.empty());
931  DCHECK(!callback.is_null());
932
933  if (!Connect() || !SetUpAsyncOperations())
934    return;
935
936  if (service_owner_changed_listener_map_.empty())
937    AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
938
939  ServiceOwnerChangedListenerMap::iterator it =
940      service_owner_changed_listener_map_.find(service_name);
941  if (it == service_owner_changed_listener_map_.end()) {
942    // Add a match rule for the new service name.
943    const std::string name_owner_changed_match_rule =
944        base::StringPrintf(kServiceNameOwnerChangeMatchRule,
945                           service_name.c_str());
946    ScopedDBusError error;
947    AddMatch(name_owner_changed_match_rule, error.get());
948    if (error.is_set()) {
949      LOG(ERROR) << "Failed to add match rule for " << service_name
950                 << ". Got " << error.name() << ": " << error.message();
951      return;
952    }
953
954    service_owner_changed_listener_map_[service_name].push_back(callback);
955    return;
956  }
957
958  // Check if the callback has already been added.
959  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
960  for (size_t i = 0; i < callbacks.size(); ++i) {
961    if (callbacks[i].Equals(callback))
962      return;
963  }
964  callbacks.push_back(callback);
965}
966
967void Bus::UnlistenForServiceOwnerChange(
968    const std::string& service_name,
969    const GetServiceOwnerCallback& callback) {
970  AssertOnOriginThread();
971  DCHECK(!service_name.empty());
972  DCHECK(!callback.is_null());
973
974  GetDBusTaskRunner()->PostTask(
975      FROM_HERE,
976      base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
977                 this, service_name, callback));
978}
979
980void Bus::UnlistenForServiceOwnerChangeInternal(
981    const std::string& service_name,
982    const GetServiceOwnerCallback& callback) {
983  AssertOnDBusThread();
984  DCHECK(!service_name.empty());
985  DCHECK(!callback.is_null());
986
987  ServiceOwnerChangedListenerMap::iterator it =
988      service_owner_changed_listener_map_.find(service_name);
989  if (it == service_owner_changed_listener_map_.end())
990    return;
991
992  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
993  for (size_t i = 0; i < callbacks.size(); ++i) {
994    if (callbacks[i].Equals(callback)) {
995      callbacks.erase(callbacks.begin() + i);
996      break;  // There can be only one.
997    }
998  }
999  if (!callbacks.empty())
1000    return;
1001
1002  // Last callback for |service_name| has been removed, remove match rule.
1003  const std::string name_owner_changed_match_rule =
1004      base::StringPrintf(kServiceNameOwnerChangeMatchRule,
1005                         service_name.c_str());
1006  ScopedDBusError error;
1007  RemoveMatch(name_owner_changed_match_rule, error.get());
1008  // And remove |service_owner_changed_listener_map_| entry.
1009  service_owner_changed_listener_map_.erase(it);
1010
1011  if (service_owner_changed_listener_map_.empty())
1012    RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
1013}
1014
1015std::string Bus::GetConnectionName() {
1016  if (!connection_)
1017    return "";
1018  return dbus_bus_get_unique_name(connection_);
1019}
1020
1021dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
1022  AssertOnDBusThread();
1023
1024  // watch will be deleted when raw_watch is removed in OnRemoveWatch().
1025  Watch* watch = new Watch(raw_watch);
1026  if (watch->IsReadyToBeWatched()) {
1027    watch->StartWatching();
1028  }
1029  ++num_pending_watches_;
1030  return true;
1031}
1032
1033void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
1034  AssertOnDBusThread();
1035
1036  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1037  delete watch;
1038  --num_pending_watches_;
1039}
1040
1041void Bus::OnToggleWatch(DBusWatch* raw_watch) {
1042  AssertOnDBusThread();
1043
1044  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1045  if (watch->IsReadyToBeWatched()) {
1046    watch->StartWatching();
1047  } else {
1048    // It's safe to call this if StartWatching() wasn't called, per
1049    // message_pump_libevent.h.
1050    watch->StopWatching();
1051  }
1052}
1053
1054dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
1055  AssertOnDBusThread();
1056
1057  // timeout will be deleted when raw_timeout is removed in
1058  // OnRemoveTimeoutThunk().
1059  Timeout* timeout = new Timeout(raw_timeout);
1060  if (timeout->IsReadyToBeMonitored()) {
1061    timeout->StartMonitoring(this);
1062  }
1063  ++num_pending_timeouts_;
1064  return true;
1065}
1066
1067void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
1068  AssertOnDBusThread();
1069
1070  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1071  timeout->Complete();
1072  --num_pending_timeouts_;
1073}
1074
1075void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
1076  AssertOnDBusThread();
1077
1078  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1079  if (timeout->IsReadyToBeMonitored()) {
1080    timeout->StartMonitoring(this);
1081  } else {
1082    timeout->StopMonitoring();
1083  }
1084}
1085
1086void Bus::OnDispatchStatusChanged(DBusConnection* connection,
1087                                  DBusDispatchStatus /*status*/) {
1088  DCHECK_EQ(connection, connection_);
1089  AssertOnDBusThread();
1090
1091  // We cannot call ProcessAllIncomingDataIfAny() here, as calling
1092  // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
1093  // prohibited by the D-Bus library. Hence, we post a task here instead.
1094  // See comments for dbus_connection_set_dispatch_status_function().
1095  GetDBusTaskRunner()->PostTask(FROM_HERE,
1096                                base::Bind(&Bus::ProcessAllIncomingDataIfAny,
1097                                           this));
1098}
1099
1100void Bus::OnServiceOwnerChanged(DBusMessage* message) {
1101  DCHECK(message);
1102  AssertOnDBusThread();
1103
1104  // |message| will be unrefed on exit of the function. Increment the
1105  // reference so we can use it in Signal::FromRawMessage() below.
1106  dbus_message_ref(message);
1107  std::unique_ptr<Signal> signal(Signal::FromRawMessage(message));
1108
1109  // Confirm the validity of the NameOwnerChanged signal.
1110  if (signal->GetMember() != kNameOwnerChangedSignal ||
1111      signal->GetInterface() != DBUS_INTERFACE_DBUS ||
1112      signal->GetSender() != DBUS_SERVICE_DBUS) {
1113    return;
1114  }
1115
1116  MessageReader reader(signal.get());
1117  std::string service_name;
1118  std::string old_owner;
1119  std::string new_owner;
1120  if (!reader.PopString(&service_name) ||
1121      !reader.PopString(&old_owner) ||
1122      !reader.PopString(&new_owner)) {
1123    return;
1124  }
1125
1126  ServiceOwnerChangedListenerMap::const_iterator it =
1127      service_owner_changed_listener_map_.find(service_name);
1128  if (it == service_owner_changed_listener_map_.end())
1129    return;
1130
1131  const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1132  for (size_t i = 0; i < callbacks.size(); ++i) {
1133    GetOriginTaskRunner()->PostTask(FROM_HERE,
1134                                    base::Bind(callbacks[i], new_owner));
1135  }
1136}
1137
1138// static
1139dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
1140  Bus* self = static_cast<Bus*>(data);
1141  return self->OnAddWatch(raw_watch);
1142}
1143
1144// static
1145void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
1146  Bus* self = static_cast<Bus*>(data);
1147  self->OnRemoveWatch(raw_watch);
1148}
1149
1150// static
1151void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
1152  Bus* self = static_cast<Bus*>(data);
1153  self->OnToggleWatch(raw_watch);
1154}
1155
1156// static
1157dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1158  Bus* self = static_cast<Bus*>(data);
1159  return self->OnAddTimeout(raw_timeout);
1160}
1161
1162// static
1163void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1164  Bus* self = static_cast<Bus*>(data);
1165  self->OnRemoveTimeout(raw_timeout);
1166}
1167
1168// static
1169void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1170  Bus* self = static_cast<Bus*>(data);
1171  self->OnToggleTimeout(raw_timeout);
1172}
1173
1174// static
1175void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
1176                                       DBusDispatchStatus status,
1177                                       void* data) {
1178  Bus* self = static_cast<Bus*>(data);
1179  self->OnDispatchStatusChanged(connection, status);
1180}
1181
1182// static
1183DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
1184    DBusConnection* /*connection*/,
1185    DBusMessage* message,
1186    void* data) {
1187  if (dbus_message_is_signal(message,
1188                             DBUS_INTERFACE_DBUS,
1189                             kNameOwnerChangedSignal)) {
1190    Bus* self = static_cast<Bus*>(data);
1191    self->OnServiceOwnerChanged(message);
1192  }
1193  // Always return unhandled to let others, e.g. ObjectProxies, handle the same
1194  // signal.
1195  return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1196}
1197
1198}  // namespace dbus
1199