channel_posix.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2016 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "mojo/edk/system/channel.h"
6
7#include <errno.h>
8#include <sys/socket.h>
9
10#include <algorithm>
11#include <deque>
12#include <limits>
13#include <memory>
14
15#include "base/bind.h"
16#include "base/location.h"
17#include "base/macros.h"
18#include "base/memory/ref_counted.h"
19#include "base/message_loop/message_loop.h"
20#include "base/synchronization/lock.h"
21#include "base/task_runner.h"
22#include "mojo/edk/embedder/platform_channel_utils_posix.h"
23#include "mojo/edk/embedder/platform_handle_vector.h"
24
25#if !defined(OS_NACL)
26#include <sys/uio.h>
27#endif
28
29namespace mojo {
30namespace edk {
31
32namespace {
33
34const size_t kMaxBatchReadCapacity = 256 * 1024;
35
36// A view over a Channel::Message object. The write queue uses these since
37// large messages may need to be sent in chunks.
38class MessageView {
39 public:
40  // Owns |message|. |offset| indexes the first unsent byte in the message.
41  MessageView(Channel::MessagePtr message, size_t offset)
42      : message_(std::move(message)),
43        offset_(offset),
44        handles_(message_->TakeHandlesForTransport()) {
45    DCHECK_GT(message_->data_num_bytes(), offset_);
46  }
47
48  MessageView(MessageView&& other) { *this = std::move(other); }
49
50  MessageView& operator=(MessageView&& other) {
51    message_ = std::move(other.message_);
52    offset_ = other.offset_;
53    handles_ = std::move(other.handles_);
54    return *this;
55  }
56
57  ~MessageView() {}
58
59  const void* data() const {
60    return static_cast<const char*>(message_->data()) + offset_;
61  }
62
63  size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
64
65  size_t data_offset() const { return offset_; }
66  void advance_data_offset(size_t num_bytes) {
67    DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
68    offset_ += num_bytes;
69  }
70
71  ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
72  Channel::MessagePtr TakeMessage() { return std::move(message_); }
73
74  void SetHandles(ScopedPlatformHandleVectorPtr handles) {
75    handles_ = std::move(handles);
76  }
77
78 private:
79  Channel::MessagePtr message_;
80  size_t offset_;
81  ScopedPlatformHandleVectorPtr handles_;
82
83  DISALLOW_COPY_AND_ASSIGN(MessageView);
84};
85
86class ChannelPosix : public Channel,
87                     public base::MessageLoop::DestructionObserver,
88                     public base::MessageLoopForIO::Watcher {
89 public:
90  ChannelPosix(Delegate* delegate,
91               ScopedPlatformHandle handle,
92               scoped_refptr<base::TaskRunner> io_task_runner)
93      : Channel(delegate),
94        self_(this),
95        handle_(std::move(handle)),
96        io_task_runner_(io_task_runner)
97#if defined(OS_MACOSX)
98        ,
99        handles_to_close_(new PlatformHandleVector)
100#endif
101  {
102  }
103
104  void Start() override {
105    if (io_task_runner_->RunsTasksOnCurrentThread()) {
106      StartOnIOThread();
107    } else {
108      io_task_runner_->PostTask(
109          FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
110    }
111  }
112
113  void ShutDownImpl() override {
114    // Always shut down asynchronously when called through the public interface.
115    io_task_runner_->PostTask(
116        FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
117  }
118
119  void Write(MessagePtr message) override {
120    bool write_error = false;
121    {
122      base::AutoLock lock(write_lock_);
123      if (reject_writes_)
124        return;
125      if (outgoing_messages_.empty()) {
126        if (!WriteNoLock(MessageView(std::move(message), 0)))
127          reject_writes_ = write_error = true;
128      } else {
129        outgoing_messages_.emplace_back(std::move(message), 0);
130      }
131    }
132    if (write_error) {
133      // Do not synchronously invoke OnError(). Write() may have been called by
134      // the delegate and we don't want to re-enter it.
135      io_task_runner_->PostTask(FROM_HERE,
136                                base::Bind(&ChannelPosix::OnError, this));
137    }
138  }
139
140  void LeakHandle() override {
141    DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
142    leak_handle_ = true;
143  }
144
145  bool GetReadPlatformHandles(
146      size_t num_handles,
147      const void* extra_header,
148      size_t extra_header_size,
149      ScopedPlatformHandleVectorPtr* handles) override {
150    if (num_handles > std::numeric_limits<uint16_t>::max())
151      return false;
152#if defined(OS_MACOSX) && !defined(OS_IOS)
153    // On OSX, we can have mach ports which are located in the extra header
154    // section.
155    using MachPortsEntry = Channel::Message::MachPortsEntry;
156    using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
157    CHECK(extra_header_size >=
158          sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
159    const MachPortsExtraHeader* mach_ports_header =
160        reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
161    size_t num_mach_ports = mach_ports_header->num_ports;
162    CHECK(num_mach_ports <= num_handles);
163    if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
164      handles->reset();
165      return true;
166    }
167
168    handles->reset(new PlatformHandleVector(num_handles));
169    const MachPortsEntry* mach_ports = mach_ports_header->entries;
170    for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
171      if (mach_port_index < num_mach_ports &&
172          mach_ports[mach_port_index].index == i) {
173        (*handles)->at(i) = PlatformHandle(
174            static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
175        CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
176        // These are actually just Mach port names until they're resolved from
177        // the remote process.
178        (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
179        mach_port_index++;
180      } else {
181        CHECK(!incoming_platform_handles_.empty());
182        (*handles)->at(i) = incoming_platform_handles_.front();
183        incoming_platform_handles_.pop_front();
184      }
185    }
186#else
187    if (incoming_platform_handles_.size() < num_handles) {
188      handles->reset();
189      return true;
190    }
191
192    handles->reset(new PlatformHandleVector(num_handles));
193    for (size_t i = 0; i < num_handles; ++i) {
194      (*handles)->at(i) = incoming_platform_handles_.front();
195      incoming_platform_handles_.pop_front();
196    }
197#endif
198
199    return true;
200  }
201
202 private:
203  ~ChannelPosix() override {
204    DCHECK(!read_watcher_);
205    DCHECK(!write_watcher_);
206    for (auto handle : incoming_platform_handles_)
207      handle.CloseIfNecessary();
208  }
209
210  void StartOnIOThread() {
211    DCHECK(!read_watcher_);
212    DCHECK(!write_watcher_);
213    read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
214    write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
215    base::MessageLoopForIO::current()->WatchFileDescriptor(
216        handle_.get().handle, true /* persistent */,
217        base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
218    base::MessageLoop::current()->AddDestructionObserver(this);
219  }
220
221  void WaitForWriteOnIOThread() {
222    base::AutoLock lock(write_lock_);
223    WaitForWriteOnIOThreadNoLock();
224  }
225
226  void WaitForWriteOnIOThreadNoLock() {
227    if (pending_write_)
228      return;
229    if (!write_watcher_)
230      return;
231    if (io_task_runner_->RunsTasksOnCurrentThread()) {
232      pending_write_ = true;
233      base::MessageLoopForIO::current()->WatchFileDescriptor(
234          handle_.get().handle, false /* persistent */,
235          base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
236    } else {
237      io_task_runner_->PostTask(
238          FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
239    }
240  }
241
242  void ShutDownOnIOThread() {
243    base::MessageLoop::current()->RemoveDestructionObserver(this);
244
245    read_watcher_.reset();
246    write_watcher_.reset();
247    if (leak_handle_)
248      ignore_result(handle_.release());
249    handle_.reset();
250#if defined(OS_MACOSX)
251    handles_to_close_.reset();
252#endif
253
254    // May destroy the |this| if it was the last reference.
255    self_ = nullptr;
256  }
257
258  // base::MessageLoop::DestructionObserver:
259  void WillDestroyCurrentMessageLoop() override {
260    DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
261    if (self_)
262      ShutDownOnIOThread();
263  }
264
265  // base::MessageLoopForIO::Watcher:
266  void OnFileCanReadWithoutBlocking(int fd) override {
267    CHECK_EQ(fd, handle_.get().handle);
268
269    bool read_error = false;
270    size_t next_read_size = 0;
271    size_t buffer_capacity = 0;
272    size_t total_bytes_read = 0;
273    size_t bytes_read = 0;
274    do {
275      buffer_capacity = next_read_size;
276      char* buffer = GetReadBuffer(&buffer_capacity);
277      DCHECK_GT(buffer_capacity, 0u);
278
279      ssize_t read_result = PlatformChannelRecvmsg(
280          handle_.get(),
281          buffer,
282          buffer_capacity,
283          &incoming_platform_handles_);
284
285      if (read_result > 0) {
286        bytes_read = static_cast<size_t>(read_result);
287        total_bytes_read += bytes_read;
288        if (!OnReadComplete(bytes_read, &next_read_size)) {
289          read_error = true;
290          break;
291        }
292      } else if (read_result == 0 ||
293                 (errno != EAGAIN && errno != EWOULDBLOCK)) {
294        read_error = true;
295        break;
296      }
297    } while (bytes_read == buffer_capacity &&
298             total_bytes_read < kMaxBatchReadCapacity &&
299             next_read_size > 0);
300    if (read_error) {
301      // Stop receiving read notifications.
302      read_watcher_.reset();
303
304      OnError();
305    }
306  }
307
308  void OnFileCanWriteWithoutBlocking(int fd) override {
309    bool write_error = false;
310    {
311      base::AutoLock lock(write_lock_);
312      pending_write_ = false;
313      if (!FlushOutgoingMessagesNoLock())
314        reject_writes_ = write_error = true;
315    }
316    if (write_error)
317      OnError();
318  }
319
320  // Attempts to write a message directly to the channel. If the full message
321  // cannot be written, it's queued and a wait is initiated to write the message
322  // ASAP on the I/O thread.
323  bool WriteNoLock(MessageView message_view) {
324    size_t bytes_written = 0;
325    do {
326      message_view.advance_data_offset(bytes_written);
327
328      ssize_t result;
329      ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
330      if (handles && handles->size()) {
331        iovec iov = {
332          const_cast<void*>(message_view.data()),
333          message_view.data_num_bytes()
334        };
335        // TODO: Handle lots of handles.
336        result = PlatformChannelSendmsgWithHandles(
337            handle_.get(), &iov, 1, handles->data(), handles->size());
338        if (result >= 0) {
339#if defined(OS_MACOSX)
340          // There is a bug on OSX which makes it dangerous to close
341          // a file descriptor while it is in transit. So instead we
342          // store the file descriptor in a set and send a message to
343          // the recipient, which is queued AFTER the message that
344          // sent the FD. The recipient will reply to the message,
345          // letting us know that it is now safe to close the file
346          // descriptor. For more information, see:
347          // http://crbug.com/298276
348          std::vector<int> fds;
349          for (auto& handle : *handles)
350            fds.push_back(handle.handle);
351          {
352            base::AutoLock l(handles_to_close_lock_);
353            for (auto& handle : *handles)
354              handles_to_close_->push_back(handle);
355          }
356          MessagePtr fds_message(
357              new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
358                                   Message::Header::MessageType::HANDLES_SENT));
359          memcpy(fds_message->mutable_payload(), fds.data(),
360                 sizeof(fds[0]) * fds.size());
361          outgoing_messages_.emplace_back(std::move(fds_message), 0);
362          handles->clear();
363#else
364          handles.reset();
365#endif  // defined(OS_MACOSX)
366        }
367      } else {
368        result = PlatformChannelWrite(handle_.get(), message_view.data(),
369                                      message_view.data_num_bytes());
370      }
371
372      if (result < 0) {
373        if (errno != EAGAIN && errno != EWOULDBLOCK)
374          return false;
375        message_view.SetHandles(std::move(handles));
376        outgoing_messages_.emplace_front(std::move(message_view));
377        WaitForWriteOnIOThreadNoLock();
378        return true;
379      }
380
381      bytes_written = static_cast<size_t>(result);
382    } while (bytes_written < message_view.data_num_bytes());
383
384    return FlushOutgoingMessagesNoLock();
385  }
386
387  bool FlushOutgoingMessagesNoLock() {
388    std::deque<MessageView> messages;
389    std::swap(outgoing_messages_, messages);
390
391    while (!messages.empty()) {
392      if (!WriteNoLock(std::move(messages.front())))
393        return false;
394
395      messages.pop_front();
396      if (!outgoing_messages_.empty()) {
397        // The message was requeued by WriteNoLock(), so we have to wait for
398        // pipe to become writable again. Repopulate the message queue and exit.
399        // If sending the message triggered any control messages, they may be
400        // in |outgoing_messages_| in addition to or instead of the message
401        // being sent.
402        std::swap(messages, outgoing_messages_);
403        while (!messages.empty()) {
404          outgoing_messages_.push_front(std::move(messages.back()));
405          messages.pop_back();
406        }
407        return true;
408      }
409    }
410
411    return true;
412  }
413
414#if defined(OS_MACOSX)
415  bool OnControlMessage(Message::Header::MessageType message_type,
416                        const void* payload,
417                        size_t payload_size,
418                        ScopedPlatformHandleVectorPtr handles) override {
419    switch (message_type) {
420      case Message::Header::MessageType::HANDLES_SENT: {
421        if (payload_size == 0)
422          break;
423        MessagePtr message(new Channel::Message(
424            payload_size, 0, Message::Header::MessageType::HANDLES_SENT_ACK));
425        memcpy(message->mutable_payload(), payload, payload_size);
426        Write(std::move(message));
427        return true;
428      }
429
430      case Message::Header::MessageType::HANDLES_SENT_ACK: {
431        size_t num_fds = payload_size / sizeof(int);
432        if (num_fds == 0 || payload_size % sizeof(int) != 0)
433          break;
434
435        const int* fds = reinterpret_cast<const int*>(payload);
436        if (!CloseHandles(fds, num_fds))
437          break;
438        return true;
439      }
440
441      default:
442        break;
443    }
444
445    return false;
446  }
447
448  // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
449  // |fds| does not match a sequence of handles in |handles_to_close_|.
450  bool CloseHandles(const int* fds, size_t num_fds) {
451    base::AutoLock l(handles_to_close_lock_);
452    if (!num_fds)
453      return false;
454
455    auto start =
456        std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
457                     [&fds](const PlatformHandle& handle) {
458                       return handle.handle == fds[0];
459                     });
460    if (start == handles_to_close_->end())
461      return false;
462
463    auto it = start;
464    size_t i = 0;
465    // The FDs in the message should match a sequence of handles in
466    // |handles_to_close_|.
467    for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
468      if (it->handle != fds[i])
469        return false;
470
471      it->CloseIfNecessary();
472    }
473    if (i != num_fds)
474      return false;
475
476    handles_to_close_->erase(start, it);
477    return true;
478  }
479#endif  // defined(OS_MACOSX)
480
481  // Keeps the Channel alive at least until explicit shutdown on the IO thread.
482  scoped_refptr<Channel> self_;
483
484  ScopedPlatformHandle handle_;
485  scoped_refptr<base::TaskRunner> io_task_runner_;
486
487  // These watchers must only be accessed on the IO thread.
488  std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
489  std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
490
491  std::deque<PlatformHandle> incoming_platform_handles_;
492
493  // Protects |pending_write_| and |outgoing_messages_|.
494  base::Lock write_lock_;
495  bool pending_write_ = false;
496  bool reject_writes_ = false;
497  std::deque<MessageView> outgoing_messages_;
498
499  bool leak_handle_ = false;
500
501#if defined(OS_MACOSX)
502  base::Lock handles_to_close_lock_;
503  ScopedPlatformHandleVectorPtr handles_to_close_;
504#endif
505
506  DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
507};
508
509}  // namespace
510
511// static
512scoped_refptr<Channel> Channel::Create(
513    Delegate* delegate,
514    ScopedPlatformHandle platform_handle,
515    scoped_refptr<base::TaskRunner> io_task_runner) {
516  return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
517}
518
519}  // namespace edk
520}  // namespace mojo
521