1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "ipc/ipc_channel_nacl.h"
6
7#include <errno.h>
8#include <stddef.h>
9#include <sys/types.h>
10
11#include <algorithm>
12
13#include "base/bind.h"
14#include "base/logging.h"
15#include "base/message_loop/message_loop_proxy.h"
16#include "base/synchronization/lock.h"
17#include "base/task_runner_util.h"
18#include "base/threading/simple_thread.h"
19#include "ipc/file_descriptor_set_posix.h"
20#include "ipc/ipc_logging.h"
21#include "native_client/src/public/imc_syscalls.h"
22#include "native_client/src/public/imc_types.h"
23
24namespace IPC {
25
26struct MessageContents {
27  std::vector<char> data;
28  std::vector<int> fds;
29};
30
31namespace {
32
33bool ReadDataOnReaderThread(int pipe, MessageContents* contents) {
34  DCHECK(pipe >= 0);
35  if (pipe < 0)
36    return false;
37
38  contents->data.resize(Channel::kReadBufferSize);
39  contents->fds.resize(FileDescriptorSet::kMaxDescriptorsPerMessage);
40
41  NaClAbiNaClImcMsgIoVec iov = { &contents->data[0], contents->data.size() };
42  NaClAbiNaClImcMsgHdr msg = {
43    &iov, 1, &contents->fds[0], contents->fds.size()
44  };
45
46  int bytes_read = imc_recvmsg(pipe, &msg, 0);
47
48  if (bytes_read <= 0) {
49    // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either
50    // due to error or for regular shutdown).
51    contents->data.clear();
52    contents->fds.clear();
53    return false;
54  }
55  DCHECK(bytes_read);
56  // Resize the buffers down to the number of bytes and fds we actually read.
57  contents->data.resize(bytes_read);
58  contents->fds.resize(msg.desc_length);
59  return true;
60}
61
62}  // namespace
63
64class Channel::ChannelImpl::ReaderThreadRunner
65    : public base::DelegateSimpleThread::Delegate {
66 public:
67  // |pipe|: A file descriptor from which we will read using imc_recvmsg.
68  // |data_read_callback|: A callback we invoke (on the main thread) when we
69  //                       have read data.
70  // |failure_callback|: A callback we invoke when we have a failure reading
71  //                     from |pipe|.
72  // |main_message_loop|: A proxy for the main thread, where we will invoke the
73  //                      above callbacks.
74  ReaderThreadRunner(
75      int pipe,
76      base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
77      base::Callback<void ()> failure_callback,
78      scoped_refptr<base::MessageLoopProxy> main_message_loop);
79
80  // DelegateSimpleThread implementation. Reads data from the pipe in a loop
81  // until either we are told to quit or a read fails.
82  virtual void Run() OVERRIDE;
83
84 private:
85  int pipe_;
86  base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback_;
87  base::Callback<void ()> failure_callback_;
88  scoped_refptr<base::MessageLoopProxy> main_message_loop_;
89
90  DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner);
91};
92
93Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner(
94    int pipe,
95    base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback,
96    base::Callback<void ()> failure_callback,
97    scoped_refptr<base::MessageLoopProxy> main_message_loop)
98    : pipe_(pipe),
99      data_read_callback_(data_read_callback),
100      failure_callback_(failure_callback),
101      main_message_loop_(main_message_loop) {
102}
103
104void Channel::ChannelImpl::ReaderThreadRunner::Run() {
105  while (true) {
106    scoped_ptr<MessageContents> msg_contents(new MessageContents);
107    bool success = ReadDataOnReaderThread(pipe_, msg_contents.get());
108    if (success) {
109      main_message_loop_->PostTask(FROM_HERE,
110          base::Bind(data_read_callback_, base::Passed(&msg_contents)));
111    } else {
112      main_message_loop_->PostTask(FROM_HERE, failure_callback_);
113      // Because the read failed, we know we're going to quit. Don't bother
114      // trying to read again.
115      return;
116    }
117  }
118}
119
120Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
121                                  Mode mode,
122                                  Listener* listener)
123    : ChannelReader(listener),
124      mode_(mode),
125      waiting_connect_(true),
126      pipe_(-1),
127      pipe_name_(channel_handle.name),
128      weak_ptr_factory_(this) {
129  if (!CreatePipe(channel_handle)) {
130    // The pipe may have been closed already.
131    const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client";
132    LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name
133                 << "\" in " << modestr << " mode";
134  }
135}
136
137Channel::ChannelImpl::~ChannelImpl() {
138  Close();
139}
140
141bool Channel::ChannelImpl::Connect() {
142  if (pipe_ == -1) {
143    DLOG(INFO) << "Channel creation failed: " << pipe_name_;
144    return false;
145  }
146
147  // Note that Connect is called on the "Channel" thread (i.e., the same thread
148  // where Channel::Send will be called, and the same thread that should receive
149  // messages). The constructor might be invoked on another thread (see
150  // ChannelProxy for an example of that). Therefore, we must wait until Connect
151  // is called to decide which MessageLoopProxy to pass to ReaderThreadRunner.
152  reader_thread_runner_.reset(
153      new ReaderThreadRunner(
154          pipe_,
155          base::Bind(&Channel::ChannelImpl::DidRecvMsg,
156                     weak_ptr_factory_.GetWeakPtr()),
157          base::Bind(&Channel::ChannelImpl::ReadDidFail,
158                     weak_ptr_factory_.GetWeakPtr()),
159          base::MessageLoopProxy::current()));
160  reader_thread_.reset(
161      new base::DelegateSimpleThread(reader_thread_runner_.get(),
162                                     "ipc_channel_nacl reader thread"));
163  reader_thread_->Start();
164  waiting_connect_ = false;
165  // If there were any messages queued before connection, send them.
166  ProcessOutgoingMessages();
167  return true;
168}
169
170void Channel::ChannelImpl::Close() {
171  // For now, we assume that at shutdown, the reader thread will be woken with
172  // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we
173  // might simply be killed with no chance to clean up anyway :-).
174  // If untrusted code tries to close the channel prior to shutdown, it's likely
175  // to hang.
176  // TODO(dmichael): Can we do anything smarter here to make sure the reader
177  //                 thread wakes up and quits?
178  reader_thread_->Join();
179  close(pipe_);
180  pipe_ = -1;
181  reader_thread_runner_.reset();
182  reader_thread_.reset();
183  read_queue_.clear();
184  output_queue_.clear();
185}
186
187bool Channel::ChannelImpl::Send(Message* message) {
188  DVLOG(2) << "sending message @" << message << " on channel @" << this
189           << " with type " << message->type();
190  scoped_ptr<Message> message_ptr(message);
191
192#ifdef IPC_MESSAGE_LOG_ENABLED
193  Logging::GetInstance()->OnSendMessage(message_ptr.get(), "");
194#endif  // IPC_MESSAGE_LOG_ENABLED
195
196  message->TraceMessageBegin();
197  output_queue_.push_back(linked_ptr<Message>(message_ptr.release()));
198  if (!waiting_connect_)
199    return ProcessOutgoingMessages();
200
201  return true;
202}
203
204void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<MessageContents> contents) {
205  // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from
206  // the reader thread after Close is called. If so, we ignore it.
207  if (pipe_ == -1)
208    return;
209
210  linked_ptr<std::vector<char> > data(new std::vector<char>);
211  data->swap(contents->data);
212  read_queue_.push_back(data);
213
214  input_fds_.insert(input_fds_.end(),
215                    contents->fds.begin(), contents->fds.end());
216  contents->fds.clear();
217
218  // In POSIX, we would be told when there are bytes to read by implementing
219  // OnFileCanReadWithoutBlocking in MessageLoopForIO::Watcher. In NaCl, we
220  // instead know at this point because the reader thread posted some data to
221  // us.
222  ProcessIncomingMessages();
223}
224
225void Channel::ChannelImpl::ReadDidFail() {
226  Close();
227}
228
229bool Channel::ChannelImpl::CreatePipe(
230    const IPC::ChannelHandle& channel_handle) {
231  DCHECK(pipe_ == -1);
232
233  // There's one possible case in NaCl:
234  // 1) It's a channel wrapping a pipe that is given to us.
235  // We don't support these:
236  // 2) It's for a named channel.
237  // 3) It's for a client that we implement ourself.
238  // 4) It's the initial IPC channel.
239
240  if (channel_handle.socket.fd == -1) {
241    NOTIMPLEMENTED();
242    return false;
243  }
244  pipe_ = channel_handle.socket.fd;
245  return true;
246}
247
248bool Channel::ChannelImpl::ProcessOutgoingMessages() {
249  DCHECK(!waiting_connect_);  // Why are we trying to send messages if there's
250                              // no connection?
251  if (output_queue_.empty())
252    return true;
253
254  if (pipe_ == -1)
255    return false;
256
257  // Write out all the messages. The trusted implementation is guaranteed to not
258  // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg.
259  while (!output_queue_.empty()) {
260    linked_ptr<Message> msg = output_queue_.front();
261    output_queue_.pop_front();
262
263    int fds[FileDescriptorSet::kMaxDescriptorsPerMessage];
264    const size_t num_fds = msg->file_descriptor_set()->size();
265    DCHECK(num_fds <= FileDescriptorSet::kMaxDescriptorsPerMessage);
266    msg->file_descriptor_set()->GetDescriptors(fds);
267
268    NaClAbiNaClImcMsgIoVec iov = {
269      const_cast<void*>(msg->data()), msg->size()
270    };
271    NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds, num_fds };
272    ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0);
273
274    DCHECK(bytes_written);  // The trusted side shouldn't return 0.
275    if (bytes_written < 0) {
276      // The trusted side should only ever give us an error of EPIPE. We
277      // should never be interrupted, nor should we get EAGAIN.
278      DCHECK(errno == EPIPE);
279      Close();
280      PLOG(ERROR) << "pipe_ error on "
281                  << pipe_
282                  << " Currently writing message of size: "
283                  << msg->size();
284      return false;
285    } else {
286      msg->file_descriptor_set()->CommitAll();
287    }
288
289    // Message sent OK!
290    DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type()
291             << " on fd " << pipe_;
292  }
293  return true;
294}
295
296Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
297    char* buffer,
298    int buffer_len,
299    int* bytes_read) {
300  *bytes_read = 0;
301  if (pipe_ == -1)
302    return READ_FAILED;
303  if (read_queue_.empty())
304    return READ_PENDING;
305  while (!read_queue_.empty() && *bytes_read < buffer_len) {
306    linked_ptr<std::vector<char> > vec(read_queue_.front());
307    size_t bytes_to_read = buffer_len - *bytes_read;
308    if (vec->size() <= bytes_to_read) {
309      // We can read and discard the entire vector.
310      std::copy(vec->begin(), vec->end(), buffer + *bytes_read);
311      *bytes_read += vec->size();
312      read_queue_.pop_front();
313    } else {
314      // Read all the bytes we can and discard them from the front of the
315      // vector. (This can be slowish, since erase has to move the back of the
316      // vector to the front, but it's hopefully a temporary hack and it keeps
317      // the code simple).
318      std::copy(vec->begin(), vec->begin() + bytes_to_read,
319                buffer + *bytes_read);
320      vec->erase(vec->begin(), vec->begin() + bytes_to_read);
321      *bytes_read += bytes_to_read;
322    }
323  }
324  return READ_SUCCEEDED;
325}
326
327bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
328  uint16 header_fds = msg->header()->num_fds;
329  CHECK(header_fds == input_fds_.size());
330  if (header_fds == 0)
331    return true;  // Nothing to do.
332
333  // The shenaniganery below with &foo.front() requires input_fds_ to have
334  // contiguous underlying storage (such as a simple array or a std::vector).
335  // This is why the header warns not to make input_fds_ a deque<>.
336  msg->file_descriptor_set()->SetDescriptors(&input_fds_.front(),
337                                             header_fds);
338  input_fds_.clear();
339  return true;
340}
341
342bool Channel::ChannelImpl::DidEmptyInputBuffers() {
343  // When the input data buffer is empty, the fds should be too.
344  return input_fds_.empty();
345}
346
347void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
348  // The trusted side IPC::Channel should handle the "hello" handshake; we
349  // should not receive the "Hello" message.
350  NOTREACHED();
351}
352
353//------------------------------------------------------------------------------
354// Channel's methods simply call through to ChannelImpl.
355
356Channel::Channel(const IPC::ChannelHandle& channel_handle,
357                 Mode mode,
358                 Listener* listener)
359    : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) {
360}
361
362Channel::~Channel() {
363  delete channel_impl_;
364}
365
366bool Channel::Connect() {
367  return channel_impl_->Connect();
368}
369
370void Channel::Close() {
371  channel_impl_->Close();
372}
373
374base::ProcessId Channel::peer_pid() const {
375  // This shouldn't actually get used in the untrusted side of the proxy, and we
376  // don't have the real pid anyway.
377  return -1;
378}
379
380bool Channel::Send(Message* message) {
381  return channel_impl_->Send(message);
382}
383
384}  // namespace IPC
385