1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "mojo/system/data_pipe.h" 6 7#include <string.h> 8 9#include <algorithm> 10#include <limits> 11 12#include "base/compiler_specific.h" 13#include "base/logging.h" 14#include "mojo/system/constants.h" 15#include "mojo/system/memory.h" 16#include "mojo/system/options_validation.h" 17#include "mojo/system/waiter_list.h" 18 19namespace mojo { 20namespace system { 21 22// static 23const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = { 24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), 25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u, 26 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)}; 27 28// static 29MojoResult DataPipe::ValidateCreateOptions( 30 UserPointer<const MojoCreateDataPipeOptions> in_options, 31 MojoCreateDataPipeOptions* out_options) { 32 const MojoCreateDataPipeOptionsFlags kKnownFlags = 33 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD; 34 35 *out_options = kDefaultCreateOptions; 36 if (in_options.IsNull()) 37 return MOJO_RESULT_OK; 38 39 UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options); 40 if (!reader.is_valid()) 41 return MOJO_RESULT_INVALID_ARGUMENT; 42 43 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader)) 44 return MOJO_RESULT_OK; 45 if ((reader.options().flags & ~kKnownFlags)) 46 return MOJO_RESULT_UNIMPLEMENTED; 47 out_options->flags = reader.options().flags; 48 49 // Checks for fields beyond |flags|: 50 51 if (!OPTIONS_STRUCT_HAS_MEMBER( 52 MojoCreateDataPipeOptions, element_num_bytes, reader)) 53 return MOJO_RESULT_OK; 54 if (reader.options().element_num_bytes == 0) 55 return MOJO_RESULT_INVALID_ARGUMENT; 56 out_options->element_num_bytes = reader.options().element_num_bytes; 57 58 if (!OPTIONS_STRUCT_HAS_MEMBER( 59 MojoCreateDataPipeOptions, capacity_num_bytes, reader) || 60 reader.options().capacity_num_bytes == 0) { 61 // Round the default capacity down to a multiple of the element size (but at 62 // least one element). 63 out_options->capacity_num_bytes = 64 std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes - 65 (kDefaultDataPipeCapacityBytes % 66 out_options->element_num_bytes)), 67 out_options->element_num_bytes); 68 return MOJO_RESULT_OK; 69 } 70 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0) 71 return MOJO_RESULT_INVALID_ARGUMENT; 72 if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes) 73 return MOJO_RESULT_RESOURCE_EXHAUSTED; 74 out_options->capacity_num_bytes = reader.options().capacity_num_bytes; 75 76 return MOJO_RESULT_OK; 77} 78 79void DataPipe::ProducerCancelAllWaiters() { 80 base::AutoLock locker(lock_); 81 DCHECK(has_local_producer_no_lock()); 82 producer_waiter_list_->CancelAllWaiters(); 83} 84 85void DataPipe::ProducerClose() { 86 base::AutoLock locker(lock_); 87 DCHECK(producer_open_); 88 producer_open_ = false; 89 DCHECK(has_local_producer_no_lock()); 90 producer_waiter_list_.reset(); 91 // Not a bug, except possibly in "user" code. 92 DVLOG_IF(2, producer_in_two_phase_write_no_lock()) 93 << "Producer closed with active two-phase write"; 94 producer_two_phase_max_num_bytes_written_ = 0; 95 ProducerCloseImplNoLock(); 96 AwakeConsumerWaitersForStateChangeNoLock( 97 ConsumerGetHandleSignalsStateImplNoLock()); 98} 99 100MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements, 101 UserPointer<uint32_t> num_bytes, 102 bool all_or_none) { 103 base::AutoLock locker(lock_); 104 DCHECK(has_local_producer_no_lock()); 105 106 if (producer_in_two_phase_write_no_lock()) 107 return MOJO_RESULT_BUSY; 108 if (!consumer_open_no_lock()) 109 return MOJO_RESULT_FAILED_PRECONDITION; 110 111 // Returning "busy" takes priority over "invalid argument". 112 uint32_t max_num_bytes_to_write = num_bytes.Get(); 113 if (max_num_bytes_to_write % element_num_bytes_ != 0) 114 return MOJO_RESULT_INVALID_ARGUMENT; 115 116 if (max_num_bytes_to_write == 0) 117 return MOJO_RESULT_OK; // Nothing to do. 118 119 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0; 120 121 HandleSignalsState old_consumer_state = 122 ConsumerGetHandleSignalsStateImplNoLock(); 123 MojoResult rv = ProducerWriteDataImplNoLock( 124 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write); 125 HandleSignalsState new_consumer_state = 126 ConsumerGetHandleSignalsStateImplNoLock(); 127 if (!new_consumer_state.equals(old_consumer_state)) 128 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 129 return rv; 130} 131 132MojoResult DataPipe::ProducerBeginWriteData( 133 UserPointer<void*> buffer, 134 UserPointer<uint32_t> buffer_num_bytes, 135 bool all_or_none) { 136 base::AutoLock locker(lock_); 137 DCHECK(has_local_producer_no_lock()); 138 139 if (producer_in_two_phase_write_no_lock()) 140 return MOJO_RESULT_BUSY; 141 if (!consumer_open_no_lock()) 142 return MOJO_RESULT_FAILED_PRECONDITION; 143 144 uint32_t min_num_bytes_to_write = 0; 145 if (all_or_none) { 146 min_num_bytes_to_write = buffer_num_bytes.Get(); 147 if (min_num_bytes_to_write % element_num_bytes_ != 0) 148 return MOJO_RESULT_INVALID_ARGUMENT; 149 } 150 151 MojoResult rv = ProducerBeginWriteDataImplNoLock( 152 buffer, buffer_num_bytes, min_num_bytes_to_write); 153 if (rv != MOJO_RESULT_OK) 154 return rv; 155 // Note: No need to awake producer waiters, even though we're going from 156 // writable to non-writable (since you can't wait on non-writability). 157 // Similarly, though this may have discarded data (in "may discard" mode), 158 // making it non-readable, there's still no need to awake consumer waiters. 159 DCHECK(producer_in_two_phase_write_no_lock()); 160 return MOJO_RESULT_OK; 161} 162 163MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) { 164 base::AutoLock locker(lock_); 165 DCHECK(has_local_producer_no_lock()); 166 167 if (!producer_in_two_phase_write_no_lock()) 168 return MOJO_RESULT_FAILED_PRECONDITION; 169 // Note: Allow successful completion of the two-phase write even if the 170 // consumer has been closed. 171 172 HandleSignalsState old_consumer_state = 173 ConsumerGetHandleSignalsStateImplNoLock(); 174 MojoResult rv; 175 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ || 176 num_bytes_written % element_num_bytes_ != 0) { 177 rv = MOJO_RESULT_INVALID_ARGUMENT; 178 producer_two_phase_max_num_bytes_written_ = 0; 179 } else { 180 rv = ProducerEndWriteDataImplNoLock(num_bytes_written); 181 } 182 // Two-phase write ended even on failure. 183 DCHECK(!producer_in_two_phase_write_no_lock()); 184 // If we're now writable, we *became* writable (since we weren't writable 185 // during the two-phase write), so awake producer waiters. 186 HandleSignalsState new_producer_state = 187 ProducerGetHandleSignalsStateImplNoLock(); 188 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) 189 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 190 HandleSignalsState new_consumer_state = 191 ConsumerGetHandleSignalsStateImplNoLock(); 192 if (!new_consumer_state.equals(old_consumer_state)) 193 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 194 return rv; 195} 196 197HandleSignalsState DataPipe::ProducerGetHandleSignalsState() { 198 base::AutoLock locker(lock_); 199 DCHECK(has_local_producer_no_lock()); 200 return ProducerGetHandleSignalsStateImplNoLock(); 201} 202 203MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter, 204 MojoHandleSignals signals, 205 uint32_t context, 206 HandleSignalsState* signals_state) { 207 base::AutoLock locker(lock_); 208 DCHECK(has_local_producer_no_lock()); 209 210 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock(); 211 if (producer_state.satisfies(signals)) { 212 if (signals_state) 213 *signals_state = producer_state; 214 return MOJO_RESULT_ALREADY_EXISTS; 215 } 216 if (!producer_state.can_satisfy(signals)) { 217 if (signals_state) 218 *signals_state = producer_state; 219 return MOJO_RESULT_FAILED_PRECONDITION; 220 } 221 222 producer_waiter_list_->AddWaiter(waiter, signals, context); 223 return MOJO_RESULT_OK; 224} 225 226void DataPipe::ProducerRemoveWaiter(Waiter* waiter, 227 HandleSignalsState* signals_state) { 228 base::AutoLock locker(lock_); 229 DCHECK(has_local_producer_no_lock()); 230 producer_waiter_list_->RemoveWaiter(waiter); 231 if (signals_state) 232 *signals_state = ProducerGetHandleSignalsStateImplNoLock(); 233} 234 235bool DataPipe::ProducerIsBusy() const { 236 base::AutoLock locker(lock_); 237 return producer_in_two_phase_write_no_lock(); 238} 239 240void DataPipe::ConsumerCancelAllWaiters() { 241 base::AutoLock locker(lock_); 242 DCHECK(has_local_consumer_no_lock()); 243 consumer_waiter_list_->CancelAllWaiters(); 244} 245 246void DataPipe::ConsumerClose() { 247 base::AutoLock locker(lock_); 248 DCHECK(consumer_open_); 249 consumer_open_ = false; 250 DCHECK(has_local_consumer_no_lock()); 251 consumer_waiter_list_.reset(); 252 // Not a bug, except possibly in "user" code. 253 DVLOG_IF(2, consumer_in_two_phase_read_no_lock()) 254 << "Consumer closed with active two-phase read"; 255 consumer_two_phase_max_num_bytes_read_ = 0; 256 ConsumerCloseImplNoLock(); 257 AwakeProducerWaitersForStateChangeNoLock( 258 ProducerGetHandleSignalsStateImplNoLock()); 259} 260 261MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements, 262 UserPointer<uint32_t> num_bytes, 263 bool all_or_none) { 264 base::AutoLock locker(lock_); 265 DCHECK(has_local_consumer_no_lock()); 266 267 if (consumer_in_two_phase_read_no_lock()) 268 return MOJO_RESULT_BUSY; 269 270 uint32_t max_num_bytes_to_read = num_bytes.Get(); 271 if (max_num_bytes_to_read % element_num_bytes_ != 0) 272 return MOJO_RESULT_INVALID_ARGUMENT; 273 274 if (max_num_bytes_to_read == 0) 275 return MOJO_RESULT_OK; // Nothing to do. 276 277 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; 278 279 HandleSignalsState old_producer_state = 280 ProducerGetHandleSignalsStateImplNoLock(); 281 MojoResult rv = ConsumerReadDataImplNoLock( 282 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read); 283 HandleSignalsState new_producer_state = 284 ProducerGetHandleSignalsStateImplNoLock(); 285 if (!new_producer_state.equals(old_producer_state)) 286 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 287 return rv; 288} 289 290MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes, 291 bool all_or_none) { 292 base::AutoLock locker(lock_); 293 DCHECK(has_local_consumer_no_lock()); 294 295 if (consumer_in_two_phase_read_no_lock()) 296 return MOJO_RESULT_BUSY; 297 298 uint32_t max_num_bytes_to_discard = num_bytes.Get(); 299 if (max_num_bytes_to_discard % element_num_bytes_ != 0) 300 return MOJO_RESULT_INVALID_ARGUMENT; 301 302 if (max_num_bytes_to_discard == 0) 303 return MOJO_RESULT_OK; // Nothing to do. 304 305 uint32_t min_num_bytes_to_discard = 306 all_or_none ? max_num_bytes_to_discard : 0; 307 308 HandleSignalsState old_producer_state = 309 ProducerGetHandleSignalsStateImplNoLock(); 310 MojoResult rv = ConsumerDiscardDataImplNoLock( 311 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard); 312 HandleSignalsState new_producer_state = 313 ProducerGetHandleSignalsStateImplNoLock(); 314 if (!new_producer_state.equals(old_producer_state)) 315 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 316 return rv; 317} 318 319MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) { 320 base::AutoLock locker(lock_); 321 DCHECK(has_local_consumer_no_lock()); 322 323 if (consumer_in_two_phase_read_no_lock()) 324 return MOJO_RESULT_BUSY; 325 326 // Note: Don't need to validate |*num_bytes| for query. 327 return ConsumerQueryDataImplNoLock(num_bytes); 328} 329 330MojoResult DataPipe::ConsumerBeginReadData( 331 UserPointer<const void*> buffer, 332 UserPointer<uint32_t> buffer_num_bytes, 333 bool all_or_none) { 334 base::AutoLock locker(lock_); 335 DCHECK(has_local_consumer_no_lock()); 336 337 if (consumer_in_two_phase_read_no_lock()) 338 return MOJO_RESULT_BUSY; 339 340 uint32_t min_num_bytes_to_read = 0; 341 if (all_or_none) { 342 min_num_bytes_to_read = buffer_num_bytes.Get(); 343 if (min_num_bytes_to_read % element_num_bytes_ != 0) 344 return MOJO_RESULT_INVALID_ARGUMENT; 345 } 346 347 MojoResult rv = ConsumerBeginReadDataImplNoLock( 348 buffer, buffer_num_bytes, min_num_bytes_to_read); 349 if (rv != MOJO_RESULT_OK) 350 return rv; 351 DCHECK(consumer_in_two_phase_read_no_lock()); 352 return MOJO_RESULT_OK; 353} 354 355MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) { 356 base::AutoLock locker(lock_); 357 DCHECK(has_local_consumer_no_lock()); 358 359 if (!consumer_in_two_phase_read_no_lock()) 360 return MOJO_RESULT_FAILED_PRECONDITION; 361 362 HandleSignalsState old_producer_state = 363 ProducerGetHandleSignalsStateImplNoLock(); 364 MojoResult rv; 365 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ || 366 num_bytes_read % element_num_bytes_ != 0) { 367 rv = MOJO_RESULT_INVALID_ARGUMENT; 368 consumer_two_phase_max_num_bytes_read_ = 0; 369 } else { 370 rv = ConsumerEndReadDataImplNoLock(num_bytes_read); 371 } 372 // Two-phase read ended even on failure. 373 DCHECK(!consumer_in_two_phase_read_no_lock()); 374 // If we're now readable, we *became* readable (since we weren't readable 375 // during the two-phase read), so awake consumer waiters. 376 HandleSignalsState new_consumer_state = 377 ConsumerGetHandleSignalsStateImplNoLock(); 378 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE)) 379 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state); 380 HandleSignalsState new_producer_state = 381 ProducerGetHandleSignalsStateImplNoLock(); 382 if (!new_producer_state.equals(old_producer_state)) 383 AwakeProducerWaitersForStateChangeNoLock(new_producer_state); 384 return rv; 385} 386 387HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() { 388 base::AutoLock locker(lock_); 389 DCHECK(has_local_consumer_no_lock()); 390 return ConsumerGetHandleSignalsStateImplNoLock(); 391} 392 393MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter, 394 MojoHandleSignals signals, 395 uint32_t context, 396 HandleSignalsState* signals_state) { 397 base::AutoLock locker(lock_); 398 DCHECK(has_local_consumer_no_lock()); 399 400 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock(); 401 if (consumer_state.satisfies(signals)) { 402 if (signals_state) 403 *signals_state = consumer_state; 404 return MOJO_RESULT_ALREADY_EXISTS; 405 } 406 if (!consumer_state.can_satisfy(signals)) { 407 if (signals_state) 408 *signals_state = consumer_state; 409 return MOJO_RESULT_FAILED_PRECONDITION; 410 } 411 412 consumer_waiter_list_->AddWaiter(waiter, signals, context); 413 return MOJO_RESULT_OK; 414} 415 416void DataPipe::ConsumerRemoveWaiter(Waiter* waiter, 417 HandleSignalsState* signals_state) { 418 base::AutoLock locker(lock_); 419 DCHECK(has_local_consumer_no_lock()); 420 consumer_waiter_list_->RemoveWaiter(waiter); 421 if (signals_state) 422 *signals_state = ConsumerGetHandleSignalsStateImplNoLock(); 423} 424 425bool DataPipe::ConsumerIsBusy() const { 426 base::AutoLock locker(lock_); 427 return consumer_in_two_phase_read_no_lock(); 428} 429 430DataPipe::DataPipe(bool has_local_producer, 431 bool has_local_consumer, 432 const MojoCreateDataPipeOptions& validated_options) 433 : may_discard_((validated_options.flags & 434 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)), 435 element_num_bytes_(validated_options.element_num_bytes), 436 capacity_num_bytes_(validated_options.capacity_num_bytes), 437 producer_open_(true), 438 consumer_open_(true), 439 producer_waiter_list_(has_local_producer ? new WaiterList() : nullptr), 440 consumer_waiter_list_(has_local_consumer ? new WaiterList() : nullptr), 441 producer_two_phase_max_num_bytes_written_(0), 442 consumer_two_phase_max_num_bytes_read_(0) { 443 // Check that the passed in options actually are validated. 444 MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0}; 445 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused), 446 MOJO_RESULT_OK); 447} 448 449DataPipe::~DataPipe() { 450 DCHECK(!producer_open_); 451 DCHECK(!consumer_open_); 452 DCHECK(!producer_waiter_list_); 453 DCHECK(!consumer_waiter_list_); 454} 455 456void DataPipe::AwakeProducerWaitersForStateChangeNoLock( 457 const HandleSignalsState& new_producer_state) { 458 lock_.AssertAcquired(); 459 if (!has_local_producer_no_lock()) 460 return; 461 producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state); 462} 463 464void DataPipe::AwakeConsumerWaitersForStateChangeNoLock( 465 const HandleSignalsState& new_consumer_state) { 466 lock_.AssertAcquired(); 467 if (!has_local_consumer_no_lock()) 468 return; 469 consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state); 470} 471 472} // namespace system 473} // namespace mojo 474