file_stream.cc revision 9ed0cab99f18acb3570a35e9408f24355f6b8324
1// Copyright 2015 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include <brillo/streams/file_stream.h>
6
7#include <algorithm>
8#include <fcntl.h>
9#include <sys/stat.h>
10#include <unistd.h>
11
12#include <base/bind.h>
13#include <base/files/file_util.h>
14#include <base/posix/eintr_wrapper.h>
15#include <brillo/errors/error_codes.h>
16#include <brillo/message_loops/message_loop.h>
17#include <brillo/streams/stream_errors.h>
18#include <brillo/streams/stream_utils.h>
19
20namespace brillo {
21
22// FileDescriptor is a helper class that serves two purposes:
23// 1. It wraps low-level system APIs (as FileDescriptorInterface) to allow
24//    mocking calls to them in tests.
25// 2. It provides file descriptor watching services using FileDescriptorWatcher
26//    and MessageLoopForIO::Watcher interface.
27// The real FileStream uses this class to perform actual file I/O on the
28// contained file descriptor.
29class FileDescriptor : public FileStream::FileDescriptorInterface {
30 public:
31  FileDescriptor(int fd, bool own) : fd_{fd}, own_{own} {}
32  ~FileDescriptor() override {
33    if (IsOpen()) {
34      Close();
35    }
36  }
37
38  // Overrides for FileStream::FileDescriptorInterface methods.
39  bool IsOpen() const override { return fd_ >= 0; }
40
41  ssize_t Read(void* buf, size_t nbyte) override {
42    return HANDLE_EINTR(read(fd_, buf, nbyte));
43  }
44
45  ssize_t Write(const void* buf, size_t nbyte) override {
46    return HANDLE_EINTR(write(fd_, buf, nbyte));
47  }
48
49  off64_t Seek(off64_t offset, int whence) override {
50    return lseek64(fd_, offset, whence);
51  }
52
53  mode_t GetFileMode() const override {
54    struct stat file_stat;
55    if (fstat(fd_, &file_stat) < 0)
56      return 0;
57    return file_stat.st_mode;
58  }
59
60  uint64_t GetSize() const override {
61    struct stat file_stat;
62    if (fstat(fd_, &file_stat) < 0)
63      return 0;
64    return file_stat.st_size;
65  }
66
67  int Truncate(off64_t length) const override {
68    return HANDLE_EINTR(ftruncate(fd_, length));
69  }
70
71  int Close() override {
72    int fd = -1;
73    // The stream may or may not own the file descriptor stored in |fd_|.
74    // Despite that, we will need to set |fd_| to -1 when Close() finished.
75    // So, here we set it to -1 first and if we own the old descriptor, close
76    // it before exiting.
77    std::swap(fd, fd_);
78    CancelPendingAsyncOperations();
79    return own_ ? IGNORE_EINTR(close(fd)) : 0;
80  }
81
82  bool WaitForData(Stream::AccessMode mode,
83                   const DataCallback& data_callback,
84                   ErrorPtr* error) override {
85    if (stream_utils::IsReadAccessMode(mode)) {
86      CHECK(read_data_callback_.is_null());
87      MessageLoop::current()->CancelTask(read_watcher_);
88      read_watcher_ = MessageLoop::current()->WatchFileDescriptor(
89          FROM_HERE,
90          fd_,
91          MessageLoop::WatchMode::kWatchRead,
92          false,  // persistent
93          base::Bind(&FileDescriptor::OnFileCanReadWithoutBlocking,
94                     base::Unretained(this)));
95      if (read_watcher_ == MessageLoop::kTaskIdNull) {
96        Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
97                     errors::stream::kInvalidParameter,
98                     "File descriptor doesn't support watching for reading.");
99        return false;
100      }
101      read_data_callback_ = data_callback;
102    }
103    if (stream_utils::IsWriteAccessMode(mode)) {
104      CHECK(write_data_callback_.is_null());
105      MessageLoop::current()->CancelTask(write_watcher_);
106      write_watcher_ = MessageLoop::current()->WatchFileDescriptor(
107          FROM_HERE,
108          fd_,
109          MessageLoop::WatchMode::kWatchWrite,
110          false,  // persistent
111          base::Bind(&FileDescriptor::OnFileCanWriteWithoutBlocking,
112                     base::Unretained(this)));
113      if (write_watcher_ == MessageLoop::kTaskIdNull) {
114        Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
115                     errors::stream::kInvalidParameter,
116                     "File descriptor doesn't support watching for writing.");
117        return false;
118      }
119      write_data_callback_ = data_callback;
120    }
121    return true;
122  }
123
124  int WaitForDataBlocking(Stream::AccessMode in_mode,
125                          base::TimeDelta timeout,
126                          Stream::AccessMode* out_mode) override {
127    fd_set read_fds;
128    fd_set write_fds;
129    fd_set error_fds;
130
131    FD_ZERO(&read_fds);
132    FD_ZERO(&write_fds);
133    FD_ZERO(&error_fds);
134
135    if (stream_utils::IsReadAccessMode(in_mode))
136      FD_SET(fd_, &read_fds);
137
138    if (stream_utils::IsWriteAccessMode(in_mode))
139      FD_SET(fd_, &write_fds);
140
141    FD_SET(fd_, &error_fds);
142    timeval timeout_val = {};
143    if (!timeout.is_max()) {
144      const timespec ts = timeout.ToTimeSpec();
145      TIMESPEC_TO_TIMEVAL(&timeout_val, &ts);
146    }
147    int res = HANDLE_EINTR(select(fd_ + 1, &read_fds, &write_fds, &error_fds,
148                                  timeout.is_max() ? nullptr : &timeout_val));
149    if (res > 0 && out_mode) {
150      *out_mode = stream_utils::MakeAccessMode(FD_ISSET(fd_, &read_fds),
151                                               FD_ISSET(fd_, &write_fds));
152    }
153    return res;
154  }
155
156  void CancelPendingAsyncOperations() override {
157    read_data_callback_.Reset();
158    if (read_watcher_ != MessageLoop::kTaskIdNull) {
159      MessageLoop::current()->CancelTask(read_watcher_);
160      read_watcher_ = MessageLoop::kTaskIdNull;
161    }
162
163    write_data_callback_.Reset();
164    if (write_watcher_ != MessageLoop::kTaskIdNull) {
165      MessageLoop::current()->CancelTask(write_watcher_);
166      write_watcher_ = MessageLoop::kTaskIdNull;
167    }
168  }
169
170  // Called from the brillo::MessageLoop when the file descriptor is available
171  // for reading.
172  void OnFileCanReadWithoutBlocking() {
173    CHECK(!read_data_callback_.is_null());
174    DataCallback cb = read_data_callback_;
175    read_data_callback_.Reset();
176    cb.Run(Stream::AccessMode::READ);
177  }
178
179  void OnFileCanWriteWithoutBlocking() {
180    CHECK(!write_data_callback_.is_null());
181    DataCallback cb = write_data_callback_;
182    write_data_callback_.Reset();
183    cb.Run(Stream::AccessMode::WRITE);
184  }
185
186 private:
187  // The actual file descriptor we are working with. Will contain -1 if the
188  // file stream has been closed.
189  int fd_;
190
191  // |own_| is set to true if the file stream owns the file descriptor |fd_| and
192  // must close it when the stream is closed. This will be false for file
193  // descriptors that shouldn't be closed (e.g. stdin, stdout, stderr).
194  bool own_;
195
196  // Stream callbacks to be called when read and/or write operations can be
197  // performed on the file descriptor without blocking.
198  DataCallback read_data_callback_;
199  DataCallback write_data_callback_;
200
201  // MessageLoop tasks monitoring read/write operations on the file descriptor.
202  MessageLoop::TaskId read_watcher_{MessageLoop::kTaskIdNull};
203  MessageLoop::TaskId write_watcher_{MessageLoop::kTaskIdNull};
204
205  DISALLOW_COPY_AND_ASSIGN(FileDescriptor);
206};
207
208StreamPtr FileStream::Open(const base::FilePath& path,
209                           AccessMode mode,
210                           Disposition disposition,
211                           ErrorPtr* error) {
212  StreamPtr stream;
213  int open_flags = O_CLOEXEC;
214  switch (mode) {
215    case AccessMode::READ:
216      open_flags |= O_RDONLY;
217      break;
218    case AccessMode::WRITE:
219      open_flags |= O_WRONLY;
220      break;
221    case AccessMode::READ_WRITE:
222      open_flags |= O_RDWR;
223      break;
224  }
225
226  switch (disposition) {
227    case Disposition::OPEN_EXISTING:
228      // Nothing else to do.
229      break;
230    case Disposition::CREATE_ALWAYS:
231      open_flags |= O_CREAT | O_TRUNC;
232      break;
233    case Disposition::CREATE_NEW_ONLY:
234      open_flags |= O_CREAT | O_EXCL;
235      break;
236    case Disposition::TRUNCATE_EXISTING:
237      open_flags |= O_TRUNC;
238      break;
239  }
240
241  mode_t creation_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
242  int fd = HANDLE_EINTR(open(path.value().c_str(), open_flags, creation_mode));
243  if (fd < 0) {
244    brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
245    return stream;
246  }
247  if (HANDLE_EINTR(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK)) < 0) {
248    brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
249    IGNORE_EINTR(close(fd));
250    return stream;
251  }
252
253  std::unique_ptr<FileDescriptorInterface> fd_interface{
254      new FileDescriptor{fd, true}};
255
256  stream.reset(new FileStream{std::move(fd_interface), mode});
257  return stream;
258}
259
260StreamPtr FileStream::CreateTemporary(ErrorPtr* error) {
261  StreamPtr stream;
262  base::FilePath path;
263  // The "proper" solution would be here to add O_TMPFILE flag to |open_flags|
264  // below and pass just the temp directory path to open(), so the actual file
265  // name isn't even needed. However this is supported only as of Linux kernel
266  // 3.11 and not all our configurations have that. So, for now just create
267  // a temp file first and then open it.
268  if (!base::CreateTemporaryFile(&path)) {
269    brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
270    return stream;
271  }
272  int open_flags = O_CLOEXEC | O_RDWR | O_CREAT | O_TRUNC;
273  mode_t creation_mode = S_IRUSR | S_IWUSR;
274  int fd = HANDLE_EINTR(open(path.value().c_str(), open_flags, creation_mode));
275  if (fd < 0) {
276    brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
277    return stream;
278  }
279  unlink(path.value().c_str());
280
281  stream = FromFileDescriptor(fd, true, error);
282  if (!stream)
283    IGNORE_EINTR(close(fd));
284  return stream;
285}
286
287StreamPtr FileStream::FromFileDescriptor(int file_descriptor,
288                                         bool own_descriptor,
289                                         ErrorPtr* error) {
290  StreamPtr stream;
291  if (file_descriptor < 0 || file_descriptor >= FD_SETSIZE) {
292    Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
293                 errors::stream::kInvalidParameter,
294                 "Invalid file descriptor value");
295    return stream;
296  }
297
298  int fd_flags = HANDLE_EINTR(fcntl(file_descriptor, F_GETFL));
299  if (fd_flags < 0) {
300    brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
301    return stream;
302  }
303  int file_access_mode = (fd_flags & O_ACCMODE);
304  AccessMode access_mode = AccessMode::READ_WRITE;
305  if (file_access_mode == O_RDONLY)
306    access_mode = AccessMode::READ;
307  else if (file_access_mode == O_WRONLY)
308    access_mode = AccessMode::WRITE;
309
310  // Make sure the file descriptor is set to perform non-blocking operations
311  // if not enabled already.
312  if ((fd_flags & O_NONBLOCK) == 0) {
313    fd_flags |= O_NONBLOCK;
314    if (HANDLE_EINTR(fcntl(file_descriptor, F_SETFL, fd_flags)) < 0) {
315      brillo::errors::system::AddSystemError(error, FROM_HERE, errno);
316      return stream;
317    }
318  }
319
320  std::unique_ptr<FileDescriptorInterface> fd_interface{
321      new FileDescriptor{file_descriptor, own_descriptor}};
322
323  stream.reset(new FileStream{std::move(fd_interface), access_mode});
324  return stream;
325}
326
327FileStream::FileStream(std::unique_ptr<FileDescriptorInterface> fd_interface,
328                       AccessMode mode)
329    : fd_interface_(std::move(fd_interface)),
330      access_mode_(mode) {
331  switch (fd_interface_->GetFileMode() & S_IFMT) {
332    case S_IFCHR:  // Character device
333    case S_IFSOCK:  // Socket
334    case S_IFIFO:  // FIFO/pipe
335      // We know that these devices are not seekable and stream size is unknown.
336      seekable_ = false;
337      can_get_size_ = false;
338      break;
339
340    case S_IFBLK:  // Block device
341    case S_IFDIR:  // Directory
342    case S_IFREG:  // Normal file
343    case S_IFLNK:  // Symbolic link
344    default:
345      // The above devices support seek. Also, if not sure/in doubt, err on the
346      // side of "allowable".
347      seekable_ = true;
348      can_get_size_ = true;
349      break;
350  }
351}
352
353bool FileStream::IsOpen() const {
354  return fd_interface_->IsOpen();
355}
356
357bool FileStream::CanRead() const {
358  return IsOpen() && stream_utils::IsReadAccessMode(access_mode_);
359}
360
361bool FileStream::CanWrite() const {
362  return IsOpen() && stream_utils::IsWriteAccessMode(access_mode_);
363}
364
365bool FileStream::CanSeek() const {
366  return IsOpen() && seekable_;
367}
368
369bool FileStream::CanGetSize() const {
370  return IsOpen() && can_get_size_;
371}
372
373uint64_t FileStream::GetSize() const {
374  return IsOpen() ? fd_interface_->GetSize() : 0;
375}
376
377bool FileStream::SetSizeBlocking(uint64_t size, ErrorPtr* error) {
378  if (!IsOpen())
379    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
380
381  if (!stream_utils::CheckInt64Overflow(FROM_HERE, size, 0, error))
382    return false;
383
384  if (fd_interface_->Truncate(size) >= 0)
385    return true;
386
387  errors::system::AddSystemError(error, FROM_HERE, errno);
388  return false;
389}
390
391uint64_t FileStream::GetRemainingSize() const {
392  if (!CanGetSize())
393    return 0;
394  uint64_t pos = GetPosition();
395  uint64_t size = GetSize();
396  return (pos < size) ? (size - pos) : 0;
397}
398
399uint64_t FileStream::GetPosition() const {
400  if (!CanSeek())
401    return 0;
402
403  off64_t pos = fd_interface_->Seek(0, SEEK_CUR);
404  const off64_t min_pos = 0;
405  return std::max(min_pos, pos);
406}
407
408bool FileStream::Seek(int64_t offset,
409                      Whence whence,
410                      uint64_t* new_position,
411                      ErrorPtr* error) {
412  if (!IsOpen())
413    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
414
415  int raw_whence = 0;
416  switch (whence) {
417    case Whence::FROM_BEGIN:
418      raw_whence = SEEK_SET;
419      break;
420    case Whence::FROM_CURRENT:
421      raw_whence = SEEK_CUR;
422      break;
423    case Whence::FROM_END:
424      raw_whence = SEEK_END;
425      break;
426    default:
427      Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
428                   errors::stream::kInvalidParameter, "Invalid whence");
429      return false;
430  }
431  off64_t pos = fd_interface_->Seek(offset, raw_whence);
432  if (pos < 0) {
433    errors::system::AddSystemError(error, FROM_HERE, errno);
434    return false;
435  }
436
437  if (new_position)
438    *new_position = static_cast<uint64_t>(pos);
439  return true;
440}
441
442bool FileStream::ReadNonBlocking(void* buffer,
443                                 size_t size_to_read,
444                                 size_t* size_read,
445                                 bool* end_of_stream,
446                                 ErrorPtr* error) {
447  if (!IsOpen())
448    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
449
450  ssize_t read = fd_interface_->Read(buffer, size_to_read);
451  if (read < 0) {
452    // If read() fails, check if this is due to no data being currently
453    // available and we do non-blocking I/O.
454    if (errno == EWOULDBLOCK || errno == EAGAIN) {
455      if (end_of_stream)
456        *end_of_stream = false;
457      *size_read = 0;
458      return true;
459    }
460    // Otherwise a real problem occurred.
461    errors::system::AddSystemError(error, FROM_HERE, errno);
462    return false;
463  }
464  if (end_of_stream)
465    *end_of_stream = (read == 0 && size_to_read != 0);
466  *size_read = read;
467  return true;
468}
469
470bool FileStream::WriteNonBlocking(const void* buffer,
471                                  size_t size_to_write,
472                                  size_t* size_written,
473                                  ErrorPtr* error) {
474  if (!IsOpen())
475    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
476
477  ssize_t written = fd_interface_->Write(buffer, size_to_write);
478  if (written < 0) {
479    // If write() fails, check if this is due to the fact that no data
480    // can be presently written and we do non-blocking I/O.
481    if (errno == EWOULDBLOCK || errno == EAGAIN) {
482      *size_written = 0;
483      return true;
484    }
485    // Otherwise a real problem occurred.
486    errors::system::AddSystemError(error, FROM_HERE, errno);
487    return false;
488  }
489  *size_written = written;
490  return true;
491}
492
493bool FileStream::FlushBlocking(ErrorPtr* error) {
494  if (!IsOpen())
495    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
496
497  // File descriptors don't have an internal buffer to flush.
498  return true;
499}
500
501bool FileStream::CloseBlocking(ErrorPtr* error) {
502  if (!IsOpen())
503    return true;
504
505  if (fd_interface_->Close() < 0) {
506    errors::system::AddSystemError(error, FROM_HERE, errno);
507    return false;
508  }
509
510  return true;
511}
512
513bool FileStream::WaitForData(
514    AccessMode mode,
515    const base::Callback<void(AccessMode)>& callback,
516    ErrorPtr* error) {
517  if (!IsOpen())
518    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
519
520  return fd_interface_->WaitForData(mode, callback, error);
521}
522
523bool FileStream::WaitForDataBlocking(AccessMode in_mode,
524                                     base::TimeDelta timeout,
525                                     AccessMode* out_mode,
526                                     ErrorPtr* error) {
527  if (!IsOpen())
528    return stream_utils::ErrorStreamClosed(FROM_HERE, error);
529
530  int ret = fd_interface_->WaitForDataBlocking(in_mode, timeout, out_mode);
531  if (ret < 0) {
532    errors::system::AddSystemError(error, FROM_HERE, errno);
533    return false;
534  }
535  if (ret == 0)
536    return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
537
538  return true;
539}
540
541void FileStream::CancelPendingAsyncOperations() {
542  if (IsOpen()) {
543    fd_interface_->CancelPendingAsyncOperations();
544  }
545  Stream::CancelPendingAsyncOperations();
546}
547
548}  // namespace brillo
549