file_posix.cc revision 731df977c0511bca2206b5f333555b1205ff1f43
1// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/disk_cache/file.h"
6
7#include <fcntl.h>
8
9#include <set>
10
11#include "base/logging.h"
12#include "base/message_loop.h"
13#include "base/singleton.h"
14#include "base/waitable_event.h"
15#include "base/worker_pool.h"
16#include "net/disk_cache/disk_cache.h"
17
18namespace {
19
20class InFlightIO;
21
22// This class represents a single asynchronous IO operation while it is being
23// bounced between threads.
24class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> {
25 public:
26  // Other than the actual parameters for the IO operation (including the
27  // |callback| that must be notified at the end), we need the controller that
28  // is keeping track of all operations. When done, we notify the controller
29  // (we do NOT invoke the callback), in the worker thead that completed the
30  // operation.
31  BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len,
32               size_t offset, disk_cache::FileIOCallback* callback,
33               InFlightIO* controller)
34      : io_completed_(true, false), callback_(callback), file_(file), buf_(buf),
35        buf_len_(buf_len), offset_(offset), controller_(controller),
36        bytes_(0) {}
37
38  // Read and Write are the operations that can be performed asynchronously.
39  // The actual parameters for the operation are setup in the constructor of
40  // the object. Both methods should be called from a worker thread, by posting
41  // a task to the WorkerPool (they are RunnableMethods). When finished,
42  // controller->OnIOComplete() is called.
43  void Read();
44  void Write();
45
46  // This method signals the controller that this operation is finished, in the
47  // original thread (presumably the IO-Thread). In practice, this is a
48  // RunableMethod that allows cancellation.
49  void OnIOSignalled();
50
51  // Allows the cancellation of the task to notify the controller (step number 7
52  // in the diagram below). In practice, if the controller waits for the
53  // operation to finish it doesn't have to wait for the final task to be
54  // processed by the message loop so calling this method prevents its delivery.
55  // Note that this method is not intended to cancel the actual IO operation or
56  // to prevent the first notification to take place (OnIOComplete).
57  void Cancel();
58
59  // Retrieves the number of bytes transfered.
60  int Result();
61
62  base::WaitableEvent* io_completed() {
63    return &io_completed_;
64  }
65
66  disk_cache::FileIOCallback* callback() {
67    return callback_;
68  }
69
70  disk_cache::File* file() {
71    return file_;
72  }
73
74 private:
75  friend class base::RefCountedThreadSafe<BackgroundIO>;
76  ~BackgroundIO() {}
77
78  // An event to signal when the operation completes, and the user callback that
79  // has to be invoked. These members are accessed directly by the controller.
80  base::WaitableEvent io_completed_;
81  disk_cache::FileIOCallback* callback_;
82
83  disk_cache::File* file_;
84  const void* buf_;
85  size_t buf_len_;
86  size_t offset_;
87  InFlightIO* controller_;  // The controller that tracks all operations.
88  int bytes_;  // Final operation result.
89
90  DISALLOW_COPY_AND_ASSIGN(BackgroundIO);
91};
92
93// This class keeps track of every asynchronous IO operation. A single instance
94// of this class is meant to be used to start an asynchronous operation (using
95// PostRead/PostWrite). This class will post the operation to a worker thread,
96// hanlde the notification when the operation finishes and perform the callback
97// on the same thread that was used to start the operation.
98//
99// The regular sequence of calls is:
100//                 Thread_1                          Worker_thread
101//    1.     InFlightIO::PostRead()
102//    2.                         -> PostTask ->
103//    3.                                          BackgroundIO::Read()
104//    4.                                         IO operation completes
105//    5.                                       InFlightIO::OnIOComplete()
106//    6.                         <- PostTask <-
107//    7.  BackgroundIO::OnIOSignalled()
108//    8.  InFlightIO::InvokeCallback()
109//    9.       invoke callback
110//
111// Shutdown is a special case that is handled though WaitForPendingIO() instead
112// of just waiting for step 7.
113class InFlightIO {
114 public:
115  InFlightIO() : callback_thread_(MessageLoop::current()) {}
116  ~InFlightIO() {}
117
118  // These methods start an asynchronous operation. The arguments have the same
119  // semantics of the File asynchronous operations, with the exception that the
120  // operation never finishes synchronously.
121  void PostRead(disk_cache::File* file, void* buf, size_t buf_len,
122                size_t offset, disk_cache::FileIOCallback* callback);
123  void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len,
124                 size_t offset, disk_cache::FileIOCallback* callback);
125
126  // Blocks the current thread until all IO operations tracked by this object
127  // complete.
128  void WaitForPendingIO();
129
130  // Called on a worker thread when |operation| completes.
131  void OnIOComplete(BackgroundIO* operation);
132
133  // Invokes the users' completion callback at the end of the IO operation.
134  // |cancel_task| is true if the actual task posted to the thread is still
135  // queued (because we are inside WaitForPendingIO), and false if said task is
136  // the one performing the call.
137  void InvokeCallback(BackgroundIO* operation, bool cancel_task);
138
139 private:
140  typedef std::set<scoped_refptr<BackgroundIO> > IOList;
141
142  IOList io_list_;  // List of pending io operations.
143  MessageLoop* callback_thread_;
144};
145
146// ---------------------------------------------------------------------------
147
148// Runs on a worker thread.
149void BackgroundIO::Read() {
150  if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) {
151    bytes_ = static_cast<int>(buf_len_);
152  } else {
153    bytes_ = -1;
154  }
155  controller_->OnIOComplete(this);
156}
157
158int BackgroundIO::Result() {
159  return bytes_;
160}
161
162void BackgroundIO::Cancel() {
163  DCHECK(controller_);
164  controller_ = NULL;
165}
166
167// Runs on a worker thread.
168void BackgroundIO::Write() {
169  bool rv = file_->Write(buf_, buf_len_, offset_);
170
171  bytes_ = rv ? static_cast<int>(buf_len_) : -1;
172  controller_->OnIOComplete(this);
173}
174
175// Runs on the IO thread.
176void BackgroundIO::OnIOSignalled() {
177  if (controller_)
178    controller_->InvokeCallback(this, false);
179}
180
181// ---------------------------------------------------------------------------
182
183void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len,
184                          size_t offset, disk_cache::FileIOCallback *callback) {
185  scoped_refptr<BackgroundIO> operation =
186      new BackgroundIO(file, buf, buf_len, offset, callback, this);
187  io_list_.insert(operation.get());
188  file->AddRef();  // Balanced on InvokeCallback()
189
190  if (!callback_thread_)
191      callback_thread_ = MessageLoop::current();
192
193  WorkerPool::PostTask(FROM_HERE,
194                       NewRunnableMethod(operation.get(), &BackgroundIO::Read),
195                       true);
196}
197
198void InFlightIO::PostWrite(disk_cache::File* file, const void* buf,
199                           size_t buf_len, size_t offset,
200                           disk_cache::FileIOCallback* callback) {
201  scoped_refptr<BackgroundIO> operation =
202      new BackgroundIO(file, buf, buf_len, offset, callback, this);
203  io_list_.insert(operation.get());
204  file->AddRef();  // Balanced on InvokeCallback()
205
206  if (!callback_thread_)
207    callback_thread_ = MessageLoop::current();
208
209  WorkerPool::PostTask(FROM_HERE,
210                       NewRunnableMethod(operation.get(), &BackgroundIO::Write),
211                       true);
212}
213
214void InFlightIO::WaitForPendingIO() {
215  while (!io_list_.empty()) {
216    // Block the current thread until all pending IO completes.
217    IOList::iterator it = io_list_.begin();
218    InvokeCallback(*it, true);
219  }
220  // Unit tests can use different threads.
221  callback_thread_ = NULL;
222}
223
224// Runs on a worker thread.
225void InFlightIO::OnIOComplete(BackgroundIO* operation) {
226  callback_thread_->PostTask(FROM_HERE,
227                             NewRunnableMethod(operation,
228                                               &BackgroundIO::OnIOSignalled));
229  operation->io_completed()->Signal();
230}
231
232// Runs on the IO thread.
233void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) {
234  operation->io_completed()->Wait();
235
236  if (cancel_task)
237    operation->Cancel();
238
239  disk_cache::FileIOCallback* callback = operation->callback();
240  int bytes = operation->Result();
241
242  // Release the references acquired in PostRead / PostWrite.
243  operation->file()->Release();
244  io_list_.erase(operation);
245  callback->OnFileIOComplete(bytes);
246}
247
248}  // namespace
249
250namespace disk_cache {
251
252File::File(base::PlatformFile file)
253    : init_(true),
254      mixed_(true),
255      platform_file_(file),
256      sync_platform_file_(base::kInvalidPlatformFileValue) {
257}
258
259bool File::Init(const FilePath& name) {
260  if (init_)
261    return false;
262
263  int flags = base::PLATFORM_FILE_OPEN |
264              base::PLATFORM_FILE_READ |
265              base::PLATFORM_FILE_WRITE;
266  platform_file_ = base::CreatePlatformFile(name, flags, NULL, NULL);
267  if (platform_file_ < 0) {
268    platform_file_ = 0;
269    return false;
270  }
271
272  init_ = true;
273  return true;
274}
275
276File::~File() {
277  if (platform_file_)
278    close(platform_file_);
279}
280
281base::PlatformFile File::platform_file() const {
282  return platform_file_;
283}
284
285bool File::IsValid() const {
286  if (!init_)
287    return false;
288  return (base::kInvalidPlatformFileValue != platform_file_);
289}
290
291bool File::Read(void* buffer, size_t buffer_len, size_t offset) {
292  DCHECK(init_);
293  if (buffer_len > ULONG_MAX || offset > LONG_MAX)
294    return false;
295
296  int ret = pread(platform_file_, buffer, buffer_len, offset);
297  return (static_cast<size_t>(ret) == buffer_len);
298}
299
300bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
301  DCHECK(init_);
302  if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
303    return false;
304
305  int ret = pwrite(platform_file_, buffer, buffer_len, offset);
306  return (static_cast<size_t>(ret) == buffer_len);
307}
308
309// We have to increase the ref counter of the file before performing the IO to
310// prevent the completion to happen with an invalid handle (if the file is
311// closed while the IO is in flight).
312bool File::Read(void* buffer, size_t buffer_len, size_t offset,
313                FileIOCallback* callback, bool* completed) {
314  DCHECK(init_);
315  if (!callback) {
316    if (completed)
317      *completed = true;
318    return Read(buffer, buffer_len, offset);
319  }
320
321  if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
322    return false;
323
324  InFlightIO* io_operations = Singleton<InFlightIO>::get();
325  io_operations->PostRead(this, buffer, buffer_len, offset, callback);
326
327  *completed = false;
328  return true;
329}
330
331bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
332                 FileIOCallback* callback, bool* completed) {
333  DCHECK(init_);
334  if (!callback) {
335    if (completed)
336      *completed = true;
337    return Write(buffer, buffer_len, offset);
338  }
339
340  return AsyncWrite(buffer, buffer_len, offset, callback, completed);
341}
342
343bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset,
344                      FileIOCallback* callback, bool* completed) {
345  DCHECK(init_);
346  if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
347    return false;
348
349  InFlightIO* io_operations = Singleton<InFlightIO>::get();
350  io_operations->PostWrite(this, buffer, buffer_len, offset, callback);
351
352  if (completed)
353    *completed = false;
354  return true;
355}
356
357bool File::SetLength(size_t length) {
358  DCHECK(init_);
359  if (length > ULONG_MAX)
360    return false;
361
362  return 0 == ftruncate(platform_file_, length);
363}
364
365size_t File::GetLength() {
366  DCHECK(init_);
367  size_t ret = lseek(platform_file_, 0, SEEK_END);
368  return ret;
369}
370
371// Static.
372void File::WaitForPendingIO(int* num_pending_io) {
373  // We may be running unit tests so we should allow InFlightIO to reset the
374  // message loop.
375  Singleton<InFlightIO>::get()->WaitForPendingIO();
376}
377
378}  // namespace disk_cache
379