file_posix.cc revision 731df977c0511bca2206b5f333555b1205ff1f43
1// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/disk_cache/file.h" 6 7#include <fcntl.h> 8 9#include <set> 10 11#include "base/logging.h" 12#include "base/message_loop.h" 13#include "base/singleton.h" 14#include "base/waitable_event.h" 15#include "base/worker_pool.h" 16#include "net/disk_cache/disk_cache.h" 17 18namespace { 19 20class InFlightIO; 21 22// This class represents a single asynchronous IO operation while it is being 23// bounced between threads. 24class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> { 25 public: 26 // Other than the actual parameters for the IO operation (including the 27 // |callback| that must be notified at the end), we need the controller that 28 // is keeping track of all operations. When done, we notify the controller 29 // (we do NOT invoke the callback), in the worker thead that completed the 30 // operation. 31 BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len, 32 size_t offset, disk_cache::FileIOCallback* callback, 33 InFlightIO* controller) 34 : io_completed_(true, false), callback_(callback), file_(file), buf_(buf), 35 buf_len_(buf_len), offset_(offset), controller_(controller), 36 bytes_(0) {} 37 38 // Read and Write are the operations that can be performed asynchronously. 39 // The actual parameters for the operation are setup in the constructor of 40 // the object. Both methods should be called from a worker thread, by posting 41 // a task to the WorkerPool (they are RunnableMethods). When finished, 42 // controller->OnIOComplete() is called. 43 void Read(); 44 void Write(); 45 46 // This method signals the controller that this operation is finished, in the 47 // original thread (presumably the IO-Thread). In practice, this is a 48 // RunableMethod that allows cancellation. 49 void OnIOSignalled(); 50 51 // Allows the cancellation of the task to notify the controller (step number 7 52 // in the diagram below). In practice, if the controller waits for the 53 // operation to finish it doesn't have to wait for the final task to be 54 // processed by the message loop so calling this method prevents its delivery. 55 // Note that this method is not intended to cancel the actual IO operation or 56 // to prevent the first notification to take place (OnIOComplete). 57 void Cancel(); 58 59 // Retrieves the number of bytes transfered. 60 int Result(); 61 62 base::WaitableEvent* io_completed() { 63 return &io_completed_; 64 } 65 66 disk_cache::FileIOCallback* callback() { 67 return callback_; 68 } 69 70 disk_cache::File* file() { 71 return file_; 72 } 73 74 private: 75 friend class base::RefCountedThreadSafe<BackgroundIO>; 76 ~BackgroundIO() {} 77 78 // An event to signal when the operation completes, and the user callback that 79 // has to be invoked. These members are accessed directly by the controller. 80 base::WaitableEvent io_completed_; 81 disk_cache::FileIOCallback* callback_; 82 83 disk_cache::File* file_; 84 const void* buf_; 85 size_t buf_len_; 86 size_t offset_; 87 InFlightIO* controller_; // The controller that tracks all operations. 88 int bytes_; // Final operation result. 89 90 DISALLOW_COPY_AND_ASSIGN(BackgroundIO); 91}; 92 93// This class keeps track of every asynchronous IO operation. A single instance 94// of this class is meant to be used to start an asynchronous operation (using 95// PostRead/PostWrite). This class will post the operation to a worker thread, 96// hanlde the notification when the operation finishes and perform the callback 97// on the same thread that was used to start the operation. 98// 99// The regular sequence of calls is: 100// Thread_1 Worker_thread 101// 1. InFlightIO::PostRead() 102// 2. -> PostTask -> 103// 3. BackgroundIO::Read() 104// 4. IO operation completes 105// 5. InFlightIO::OnIOComplete() 106// 6. <- PostTask <- 107// 7. BackgroundIO::OnIOSignalled() 108// 8. InFlightIO::InvokeCallback() 109// 9. invoke callback 110// 111// Shutdown is a special case that is handled though WaitForPendingIO() instead 112// of just waiting for step 7. 113class InFlightIO { 114 public: 115 InFlightIO() : callback_thread_(MessageLoop::current()) {} 116 ~InFlightIO() {} 117 118 // These methods start an asynchronous operation. The arguments have the same 119 // semantics of the File asynchronous operations, with the exception that the 120 // operation never finishes synchronously. 121 void PostRead(disk_cache::File* file, void* buf, size_t buf_len, 122 size_t offset, disk_cache::FileIOCallback* callback); 123 void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len, 124 size_t offset, disk_cache::FileIOCallback* callback); 125 126 // Blocks the current thread until all IO operations tracked by this object 127 // complete. 128 void WaitForPendingIO(); 129 130 // Called on a worker thread when |operation| completes. 131 void OnIOComplete(BackgroundIO* operation); 132 133 // Invokes the users' completion callback at the end of the IO operation. 134 // |cancel_task| is true if the actual task posted to the thread is still 135 // queued (because we are inside WaitForPendingIO), and false if said task is 136 // the one performing the call. 137 void InvokeCallback(BackgroundIO* operation, bool cancel_task); 138 139 private: 140 typedef std::set<scoped_refptr<BackgroundIO> > IOList; 141 142 IOList io_list_; // List of pending io operations. 143 MessageLoop* callback_thread_; 144}; 145 146// --------------------------------------------------------------------------- 147 148// Runs on a worker thread. 149void BackgroundIO::Read() { 150 if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) { 151 bytes_ = static_cast<int>(buf_len_); 152 } else { 153 bytes_ = -1; 154 } 155 controller_->OnIOComplete(this); 156} 157 158int BackgroundIO::Result() { 159 return bytes_; 160} 161 162void BackgroundIO::Cancel() { 163 DCHECK(controller_); 164 controller_ = NULL; 165} 166 167// Runs on a worker thread. 168void BackgroundIO::Write() { 169 bool rv = file_->Write(buf_, buf_len_, offset_); 170 171 bytes_ = rv ? static_cast<int>(buf_len_) : -1; 172 controller_->OnIOComplete(this); 173} 174 175// Runs on the IO thread. 176void BackgroundIO::OnIOSignalled() { 177 if (controller_) 178 controller_->InvokeCallback(this, false); 179} 180 181// --------------------------------------------------------------------------- 182 183void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, 184 size_t offset, disk_cache::FileIOCallback *callback) { 185 scoped_refptr<BackgroundIO> operation = 186 new BackgroundIO(file, buf, buf_len, offset, callback, this); 187 io_list_.insert(operation.get()); 188 file->AddRef(); // Balanced on InvokeCallback() 189 190 if (!callback_thread_) 191 callback_thread_ = MessageLoop::current(); 192 193 WorkerPool::PostTask(FROM_HERE, 194 NewRunnableMethod(operation.get(), &BackgroundIO::Read), 195 true); 196} 197 198void InFlightIO::PostWrite(disk_cache::File* file, const void* buf, 199 size_t buf_len, size_t offset, 200 disk_cache::FileIOCallback* callback) { 201 scoped_refptr<BackgroundIO> operation = 202 new BackgroundIO(file, buf, buf_len, offset, callback, this); 203 io_list_.insert(operation.get()); 204 file->AddRef(); // Balanced on InvokeCallback() 205 206 if (!callback_thread_) 207 callback_thread_ = MessageLoop::current(); 208 209 WorkerPool::PostTask(FROM_HERE, 210 NewRunnableMethod(operation.get(), &BackgroundIO::Write), 211 true); 212} 213 214void InFlightIO::WaitForPendingIO() { 215 while (!io_list_.empty()) { 216 // Block the current thread until all pending IO completes. 217 IOList::iterator it = io_list_.begin(); 218 InvokeCallback(*it, true); 219 } 220 // Unit tests can use different threads. 221 callback_thread_ = NULL; 222} 223 224// Runs on a worker thread. 225void InFlightIO::OnIOComplete(BackgroundIO* operation) { 226 callback_thread_->PostTask(FROM_HERE, 227 NewRunnableMethod(operation, 228 &BackgroundIO::OnIOSignalled)); 229 operation->io_completed()->Signal(); 230} 231 232// Runs on the IO thread. 233void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) { 234 operation->io_completed()->Wait(); 235 236 if (cancel_task) 237 operation->Cancel(); 238 239 disk_cache::FileIOCallback* callback = operation->callback(); 240 int bytes = operation->Result(); 241 242 // Release the references acquired in PostRead / PostWrite. 243 operation->file()->Release(); 244 io_list_.erase(operation); 245 callback->OnFileIOComplete(bytes); 246} 247 248} // namespace 249 250namespace disk_cache { 251 252File::File(base::PlatformFile file) 253 : init_(true), 254 mixed_(true), 255 platform_file_(file), 256 sync_platform_file_(base::kInvalidPlatformFileValue) { 257} 258 259bool File::Init(const FilePath& name) { 260 if (init_) 261 return false; 262 263 int flags = base::PLATFORM_FILE_OPEN | 264 base::PLATFORM_FILE_READ | 265 base::PLATFORM_FILE_WRITE; 266 platform_file_ = base::CreatePlatformFile(name, flags, NULL, NULL); 267 if (platform_file_ < 0) { 268 platform_file_ = 0; 269 return false; 270 } 271 272 init_ = true; 273 return true; 274} 275 276File::~File() { 277 if (platform_file_) 278 close(platform_file_); 279} 280 281base::PlatformFile File::platform_file() const { 282 return platform_file_; 283} 284 285bool File::IsValid() const { 286 if (!init_) 287 return false; 288 return (base::kInvalidPlatformFileValue != platform_file_); 289} 290 291bool File::Read(void* buffer, size_t buffer_len, size_t offset) { 292 DCHECK(init_); 293 if (buffer_len > ULONG_MAX || offset > LONG_MAX) 294 return false; 295 296 int ret = pread(platform_file_, buffer, buffer_len, offset); 297 return (static_cast<size_t>(ret) == buffer_len); 298} 299 300bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { 301 DCHECK(init_); 302 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) 303 return false; 304 305 int ret = pwrite(platform_file_, buffer, buffer_len, offset); 306 return (static_cast<size_t>(ret) == buffer_len); 307} 308 309// We have to increase the ref counter of the file before performing the IO to 310// prevent the completion to happen with an invalid handle (if the file is 311// closed while the IO is in flight). 312bool File::Read(void* buffer, size_t buffer_len, size_t offset, 313 FileIOCallback* callback, bool* completed) { 314 DCHECK(init_); 315 if (!callback) { 316 if (completed) 317 *completed = true; 318 return Read(buffer, buffer_len, offset); 319 } 320 321 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) 322 return false; 323 324 InFlightIO* io_operations = Singleton<InFlightIO>::get(); 325 io_operations->PostRead(this, buffer, buffer_len, offset, callback); 326 327 *completed = false; 328 return true; 329} 330 331bool File::Write(const void* buffer, size_t buffer_len, size_t offset, 332 FileIOCallback* callback, bool* completed) { 333 DCHECK(init_); 334 if (!callback) { 335 if (completed) 336 *completed = true; 337 return Write(buffer, buffer_len, offset); 338 } 339 340 return AsyncWrite(buffer, buffer_len, offset, callback, completed); 341} 342 343bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, 344 FileIOCallback* callback, bool* completed) { 345 DCHECK(init_); 346 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) 347 return false; 348 349 InFlightIO* io_operations = Singleton<InFlightIO>::get(); 350 io_operations->PostWrite(this, buffer, buffer_len, offset, callback); 351 352 if (completed) 353 *completed = false; 354 return true; 355} 356 357bool File::SetLength(size_t length) { 358 DCHECK(init_); 359 if (length > ULONG_MAX) 360 return false; 361 362 return 0 == ftruncate(platform_file_, length); 363} 364 365size_t File::GetLength() { 366 DCHECK(init_); 367 size_t ret = lseek(platform_file_, 0, SEEK_END); 368 return ret; 369} 370 371// Static. 372void File::WaitForPendingIO(int* num_pending_io) { 373 // We may be running unit tests so we should allow InFlightIO to reset the 374 // message loop. 375 Singleton<InFlightIO>::get()->WaitForPendingIO(); 376} 377 378} // namespace disk_cache 379