1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/embedder/embedder.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/logging.h"
10#include "base/memory/scoped_ptr.h"
11#include "mojo/embedder/platform_support.h"
12#include "mojo/system/channel.h"
13#include "mojo/system/channel_endpoint.h"
14#include "mojo/system/core.h"
15#include "mojo/system/entrypoints.h"
16#include "mojo/system/message_in_transit.h"
17#include "mojo/system/message_pipe_dispatcher.h"
18#include "mojo/system/platform_handle_dispatcher.h"
19#include "mojo/system/raw_channel.h"
20
21namespace mojo {
22namespace embedder {
23
24// This is defined here (instead of a header file), since it's opaque to the
25// outside world. But we need to define it before our (internal-only) functions
26// that use it.
27struct ChannelInfo {
28  ChannelInfo() {}
29  ~ChannelInfo() {}
30
31  scoped_refptr<system::Channel> channel;
32
33  // May be null, in which case |DestroyChannelOnIOThread()| must be used (from
34  // the IO thread), instead of |DestroyChannel()|.
35  scoped_refptr<base::TaskRunner> io_thread_task_runner;
36};
37
38namespace {
39
40// Helper for |CreateChannel...()|. (Note: May return null for some failures.)
41scoped_refptr<system::Channel> MakeChannel(
42    system::Core* core,
43    ScopedPlatformHandle platform_handle,
44    scoped_refptr<system::ChannelEndpoint> channel_endpoint) {
45  DCHECK(platform_handle.is_valid());
46
47  // Create and initialize a |system::Channel|.
48  scoped_refptr<system::Channel> channel =
49      new system::Channel(core->platform_support());
50  if (!channel->Init(system::RawChannel::Create(platform_handle.Pass()))) {
51    // This is very unusual (e.g., maybe |platform_handle| was invalid or we
52    // reached some system resource limit).
53    LOG(ERROR) << "Channel::Init() failed";
54    // Return null, since |Shutdown()| shouldn't be called in this case.
55    return scoped_refptr<system::Channel>();
56  }
57  // Once |Init()| has succeeded, we have to return |channel| (since
58  // |Shutdown()| will have to be called on it).
59
60  // Attach the endpoint.
61  system::MessageInTransit::EndpointId endpoint_id =
62      channel->AttachEndpoint(channel_endpoint);
63  if (endpoint_id == system::MessageInTransit::kInvalidEndpointId) {
64    // This means that, e.g., the other endpoint of the message pipe was closed
65    // first. But it's not necessarily an error per se.
66    DVLOG(2) << "Channel::AttachEndpoint() failed";
67    return channel;
68  }
69  CHECK_EQ(endpoint_id, system::Channel::kBootstrapEndpointId);
70
71  if (!channel->RunMessagePipeEndpoint(system::Channel::kBootstrapEndpointId,
72                                       system::Channel::kBootstrapEndpointId)) {
73    // Currently, there's no reason for this to fail.
74    NOTREACHED() << "Channel::RunMessagePipeEndpoint() failed";
75    return channel;
76  }
77
78  return channel;
79}
80
81void CreateChannelHelper(
82    system::Core* core,
83    ScopedPlatformHandle platform_handle,
84    scoped_ptr<ChannelInfo> channel_info,
85    scoped_refptr<system::ChannelEndpoint> channel_endpoint,
86    DidCreateChannelCallback callback,
87    scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
88  channel_info->channel =
89      MakeChannel(core, platform_handle.Pass(), channel_endpoint);
90
91  // Hand the channel back to the embedder.
92  if (callback_thread_task_runner.get()) {
93    callback_thread_task_runner->PostTask(
94        FROM_HERE, base::Bind(callback, channel_info.release()));
95  } else {
96    callback.Run(channel_info.release());
97  }
98}
99
100}  // namespace
101
102void Init(scoped_ptr<PlatformSupport> platform_support) {
103  system::entrypoints::SetCore(new system::Core(platform_support.Pass()));
104}
105
106// TODO(vtl): Write tests for this.
107ScopedMessagePipeHandle CreateChannelOnIOThread(
108    ScopedPlatformHandle platform_handle,
109    ChannelInfo** channel_info) {
110  DCHECK(platform_handle.is_valid());
111  DCHECK(channel_info);
112
113  scoped_refptr<system::ChannelEndpoint> channel_endpoint;
114  scoped_refptr<system::MessagePipeDispatcher> dispatcher =
115      system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
116
117  system::Core* core = system::entrypoints::GetCore();
118  DCHECK(core);
119  ScopedMessagePipeHandle rv(
120      MessagePipeHandle(core->AddDispatcher(dispatcher)));
121
122  *channel_info = new ChannelInfo();
123  (*channel_info)->channel =
124      MakeChannel(core, platform_handle.Pass(), channel_endpoint);
125
126  return rv.Pass();
127}
128
129ScopedMessagePipeHandle CreateChannel(
130    ScopedPlatformHandle platform_handle,
131    scoped_refptr<base::TaskRunner> io_thread_task_runner,
132    DidCreateChannelCallback callback,
133    scoped_refptr<base::TaskRunner> callback_thread_task_runner) {
134  DCHECK(platform_handle.is_valid());
135
136  scoped_refptr<system::ChannelEndpoint> channel_endpoint;
137  scoped_refptr<system::MessagePipeDispatcher> dispatcher =
138      system::MessagePipeDispatcher::CreateRemoteMessagePipe(&channel_endpoint);
139
140  system::Core* core = system::entrypoints::GetCore();
141  DCHECK(core);
142  ScopedMessagePipeHandle rv(
143      MessagePipeHandle(core->AddDispatcher(dispatcher)));
144
145  scoped_ptr<ChannelInfo> channel_info(new ChannelInfo());
146  channel_info->io_thread_task_runner = io_thread_task_runner;
147
148  if (rv.is_valid()) {
149    io_thread_task_runner->PostTask(FROM_HERE,
150                                    base::Bind(&CreateChannelHelper,
151                                               base::Unretained(core),
152                                               base::Passed(&platform_handle),
153                                               base::Passed(&channel_info),
154                                               channel_endpoint,
155                                               callback,
156                                               callback_thread_task_runner));
157  } else {
158    (callback_thread_task_runner.get() ? callback_thread_task_runner
159                                       : io_thread_task_runner)
160        ->PostTask(FROM_HERE, base::Bind(callback, channel_info.release()));
161  }
162
163  return rv.Pass();
164}
165
166void DestroyChannelOnIOThread(ChannelInfo* channel_info) {
167  DCHECK(channel_info);
168  if (!channel_info->channel.get()) {
169    // Presumably, |Init()| on the channel failed.
170    return;
171  }
172
173  channel_info->channel->Shutdown();
174  delete channel_info;
175}
176
177// TODO(vtl): Write tests for this.
178void DestroyChannel(ChannelInfo* channel_info) {
179  DCHECK(channel_info);
180  DCHECK(channel_info->io_thread_task_runner.get());
181
182  if (!channel_info->channel.get()) {
183    // Presumably, |Init()| on the channel failed.
184    return;
185  }
186
187  channel_info->channel->WillShutdownSoon();
188  channel_info->io_thread_task_runner->PostTask(
189      FROM_HERE, base::Bind(&DestroyChannelOnIOThread, channel_info));
190}
191
192void WillDestroyChannelSoon(ChannelInfo* channel_info) {
193  DCHECK(channel_info);
194  channel_info->channel->WillShutdownSoon();
195}
196
197MojoResult CreatePlatformHandleWrapper(
198    ScopedPlatformHandle platform_handle,
199    MojoHandle* platform_handle_wrapper_handle) {
200  DCHECK(platform_handle_wrapper_handle);
201
202  scoped_refptr<system::Dispatcher> dispatcher(
203      new system::PlatformHandleDispatcher(platform_handle.Pass()));
204
205  system::Core* core = system::entrypoints::GetCore();
206  DCHECK(core);
207  MojoHandle h = core->AddDispatcher(dispatcher);
208  if (h == MOJO_HANDLE_INVALID) {
209    LOG(ERROR) << "Handle table full";
210    dispatcher->Close();
211    return MOJO_RESULT_RESOURCE_EXHAUSTED;
212  }
213
214  *platform_handle_wrapper_handle = h;
215  return MOJO_RESULT_OK;
216}
217
218MojoResult PassWrappedPlatformHandle(MojoHandle platform_handle_wrapper_handle,
219                                     ScopedPlatformHandle* platform_handle) {
220  DCHECK(platform_handle);
221
222  system::Core* core = system::entrypoints::GetCore();
223  DCHECK(core);
224  scoped_refptr<system::Dispatcher> dispatcher(
225      core->GetDispatcher(platform_handle_wrapper_handle));
226  if (!dispatcher.get())
227    return MOJO_RESULT_INVALID_ARGUMENT;
228
229  if (dispatcher->GetType() != system::Dispatcher::kTypePlatformHandle)
230    return MOJO_RESULT_INVALID_ARGUMENT;
231
232  *platform_handle =
233      static_cast<system::PlatformHandleDispatcher*>(dispatcher.get())
234          ->PassPlatformHandle()
235          .Pass();
236  return MOJO_RESULT_OK;
237}
238
239}  // namespace embedder
240}  // namespace mojo
241