file_stream_posix.cc revision ddb351dbec246cf1fab5ec20d2d5520909041de1
1// Copyright (c) 2011 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// For 64-bit file access (off_t = off64_t, lseek64, etc).
6#define _FILE_OFFSET_BITS 64
7
8#include "net/base/file_stream.h"
9
10#include <sys/types.h>
11#include <sys/stat.h>
12#include <fcntl.h>
13#include <unistd.h>
14#include <errno.h>
15
16#include "base/basictypes.h"
17#include "base/callback.h"
18#include "base/eintr_wrapper.h"
19#include "base/file_path.h"
20#include "base/logging.h"
21#include "base/message_loop.h"
22#include "base/metrics/histogram.h"
23#include "base/string_util.h"
24#include "base/task.h"
25#include "base/threading/thread_restrictions.h"
26#include "base/threading/worker_pool.h"
27#include "base/synchronization/waitable_event.h"
28#include "net/base/net_errors.h"
29
30namespace net {
31
32// We cast back and forth, so make sure it's the size we're expecting.
33#if defined(__BIONIC__) && defined(ANDROID)
34COMPILE_ASSERT(sizeof(int32) == sizeof(off_t), off_t_32_bit);
35#else
36COMPILE_ASSERT(sizeof(int64) == sizeof(off_t), off_t_64_bit);
37#endif
38
39// Make sure our Whence mappings match the system headers.
40COMPILE_ASSERT(FROM_BEGIN   == SEEK_SET &&
41               FROM_CURRENT == SEEK_CUR &&
42               FROM_END     == SEEK_END, whence_matches_system);
43
44namespace {
45
46// Map from errno to net error codes.
47int64 MapErrorCode(int err) {
48  switch (err) {
49    case ENOENT:
50      return ERR_FILE_NOT_FOUND;
51    case EACCES:
52      return ERR_ACCESS_DENIED;
53    default:
54      LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED";
55      return ERR_FAILED;
56  }
57}
58
59// ReadFile() is a simple wrapper around read() that handles EINTR signals and
60// calls MapErrorCode() to map errno to net error codes.
61int ReadFile(base::PlatformFile file, char* buf, int buf_len) {
62  base::ThreadRestrictions::AssertIOAllowed();
63  // read(..., 0) returns 0 to indicate end-of-file.
64
65  // Loop in the case of getting interrupted by a signal.
66  ssize_t res = HANDLE_EINTR(read(file, buf, static_cast<size_t>(buf_len)));
67  if (res == static_cast<ssize_t>(-1))
68    return MapErrorCode(errno);
69  return static_cast<int>(res);
70}
71
72void ReadFileTask(base::PlatformFile file,
73                  char* buf,
74                  int buf_len,
75                  CompletionCallback* callback) {
76  callback->Run(ReadFile(file, buf, buf_len));
77}
78
79// WriteFile() is a simple wrapper around write() that handles EINTR signals and
80// calls MapErrorCode() to map errno to net error codes.  It tries to write to
81// completion.
82int WriteFile(base::PlatformFile file, const char* buf, int buf_len) {
83  base::ThreadRestrictions::AssertIOAllowed();
84  ssize_t res = HANDLE_EINTR(write(file, buf, buf_len));
85  if (res == -1)
86    return MapErrorCode(errno);
87  return res;
88}
89
90void WriteFileTask(base::PlatformFile file,
91                   const char* buf,
92                   int buf_len,
93                   CompletionCallback* callback) {
94  callback->Run(WriteFile(file, buf, buf_len));
95}
96
97// FlushFile() is a simple wrapper around fsync() that handles EINTR signals and
98// calls MapErrorCode() to map errno to net error codes.  It tries to flush to
99// completion.
100int FlushFile(base::PlatformFile file) {
101  base::ThreadRestrictions::AssertIOAllowed();
102  ssize_t res = HANDLE_EINTR(fsync(file));
103  if (res == -1)
104    return MapErrorCode(errno);
105  return res;
106}
107
108}  // namespace
109
110// CancelableCallbackTask takes ownership of the Callback.  This task gets
111// posted to the MessageLoopForIO instance.
112class CancelableCallbackTask : public CancelableTask {
113 public:
114  explicit CancelableCallbackTask(Callback0::Type* callback)
115      : canceled_(false), callback_(callback) {}
116
117  virtual void Run() {
118    if (!canceled_)
119      callback_->Run();
120  }
121
122  virtual void Cancel() {
123    canceled_ = true;
124  }
125
126 private:
127  bool canceled_;
128  scoped_ptr<Callback0::Type> callback_;
129};
130
131// FileStream::AsyncContext ----------------------------------------------
132
133class FileStream::AsyncContext {
134 public:
135  AsyncContext();
136  ~AsyncContext();
137
138  // These methods post synchronous read() and write() calls to a WorkerThread.
139  void InitiateAsyncRead(
140      base::PlatformFile file, char* buf, int buf_len,
141      CompletionCallback* callback);
142  void InitiateAsyncWrite(
143      base::PlatformFile file, const char* buf, int buf_len,
144      CompletionCallback* callback);
145
146  CompletionCallback* callback() const { return callback_; }
147
148  // Called by the WorkerPool thread executing the IO after the IO completes.
149  // This method queues RunAsynchronousCallback() on the MessageLoop and signals
150  // |background_io_completed_callback_|, in case the destructor is waiting.  In
151  // that case, the destructor will call RunAsynchronousCallback() instead, and
152  // cancel |message_loop_task_|.
153  // |result| is the result of the Read/Write task.
154  void OnBackgroundIOCompleted(int result);
155
156 private:
157  // Always called on the IO thread, either directly by a task on the
158  // MessageLoop or by ~AsyncContext().
159  void RunAsynchronousCallback();
160
161  // The MessageLoopForIO that this AsyncContext is running on.
162  MessageLoopForIO* const message_loop_;
163  CompletionCallback* callback_;  // The user provided callback.
164
165  // A callback wrapper around OnBackgroundIOCompleted().  Run by the WorkerPool
166  // thread doing the background IO on our behalf.
167  CompletionCallbackImpl<AsyncContext> background_io_completed_callback_;
168
169  // This is used to synchronize between the AsyncContext destructor (which runs
170  // on the IO thread and OnBackgroundIOCompleted() which runs on the WorkerPool
171  // thread.
172  base::WaitableEvent background_io_completed_;
173
174  // These variables are only valid when background_io_completed is signaled.
175  int result_;
176  CancelableCallbackTask* message_loop_task_;
177
178  bool is_closing_;
179
180  DISALLOW_COPY_AND_ASSIGN(AsyncContext);
181};
182
183FileStream::AsyncContext::AsyncContext()
184    : message_loop_(MessageLoopForIO::current()),
185      callback_(NULL),
186      background_io_completed_callback_(
187          this, &AsyncContext::OnBackgroundIOCompleted),
188      background_io_completed_(true, false),
189      message_loop_task_(NULL),
190      is_closing_(false) {}
191
192FileStream::AsyncContext::~AsyncContext() {
193  is_closing_ = true;
194  if (callback_) {
195    // If |callback_| is non-NULL, that implies either the worker thread is
196    // still running the IO task, or the completion callback is queued up on the
197    // MessageLoopForIO, but AsyncContext() got deleted before then.
198    const bool need_to_wait = !background_io_completed_.IsSignaled();
199    base::TimeTicks start = base::TimeTicks::Now();
200    RunAsynchronousCallback();
201    if (need_to_wait) {
202      // We want to see if we block the message loop for too long.
203      UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose",
204                          base::TimeTicks::Now() - start);
205    }
206  }
207}
208
209void FileStream::AsyncContext::InitiateAsyncRead(
210    base::PlatformFile file, char* buf, int buf_len,
211    CompletionCallback* callback) {
212  DCHECK(!callback_);
213  callback_ = callback;
214
215  base::WorkerPool::PostTask(FROM_HERE,
216                             NewRunnableFunction(
217                                 &ReadFileTask,
218                                 file, buf, buf_len,
219                                 &background_io_completed_callback_),
220                             true /* task_is_slow */);
221}
222
223void FileStream::AsyncContext::InitiateAsyncWrite(
224    base::PlatformFile file, const char* buf, int buf_len,
225    CompletionCallback* callback) {
226  DCHECK(!callback_);
227  callback_ = callback;
228
229  base::WorkerPool::PostTask(FROM_HERE,
230                             NewRunnableFunction(
231                                 &WriteFileTask,
232                                 file, buf, buf_len,
233                                 &background_io_completed_callback_),
234                             true /* task_is_slow */);
235}
236
237void FileStream::AsyncContext::OnBackgroundIOCompleted(int result) {
238  result_ = result;
239  message_loop_task_ = new CancelableCallbackTask(
240      NewCallback(this, &AsyncContext::RunAsynchronousCallback));
241  message_loop_->PostTask(FROM_HERE, message_loop_task_);
242  background_io_completed_.Signal();
243}
244
245void FileStream::AsyncContext::RunAsynchronousCallback() {
246  // Wait() here ensures that all modifications from the WorkerPool thread are
247  // now visible.
248  background_io_completed_.Wait();
249
250  // Either we're in the MessageLoop's task, in which case Cancel() doesn't do
251  // anything, or we're in ~AsyncContext(), in which case this prevents the call
252  // from happening again.  Must do it here after calling Wait().
253  message_loop_task_->Cancel();
254  message_loop_task_ = NULL;
255
256  if (is_closing_) {
257    callback_ = NULL;
258    return;
259  }
260
261  DCHECK(callback_);
262  CompletionCallback* temp = NULL;
263  std::swap(temp, callback_);
264  background_io_completed_.Reset();
265  temp->Run(result_);
266}
267
268// FileStream ------------------------------------------------------------
269
270FileStream::FileStream()
271    : file_(base::kInvalidPlatformFileValue),
272      open_flags_(0),
273      auto_closed_(true) {
274  DCHECK(!IsOpen());
275}
276
277FileStream::FileStream(base::PlatformFile file, int flags)
278    : file_(file),
279      open_flags_(flags),
280      auto_closed_(false) {
281  // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to
282  // make sure we will perform asynchronous File IO to it.
283  if (flags & base::PLATFORM_FILE_ASYNC) {
284    async_context_.reset(new AsyncContext());
285  }
286}
287
288FileStream::~FileStream() {
289  if (auto_closed_)
290    Close();
291}
292
293void FileStream::Close() {
294  // Abort any existing asynchronous operations.
295  async_context_.reset();
296
297  if (file_ != base::kInvalidPlatformFileValue) {
298    if (close(file_) != 0) {
299      NOTREACHED();
300    }
301    file_ = base::kInvalidPlatformFileValue;
302  }
303}
304
305int FileStream::Open(const FilePath& path, int open_flags) {
306  if (IsOpen()) {
307    DLOG(FATAL) << "File is already open!";
308    return ERR_UNEXPECTED;
309  }
310
311  open_flags_ = open_flags;
312  file_ = base::CreatePlatformFile(path, open_flags_, NULL, NULL);
313  if (file_ == base::kInvalidPlatformFileValue) {
314    return MapErrorCode(errno);
315  }
316
317  if (open_flags_ & base::PLATFORM_FILE_ASYNC) {
318    async_context_.reset(new AsyncContext());
319  }
320
321  return OK;
322}
323
324bool FileStream::IsOpen() const {
325  return file_ != base::kInvalidPlatformFileValue;
326}
327
328int64 FileStream::Seek(Whence whence, int64 offset) {
329  base::ThreadRestrictions::AssertIOAllowed();
330
331  if (!IsOpen())
332    return ERR_UNEXPECTED;
333
334  // If we're in async, make sure we don't have a request in flight.
335  DCHECK(!async_context_.get() || !async_context_->callback());
336
337  off_t res = lseek(file_, static_cast<off_t>(offset),
338                    static_cast<int>(whence));
339  if (res == static_cast<off_t>(-1))
340    return MapErrorCode(errno);
341
342  return res;
343}
344
345int64 FileStream::Available() {
346  base::ThreadRestrictions::AssertIOAllowed();
347
348  if (!IsOpen())
349    return ERR_UNEXPECTED;
350
351  int64 cur_pos = Seek(FROM_CURRENT, 0);
352  if (cur_pos < 0)
353    return cur_pos;
354
355  struct stat info;
356  if (fstat(file_, &info) != 0)
357    return MapErrorCode(errno);
358
359  int64 size = static_cast<int64>(info.st_size);
360  DCHECK_GT(size, cur_pos);
361
362  return size - cur_pos;
363}
364
365int FileStream::Read(
366    char* buf, int buf_len, CompletionCallback* callback) {
367  if (!IsOpen())
368    return ERR_UNEXPECTED;
369
370  // read(..., 0) will return 0, which indicates end-of-file.
371  DCHECK(buf_len > 0);
372  DCHECK(open_flags_ & base::PLATFORM_FILE_READ);
373
374  if (async_context_.get()) {
375    DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
376    // If we're in async, make sure we don't have a request in flight.
377    DCHECK(!async_context_->callback());
378    async_context_->InitiateAsyncRead(file_, buf, buf_len, callback);
379    return ERR_IO_PENDING;
380  } else {
381    return ReadFile(file_, buf, buf_len);
382  }
383}
384
385int FileStream::ReadUntilComplete(char *buf, int buf_len) {
386  int to_read = buf_len;
387  int bytes_total = 0;
388
389  do {
390    int bytes_read = Read(buf, to_read, NULL);
391    if (bytes_read <= 0) {
392      if (bytes_total == 0)
393        return bytes_read;
394
395      return bytes_total;
396    }
397
398    bytes_total += bytes_read;
399    buf += bytes_read;
400    to_read -= bytes_read;
401  } while (bytes_total < buf_len);
402
403  return bytes_total;
404}
405
406int FileStream::Write(
407    const char* buf, int buf_len, CompletionCallback* callback) {
408  // write(..., 0) will return 0, which indicates end-of-file.
409  DCHECK_GT(buf_len, 0);
410
411  if (!IsOpen())
412    return ERR_UNEXPECTED;
413
414  if (async_context_.get()) {
415    DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
416    // If we're in async, make sure we don't have a request in flight.
417    DCHECK(!async_context_->callback());
418    async_context_->InitiateAsyncWrite(file_, buf, buf_len, callback);
419    return ERR_IO_PENDING;
420  } else {
421    return WriteFile(file_, buf, buf_len);
422  }
423}
424
425int64 FileStream::Truncate(int64 bytes) {
426  base::ThreadRestrictions::AssertIOAllowed();
427
428  if (!IsOpen())
429    return ERR_UNEXPECTED;
430
431  // We better be open for reading.
432  DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
433
434  // Seek to the position to truncate from.
435  int64 seek_position = Seek(FROM_BEGIN, bytes);
436  if (seek_position != bytes)
437    return ERR_UNEXPECTED;
438
439  // And truncate the file.
440  int result = ftruncate(file_, bytes);
441  return result == 0 ? seek_position : MapErrorCode(errno);
442}
443
444int FileStream::Flush() {
445  if (!IsOpen())
446    return ERR_UNEXPECTED;
447
448  return FlushFile(file_);
449}
450
451}  // namespace net
452