1// Copyright 2015 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <brillo/streams/fake_stream.h>
6
7#include <algorithm>
8
9#include <base/bind.h>
10#include <brillo/message_loops/message_loop.h>
11#include <brillo/streams/stream_utils.h>
12
13namespace brillo {
14
15namespace {
16
17// Gets a delta between the two times, makes sure that the delta is positive.
18base::TimeDelta CalculateDelay(const base::Time& now,
19                               const base::Time& delay_until) {
20  const base::TimeDelta zero_delay;
21  if (delay_until.is_null() || now >= delay_until) {
22    return zero_delay;
23  }
24
25  base::TimeDelta delay = delay_until - now;
26  if (delay < zero_delay)
27    delay = zero_delay;
28  return delay;
29}
30
31// Given the current clock time, and expected delays for read and write
32// operations calculates the smaller wait delay of the two and sets the
33// resulting operation to |*mode| and the delay to wait for into |*delay|.
34void GetMinDelayAndMode(const base::Time& now,
35                        bool read, const base::Time& delay_read_until,
36                        bool write, const base::Time& delay_write_until,
37                        Stream::AccessMode* mode, base::TimeDelta* delay) {
38  base::TimeDelta read_delay = base::TimeDelta::Max();
39  base::TimeDelta write_delay = base::TimeDelta::Max();
40
41  if (read)
42    read_delay = CalculateDelay(now, delay_read_until);
43  if (write)
44    write_delay = CalculateDelay(now, delay_write_until);
45
46  if (read_delay > write_delay) {
47    read = false;
48  } else if (read_delay < write_delay) {
49    write = false;
50  }
51  *mode = stream_utils::MakeAccessMode(read, write);
52  *delay = std::min(read_delay, write_delay);
53}
54
55}  // anonymous namespace
56
57FakeStream::FakeStream(Stream::AccessMode mode,
58                       base::Clock* clock)
59    : mode_{mode}, clock_{clock} {}
60
61void FakeStream::AddReadPacketData(base::TimeDelta delay,
62                                   const void* data,
63                                   size_t size) {
64  auto* byte_ptr = static_cast<const uint8_t*>(data);
65  AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
66}
67
68void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
69  InputDataPacket packet;
70  packet.data = std::move(data);
71  packet.delay_before = delay;
72  incoming_queue_.push(std::move(packet));
73}
74
75void FakeStream::AddReadPacketString(base::TimeDelta delay,
76                                     const std::string& data) {
77  AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
78}
79
80void FakeStream::QueueReadError(base::TimeDelta delay) {
81  QueueReadErrorWithMessage(delay, std::string{});
82}
83
84void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
85                                           const std::string& message) {
86  InputDataPacket packet;
87  packet.data.assign(message.begin(), message.end());
88  packet.delay_before = delay;
89  packet.read_error = true;
90  incoming_queue_.push(std::move(packet));
91}
92
93void FakeStream::ClearReadQueue() {
94  std::queue<InputDataPacket>().swap(incoming_queue_);
95  delay_input_until_ = base::Time{};
96  input_buffer_.clear();
97  input_ptr_ = 0;
98  report_read_error_ = 0;
99}
100
101void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
102                                       size_t data_size) {
103  OutputDataPacket packet;
104  packet.expected_size = data_size;
105  packet.delay_before = delay;
106  outgoing_queue_.push(std::move(packet));
107}
108
109void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
110                                       const void* data,
111                                       size_t size) {
112  auto* byte_ptr = static_cast<const uint8_t*>(data);
113  ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
114}
115
116void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
117                                       brillo::Blob data) {
118  OutputDataPacket packet;
119  packet.expected_size = data.size();
120  packet.data = std::move(data);
121  packet.delay_before = delay;
122  outgoing_queue_.push(std::move(packet));
123}
124
125void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
126                                         const std::string& data) {
127  ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
128}
129
130void FakeStream::QueueWriteError(base::TimeDelta delay) {
131  QueueWriteErrorWithMessage(delay, std::string{});
132}
133
134void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
135                                            const std::string& message) {
136  OutputDataPacket packet;
137  packet.expected_size = 0;
138  packet.data.assign(message.begin(), message.end());
139  packet.delay_before = delay;
140  packet.write_error = true;
141  outgoing_queue_.push(std::move(packet));
142}
143
144void FakeStream::ClearWriteQueue() {
145  std::queue<OutputDataPacket>().swap(outgoing_queue_);
146  delay_output_until_ = base::Time{};
147  output_buffer_.clear();
148  expected_output_data_.clear();
149  max_output_buffer_size_ = 0;
150  all_output_data_.clear();
151  report_write_error_ = 0;
152}
153
154const brillo::Blob& FakeStream::GetFlushedOutputData() const {
155  return all_output_data_;
156}
157
158std::string FakeStream::GetFlushedOutputDataAsString() const {
159  return std::string{all_output_data_.begin(), all_output_data_.end()};
160}
161
162bool FakeStream::CanRead() const {
163  return stream_utils::IsReadAccessMode(mode_);
164}
165
166bool FakeStream::CanWrite() const {
167  return stream_utils::IsWriteAccessMode(mode_);
168}
169
170bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
171  return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
172}
173
174bool FakeStream::Seek(int64_t /* offset */,
175                      Whence /* whence */,
176                      uint64_t* /* new_position */,
177                      ErrorPtr* error) {
178  return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
179}
180
181bool FakeStream::IsReadBufferEmpty() const {
182  return input_ptr_ >= input_buffer_.size();
183}
184
185bool FakeStream::PopReadPacket() {
186  if (incoming_queue_.empty())
187    return false;
188  const InputDataPacket& packet = incoming_queue_.front();
189  input_ptr_ = 0;
190  input_buffer_ = std::move(packet.data);
191  delay_input_until_ = clock_->Now() + packet.delay_before;
192  incoming_queue_.pop();
193  report_read_error_ = packet.read_error;
194  return true;
195}
196
197bool FakeStream::ReadNonBlocking(void* buffer,
198                                 size_t size_to_read,
199                                 size_t* size_read,
200                                 bool* end_of_stream,
201                                 ErrorPtr* error) {
202  if (!CanRead())
203    return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
204
205  if (!IsOpen())
206    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
207
208  for (;;) {
209    if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
210      *size_read = 0;
211      if (end_of_stream)
212        *end_of_stream = false;
213      break;
214    }
215
216    if (report_read_error_) {
217      report_read_error_ = false;
218      std::string message{input_buffer_.begin(), input_buffer_.end()};
219      if (message.empty())
220        message = "Simulating read error for tests";
221      input_buffer_.clear();
222      Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
223      return false;
224    }
225
226    if (!IsReadBufferEmpty()) {
227      size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
228      std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
229      input_ptr_ += size_to_read;
230      *size_read = size_to_read;
231      if (end_of_stream)
232        *end_of_stream = false;
233      break;
234    }
235
236    if (!PopReadPacket()) {
237      *size_read = 0;
238      if (end_of_stream)
239        *end_of_stream = true;
240      break;
241    }
242  }
243  return true;
244}
245
246bool FakeStream::IsWriteBufferFull() const {
247  return output_buffer_.size() >= max_output_buffer_size_;
248}
249
250bool FakeStream::PopWritePacket() {
251  if (outgoing_queue_.empty())
252    return false;
253  const OutputDataPacket& packet = outgoing_queue_.front();
254  expected_output_data_ = std::move(packet.data);
255  delay_output_until_ = clock_->Now() + packet.delay_before;
256  max_output_buffer_size_ = packet.expected_size;
257  report_write_error_ = packet.write_error;
258  outgoing_queue_.pop();
259  return true;
260}
261
262bool FakeStream::WriteNonBlocking(const void* buffer,
263                                  size_t size_to_write,
264                                  size_t* size_written,
265                                  ErrorPtr* error) {
266  if (!CanWrite())
267    return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
268
269  if (!IsOpen())
270    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
271
272  for (;;) {
273    if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
274      *size_written = 0;
275      return true;
276    }
277
278    if (report_write_error_) {
279      report_write_error_ = false;
280      std::string message{expected_output_data_.begin(),
281                          expected_output_data_.end()};
282      if (message.empty())
283        message = "Simulating write error for tests";
284      output_buffer_.clear();
285      max_output_buffer_size_ = 0;
286      expected_output_data_.clear();
287      Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
288      return false;
289    }
290
291    if (!IsWriteBufferFull()) {
292      bool success = true;
293      size_to_write = std::min(size_to_write,
294                               max_output_buffer_size_ - output_buffer_.size());
295      auto byte_ptr = static_cast<const uint8_t*>(buffer);
296      output_buffer_.insert(output_buffer_.end(),
297                            byte_ptr, byte_ptr + size_to_write);
298      if (output_buffer_.size()  == max_output_buffer_size_) {
299        if (!expected_output_data_.empty() &&
300            expected_output_data_ != output_buffer_) {
301          // We expected different data to be written, report an error.
302          Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
303                       "Unexpected data written");
304          success = false;
305        }
306
307        all_output_data_.insert(all_output_data_.end(),
308                                output_buffer_.begin(), output_buffer_.end());
309
310        output_buffer_.clear();
311        max_output_buffer_size_ = 0;
312        expected_output_data_.clear();
313      }
314      *size_written = size_to_write;
315      return success;
316    }
317
318    if (!PopWritePacket()) {
319      // No more data expected.
320      Error::AddTo(error, FROM_HERE, "fake_stream", "full",
321                   "No more output data expected");
322      return false;
323    }
324  }
325}
326
327bool FakeStream::FlushBlocking(ErrorPtr* error) {
328  if (!CanWrite())
329    return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
330
331  if (!IsOpen())
332    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
333
334  bool success = true;
335  if (!output_buffer_.empty()) {
336    if (!expected_output_data_.empty() &&
337        expected_output_data_ != output_buffer_) {
338      // We expected different data to be written, report an error.
339      Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
340                   "Unexpected data written");
341      success = false;
342    }
343    all_output_data_.insert(all_output_data_.end(),
344                            output_buffer_.begin(), output_buffer_.end());
345
346    output_buffer_.clear();
347    max_output_buffer_size_ = 0;
348    expected_output_data_.clear();
349  }
350  return success;
351}
352
353bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
354  is_open_ = false;
355  return true;
356}
357
358bool FakeStream::WaitForData(AccessMode mode,
359                             const base::Callback<void(AccessMode)>& callback,
360                             ErrorPtr* error) {
361  bool read_requested = stream_utils::IsReadAccessMode(mode);
362  bool write_requested = stream_utils::IsWriteAccessMode(mode);
363
364  if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
365    return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
366
367  if (read_requested && IsReadBufferEmpty())
368    PopReadPacket();
369  if (write_requested && IsWriteBufferFull())
370    PopWritePacket();
371
372  base::TimeDelta delay;
373  GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
374                     write_requested, delay_output_until_, &mode, &delay);
375  MessageLoop::current()->PostDelayedTask(
376      FROM_HERE, base::Bind(callback, mode), delay);
377  return true;
378}
379
380bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
381                                     base::TimeDelta timeout,
382                                     AccessMode* out_mode,
383                                     ErrorPtr* error) {
384  const base::TimeDelta zero_delay;
385  bool read_requested = stream_utils::IsReadAccessMode(in_mode);
386  bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
387
388  if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
389    return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
390
391  base::TimeDelta delay;
392  GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
393                     write_requested, delay_output_until_, out_mode, &delay);
394
395  if (timeout < delay)
396    return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
397
398  LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
399            << " ms.";
400
401  return true;
402}
403
404}  // namespace brillo
405