1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "dbus/bus.h"
6
7#include <stddef.h>
8
9#include "base/bind.h"
10#include "base/logging.h"
11#include "base/message_loop/message_loop.h"
12#include "base/stl_util.h"
13#include "base/strings/stringprintf.h"
14#include "base/threading/thread.h"
15#include "base/threading/thread_restrictions.h"
16#include "base/time/time.h"
17#include "dbus/exported_object.h"
18#include "dbus/message.h"
19#include "dbus/object_manager.h"
20#include "dbus/object_path.h"
21#include "dbus/object_proxy.h"
22#include "dbus/scoped_dbus_error.h"
23
24namespace dbus {
25
26namespace {
27
28// The NameOwnerChanged member in org.freedesktop.DBus
29const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
30
31// The match rule used to filter for changes to a given service name owner.
32const char kServiceNameOwnerChangeMatchRule[] =
33    "type='signal',interface='org.freedesktop.DBus',"
34    "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
35    "sender='org.freedesktop.DBus',arg0='%s'";
36
37// The class is used for watching the file descriptor used for D-Bus
38// communication.
39class Watch : public base::MessagePumpLibevent::Watcher {
40 public:
41  explicit Watch(DBusWatch* watch)
42      : raw_watch_(watch) {
43    dbus_watch_set_data(raw_watch_, this, NULL);
44  }
45
46  ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); }
47
48  // Returns true if the underlying file descriptor is ready to be watched.
49  bool IsReadyToBeWatched() {
50    return dbus_watch_get_enabled(raw_watch_);
51  }
52
53  // Starts watching the underlying file descriptor.
54  void StartWatching() {
55    const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
56    const int flags = dbus_watch_get_flags(raw_watch_);
57
58    base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
59    if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
60      mode = base::MessageLoopForIO::WATCH_READ_WRITE;
61    else if (flags & DBUS_WATCH_READABLE)
62      mode = base::MessageLoopForIO::WATCH_READ;
63    else if (flags & DBUS_WATCH_WRITABLE)
64      mode = base::MessageLoopForIO::WATCH_WRITE;
65    else
66      NOTREACHED();
67
68    const bool persistent = true;  // Watch persistently.
69    const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
70        file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
71    CHECK(success) << "Unable to allocate memory";
72  }
73
74  // Stops watching the underlying file descriptor.
75  void StopWatching() {
76    file_descriptor_watcher_.StopWatchingFileDescriptor();
77  }
78
79 private:
80  // Implement MessagePumpLibevent::Watcher.
81  void OnFileCanReadWithoutBlocking(int /* file_descriptor */) override {
82    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
83    CHECK(success) << "Unable to allocate memory";
84  }
85
86  // Implement MessagePumpLibevent::Watcher.
87  void OnFileCanWriteWithoutBlocking(int /* file_descriptor */) override {
88    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
89    CHECK(success) << "Unable to allocate memory";
90  }
91
92  DBusWatch* raw_watch_;
93  base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
94};
95
96// The class is used for monitoring the timeout used for D-Bus method
97// calls.
98//
99// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
100// the object is is alive when HandleTimeout() is called. It's unlikely
101// but it may be possible that HandleTimeout() is called after
102// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
103// Bus::OnRemoveTimeout().
104class Timeout : public base::RefCountedThreadSafe<Timeout> {
105 public:
106  explicit Timeout(DBusTimeout* timeout)
107      : raw_timeout_(timeout),
108        monitoring_is_active_(false),
109        is_completed(false) {
110    dbus_timeout_set_data(raw_timeout_, this, NULL);
111    AddRef();  // Balanced on Complete().
112  }
113
114  // Returns true if the timeout is ready to be monitored.
115  bool IsReadyToBeMonitored() {
116    return dbus_timeout_get_enabled(raw_timeout_);
117  }
118
119  // Starts monitoring the timeout.
120  void StartMonitoring(Bus* bus) {
121    bus->GetDBusTaskRunner()->PostDelayedTask(
122        FROM_HERE,
123        base::Bind(&Timeout::HandleTimeout, this),
124        GetInterval());
125    monitoring_is_active_ = true;
126  }
127
128  // Stops monitoring the timeout.
129  void StopMonitoring() {
130    // We cannot take back the delayed task we posted in
131    // StartMonitoring(), so we just mark the monitoring is inactive now.
132    monitoring_is_active_ = false;
133  }
134
135  // Returns the interval.
136  base::TimeDelta GetInterval() {
137    return base::TimeDelta::FromMilliseconds(
138        dbus_timeout_get_interval(raw_timeout_));
139  }
140
141  // Cleans up the raw_timeout and marks that timeout is completed.
142  // See the class comment above for why we are doing this.
143  void Complete() {
144    dbus_timeout_set_data(raw_timeout_, NULL, NULL);
145    is_completed = true;
146    Release();
147  }
148
149 private:
150  friend class base::RefCountedThreadSafe<Timeout>;
151  ~Timeout() {
152  }
153
154  // Handles the timeout.
155  void HandleTimeout() {
156    // If the timeout is marked completed, we should do nothing. This can
157    // occur if this function is called after Bus::OnRemoveTimeout().
158    if (is_completed)
159      return;
160    // Skip if monitoring is canceled.
161    if (!monitoring_is_active_)
162      return;
163
164    const bool success = dbus_timeout_handle(raw_timeout_);
165    CHECK(success) << "Unable to allocate memory";
166  }
167
168  DBusTimeout* raw_timeout_;
169  bool monitoring_is_active_;
170  bool is_completed;
171};
172
173}  // namespace
174
175Bus::Options::Options()
176  : bus_type(SESSION),
177    connection_type(PRIVATE) {
178}
179
180Bus::Options::~Options() {
181}
182
183Bus::Bus(const Options& options)
184    : bus_type_(options.bus_type),
185      connection_type_(options.connection_type),
186      dbus_task_runner_(options.dbus_task_runner),
187      on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
188      connection_(NULL),
189      origin_thread_id_(base::PlatformThread::CurrentId()),
190      async_operations_set_up_(false),
191      shutdown_completed_(false),
192      num_pending_watches_(0),
193      num_pending_timeouts_(0),
194      address_(options.address) {
195  // This is safe to call multiple times.
196  dbus_threads_init_default();
197  // The origin message loop is unnecessary if the client uses synchronous
198  // functions only.
199  if (base::MessageLoop::current())
200    origin_task_runner_ = base::MessageLoop::current()->task_runner();
201}
202
203Bus::~Bus() {
204  DCHECK(!connection_);
205  DCHECK(owned_service_names_.empty());
206  DCHECK(match_rules_added_.empty());
207  DCHECK(filter_functions_added_.empty());
208  DCHECK(registered_object_paths_.empty());
209  DCHECK_EQ(0, num_pending_watches_);
210  // TODO(satorux): This check fails occasionally in browser_tests for tests
211  // that run very quickly. Perhaps something does not have time to clean up.
212  // Despite the check failing, the tests seem to run fine. crosbug.com/23416
213  // DCHECK_EQ(0, num_pending_timeouts_);
214}
215
216ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
217                                 const ObjectPath& object_path) {
218  return GetObjectProxyWithOptions(service_name, object_path,
219                                   ObjectProxy::DEFAULT_OPTIONS);
220}
221
222ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
223                                            const ObjectPath& object_path,
224                                            int options) {
225  AssertOnOriginThread();
226
227  // Check if we already have the requested object proxy.
228  const ObjectProxyTable::key_type key(service_name + object_path.value(),
229                                       options);
230  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
231  if (iter != object_proxy_table_.end()) {
232    return iter->second.get();
233  }
234
235  scoped_refptr<ObjectProxy> object_proxy =
236      new ObjectProxy(this, service_name, object_path, options);
237  object_proxy_table_[key] = object_proxy;
238
239  return object_proxy.get();
240}
241
242bool Bus::RemoveObjectProxy(const std::string& service_name,
243                            const ObjectPath& object_path,
244                            const base::Closure& callback) {
245  return RemoveObjectProxyWithOptions(service_name, object_path,
246                                      ObjectProxy::DEFAULT_OPTIONS,
247                                      callback);
248}
249
250bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
251                                       const ObjectPath& object_path,
252                                       int options,
253                                       const base::Closure& callback) {
254  AssertOnOriginThread();
255
256  // Check if we have the requested object proxy.
257  const ObjectProxyTable::key_type key(service_name + object_path.value(),
258                                       options);
259  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
260  if (iter != object_proxy_table_.end()) {
261    scoped_refptr<ObjectProxy> object_proxy = iter->second;
262    object_proxy_table_.erase(iter);
263    // Object is present. Remove it now and Detach on the DBus thread.
264    GetDBusTaskRunner()->PostTask(
265        FROM_HERE,
266        base::Bind(&Bus::RemoveObjectProxyInternal,
267                   this, object_proxy, callback));
268    return true;
269  }
270  return false;
271}
272
273void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
274                                    const base::Closure& callback) {
275  AssertOnDBusThread();
276
277  object_proxy.get()->Detach();
278
279  GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
280}
281
282ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
283  AssertOnOriginThread();
284
285  // Check if we already have the requested exported object.
286  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
287  if (iter != exported_object_table_.end()) {
288    return iter->second.get();
289  }
290
291  scoped_refptr<ExportedObject> exported_object =
292      new ExportedObject(this, object_path);
293  exported_object_table_[object_path] = exported_object;
294
295  return exported_object.get();
296}
297
298void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
299  AssertOnOriginThread();
300
301  // Remove the registered object from the table first, to allow a new
302  // GetExportedObject() call to return a new object, rather than this one.
303  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
304  if (iter == exported_object_table_.end())
305    return;
306
307  scoped_refptr<ExportedObject> exported_object = iter->second;
308  exported_object_table_.erase(iter);
309
310  // Post the task to perform the final unregistration to the D-Bus thread.
311  // Since the registration also happens on the D-Bus thread in
312  // TryRegisterObjectPath(), and the task runner we post to is a
313  // SequencedTaskRunner, there is a guarantee that this will happen before any
314  // future registration call.
315  GetDBusTaskRunner()->PostTask(
316      FROM_HERE,
317      base::Bind(&Bus::UnregisterExportedObjectInternal,
318                 this, exported_object));
319}
320
321void Bus::UnregisterExportedObjectInternal(
322    scoped_refptr<ExportedObject> exported_object) {
323  AssertOnDBusThread();
324
325  exported_object->Unregister();
326}
327
328ObjectManager* Bus::GetObjectManager(const std::string& service_name,
329                                     const ObjectPath& object_path) {
330  AssertOnOriginThread();
331
332  // Check if we already have the requested object manager.
333  const ObjectManagerTable::key_type key(service_name + object_path.value());
334  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
335  if (iter != object_manager_table_.end()) {
336    return iter->second.get();
337  }
338
339  scoped_refptr<ObjectManager> object_manager =
340      new ObjectManager(this, service_name, object_path);
341  object_manager_table_[key] = object_manager;
342
343  return object_manager.get();
344}
345
346bool Bus::RemoveObjectManager(const std::string& service_name,
347                              const ObjectPath& object_path,
348                              const base::Closure& callback) {
349  AssertOnOriginThread();
350  DCHECK(!callback.is_null());
351
352  const ObjectManagerTable::key_type key(service_name + object_path.value());
353  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
354  if (iter == object_manager_table_.end())
355    return false;
356
357  // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
358  scoped_refptr<ObjectManager> object_manager = iter->second;
359  object_manager_table_.erase(iter);
360
361  GetDBusTaskRunner()->PostTask(
362      FROM_HERE,
363      base::Bind(&Bus::RemoveObjectManagerInternal,
364                 this, object_manager, callback));
365
366  return true;
367}
368
369void Bus::RemoveObjectManagerInternal(
370      scoped_refptr<dbus::ObjectManager> object_manager,
371      const base::Closure& callback) {
372  AssertOnDBusThread();
373  DCHECK(object_manager.get());
374
375  object_manager->CleanUp();
376
377  // The ObjectManager has to be deleted on the origin thread since it was
378  // created there.
379  GetOriginTaskRunner()->PostTask(
380      FROM_HERE,
381      base::Bind(&Bus::RemoveObjectManagerInternalHelper,
382                 this, object_manager, callback));
383}
384
385void Bus::RemoveObjectManagerInternalHelper(
386      scoped_refptr<dbus::ObjectManager> object_manager,
387      const base::Closure& callback) {
388  AssertOnOriginThread();
389  DCHECK(object_manager.get());
390
391  // Release the object manager and run the callback.
392  object_manager = NULL;
393  callback.Run();
394}
395
396void Bus::GetManagedObjects() {
397  for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
398       iter != object_manager_table_.end(); ++iter) {
399    iter->second->GetManagedObjects();
400  }
401}
402
403bool Bus::Connect() {
404  // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
405  AssertOnDBusThread();
406
407  // Check if it's already initialized.
408  if (connection_)
409    return true;
410
411  ScopedDBusError error;
412  if (bus_type_ == CUSTOM_ADDRESS) {
413    if (connection_type_ == PRIVATE) {
414      connection_ = dbus_connection_open_private(address_.c_str(), error.get());
415    } else {
416      connection_ = dbus_connection_open(address_.c_str(), error.get());
417    }
418  } else {
419    const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
420    if (connection_type_ == PRIVATE) {
421      connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
422    } else {
423      connection_ = dbus_bus_get(dbus_bus_type, error.get());
424    }
425  }
426  if (!connection_) {
427    LOG(ERROR) << "Failed to connect to the bus: "
428               << (error.is_set() ? error.message() : "");
429    return false;
430  }
431
432  if (bus_type_ == CUSTOM_ADDRESS) {
433    // We should call dbus_bus_register here, otherwise unique name can not be
434    // acquired. According to dbus specification, it is responsible to call
435    // org.freedesktop.DBus.Hello method at the beging of bus connection to
436    // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
437    // called internally.
438    if (!dbus_bus_register(connection_, error.get())) {
439      LOG(ERROR) << "Failed to register the bus component: "
440                 << (error.is_set() ? error.message() : "");
441      return false;
442    }
443  }
444
445  return true;
446}
447
448void Bus::ClosePrivateConnection() {
449  // dbus_connection_close is blocking call.
450  AssertOnDBusThread();
451  DCHECK_EQ(PRIVATE, connection_type_)
452      << "non-private connection should not be closed";
453  dbus_connection_close(connection_);
454}
455
456void Bus::ShutdownAndBlock() {
457  AssertOnDBusThread();
458
459  if (shutdown_completed_)
460    return;  // Already shutdowned, just return.
461
462  // Unregister the exported objects.
463  for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
464       iter != exported_object_table_.end(); ++iter) {
465    iter->second->Unregister();
466  }
467
468  // Release all service names.
469  for (std::set<std::string>::iterator iter = owned_service_names_.begin();
470       iter != owned_service_names_.end();) {
471    // This is a bit tricky but we should increment the iter here as
472    // ReleaseOwnership() may remove |service_name| from the set.
473    const std::string& service_name = *iter++;
474    ReleaseOwnership(service_name);
475  }
476  if (!owned_service_names_.empty()) {
477    LOG(ERROR) << "Failed to release all service names. # of services left: "
478               << owned_service_names_.size();
479  }
480
481  // Detach from the remote objects.
482  for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
483       iter != object_proxy_table_.end(); ++iter) {
484    iter->second->Detach();
485  }
486
487  // Clean up the object managers.
488  for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
489       iter != object_manager_table_.end(); ++iter) {
490    iter->second->CleanUp();
491  }
492
493  // Release object proxies and exported objects here. We should do this
494  // here rather than in the destructor to avoid memory leaks due to
495  // cyclic references.
496  object_proxy_table_.clear();
497  exported_object_table_.clear();
498
499  // Private connection should be closed.
500  if (connection_) {
501    // Remove Disconnected watcher.
502    ScopedDBusError error;
503
504    if (connection_type_ == PRIVATE)
505      ClosePrivateConnection();
506    // dbus_connection_close() won't unref.
507    dbus_connection_unref(connection_);
508  }
509
510  connection_ = NULL;
511  shutdown_completed_ = true;
512}
513
514void Bus::ShutdownOnDBusThreadAndBlock() {
515  AssertOnOriginThread();
516  DCHECK(dbus_task_runner_.get());
517
518  GetDBusTaskRunner()->PostTask(
519      FROM_HERE,
520      base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
521
522  // http://crbug.com/125222
523  base::ThreadRestrictions::ScopedAllowWait allow_wait;
524
525  // Wait until the shutdown is complete on the D-Bus thread.
526  // The shutdown should not hang, but set timeout just in case.
527  const int kTimeoutSecs = 3;
528  const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
529  const bool signaled = on_shutdown_.TimedWait(timeout);
530  LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
531}
532
533void Bus::RequestOwnership(const std::string& service_name,
534                           ServiceOwnershipOptions options,
535                           OnOwnershipCallback on_ownership_callback) {
536  AssertOnOriginThread();
537
538  GetDBusTaskRunner()->PostTask(
539      FROM_HERE,
540      base::Bind(&Bus::RequestOwnershipInternal,
541                 this, service_name, options, on_ownership_callback));
542}
543
544void Bus::RequestOwnershipInternal(const std::string& service_name,
545                                   ServiceOwnershipOptions options,
546                                   OnOwnershipCallback on_ownership_callback) {
547  AssertOnDBusThread();
548
549  bool success = Connect();
550  if (success)
551    success = RequestOwnershipAndBlock(service_name, options);
552
553  GetOriginTaskRunner()->PostTask(FROM_HERE,
554                                  base::Bind(on_ownership_callback,
555                                             service_name,
556                                             success));
557}
558
559bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
560                                   ServiceOwnershipOptions options) {
561  DCHECK(connection_);
562  // dbus_bus_request_name() is a blocking call.
563  AssertOnDBusThread();
564
565  // Check if we already own the service name.
566  if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
567    return true;
568  }
569
570  ScopedDBusError error;
571  const int result = dbus_bus_request_name(connection_,
572                                           service_name.c_str(),
573                                           options,
574                                           error.get());
575  if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
576    LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
577               << (error.is_set() ? error.message() : "");
578    return false;
579  }
580  owned_service_names_.insert(service_name);
581  return true;
582}
583
584bool Bus::ReleaseOwnership(const std::string& service_name) {
585  DCHECK(connection_);
586  // dbus_bus_request_name() is a blocking call.
587  AssertOnDBusThread();
588
589  // Check if we already own the service name.
590  std::set<std::string>::iterator found =
591      owned_service_names_.find(service_name);
592  if (found == owned_service_names_.end()) {
593    LOG(ERROR) << service_name << " is not owned by the bus";
594    return false;
595  }
596
597  ScopedDBusError error;
598  const int result = dbus_bus_release_name(connection_, service_name.c_str(),
599                                           error.get());
600  if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
601    owned_service_names_.erase(found);
602    return true;
603  } else {
604    LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
605               << (error.is_set() ? error.message() : "")
606               << ", result code: " << result;
607    return false;
608  }
609}
610
611bool Bus::SetUpAsyncOperations() {
612  DCHECK(connection_);
613  AssertOnDBusThread();
614
615  if (async_operations_set_up_)
616    return true;
617
618  // Process all the incoming data if any, so that OnDispatchStatus() will
619  // be called when the incoming data is ready.
620  ProcessAllIncomingDataIfAny();
621
622  bool success = dbus_connection_set_watch_functions(connection_,
623                                                     &Bus::OnAddWatchThunk,
624                                                     &Bus::OnRemoveWatchThunk,
625                                                     &Bus::OnToggleWatchThunk,
626                                                     this,
627                                                     NULL);
628  CHECK(success) << "Unable to allocate memory";
629
630  success = dbus_connection_set_timeout_functions(connection_,
631                                                  &Bus::OnAddTimeoutThunk,
632                                                  &Bus::OnRemoveTimeoutThunk,
633                                                  &Bus::OnToggleTimeoutThunk,
634                                                  this,
635                                                  NULL);
636  CHECK(success) << "Unable to allocate memory";
637
638  dbus_connection_set_dispatch_status_function(
639      connection_,
640      &Bus::OnDispatchStatusChangedThunk,
641      this,
642      NULL);
643
644  async_operations_set_up_ = true;
645
646  return true;
647}
648
649DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
650                                        int timeout_ms,
651                                        DBusError* error) {
652  DCHECK(connection_);
653  AssertOnDBusThread();
654
655  return dbus_connection_send_with_reply_and_block(
656      connection_, request, timeout_ms, error);
657}
658
659void Bus::SendWithReply(DBusMessage* request,
660                        DBusPendingCall** pending_call,
661                        int timeout_ms) {
662  DCHECK(connection_);
663  AssertOnDBusThread();
664
665  const bool success = dbus_connection_send_with_reply(
666      connection_, request, pending_call, timeout_ms);
667  CHECK(success) << "Unable to allocate memory";
668}
669
670void Bus::Send(DBusMessage* request, uint32_t* serial) {
671  DCHECK(connection_);
672  AssertOnDBusThread();
673
674  const bool success = dbus_connection_send(connection_, request, serial);
675  CHECK(success) << "Unable to allocate memory";
676}
677
678void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
679                            void* user_data) {
680  DCHECK(connection_);
681  AssertOnDBusThread();
682
683  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
684      std::make_pair(filter_function, user_data);
685  if (filter_functions_added_.find(filter_data_pair) !=
686      filter_functions_added_.end()) {
687    VLOG(1) << "Filter function already exists: " << filter_function
688            << " with associated data: " << user_data;
689    return;
690  }
691
692  const bool success = dbus_connection_add_filter(
693      connection_, filter_function, user_data, NULL);
694  CHECK(success) << "Unable to allocate memory";
695  filter_functions_added_.insert(filter_data_pair);
696}
697
698void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
699                               void* user_data) {
700  DCHECK(connection_);
701  AssertOnDBusThread();
702
703  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
704      std::make_pair(filter_function, user_data);
705  if (filter_functions_added_.find(filter_data_pair) ==
706      filter_functions_added_.end()) {
707    VLOG(1) << "Requested to remove an unknown filter function: "
708            << filter_function
709            << " with associated data: " << user_data;
710    return;
711  }
712
713  dbus_connection_remove_filter(connection_, filter_function, user_data);
714  filter_functions_added_.erase(filter_data_pair);
715}
716
717void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
718  DCHECK(connection_);
719  AssertOnDBusThread();
720
721  std::map<std::string, int>::iterator iter =
722      match_rules_added_.find(match_rule);
723  if (iter != match_rules_added_.end()) {
724    // The already existing rule's counter is incremented.
725    iter->second++;
726
727    VLOG(1) << "Match rule already exists: " << match_rule;
728    return;
729  }
730
731  dbus_bus_add_match(connection_, match_rule.c_str(), error);
732  match_rules_added_[match_rule] = 1;
733}
734
735bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
736  DCHECK(connection_);
737  AssertOnDBusThread();
738
739  std::map<std::string, int>::iterator iter =
740      match_rules_added_.find(match_rule);
741  if (iter == match_rules_added_.end()) {
742    LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
743    return false;
744  }
745
746  // The rule's counter is decremented and the rule is deleted when reachs 0.
747  iter->second--;
748  if (iter->second == 0) {
749    dbus_bus_remove_match(connection_, match_rule.c_str(), error);
750    match_rules_added_.erase(match_rule);
751  }
752  return true;
753}
754
755bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
756                                const DBusObjectPathVTable* vtable,
757                                void* user_data,
758                                DBusError* error) {
759  DCHECK(connection_);
760  AssertOnDBusThread();
761
762  if (registered_object_paths_.find(object_path) !=
763      registered_object_paths_.end()) {
764    LOG(ERROR) << "Object path already registered: " << object_path.value();
765    return false;
766  }
767
768  const bool success = dbus_connection_try_register_object_path(
769      connection_,
770      object_path.value().c_str(),
771      vtable,
772      user_data,
773      error);
774  if (success)
775    registered_object_paths_.insert(object_path);
776  return success;
777}
778
779void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
780  DCHECK(connection_);
781  AssertOnDBusThread();
782
783  if (registered_object_paths_.find(object_path) ==
784      registered_object_paths_.end()) {
785    LOG(ERROR) << "Requested to unregister an unknown object path: "
786               << object_path.value();
787    return;
788  }
789
790  const bool success = dbus_connection_unregister_object_path(
791      connection_,
792      object_path.value().c_str());
793  CHECK(success) << "Unable to allocate memory";
794  registered_object_paths_.erase(object_path);
795}
796
797void Bus::ShutdownOnDBusThreadAndBlockInternal() {
798  AssertOnDBusThread();
799
800  ShutdownAndBlock();
801  on_shutdown_.Signal();
802}
803
804void Bus::ProcessAllIncomingDataIfAny() {
805  AssertOnDBusThread();
806
807  // As mentioned at the class comment in .h file, connection_ can be NULL.
808  if (!connection_)
809    return;
810
811  // It is safe and necessary to call dbus_connection_get_dispatch_status even
812  // if the connection is lost.
813  if (dbus_connection_get_dispatch_status(connection_) ==
814      DBUS_DISPATCH_DATA_REMAINS) {
815    while (dbus_connection_dispatch(connection_) ==
816           DBUS_DISPATCH_DATA_REMAINS) {
817    }
818  }
819}
820
821base::TaskRunner* Bus::GetDBusTaskRunner() {
822  if (dbus_task_runner_.get())
823    return dbus_task_runner_.get();
824  else
825    return GetOriginTaskRunner();
826}
827
828base::TaskRunner* Bus::GetOriginTaskRunner() {
829  DCHECK(origin_task_runner_.get());
830  return origin_task_runner_.get();
831}
832
833bool Bus::HasDBusThread() {
834  return dbus_task_runner_.get() != NULL;
835}
836
837void Bus::AssertOnOriginThread() {
838  DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
839}
840
841void Bus::AssertOnDBusThread() {
842  base::ThreadRestrictions::AssertIOAllowed();
843
844  if (dbus_task_runner_.get()) {
845    DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
846  } else {
847    AssertOnOriginThread();
848  }
849}
850
851std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
852                                         GetServiceOwnerOption options) {
853  AssertOnDBusThread();
854
855  MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
856  MessageWriter writer(&get_name_owner_call);
857  writer.AppendString(service_name);
858  VLOG(1) << "Method call: " << get_name_owner_call.ToString();
859
860  const ObjectPath obj_path("/org/freedesktop/DBus");
861  if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
862      !get_name_owner_call.SetPath(obj_path)) {
863    if (options == REPORT_ERRORS)
864      LOG(ERROR) << "Failed to get name owner.";
865    return "";
866  }
867
868  ScopedDBusError error;
869  DBusMessage* response_message =
870      SendWithReplyAndBlock(get_name_owner_call.raw_message(),
871                            ObjectProxy::TIMEOUT_USE_DEFAULT,
872                            error.get());
873  if (!response_message) {
874    if (options == REPORT_ERRORS) {
875      LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
876                 << error.message();
877    }
878    return "";
879  }
880
881  scoped_ptr<Response> response(Response::FromRawMessage(response_message));
882  MessageReader reader(response.get());
883
884  std::string service_owner;
885  if (!reader.PopString(&service_owner))
886    service_owner.clear();
887  return service_owner;
888}
889
890void Bus::GetServiceOwner(const std::string& service_name,
891                          const GetServiceOwnerCallback& callback) {
892  AssertOnOriginThread();
893
894  GetDBusTaskRunner()->PostTask(
895      FROM_HERE,
896      base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
897}
898
899void Bus::GetServiceOwnerInternal(const std::string& service_name,
900                                  const GetServiceOwnerCallback& callback) {
901  AssertOnDBusThread();
902
903  std::string service_owner;
904  if (Connect())
905    service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
906  GetOriginTaskRunner()->PostTask(FROM_HERE,
907                                  base::Bind(callback, service_owner));
908}
909
910void Bus::ListenForServiceOwnerChange(
911    const std::string& service_name,
912    const GetServiceOwnerCallback& callback) {
913  AssertOnOriginThread();
914  DCHECK(!service_name.empty());
915  DCHECK(!callback.is_null());
916
917  GetDBusTaskRunner()->PostTask(
918      FROM_HERE,
919      base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
920                 this, service_name, callback));
921}
922
923void Bus::ListenForServiceOwnerChangeInternal(
924    const std::string& service_name,
925    const GetServiceOwnerCallback& callback) {
926  AssertOnDBusThread();
927  DCHECK(!service_name.empty());
928  DCHECK(!callback.is_null());
929
930  if (!Connect() || !SetUpAsyncOperations())
931    return;
932
933  if (service_owner_changed_listener_map_.empty())
934    AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
935
936  ServiceOwnerChangedListenerMap::iterator it =
937      service_owner_changed_listener_map_.find(service_name);
938  if (it == service_owner_changed_listener_map_.end()) {
939    // Add a match rule for the new service name.
940    const std::string name_owner_changed_match_rule =
941        base::StringPrintf(kServiceNameOwnerChangeMatchRule,
942                           service_name.c_str());
943    ScopedDBusError error;
944    AddMatch(name_owner_changed_match_rule, error.get());
945    if (error.is_set()) {
946      LOG(ERROR) << "Failed to add match rule for " << service_name
947                 << ". Got " << error.name() << ": " << error.message();
948      return;
949    }
950
951    service_owner_changed_listener_map_[service_name].push_back(callback);
952    return;
953  }
954
955  // Check if the callback has already been added.
956  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
957  for (size_t i = 0; i < callbacks.size(); ++i) {
958    if (callbacks[i].Equals(callback))
959      return;
960  }
961  callbacks.push_back(callback);
962}
963
964void Bus::UnlistenForServiceOwnerChange(
965    const std::string& service_name,
966    const GetServiceOwnerCallback& callback) {
967  AssertOnOriginThread();
968  DCHECK(!service_name.empty());
969  DCHECK(!callback.is_null());
970
971  GetDBusTaskRunner()->PostTask(
972      FROM_HERE,
973      base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
974                 this, service_name, callback));
975}
976
977void Bus::UnlistenForServiceOwnerChangeInternal(
978    const std::string& service_name,
979    const GetServiceOwnerCallback& callback) {
980  AssertOnDBusThread();
981  DCHECK(!service_name.empty());
982  DCHECK(!callback.is_null());
983
984  ServiceOwnerChangedListenerMap::iterator it =
985      service_owner_changed_listener_map_.find(service_name);
986  if (it == service_owner_changed_listener_map_.end())
987    return;
988
989  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
990  for (size_t i = 0; i < callbacks.size(); ++i) {
991    if (callbacks[i].Equals(callback)) {
992      callbacks.erase(callbacks.begin() + i);
993      break;  // There can be only one.
994    }
995  }
996  if (!callbacks.empty())
997    return;
998
999  // Last callback for |service_name| has been removed, remove match rule.
1000  const std::string name_owner_changed_match_rule =
1001      base::StringPrintf(kServiceNameOwnerChangeMatchRule,
1002                         service_name.c_str());
1003  ScopedDBusError error;
1004  RemoveMatch(name_owner_changed_match_rule, error.get());
1005  // And remove |service_owner_changed_listener_map_| entry.
1006  service_owner_changed_listener_map_.erase(it);
1007
1008  if (service_owner_changed_listener_map_.empty())
1009    RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
1010}
1011
1012std::string Bus::GetConnectionName() {
1013  if (!connection_)
1014    return "";
1015  return dbus_bus_get_unique_name(connection_);
1016}
1017
1018dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
1019  AssertOnDBusThread();
1020
1021  // watch will be deleted when raw_watch is removed in OnRemoveWatch().
1022  Watch* watch = new Watch(raw_watch);
1023  if (watch->IsReadyToBeWatched()) {
1024    watch->StartWatching();
1025  }
1026  ++num_pending_watches_;
1027  return true;
1028}
1029
1030void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
1031  AssertOnDBusThread();
1032
1033  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1034  delete watch;
1035  --num_pending_watches_;
1036}
1037
1038void Bus::OnToggleWatch(DBusWatch* raw_watch) {
1039  AssertOnDBusThread();
1040
1041  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1042  if (watch->IsReadyToBeWatched()) {
1043    watch->StartWatching();
1044  } else {
1045    // It's safe to call this if StartWatching() wasn't called, per
1046    // message_pump_libevent.h.
1047    watch->StopWatching();
1048  }
1049}
1050
1051dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
1052  AssertOnDBusThread();
1053
1054  // timeout will be deleted when raw_timeout is removed in
1055  // OnRemoveTimeoutThunk().
1056  Timeout* timeout = new Timeout(raw_timeout);
1057  if (timeout->IsReadyToBeMonitored()) {
1058    timeout->StartMonitoring(this);
1059  }
1060  ++num_pending_timeouts_;
1061  return true;
1062}
1063
1064void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
1065  AssertOnDBusThread();
1066
1067  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1068  timeout->Complete();
1069  --num_pending_timeouts_;
1070}
1071
1072void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
1073  AssertOnDBusThread();
1074
1075  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1076  if (timeout->IsReadyToBeMonitored()) {
1077    timeout->StartMonitoring(this);
1078  } else {
1079    timeout->StopMonitoring();
1080  }
1081}
1082
1083void Bus::OnDispatchStatusChanged(DBusConnection* connection,
1084                                  DBusDispatchStatus /* status */) {
1085  DCHECK_EQ(connection, connection_);
1086  AssertOnDBusThread();
1087
1088  // We cannot call ProcessAllIncomingDataIfAny() here, as calling
1089  // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
1090  // prohibited by the D-Bus library. Hence, we post a task here instead.
1091  // See comments for dbus_connection_set_dispatch_status_function().
1092  GetDBusTaskRunner()->PostTask(FROM_HERE,
1093                                base::Bind(&Bus::ProcessAllIncomingDataIfAny,
1094                                           this));
1095}
1096
1097void Bus::OnServiceOwnerChanged(DBusMessage* message) {
1098  DCHECK(message);
1099  AssertOnDBusThread();
1100
1101  // |message| will be unrefed on exit of the function. Increment the
1102  // reference so we can use it in Signal::FromRawMessage() below.
1103  dbus_message_ref(message);
1104  scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
1105
1106  // Confirm the validity of the NameOwnerChanged signal.
1107  if (signal->GetMember() != kNameOwnerChangedSignal ||
1108      signal->GetInterface() != DBUS_INTERFACE_DBUS ||
1109      signal->GetSender() != DBUS_SERVICE_DBUS) {
1110    return;
1111  }
1112
1113  MessageReader reader(signal.get());
1114  std::string service_name;
1115  std::string old_owner;
1116  std::string new_owner;
1117  if (!reader.PopString(&service_name) ||
1118      !reader.PopString(&old_owner) ||
1119      !reader.PopString(&new_owner)) {
1120    return;
1121  }
1122
1123  ServiceOwnerChangedListenerMap::const_iterator it =
1124      service_owner_changed_listener_map_.find(service_name);
1125  if (it == service_owner_changed_listener_map_.end())
1126    return;
1127
1128  const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1129  for (size_t i = 0; i < callbacks.size(); ++i) {
1130    GetOriginTaskRunner()->PostTask(FROM_HERE,
1131                                    base::Bind(callbacks[i], new_owner));
1132  }
1133}
1134
1135// static
1136dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
1137  Bus* self = static_cast<Bus*>(data);
1138  return self->OnAddWatch(raw_watch);
1139}
1140
1141// static
1142void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
1143  Bus* self = static_cast<Bus*>(data);
1144  self->OnRemoveWatch(raw_watch);
1145}
1146
1147// static
1148void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
1149  Bus* self = static_cast<Bus*>(data);
1150  self->OnToggleWatch(raw_watch);
1151}
1152
1153// static
1154dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1155  Bus* self = static_cast<Bus*>(data);
1156  return self->OnAddTimeout(raw_timeout);
1157}
1158
1159// static
1160void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1161  Bus* self = static_cast<Bus*>(data);
1162  self->OnRemoveTimeout(raw_timeout);
1163}
1164
1165// static
1166void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1167  Bus* self = static_cast<Bus*>(data);
1168  self->OnToggleTimeout(raw_timeout);
1169}
1170
1171// static
1172void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
1173                                       DBusDispatchStatus status,
1174                                       void* data) {
1175  Bus* self = static_cast<Bus*>(data);
1176  self->OnDispatchStatusChanged(connection, status);
1177}
1178
1179// static
1180DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
1181    DBusConnection* /* connection */,
1182    DBusMessage* message,
1183    void* data) {
1184  if (dbus_message_is_signal(message,
1185                             DBUS_INTERFACE_DBUS,
1186                             kNameOwnerChangedSignal)) {
1187    Bus* self = static_cast<Bus*>(data);
1188    self->OnServiceOwnerChanged(message);
1189  }
1190  // Always return unhandled to let others, e.g. ObjectProxies, handle the same
1191  // signal.
1192  return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1193}
1194
1195}  // namespace dbus
1196