bus.cc revision eb525c5499e34cc9c4b825d6d9e75bb07cc06ace
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "dbus/bus.h"
6
7#include "base/bind.h"
8#include "base/logging.h"
9#include "base/message_loop.h"
10#include "base/message_loop/message_loop_proxy.h"
11#include "base/stl_util.h"
12#include "base/strings/stringprintf.h"
13#include "base/threading/thread.h"
14#include "base/threading/thread_restrictions.h"
15#include "base/time/time.h"
16#include "dbus/exported_object.h"
17#include "dbus/message.h"
18#include "dbus/object_manager.h"
19#include "dbus/object_path.h"
20#include "dbus/object_proxy.h"
21#include "dbus/scoped_dbus_error.h"
22
23namespace dbus {
24
25namespace {
26
27const char kDisconnectedSignal[] = "Disconnected";
28const char kDisconnectedMatchRule[] =
29    "type='signal', path='/org/freedesktop/DBus/Local',"
30    "interface='org.freedesktop.DBus.Local', member='Disconnected'";
31
32// The NameOwnerChanged member in org.freedesktop.DBus
33const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
34
35// The match rule used to filter for changes to a given service name owner.
36const char kServiceNameOwnerChangeMatchRule[] =
37    "type='signal',interface='org.freedesktop.DBus',"
38    "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
39    "sender='org.freedesktop.DBus',arg0='%s'";
40
41// The class is used for watching the file descriptor used for D-Bus
42// communication.
43class Watch : public base::MessagePumpLibevent::Watcher {
44 public:
45  explicit Watch(DBusWatch* watch)
46      : raw_watch_(watch) {
47    dbus_watch_set_data(raw_watch_, this, NULL);
48  }
49
50  virtual ~Watch() {
51    dbus_watch_set_data(raw_watch_, NULL, NULL);
52  }
53
54  // Returns true if the underlying file descriptor is ready to be watched.
55  bool IsReadyToBeWatched() {
56    return dbus_watch_get_enabled(raw_watch_);
57  }
58
59  // Starts watching the underlying file descriptor.
60  void StartWatching() {
61    const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
62    const int flags = dbus_watch_get_flags(raw_watch_);
63
64    base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
65    if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
66      mode = base::MessageLoopForIO::WATCH_READ_WRITE;
67    else if (flags & DBUS_WATCH_READABLE)
68      mode = base::MessageLoopForIO::WATCH_READ;
69    else if (flags & DBUS_WATCH_WRITABLE)
70      mode = base::MessageLoopForIO::WATCH_WRITE;
71    else
72      NOTREACHED();
73
74    const bool persistent = true;  // Watch persistently.
75    const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
76        file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
77    CHECK(success) << "Unable to allocate memory";
78  }
79
80  // Stops watching the underlying file descriptor.
81  void StopWatching() {
82    file_descriptor_watcher_.StopWatchingFileDescriptor();
83  }
84
85 private:
86  // Implement MessagePumpLibevent::Watcher.
87  virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
88    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
89    CHECK(success) << "Unable to allocate memory";
90  }
91
92  // Implement MessagePumpLibevent::Watcher.
93  virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
94    const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
95    CHECK(success) << "Unable to allocate memory";
96  }
97
98  DBusWatch* raw_watch_;
99  base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
100};
101
102// The class is used for monitoring the timeout used for D-Bus method
103// calls.
104//
105// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
106// the object is is alive when HandleTimeout() is called. It's unlikely
107// but it may be possible that HandleTimeout() is called after
108// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
109// Bus::OnRemoveTimeout().
110class Timeout : public base::RefCountedThreadSafe<Timeout> {
111 public:
112  explicit Timeout(DBusTimeout* timeout)
113      : raw_timeout_(timeout),
114        monitoring_is_active_(false),
115        is_completed(false) {
116    dbus_timeout_set_data(raw_timeout_, this, NULL);
117    AddRef();  // Balanced on Complete().
118  }
119
120  // Returns true if the timeout is ready to be monitored.
121  bool IsReadyToBeMonitored() {
122    return dbus_timeout_get_enabled(raw_timeout_);
123  }
124
125  // Starts monitoring the timeout.
126  void StartMonitoring(Bus* bus) {
127    bus->PostDelayedTaskToDBusThread(FROM_HERE,
128                                     base::Bind(&Timeout::HandleTimeout,
129                                                this),
130                                     GetInterval());
131    monitoring_is_active_ = true;
132  }
133
134  // Stops monitoring the timeout.
135  void StopMonitoring() {
136    // We cannot take back the delayed task we posted in
137    // StartMonitoring(), so we just mark the monitoring is inactive now.
138    monitoring_is_active_ = false;
139  }
140
141  // Returns the interval.
142  base::TimeDelta GetInterval() {
143    return base::TimeDelta::FromMilliseconds(
144        dbus_timeout_get_interval(raw_timeout_));
145  }
146
147  // Cleans up the raw_timeout and marks that timeout is completed.
148  // See the class comment above for why we are doing this.
149  void Complete() {
150    dbus_timeout_set_data(raw_timeout_, NULL, NULL);
151    is_completed = true;
152    Release();
153  }
154
155 private:
156  friend class base::RefCountedThreadSafe<Timeout>;
157  ~Timeout() {
158  }
159
160  // Handles the timeout.
161  void HandleTimeout() {
162    // If the timeout is marked completed, we should do nothing. This can
163    // occur if this function is called after Bus::OnRemoveTimeout().
164    if (is_completed)
165      return;
166    // Skip if monitoring is canceled.
167    if (!monitoring_is_active_)
168      return;
169
170    const bool success = dbus_timeout_handle(raw_timeout_);
171    CHECK(success) << "Unable to allocate memory";
172  }
173
174  DBusTimeout* raw_timeout_;
175  bool monitoring_is_active_;
176  bool is_completed;
177};
178
179}  // namespace
180
181Bus::Options::Options()
182  : bus_type(SESSION),
183    connection_type(PRIVATE) {
184}
185
186Bus::Options::~Options() {
187}
188
189Bus::Bus(const Options& options)
190    : bus_type_(options.bus_type),
191      connection_type_(options.connection_type),
192      dbus_task_runner_(options.dbus_task_runner),
193      on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
194      connection_(NULL),
195      origin_thread_id_(base::PlatformThread::CurrentId()),
196      async_operations_set_up_(false),
197      shutdown_completed_(false),
198      num_pending_watches_(0),
199      num_pending_timeouts_(0),
200      address_(options.address),
201      on_disconnected_closure_(options.disconnected_callback) {
202  // This is safe to call multiple times.
203  dbus_threads_init_default();
204  // The origin message loop is unnecessary if the client uses synchronous
205  // functions only.
206  if (base::MessageLoop::current())
207    origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
208}
209
210Bus::~Bus() {
211  DCHECK(!connection_);
212  DCHECK(owned_service_names_.empty());
213  DCHECK(match_rules_added_.empty());
214  DCHECK(filter_functions_added_.empty());
215  DCHECK(registered_object_paths_.empty());
216  DCHECK_EQ(0, num_pending_watches_);
217  // TODO(satorux): This check fails occasionally in browser_tests for tests
218  // that run very quickly. Perhaps something does not have time to clean up.
219  // Despite the check failing, the tests seem to run fine. crosbug.com/23416
220  // DCHECK_EQ(0, num_pending_timeouts_);
221}
222
223ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
224                                 const ObjectPath& object_path) {
225  return GetObjectProxyWithOptions(service_name, object_path,
226                                   ObjectProxy::DEFAULT_OPTIONS);
227}
228
229ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
230                                            const ObjectPath& object_path,
231                                            int options) {
232  AssertOnOriginThread();
233
234  // Check if we already have the requested object proxy.
235  const ObjectProxyTable::key_type key(service_name + object_path.value(),
236                                       options);
237  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
238  if (iter != object_proxy_table_.end()) {
239    return iter->second.get();
240  }
241
242  scoped_refptr<ObjectProxy> object_proxy =
243      new ObjectProxy(this, service_name, object_path, options);
244  object_proxy_table_[key] = object_proxy;
245
246  return object_proxy.get();
247}
248
249bool Bus::RemoveObjectProxy(const std::string& service_name,
250                            const ObjectPath& object_path,
251                            const base::Closure& callback) {
252  return RemoveObjectProxyWithOptions(service_name, object_path,
253                                      ObjectProxy::DEFAULT_OPTIONS,
254                                      callback);
255}
256
257bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
258                                       const ObjectPath& object_path,
259                                       int options,
260                                       const base::Closure& callback) {
261  AssertOnOriginThread();
262
263  // Check if we have the requested object proxy.
264  const ObjectProxyTable::key_type key(service_name + object_path.value(),
265                                       options);
266  ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
267  if (iter != object_proxy_table_.end()) {
268    // Object is present. Remove it now and Detach in the DBus thread.
269    PostTaskToDBusThread(FROM_HERE, base::Bind(
270        &Bus::RemoveObjectProxyInternal,
271        this, iter->second, callback));
272
273    object_proxy_table_.erase(iter);
274    return true;
275  }
276  return false;
277}
278
279void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
280                                    const base::Closure& callback) {
281  AssertOnDBusThread();
282
283  object_proxy.get()->Detach();
284
285  PostTaskToOriginThread(FROM_HERE, callback);
286}
287
288ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
289  AssertOnOriginThread();
290
291  // Check if we already have the requested exported object.
292  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
293  if (iter != exported_object_table_.end()) {
294    return iter->second.get();
295  }
296
297  scoped_refptr<ExportedObject> exported_object =
298      new ExportedObject(this, object_path);
299  exported_object_table_[object_path] = exported_object;
300
301  return exported_object.get();
302}
303
304void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
305  AssertOnOriginThread();
306
307  // Remove the registered object from the table first, to allow a new
308  // GetExportedObject() call to return a new object, rather than this one.
309  ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
310  if (iter == exported_object_table_.end())
311    return;
312
313  scoped_refptr<ExportedObject> exported_object = iter->second;
314  exported_object_table_.erase(iter);
315
316  // Post the task to perform the final unregistration to the D-Bus thread.
317  // Since the registration also happens on the D-Bus thread in
318  // TryRegisterObjectPath(), and the task runner we post to is a
319  // SequencedTaskRunner, there is a guarantee that this will happen before any
320  // future registration call.
321  PostTaskToDBusThread(FROM_HERE,
322                       base::Bind(&Bus::UnregisterExportedObjectInternal,
323                                  this, exported_object));
324}
325
326void Bus::UnregisterExportedObjectInternal(
327    scoped_refptr<ExportedObject> exported_object) {
328  AssertOnDBusThread();
329
330  exported_object->Unregister();
331}
332
333ObjectManager* Bus::GetObjectManager(const std::string& service_name,
334                                     const ObjectPath& object_path) {
335  AssertOnOriginThread();
336
337  // Check if we already have the requested object manager.
338  const ObjectManagerTable::key_type key(service_name + object_path.value());
339  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
340  if (iter != object_manager_table_.end()) {
341    return iter->second.get();
342  }
343
344  scoped_refptr<ObjectManager> object_manager =
345      new ObjectManager(this, service_name, object_path);
346  object_manager_table_[key] = object_manager;
347
348  return object_manager.get();
349}
350
351void Bus::RemoveObjectManager(const std::string& service_name,
352                              const ObjectPath& object_path) {
353  AssertOnOriginThread();
354
355  const ObjectManagerTable::key_type key(service_name + object_path.value());
356  ObjectManagerTable::iterator iter = object_manager_table_.find(key);
357  if (iter == object_manager_table_.end())
358    return;
359
360  scoped_refptr<ObjectManager> object_manager = iter->second;
361  object_manager_table_.erase(iter);
362}
363
364void Bus::GetManagedObjects() {
365  for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
366       iter != object_manager_table_.end(); ++iter) {
367    iter->second->GetManagedObjects();
368  }
369}
370
371bool Bus::Connect() {
372  // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
373  AssertOnDBusThread();
374
375  // Check if it's already initialized.
376  if (connection_)
377    return true;
378
379  ScopedDBusError error;
380  if (bus_type_ == CUSTOM_ADDRESS) {
381    if (connection_type_ == PRIVATE) {
382      connection_ = dbus_connection_open_private(address_.c_str(), error.get());
383    } else {
384      connection_ = dbus_connection_open(address_.c_str(), error.get());
385    }
386  } else {
387    const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
388    if (connection_type_ == PRIVATE) {
389      connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
390    } else {
391      connection_ = dbus_bus_get(dbus_bus_type, error.get());
392    }
393  }
394  if (!connection_) {
395    LOG(ERROR) << "Failed to connect to the bus: "
396               << (error.is_set() ? error.message() : "");
397    return false;
398  }
399
400  if (bus_type_ == CUSTOM_ADDRESS) {
401    // We should call dbus_bus_register here, otherwise unique name can not be
402    // acquired. According to dbus specification, it is responsible to call
403    // org.freedesktop.DBus.Hello method at the beging of bus connection to
404    // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
405    // called internally.
406    if (!dbus_bus_register(connection_, error.get())) {
407      LOG(ERROR) << "Failed to register the bus component: "
408                 << (error.is_set() ? error.message() : "");
409      return false;
410    }
411  }
412  // We shouldn't exit on the disconnected signal.
413  dbus_connection_set_exit_on_disconnect(connection_, false);
414
415  // Watch Disconnected signal.
416  AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
417  AddMatch(kDisconnectedMatchRule, error.get());
418
419  return true;
420}
421
422void Bus::ClosePrivateConnection() {
423  // dbus_connection_close is blocking call.
424  AssertOnDBusThread();
425  DCHECK_EQ(PRIVATE, connection_type_)
426      << "non-private connection should not be closed";
427  dbus_connection_close(connection_);
428}
429
430void Bus::ShutdownAndBlock() {
431  AssertOnDBusThread();
432
433  if (shutdown_completed_)
434    return;  // Already shutdowned, just return.
435
436  // Unregister the exported objects.
437  for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
438       iter != exported_object_table_.end(); ++iter) {
439    iter->second->Unregister();
440  }
441
442  // Release all service names.
443  for (std::set<std::string>::iterator iter = owned_service_names_.begin();
444       iter != owned_service_names_.end();) {
445    // This is a bit tricky but we should increment the iter here as
446    // ReleaseOwnership() may remove |service_name| from the set.
447    const std::string& service_name = *iter++;
448    ReleaseOwnership(service_name);
449  }
450  if (!owned_service_names_.empty()) {
451    LOG(ERROR) << "Failed to release all service names. # of services left: "
452               << owned_service_names_.size();
453  }
454
455  // Detach from the remote objects.
456  for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
457       iter != object_proxy_table_.end(); ++iter) {
458    iter->second->Detach();
459  }
460
461  // Release object proxies and exported objects here. We should do this
462  // here rather than in the destructor to avoid memory leaks due to
463  // cyclic references.
464  object_proxy_table_.clear();
465  exported_object_table_.clear();
466
467  // Private connection should be closed.
468  if (connection_) {
469    // Remove Disconnected watcher.
470    ScopedDBusError error;
471    RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
472    RemoveMatch(kDisconnectedMatchRule, error.get());
473
474    if (connection_type_ == PRIVATE)
475      ClosePrivateConnection();
476    // dbus_connection_close() won't unref.
477    dbus_connection_unref(connection_);
478  }
479
480  connection_ = NULL;
481  shutdown_completed_ = true;
482}
483
484void Bus::ShutdownOnDBusThreadAndBlock() {
485  AssertOnOriginThread();
486  DCHECK(dbus_task_runner_.get());
487
488  PostTaskToDBusThread(FROM_HERE, base::Bind(
489      &Bus::ShutdownOnDBusThreadAndBlockInternal,
490      this));
491
492  // http://crbug.com/125222
493  base::ThreadRestrictions::ScopedAllowWait allow_wait;
494
495  // Wait until the shutdown is complete on the D-Bus thread.
496  // The shutdown should not hang, but set timeout just in case.
497  const int kTimeoutSecs = 3;
498  const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
499  const bool signaled = on_shutdown_.TimedWait(timeout);
500  LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
501}
502
503void Bus::RequestOwnership(const std::string& service_name,
504                           OnOwnershipCallback on_ownership_callback) {
505  AssertOnOriginThread();
506
507  PostTaskToDBusThread(FROM_HERE, base::Bind(
508      &Bus::RequestOwnershipInternal,
509      this, service_name, on_ownership_callback));
510}
511
512void Bus::RequestOwnershipInternal(const std::string& service_name,
513                                   OnOwnershipCallback on_ownership_callback) {
514  AssertOnDBusThread();
515
516  bool success = Connect();
517  if (success)
518    success = RequestOwnershipAndBlock(service_name);
519
520  PostTaskToOriginThread(FROM_HERE,
521                         base::Bind(on_ownership_callback,
522                                    service_name,
523                                    success));
524}
525
526bool Bus::RequestOwnershipAndBlock(const std::string& service_name) {
527  DCHECK(connection_);
528  // dbus_bus_request_name() is a blocking call.
529  AssertOnDBusThread();
530
531  // Check if we already own the service name.
532  if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
533    return true;
534  }
535
536  ScopedDBusError error;
537  const int result = dbus_bus_request_name(connection_,
538                                           service_name.c_str(),
539                                           DBUS_NAME_FLAG_DO_NOT_QUEUE,
540                                           error.get());
541  if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
542    LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
543               << (error.is_set() ? error.message() : "");
544    return false;
545  }
546  owned_service_names_.insert(service_name);
547  return true;
548}
549
550bool Bus::ReleaseOwnership(const std::string& service_name) {
551  DCHECK(connection_);
552  // dbus_bus_request_name() is a blocking call.
553  AssertOnDBusThread();
554
555  // Check if we already own the service name.
556  std::set<std::string>::iterator found =
557      owned_service_names_.find(service_name);
558  if (found == owned_service_names_.end()) {
559    LOG(ERROR) << service_name << " is not owned by the bus";
560    return false;
561  }
562
563  ScopedDBusError error;
564  const int result = dbus_bus_release_name(connection_, service_name.c_str(),
565                                           error.get());
566  if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
567    owned_service_names_.erase(found);
568    return true;
569  } else {
570    LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
571               << (error.is_set() ? error.message() : "");
572    return false;
573  }
574}
575
576bool Bus::SetUpAsyncOperations() {
577  DCHECK(connection_);
578  AssertOnDBusThread();
579
580  if (async_operations_set_up_)
581    return true;
582
583  // Process all the incoming data if any, so that OnDispatchStatus() will
584  // be called when the incoming data is ready.
585  ProcessAllIncomingDataIfAny();
586
587  bool success = dbus_connection_set_watch_functions(connection_,
588                                                     &Bus::OnAddWatchThunk,
589                                                     &Bus::OnRemoveWatchThunk,
590                                                     &Bus::OnToggleWatchThunk,
591                                                     this,
592                                                     NULL);
593  CHECK(success) << "Unable to allocate memory";
594
595  success = dbus_connection_set_timeout_functions(connection_,
596                                                  &Bus::OnAddTimeoutThunk,
597                                                  &Bus::OnRemoveTimeoutThunk,
598                                                  &Bus::OnToggleTimeoutThunk,
599                                                  this,
600                                                  NULL);
601  CHECK(success) << "Unable to allocate memory";
602
603  dbus_connection_set_dispatch_status_function(
604      connection_,
605      &Bus::OnDispatchStatusChangedThunk,
606      this,
607      NULL);
608
609  async_operations_set_up_ = true;
610
611  return true;
612}
613
614DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
615                                        int timeout_ms,
616                                        DBusError* error) {
617  DCHECK(connection_);
618  AssertOnDBusThread();
619
620  return dbus_connection_send_with_reply_and_block(
621      connection_, request, timeout_ms, error);
622}
623
624void Bus::SendWithReply(DBusMessage* request,
625                        DBusPendingCall** pending_call,
626                        int timeout_ms) {
627  DCHECK(connection_);
628  AssertOnDBusThread();
629
630  const bool success = dbus_connection_send_with_reply(
631      connection_, request, pending_call, timeout_ms);
632  CHECK(success) << "Unable to allocate memory";
633}
634
635void Bus::Send(DBusMessage* request, uint32* serial) {
636  DCHECK(connection_);
637  AssertOnDBusThread();
638
639  const bool success = dbus_connection_send(connection_, request, serial);
640  CHECK(success) << "Unable to allocate memory";
641}
642
643bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
644                            void* user_data) {
645  DCHECK(connection_);
646  AssertOnDBusThread();
647
648  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
649      std::make_pair(filter_function, user_data);
650  if (filter_functions_added_.find(filter_data_pair) !=
651      filter_functions_added_.end()) {
652    VLOG(1) << "Filter function already exists: " << filter_function
653            << " with associated data: " << user_data;
654    return false;
655  }
656
657  const bool success = dbus_connection_add_filter(
658      connection_, filter_function, user_data, NULL);
659  CHECK(success) << "Unable to allocate memory";
660  filter_functions_added_.insert(filter_data_pair);
661  return true;
662}
663
664bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
665                               void* user_data) {
666  DCHECK(connection_);
667  AssertOnDBusThread();
668
669  std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
670      std::make_pair(filter_function, user_data);
671  if (filter_functions_added_.find(filter_data_pair) ==
672      filter_functions_added_.end()) {
673    VLOG(1) << "Requested to remove an unknown filter function: "
674            << filter_function
675            << " with associated data: " << user_data;
676    return false;
677  }
678
679  dbus_connection_remove_filter(connection_, filter_function, user_data);
680  filter_functions_added_.erase(filter_data_pair);
681  return true;
682}
683
684void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
685  DCHECK(connection_);
686  AssertOnDBusThread();
687
688  std::map<std::string, int>::iterator iter =
689      match_rules_added_.find(match_rule);
690  if (iter != match_rules_added_.end()) {
691    // The already existing rule's counter is incremented.
692    iter->second++;
693
694    VLOG(1) << "Match rule already exists: " << match_rule;
695    return;
696  }
697
698  dbus_bus_add_match(connection_, match_rule.c_str(), error);
699  match_rules_added_[match_rule] = 1;
700}
701
702bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
703  DCHECK(connection_);
704  AssertOnDBusThread();
705
706  std::map<std::string, int>::iterator iter =
707      match_rules_added_.find(match_rule);
708  if (iter == match_rules_added_.end()) {
709    LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
710    return false;
711  }
712
713  // The rule's counter is decremented and the rule is deleted when reachs 0.
714  iter->second--;
715  if (iter->second == 0) {
716    dbus_bus_remove_match(connection_, match_rule.c_str(), error);
717    match_rules_added_.erase(match_rule);
718  }
719  return true;
720}
721
722bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
723                                const DBusObjectPathVTable* vtable,
724                                void* user_data,
725                                DBusError* error) {
726  DCHECK(connection_);
727  AssertOnDBusThread();
728
729  if (registered_object_paths_.find(object_path) !=
730      registered_object_paths_.end()) {
731    LOG(ERROR) << "Object path already registered: " << object_path.value();
732    return false;
733  }
734
735  const bool success = dbus_connection_try_register_object_path(
736      connection_,
737      object_path.value().c_str(),
738      vtable,
739      user_data,
740      error);
741  if (success)
742    registered_object_paths_.insert(object_path);
743  return success;
744}
745
746void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
747  DCHECK(connection_);
748  AssertOnDBusThread();
749
750  if (registered_object_paths_.find(object_path) ==
751      registered_object_paths_.end()) {
752    LOG(ERROR) << "Requested to unregister an unknown object path: "
753               << object_path.value();
754    return;
755  }
756
757  const bool success = dbus_connection_unregister_object_path(
758      connection_,
759      object_path.value().c_str());
760  CHECK(success) << "Unable to allocate memory";
761  registered_object_paths_.erase(object_path);
762}
763
764void Bus::ShutdownOnDBusThreadAndBlockInternal() {
765  AssertOnDBusThread();
766
767  ShutdownAndBlock();
768  on_shutdown_.Signal();
769}
770
771void Bus::ProcessAllIncomingDataIfAny() {
772  AssertOnDBusThread();
773
774  // As mentioned at the class comment in .h file, connection_ can be NULL.
775  if (!connection_)
776    return;
777
778  // It is safe and necessary to call dbus_connection_get_dispatch_status even
779  // if the connection is lost. Otherwise we will miss "Disconnected" signal.
780  // (crbug.com/174431)
781  if (dbus_connection_get_dispatch_status(connection_) ==
782      DBUS_DISPATCH_DATA_REMAINS) {
783    while (dbus_connection_dispatch(connection_) ==
784           DBUS_DISPATCH_DATA_REMAINS) {
785    }
786  }
787}
788
789void Bus::PostTaskToDBusThreadAndReply(
790    const tracked_objects::Location& from_here,
791    const base::Closure& task,
792    const base::Closure& reply) {
793  AssertOnOriginThread();
794
795  if (dbus_task_runner_.get()) {
796    if (!dbus_task_runner_->PostTaskAndReply(from_here, task, reply)) {
797      LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
798    }
799  } else {
800    DCHECK(origin_task_runner_.get());
801    if (!origin_task_runner_->PostTaskAndReply(from_here, task, reply)) {
802      LOG(WARNING) << "Failed to post a task to the origin message loop";
803    }
804  }
805}
806
807void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
808                                 const base::Closure& task) {
809  DCHECK(origin_task_runner_.get());
810  if (!origin_task_runner_->PostTask(from_here, task)) {
811    LOG(WARNING) << "Failed to post a task to the origin message loop";
812  }
813}
814
815void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
816                               const base::Closure& task) {
817  if (dbus_task_runner_.get()) {
818    if (!dbus_task_runner_->PostTask(from_here, task)) {
819      LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
820    }
821  } else {
822    DCHECK(origin_task_runner_.get());
823    if (!origin_task_runner_->PostTask(from_here, task)) {
824      LOG(WARNING) << "Failed to post a task to the origin message loop";
825    }
826  }
827}
828
829void Bus::PostDelayedTaskToDBusThread(
830    const tracked_objects::Location& from_here,
831    const base::Closure& task,
832    base::TimeDelta delay) {
833  if (dbus_task_runner_.get()) {
834    if (!dbus_task_runner_->PostDelayedTask(
835            from_here, task, delay)) {
836      LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
837    }
838  } else {
839    DCHECK(origin_task_runner_.get());
840    if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) {
841      LOG(WARNING) << "Failed to post a task to the origin message loop";
842    }
843  }
844}
845
846bool Bus::HasDBusThread() {
847  return dbus_task_runner_.get() != NULL;
848}
849
850void Bus::AssertOnOriginThread() {
851  DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
852}
853
854void Bus::AssertOnDBusThread() {
855  base::ThreadRestrictions::AssertIOAllowed();
856
857  if (dbus_task_runner_.get()) {
858    DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
859  } else {
860    AssertOnOriginThread();
861  }
862}
863
864std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
865                                         GetServiceOwnerOption options) {
866  AssertOnDBusThread();
867
868  MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
869  MessageWriter writer(&get_name_owner_call);
870  writer.AppendString(service_name);
871  VLOG(1) << "Method call: " << get_name_owner_call.ToString();
872
873  const ObjectPath obj_path("/org/freedesktop/DBus");
874  if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
875      !get_name_owner_call.SetPath(obj_path)) {
876    if (options == REPORT_ERRORS)
877      LOG(ERROR) << "Failed to get name owner.";
878    return "";
879  }
880
881  ScopedDBusError error;
882  DBusMessage* response_message =
883      SendWithReplyAndBlock(get_name_owner_call.raw_message(),
884                            ObjectProxy::TIMEOUT_USE_DEFAULT,
885                            error.get());
886  if (!response_message) {
887    if (options == REPORT_ERRORS) {
888      LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
889                 << error.message();
890    }
891    return "";
892  }
893
894  scoped_ptr<Response> response(Response::FromRawMessage(response_message));
895  MessageReader reader(response.get());
896
897  std::string service_owner;
898  if (!reader.PopString(&service_owner))
899    service_owner.clear();
900  return service_owner;
901}
902
903void Bus::GetServiceOwner(const std::string& service_name,
904                          const GetServiceOwnerCallback& callback) {
905  AssertOnOriginThread();
906
907  PostTaskToDBusThread(
908      FROM_HERE,
909      base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
910}
911
912void Bus::GetServiceOwnerInternal(const std::string& service_name,
913                                  const GetServiceOwnerCallback& callback) {
914  AssertOnDBusThread();
915
916  std::string service_owner;
917  if (Connect())
918    service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
919  PostTaskToOriginThread(FROM_HERE, base::Bind(callback, service_owner));
920}
921
922void Bus::ListenForServiceOwnerChange(
923    const std::string& service_name,
924    const GetServiceOwnerCallback& callback) {
925  AssertOnOriginThread();
926  DCHECK(!service_name.empty());
927  DCHECK(!callback.is_null());
928
929  PostTaskToDBusThread(FROM_HERE,
930                       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
931                                  this, service_name, callback));
932}
933
934void Bus::ListenForServiceOwnerChangeInternal(
935    const std::string& service_name,
936    const GetServiceOwnerCallback& callback) {
937  AssertOnDBusThread();
938  DCHECK(!service_name.empty());
939  DCHECK(!callback.is_null());
940
941  if (!Connect() || !SetUpAsyncOperations())
942    return;
943
944  if (service_owner_changed_listener_map_.empty()) {
945    bool filter_added =
946        AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
947    DCHECK(filter_added);
948  }
949
950  ServiceOwnerChangedListenerMap::iterator it =
951      service_owner_changed_listener_map_.find(service_name);
952  if (it == service_owner_changed_listener_map_.end()) {
953    // Add a match rule for the new service name.
954    const std::string name_owner_changed_match_rule =
955        base::StringPrintf(kServiceNameOwnerChangeMatchRule,
956                           service_name.c_str());
957    ScopedDBusError error;
958    AddMatch(name_owner_changed_match_rule, error.get());
959    if (error.is_set()) {
960      LOG(ERROR) << "Failed to add match rule for " << service_name
961                 << ". Got " << error.name() << ": " << error.message();
962      return;
963    }
964
965    service_owner_changed_listener_map_[service_name].push_back(callback);
966    return;
967  }
968
969  // Check if the callback has already been added.
970  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
971  for (size_t i = 0; i < callbacks.size(); ++i) {
972    if (callbacks[i].Equals(callback))
973      return;
974  }
975  callbacks.push_back(callback);
976}
977
978void Bus::UnlistenForServiceOwnerChange(
979    const std::string& service_name,
980    const GetServiceOwnerCallback& callback) {
981  AssertOnOriginThread();
982  DCHECK(!service_name.empty());
983  DCHECK(!callback.is_null());
984
985  PostTaskToDBusThread(FROM_HERE,
986                       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
987                                  this, service_name, callback));
988}
989
990void Bus::UnlistenForServiceOwnerChangeInternal(
991    const std::string& service_name,
992    const GetServiceOwnerCallback& callback) {
993  AssertOnDBusThread();
994  DCHECK(!service_name.empty());
995  DCHECK(!callback.is_null());
996
997  ServiceOwnerChangedListenerMap::iterator it =
998      service_owner_changed_listener_map_.find(service_name);
999  if (it == service_owner_changed_listener_map_.end())
1000    return;
1001
1002  std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1003  for (size_t i = 0; i < callbacks.size(); ++i) {
1004    if (callbacks[i].Equals(callback)) {
1005      callbacks.erase(callbacks.begin() + i);
1006      break;  // There can be only one.
1007    }
1008  }
1009  if (!callbacks.empty())
1010    return;
1011
1012  // Last callback for |service_name| has been removed, remove match rule.
1013  const std::string name_owner_changed_match_rule =
1014      base::StringPrintf(kServiceNameOwnerChangeMatchRule,
1015                         service_name.c_str());
1016  ScopedDBusError error;
1017  RemoveMatch(name_owner_changed_match_rule, error.get());
1018  // And remove |service_owner_changed_listener_map_| entry.
1019  service_owner_changed_listener_map_.erase(it);
1020
1021  if (service_owner_changed_listener_map_.empty()) {
1022    bool filter_removed =
1023        RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
1024    DCHECK(filter_removed);
1025  }
1026}
1027
1028dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
1029  AssertOnDBusThread();
1030
1031  // watch will be deleted when raw_watch is removed in OnRemoveWatch().
1032  Watch* watch = new Watch(raw_watch);
1033  if (watch->IsReadyToBeWatched()) {
1034    watch->StartWatching();
1035  }
1036  ++num_pending_watches_;
1037  return true;
1038}
1039
1040void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
1041  AssertOnDBusThread();
1042
1043  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1044  delete watch;
1045  --num_pending_watches_;
1046}
1047
1048void Bus::OnToggleWatch(DBusWatch* raw_watch) {
1049  AssertOnDBusThread();
1050
1051  Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1052  if (watch->IsReadyToBeWatched()) {
1053    watch->StartWatching();
1054  } else {
1055    // It's safe to call this if StartWatching() wasn't called, per
1056    // message_pump_libevent.h.
1057    watch->StopWatching();
1058  }
1059}
1060
1061dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
1062  AssertOnDBusThread();
1063
1064  // timeout will be deleted when raw_timeout is removed in
1065  // OnRemoveTimeoutThunk().
1066  Timeout* timeout = new Timeout(raw_timeout);
1067  if (timeout->IsReadyToBeMonitored()) {
1068    timeout->StartMonitoring(this);
1069  }
1070  ++num_pending_timeouts_;
1071  return true;
1072}
1073
1074void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
1075  AssertOnDBusThread();
1076
1077  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1078  timeout->Complete();
1079  --num_pending_timeouts_;
1080}
1081
1082void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
1083  AssertOnDBusThread();
1084
1085  Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1086  if (timeout->IsReadyToBeMonitored()) {
1087    timeout->StartMonitoring(this);
1088  } else {
1089    timeout->StopMonitoring();
1090  }
1091}
1092
1093void Bus::OnDispatchStatusChanged(DBusConnection* connection,
1094                                  DBusDispatchStatus status) {
1095  DCHECK_EQ(connection, connection_);
1096  AssertOnDBusThread();
1097
1098  // We cannot call ProcessAllIncomingDataIfAny() here, as calling
1099  // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
1100  // prohibited by the D-Bus library. Hence, we post a task here instead.
1101  // See comments for dbus_connection_set_dispatch_status_function().
1102  PostTaskToDBusThread(FROM_HERE,
1103                       base::Bind(&Bus::ProcessAllIncomingDataIfAny,
1104                                  this));
1105}
1106
1107void Bus::OnConnectionDisconnected(DBusConnection* connection) {
1108  AssertOnDBusThread();
1109
1110  if (!on_disconnected_closure_.is_null())
1111    PostTaskToOriginThread(FROM_HERE, on_disconnected_closure_);
1112
1113  if (!connection)
1114    return;
1115  DCHECK(!dbus_connection_get_is_connected(connection));
1116
1117  ShutdownAndBlock();
1118}
1119
1120void Bus::OnServiceOwnerChanged(DBusMessage* message) {
1121  DCHECK(message);
1122  AssertOnDBusThread();
1123
1124  // |message| will be unrefed on exit of the function. Increment the
1125  // reference so we can use it in Signal::FromRawMessage() below.
1126  dbus_message_ref(message);
1127  scoped_ptr<Signal> signal(Signal::FromRawMessage(message));
1128
1129  // Confirm the validity of the NameOwnerChanged signal.
1130  if (signal->GetMember() != kNameOwnerChangedSignal ||
1131      signal->GetInterface() != DBUS_INTERFACE_DBUS ||
1132      signal->GetSender() != DBUS_SERVICE_DBUS) {
1133    return;
1134  }
1135
1136  MessageReader reader(signal.get());
1137  std::string service_name;
1138  std::string old_owner;
1139  std::string new_owner;
1140  if (!reader.PopString(&service_name) ||
1141      !reader.PopString(&old_owner) ||
1142      !reader.PopString(&new_owner)) {
1143    return;
1144  }
1145
1146  ServiceOwnerChangedListenerMap::const_iterator it =
1147      service_owner_changed_listener_map_.find(service_name);
1148  if (it == service_owner_changed_listener_map_.end())
1149    return;
1150
1151  const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1152  for (size_t i = 0; i < callbacks.size(); ++i) {
1153    PostTaskToOriginThread(FROM_HERE,
1154                           base::Bind(callbacks[i], new_owner));
1155  }
1156}
1157
1158// static
1159dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
1160  Bus* self = static_cast<Bus*>(data);
1161  return self->OnAddWatch(raw_watch);
1162}
1163
1164// static
1165void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
1166  Bus* self = static_cast<Bus*>(data);
1167  self->OnRemoveWatch(raw_watch);
1168}
1169
1170// static
1171void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
1172  Bus* self = static_cast<Bus*>(data);
1173  self->OnToggleWatch(raw_watch);
1174}
1175
1176// static
1177dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1178  Bus* self = static_cast<Bus*>(data);
1179  return self->OnAddTimeout(raw_timeout);
1180}
1181
1182// static
1183void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1184  Bus* self = static_cast<Bus*>(data);
1185  self->OnRemoveTimeout(raw_timeout);
1186}
1187
1188// static
1189void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1190  Bus* self = static_cast<Bus*>(data);
1191  self->OnToggleTimeout(raw_timeout);
1192}
1193
1194// static
1195void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
1196                                       DBusDispatchStatus status,
1197                                       void* data) {
1198  Bus* self = static_cast<Bus*>(data);
1199  self->OnDispatchStatusChanged(connection, status);
1200}
1201
1202// static
1203DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
1204    DBusConnection* connection,
1205    DBusMessage* message,
1206    void* data) {
1207  if (dbus_message_is_signal(message,
1208                             DBUS_INTERFACE_LOCAL,
1209                             kDisconnectedSignal)) {
1210    Bus* self = static_cast<Bus*>(data);
1211    self->OnConnectionDisconnected(connection);
1212    return DBUS_HANDLER_RESULT_HANDLED;
1213  }
1214  return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1215}
1216
1217// static
1218DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
1219    DBusConnection* connection,
1220    DBusMessage* message,
1221    void* data) {
1222  if (dbus_message_is_signal(message,
1223                             DBUS_INTERFACE_DBUS,
1224                             kNameOwnerChangedSignal)) {
1225    Bus* self = static_cast<Bus*>(data);
1226    self->OnServiceOwnerChanged(message);
1227  }
1228  // Always return unhandled to let others, e.g. ObjectProxies, handle the same
1229  // signal.
1230  return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1231}
1232
1233}  // namespace dbus
1234