channel_posix.cc revision 645501c2ab19a559ce82a1d5a29ced159a4c30fb
1// Copyright 2016 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/edk/system/channel.h" 6 7#include <errno.h> 8#include <sys/socket.h> 9 10#include <algorithm> 11#include <deque> 12#include <limits> 13#include <memory> 14 15#include "base/bind.h" 16#include "base/location.h" 17#include "base/macros.h" 18#include "base/memory/ref_counted.h" 19#include "base/message_loop/message_loop.h" 20#include "base/synchronization/lock.h" 21#include "base/task_runner.h" 22#include "mojo/edk/embedder/platform_channel_utils_posix.h" 23#include "mojo/edk/embedder/platform_handle_vector.h" 24 25#if !defined(OS_NACL) 26#include <sys/uio.h> 27#endif 28 29namespace mojo { 30namespace edk { 31 32namespace { 33 34const size_t kMaxBatchReadCapacity = 256 * 1024; 35 36// A view over a Channel::Message object. The write queue uses these since 37// large messages may need to be sent in chunks. 38class MessageView { 39 public: 40 // Owns |message|. |offset| indexes the first unsent byte in the message. 41 MessageView(Channel::MessagePtr message, size_t offset) 42 : message_(std::move(message)), 43 offset_(offset), 44 handles_(message_->TakeHandlesForTransport()) { 45 DCHECK_GT(message_->data_num_bytes(), offset_); 46 } 47 48 MessageView(MessageView&& other) { *this = std::move(other); } 49 50 MessageView& operator=(MessageView&& other) { 51 message_ = std::move(other.message_); 52 offset_ = other.offset_; 53 handles_ = std::move(other.handles_); 54 return *this; 55 } 56 57 ~MessageView() {} 58 59 const void* data() const { 60 return static_cast<const char*>(message_->data()) + offset_; 61 } 62 63 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } 64 65 size_t data_offset() const { return offset_; } 66 void advance_data_offset(size_t num_bytes) { 67 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); 68 offset_ += num_bytes; 69 } 70 71 ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); } 72 Channel::MessagePtr TakeMessage() { return std::move(message_); } 73 74 void SetHandles(ScopedPlatformHandleVectorPtr handles) { 75 handles_ = std::move(handles); 76 } 77 78 private: 79 Channel::MessagePtr message_; 80 size_t offset_; 81 ScopedPlatformHandleVectorPtr handles_; 82 83 DISALLOW_COPY_AND_ASSIGN(MessageView); 84}; 85 86class ChannelPosix : public Channel, 87 public base::MessageLoop::DestructionObserver, 88 public base::MessageLoopForIO::Watcher { 89 public: 90 ChannelPosix(Delegate* delegate, 91 ScopedPlatformHandle handle, 92 scoped_refptr<base::TaskRunner> io_task_runner) 93 : Channel(delegate), 94 self_(this), 95 handle_(std::move(handle)), 96 io_task_runner_(io_task_runner) 97#if defined(OS_MACOSX) 98 , 99 handles_to_close_(new PlatformHandleVector) 100#endif 101 { 102 } 103 104 void Start() override { 105 if (io_task_runner_->RunsTasksOnCurrentThread()) { 106 StartOnIOThread(); 107 } else { 108 io_task_runner_->PostTask( 109 FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this)); 110 } 111 } 112 113 void ShutDownImpl() override { 114 // Always shut down asynchronously when called through the public interface. 115 io_task_runner_->PostTask( 116 FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this)); 117 } 118 119 void Write(MessagePtr message) override { 120 bool write_error = false; 121 { 122 base::AutoLock lock(write_lock_); 123 if (reject_writes_) 124 return; 125 if (outgoing_messages_.empty()) { 126 if (!WriteNoLock(MessageView(std::move(message), 0))) 127 reject_writes_ = write_error = true; 128 } else { 129 outgoing_messages_.emplace_back(std::move(message), 0); 130 } 131 } 132 if (write_error) { 133 // Do not synchronously invoke OnError(). Write() may have been called by 134 // the delegate and we don't want to re-enter it. 135 io_task_runner_->PostTask(FROM_HERE, 136 base::Bind(&ChannelPosix::OnError, this)); 137 } 138 } 139 140 void LeakHandle() override { 141 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 142 leak_handle_ = true; 143 } 144 145 bool GetReadPlatformHandles( 146 size_t num_handles, 147 const void* extra_header, 148 size_t extra_header_size, 149 ScopedPlatformHandleVectorPtr* handles) override { 150 if (num_handles > std::numeric_limits<uint16_t>::max()) 151 return false; 152#if defined(OS_MACOSX) && !defined(OS_IOS) 153 // On OSX, we can have mach ports which are located in the extra header 154 // section. 155 using MachPortsEntry = Channel::Message::MachPortsEntry; 156 using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader; 157 CHECK(extra_header_size >= 158 sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)); 159 const MachPortsExtraHeader* mach_ports_header = 160 reinterpret_cast<const MachPortsExtraHeader*>(extra_header); 161 size_t num_mach_ports = mach_ports_header->num_ports; 162 CHECK(num_mach_ports <= num_handles); 163 if (incoming_platform_handles_.size() + num_mach_ports < num_handles) { 164 handles->reset(); 165 return true; 166 } 167 168 handles->reset(new PlatformHandleVector(num_handles)); 169 const MachPortsEntry* mach_ports = mach_ports_header->entries; 170 for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { 171 if (mach_port_index < num_mach_ports && 172 mach_ports[mach_port_index].index == i) { 173 (*handles)->at(i) = PlatformHandle( 174 static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port)); 175 CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH); 176 // These are actually just Mach port names until they're resolved from 177 // the remote process. 178 (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME; 179 mach_port_index++; 180 } else { 181 CHECK(!incoming_platform_handles_.empty()); 182 (*handles)->at(i) = incoming_platform_handles_.front(); 183 incoming_platform_handles_.pop_front(); 184 } 185 } 186#else 187 if (incoming_platform_handles_.size() < num_handles) { 188 handles->reset(); 189 return true; 190 } 191 192 handles->reset(new PlatformHandleVector(num_handles)); 193 for (size_t i = 0; i < num_handles; ++i) { 194 (*handles)->at(i) = incoming_platform_handles_.front(); 195 incoming_platform_handles_.pop_front(); 196 } 197#endif 198 199 return true; 200 } 201 202 private: 203 ~ChannelPosix() override { 204 DCHECK(!read_watcher_); 205 DCHECK(!write_watcher_); 206 for (auto handle : incoming_platform_handles_) 207 handle.CloseIfNecessary(); 208 } 209 210 void StartOnIOThread() { 211 DCHECK(!read_watcher_); 212 DCHECK(!write_watcher_); 213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); 214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); 215 base::MessageLoopForIO::current()->WatchFileDescriptor( 216 handle_.get().handle, true /* persistent */, 217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); 218 base::MessageLoop::current()->AddDestructionObserver(this); 219 } 220 221 void WaitForWriteOnIOThread() { 222 base::AutoLock lock(write_lock_); 223 WaitForWriteOnIOThreadNoLock(); 224 } 225 226 void WaitForWriteOnIOThreadNoLock() { 227 if (pending_write_) 228 return; 229 if (!write_watcher_) 230 return; 231 if (io_task_runner_->RunsTasksOnCurrentThread()) { 232 pending_write_ = true; 233 base::MessageLoopForIO::current()->WatchFileDescriptor( 234 handle_.get().handle, false /* persistent */, 235 base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this); 236 } else { 237 io_task_runner_->PostTask( 238 FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this)); 239 } 240 } 241 242 void ShutDownOnIOThread() { 243 base::MessageLoop::current()->RemoveDestructionObserver(this); 244 245 read_watcher_.reset(); 246 write_watcher_.reset(); 247 if (leak_handle_) 248 ignore_result(handle_.release()); 249 handle_.reset(); 250#if defined(OS_MACOSX) 251 handles_to_close_.reset(); 252#endif 253 254 // May destroy the |this| if it was the last reference. 255 self_ = nullptr; 256 } 257 258 // base::MessageLoop::DestructionObserver: 259 void WillDestroyCurrentMessageLoop() override { 260 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 261 if (self_) 262 ShutDownOnIOThread(); 263 } 264 265 // base::MessageLoopForIO::Watcher: 266 void OnFileCanReadWithoutBlocking(int fd) override { 267 CHECK_EQ(fd, handle_.get().handle); 268 269 bool read_error = false; 270 size_t next_read_size = 0; 271 size_t buffer_capacity = 0; 272 size_t total_bytes_read = 0; 273 size_t bytes_read = 0; 274 do { 275 buffer_capacity = next_read_size; 276 char* buffer = GetReadBuffer(&buffer_capacity); 277 DCHECK_GT(buffer_capacity, 0u); 278 279 ssize_t read_result = PlatformChannelRecvmsg( 280 handle_.get(), 281 buffer, 282 buffer_capacity, 283 &incoming_platform_handles_); 284 285 if (read_result > 0) { 286 bytes_read = static_cast<size_t>(read_result); 287 total_bytes_read += bytes_read; 288 if (!OnReadComplete(bytes_read, &next_read_size)) { 289 read_error = true; 290 break; 291 } 292 } else if (read_result == 0 || 293 (errno != EAGAIN && errno != EWOULDBLOCK)) { 294 read_error = true; 295 break; 296 } 297 } while (bytes_read == buffer_capacity && 298 total_bytes_read < kMaxBatchReadCapacity && 299 next_read_size > 0); 300 if (read_error) { 301 // Stop receiving read notifications. 302 read_watcher_.reset(); 303 304 OnError(); 305 } 306 } 307 308 void OnFileCanWriteWithoutBlocking(int fd) override { 309 bool write_error = false; 310 { 311 base::AutoLock lock(write_lock_); 312 pending_write_ = false; 313 if (!FlushOutgoingMessagesNoLock()) 314 reject_writes_ = write_error = true; 315 } 316 if (write_error) 317 OnError(); 318 } 319 320 // Attempts to write a message directly to the channel. If the full message 321 // cannot be written, it's queued and a wait is initiated to write the message 322 // ASAP on the I/O thread. 323 bool WriteNoLock(MessageView message_view) { 324 size_t bytes_written = 0; 325 do { 326 message_view.advance_data_offset(bytes_written); 327 328 ssize_t result; 329 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); 330 if (handles && handles->size()) { 331 iovec iov = { 332 const_cast<void*>(message_view.data()), 333 message_view.data_num_bytes() 334 }; 335 // TODO: Handle lots of handles. 336 result = PlatformChannelSendmsgWithHandles( 337 handle_.get(), &iov, 1, handles->data(), handles->size()); 338 if (result >= 0) { 339#if defined(OS_MACOSX) 340 // There is a bug on OSX which makes it dangerous to close 341 // a file descriptor while it is in transit. So instead we 342 // store the file descriptor in a set and send a message to 343 // the recipient, which is queued AFTER the message that 344 // sent the FD. The recipient will reply to the message, 345 // letting us know that it is now safe to close the file 346 // descriptor. For more information, see: 347 // http://crbug.com/298276 348 std::vector<int> fds; 349 for (auto& handle : *handles) 350 fds.push_back(handle.handle); 351 { 352 base::AutoLock l(handles_to_close_lock_); 353 for (auto& handle : *handles) 354 handles_to_close_->push_back(handle); 355 } 356 MessagePtr fds_message( 357 new Channel::Message(sizeof(fds[0]) * fds.size(), 0, 358 Message::Header::MessageType::HANDLES_SENT)); 359 memcpy(fds_message->mutable_payload(), fds.data(), 360 sizeof(fds[0]) * fds.size()); 361 outgoing_messages_.emplace_back(std::move(fds_message), 0); 362 handles->clear(); 363#else 364 handles.reset(); 365#endif // defined(OS_MACOSX) 366 } 367 } else { 368 result = PlatformChannelWrite(handle_.get(), message_view.data(), 369 message_view.data_num_bytes()); 370 } 371 372 if (result < 0) { 373 if (errno != EAGAIN && errno != EWOULDBLOCK) 374 return false; 375 message_view.SetHandles(std::move(handles)); 376 outgoing_messages_.emplace_front(std::move(message_view)); 377 WaitForWriteOnIOThreadNoLock(); 378 return true; 379 } 380 381 bytes_written = static_cast<size_t>(result); 382 } while (bytes_written < message_view.data_num_bytes()); 383 384 return FlushOutgoingMessagesNoLock(); 385 } 386 387 bool FlushOutgoingMessagesNoLock() { 388 std::deque<MessageView> messages; 389 std::swap(outgoing_messages_, messages); 390 391 while (!messages.empty()) { 392 if (!WriteNoLock(std::move(messages.front()))) 393 return false; 394 395 messages.pop_front(); 396 if (!outgoing_messages_.empty()) { 397 // The message was requeued by WriteNoLock(), so we have to wait for 398 // pipe to become writable again. Repopulate the message queue and exit. 399 // If sending the message triggered any control messages, they may be 400 // in |outgoing_messages_| in addition to or instead of the message 401 // being sent. 402 std::swap(messages, outgoing_messages_); 403 while (!messages.empty()) { 404 outgoing_messages_.push_front(std::move(messages.back())); 405 messages.pop_back(); 406 } 407 return true; 408 } 409 } 410 411 return true; 412 } 413 414#if defined(OS_MACOSX) 415 bool OnControlMessage(Message::Header::MessageType message_type, 416 const void* payload, 417 size_t payload_size, 418 ScopedPlatformHandleVectorPtr handles) override { 419 switch (message_type) { 420 case Message::Header::MessageType::HANDLES_SENT: { 421 if (payload_size == 0) 422 break; 423 MessagePtr message(new Channel::Message( 424 payload_size, 0, Message::Header::MessageType::HANDLES_SENT_ACK)); 425 memcpy(message->mutable_payload(), payload, payload_size); 426 Write(std::move(message)); 427 return true; 428 } 429 430 case Message::Header::MessageType::HANDLES_SENT_ACK: { 431 size_t num_fds = payload_size / sizeof(int); 432 if (num_fds == 0 || payload_size % sizeof(int) != 0) 433 break; 434 435 const int* fds = reinterpret_cast<const int*>(payload); 436 if (!CloseHandles(fds, num_fds)) 437 break; 438 return true; 439 } 440 441 default: 442 break; 443 } 444 445 return false; 446 } 447 448 // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if 449 // |fds| does not match a sequence of handles in |handles_to_close_|. 450 bool CloseHandles(const int* fds, size_t num_fds) { 451 base::AutoLock l(handles_to_close_lock_); 452 if (!num_fds) 453 return false; 454 455 auto start = 456 std::find_if(handles_to_close_->begin(), handles_to_close_->end(), 457 [&fds](const PlatformHandle& handle) { 458 return handle.handle == fds[0]; 459 }); 460 if (start == handles_to_close_->end()) 461 return false; 462 463 auto it = start; 464 size_t i = 0; 465 // The FDs in the message should match a sequence of handles in 466 // |handles_to_close_|. 467 for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) { 468 if (it->handle != fds[i]) 469 return false; 470 471 it->CloseIfNecessary(); 472 } 473 if (i != num_fds) 474 return false; 475 476 handles_to_close_->erase(start, it); 477 return true; 478 } 479#endif // defined(OS_MACOSX) 480 481 // Keeps the Channel alive at least until explicit shutdown on the IO thread. 482 scoped_refptr<Channel> self_; 483 484 ScopedPlatformHandle handle_; 485 scoped_refptr<base::TaskRunner> io_task_runner_; 486 487 // These watchers must only be accessed on the IO thread. 488 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; 489 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; 490 491 std::deque<PlatformHandle> incoming_platform_handles_; 492 493 // Protects |pending_write_| and |outgoing_messages_|. 494 base::Lock write_lock_; 495 bool pending_write_ = false; 496 bool reject_writes_ = false; 497 std::deque<MessageView> outgoing_messages_; 498 499 bool leak_handle_ = false; 500 501#if defined(OS_MACOSX) 502 base::Lock handles_to_close_lock_; 503 ScopedPlatformHandleVectorPtr handles_to_close_; 504#endif 505 506 DISALLOW_COPY_AND_ASSIGN(ChannelPosix); 507}; 508 509} // namespace 510 511// static 512scoped_refptr<Channel> Channel::Create( 513 Delegate* delegate, 514 ScopedPlatformHandle platform_handle, 515 scoped_refptr<base::TaskRunner> io_task_runner) { 516 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); 517} 518 519} // namespace edk 520} // namespace mojo 521