16e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko// Copyright 2015 The Chromium OS Authors. All rights reserved. 26e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko// Use of this source code is governed by a BSD-style license that can be 36e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko// found in the LICENSE file. 46e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 59ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko#include <brillo/streams/input_stream_set.h> 66e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 76e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko#include <base/bind.h> 89ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko#include <brillo/message_loops/message_loop.h> 99ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko#include <brillo/streams/stream_errors.h> 109ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko#include <brillo/streams/stream_utils.h> 116e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 129ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenkonamespace brillo { 136e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 146e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex VakulenkoInputStreamSet::InputStreamSet( 156e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko std::vector<Stream*> source_streams, 166e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko std::vector<StreamPtr> owned_source_streams, 176e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko uint64_t initial_stream_size) 186e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko : source_streams_{std::move(source_streams)}, 196e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko owned_source_streams_{std::move(owned_source_streams)}, 206e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko initial_stream_size_{initial_stream_size} {} 216e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 226e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex VakulenkoStreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, 236e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko std::vector<StreamPtr> owned_source_streams, 246e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 256e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko StreamPtr stream; 266e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 276e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (source_streams.empty()) { 286e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko Error::AddTo(error, FROM_HERE, errors::stream::kDomain, 299ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko errors::stream::kInvalidParameter, 309ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko "Source stream list is empty"); 316e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream; 326e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 336e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 346e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko // Make sure we have only readable streams. 356e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (Stream* src_stream : source_streams) { 366e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!src_stream->CanRead()) { 376e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko Error::AddTo(error, FROM_HERE, errors::stream::kDomain, 386e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko errors::stream::kInvalidParameter, 396e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko "The stream list must contain only readable streams"); 406e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream; 416e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 426e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 436e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 446e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko // We are using remaining size here because the multiplexed stream is not 456e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko // seekable and the bytes already read are essentially "lost" as far as this 466e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko // stream is concerned. 476e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko uint64_t initial_stream_size = 0; 486e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (const Stream* stream : source_streams) 496e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko initial_stream_size += stream->GetRemainingSize(); 506e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 516e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko stream.reset(new InputStreamSet{std::move(source_streams), 526e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko std::move(owned_source_streams), 536e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko initial_stream_size}); 546e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream; 556e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 566e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 576e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex VakulenkoStreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams, 586e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 596e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return Create(std::move(source_streams), {}, error); 606e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 616e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 626e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex VakulenkoStreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams, 636e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 646e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko std::vector<Stream*> source_streams; 656e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko source_streams.reserve(owned_source_streams.size()); 666e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (const StreamPtr& stream : owned_source_streams) 676e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko source_streams.push_back(stream.get()); 686e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return Create(std::move(source_streams), std::move(owned_source_streams), 696e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko error); 706e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 716e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 726e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::IsOpen() const { 736e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return !closed_; 746e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 756e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 766e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::CanGetSize() const { 776e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko bool can_get_size = IsOpen(); 786e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (const Stream* stream : source_streams_) { 796e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!stream->CanGetSize()) { 806e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko can_get_size = false; 816e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko break; 826e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 836e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 846e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return can_get_size; 856e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 866e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 876e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkouint64_t InputStreamSet::GetSize() const { 886e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return initial_stream_size_; 896e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 906e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 912fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wileybool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) { 926e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 936e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 946e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 956e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkouint64_t InputStreamSet::GetRemainingSize() const { 966e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko uint64_t size = 0; 976e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (const Stream* stream : source_streams_) 986e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko size += stream->GetRemainingSize(); 996e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return size; 1006e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1016e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1022fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wileybool InputStreamSet::Seek(int64_t /* offset */, 1032fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wiley Whence /* whence */, 1042fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wiley uint64_t* /* new_position */, 1056e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 1066e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 1076e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1086e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1096e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::ReadNonBlocking(void* buffer, 1106e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko size_t size_to_read, 1116e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko size_t* size_read, 1126e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko bool* end_of_stream, 1136e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 1146e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!IsOpen()) 1156e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorStreamClosed(FROM_HERE, error); 1166e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1176e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko while (!source_streams_.empty()) { 1186e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko Stream* stream = source_streams_.front(); 1196e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko bool eos = false; 1206e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error)) 1216e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return false; 1226e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1236e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (*size_read > 0 || !eos) { 1246e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (end_of_stream) 1256e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko *end_of_stream = false; 1266e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return true; 1276e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 1286e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1296e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko source_streams_.erase(source_streams_.begin()); 1306e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 1316e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko *size_read = 0; 1326e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (end_of_stream) 1336e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko *end_of_stream = true; 1346e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return true; 1356e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1366e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1372fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wileybool InputStreamSet::WriteNonBlocking(const void* /* buffer */, 1382fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wiley size_t /* size_to_write */, 1392fd46ba1458275cd16b0949675bff70cc8abcdadChristopher Wiley size_t* /* size_written */, 1406e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 1416e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 1426e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1436e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1446e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::CloseBlocking(ErrorPtr* error) { 1456e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko bool success = true; 1466e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko // We want to close only the owned streams. 1476e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko for (StreamPtr& stream_ptr : owned_source_streams_) { 1486e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!stream_ptr->CloseBlocking(error)) 1496e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko success = false; // Keep going for other streams... 1506e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 1516e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko owned_source_streams_.clear(); 1526e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko source_streams_.clear(); 1536e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko initial_stream_size_ = 0; 1546e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko closed_ = true; 1556e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return success; 1566e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1576e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1586e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::WaitForData( 1596e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko AccessMode mode, 1606e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko const base::Callback<void(AccessMode)>& callback, 1616e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 1626e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!IsOpen()) 1636e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorStreamClosed(FROM_HERE, error); 1646e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 16538bb5d630348b1443c8341b939b3f2586273516dAlex Vakulenko if (stream_utils::IsWriteAccessMode(mode)) 1666e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 1676e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1686e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!source_streams_.empty()) { 1696e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko Stream* stream = source_streams_.front(); 1706e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream->WaitForData(mode, callback, error); 1716e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 1726e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 173000726053bddc605a6c78b5dece37bcb2c67f291Alex Deymo MessageLoop::current()->PostTask(FROM_HERE, base::Bind(callback, mode)); 1746e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return true; 1756e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1766e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1776e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenkobool InputStreamSet::WaitForDataBlocking(AccessMode in_mode, 1781b79239785bf964fd5f1a607a6ed9c9bbb57a4b1Alex Vakulenko base::TimeDelta timeout, 1796e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko AccessMode* out_mode, 1806e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko ErrorPtr* error) { 1816e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!IsOpen()) 1826e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorStreamClosed(FROM_HERE, error); 1836e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 18438bb5d630348b1443c8341b939b3f2586273516dAlex Vakulenko if (stream_utils::IsWriteAccessMode(in_mode)) 1856e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return stream_utils::ErrorOperationNotSupported(FROM_HERE, error); 1866e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1876e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (!source_streams_.empty()) { 1886e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko Stream* stream = source_streams_.front(); 1891b79239785bf964fd5f1a607a6ed9c9bbb57a4b1Alex Vakulenko return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error); 1906e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko } 1916e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1926e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko if (out_mode) 1936e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko *out_mode = in_mode; 1946e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko return true; 1956e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko} 1966e6ff673d18612fbd59b2284a15c871d7f92c7d1Alex Vakulenko 1978cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenkovoid InputStreamSet::CancelPendingAsyncOperations() { 1988cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko if (IsOpen() && !source_streams_.empty()) { 1998cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko Stream* stream = source_streams_.front(); 2008cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko stream->CancelPendingAsyncOperations(); 2018cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko } 2029988ee08f2bb57095b1c2adecce764b1bd612a93Alex Vakulenko Stream::CancelPendingAsyncOperations(); 2038cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko} 2048cb41343cddbab496e7ca90bca4dc95f07af64bdAlex Vakulenko 2059ed0cab99f18acb3570a35e9408f24355f6b8324Alex Vakulenko} // namespace brillo 206