1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include <android-base/logging.h> 18#include <condition_variable> 19#include <memory> 20#include <mutex> 21#include <queue> 22 23#include "AsyncIO.h" 24 25namespace { 26 27void read_func(struct aiocb *aiocbp) { 28 aiocbp->ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes, 29 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); 30 if (aiocbp->ret == -1) aiocbp->error = errno; 31} 32 33void write_func(struct aiocb *aiocbp) { 34 aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes, 35 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); 36 if (aiocbp->ret == -1) aiocbp->error = errno; 37} 38 39void splice_read_func(struct aiocb *aiocbp) { 40 loff_t long_offset = aiocbp->aio_offset; 41 aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes, 42 &long_offset, aiocbp->aio_sink, 43 NULL, aiocbp->aio_nbytes, 0)); 44 if (aiocbp->ret == -1) aiocbp->error = errno; 45} 46 47void splice_write_func(struct aiocb *aiocbp) { 48 loff_t long_offset = aiocbp->aio_offset; 49 aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes, NULL, 50 aiocbp->aio_sink, &long_offset, 51 aiocbp->aio_nbytes, 0)); 52 if (aiocbp->ret == -1) aiocbp->error = errno; 53} 54 55std::queue<std::unique_ptr<struct aiocb>> queue; 56std::mutex queue_lock; 57std::condition_variable queue_cond; 58std::condition_variable write_cond; 59int done = 1; 60void splice_write_pool_func(int) { 61 while(1) { 62 std::unique_lock<std::mutex> lk(queue_lock); 63 queue_cond.wait(lk, []{return !queue.empty() || done;}); 64 if (queue.empty() && done) { 65 return; 66 } 67 std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front()); 68 queue.pop(); 69 lk.unlock(); 70 write_cond.notify_one(); 71 splice_write_func(aiocbp.get()); 72 close(aiocbp->aio_fildes); 73 } 74} 75 76void write_pool_func(int) { 77 while(1) { 78 std::unique_lock<std::mutex> lk(queue_lock); 79 queue_cond.wait(lk, []{return !queue.empty() || done;}); 80 if (queue.empty() && done) { 81 return; 82 } 83 std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front()); 84 queue.pop(); 85 lk.unlock(); 86 write_cond.notify_one(); 87 aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes, 88 aiocbp->aio_pool_buf.get(), aiocbp->aio_nbytes, aiocbp->aio_offset)); 89 if (aiocbp->ret == -1) aiocbp->error = errno; 90 } 91} 92 93constexpr int NUM_THREADS = 1; 94constexpr int MAX_QUEUE_SIZE = 10; 95std::thread pool[NUM_THREADS]; 96 97} // end anonymous namespace 98 99aiocb::~aiocb() { 100 CHECK(!thread.joinable()); 101} 102 103void aio_pool_init(void(f)(int)) { 104 CHECK(done == 1); 105 done = 0; 106 for (int i = 0; i < NUM_THREADS; i++) { 107 pool[i] = std::thread(f, i); 108 } 109} 110 111void aio_pool_splice_init() { 112 aio_pool_init(splice_write_pool_func); 113} 114 115void aio_pool_write_init() { 116 aio_pool_init(write_pool_func); 117} 118 119void aio_pool_end() { 120 done = 1; 121 for (int i = 0; i < NUM_THREADS; i++) { 122 std::unique_lock<std::mutex> lk(queue_lock); 123 lk.unlock(); 124 queue_cond.notify_one(); 125 } 126 127 for (int i = 0; i < NUM_THREADS; i++) { 128 pool[i].join(); 129 } 130} 131 132// used for both writes and splices depending on which init was used before. 133int aio_pool_write(struct aiocb *aiocbp) { 134 std::unique_lock<std::mutex> lk(queue_lock); 135 write_cond.wait(lk, []{return queue.size() < MAX_QUEUE_SIZE;}); 136 queue.push(std::unique_ptr<struct aiocb>(aiocbp)); 137 lk.unlock(); 138 queue_cond.notify_one(); 139 return 0; 140} 141 142int aio_read(struct aiocb *aiocbp) { 143 aiocbp->thread = std::thread(read_func, aiocbp); 144 return 0; 145} 146 147int aio_write(struct aiocb *aiocbp) { 148 aiocbp->thread = std::thread(write_func, aiocbp); 149 return 0; 150} 151 152int aio_splice_read(struct aiocb *aiocbp) { 153 aiocbp->thread = std::thread(splice_read_func, aiocbp); 154 return 0; 155} 156 157int aio_splice_write(struct aiocb *aiocbp) { 158 aiocbp->thread = std::thread(splice_write_func, aiocbp); 159 return 0; 160} 161 162int aio_error(const struct aiocb *aiocbp) { 163 return aiocbp->error; 164} 165 166ssize_t aio_return(struct aiocb *aiocbp) { 167 return aiocbp->ret; 168} 169 170int aio_suspend(struct aiocb *aiocbp[], int n, 171 const struct timespec *) { 172 for (int i = 0; i < n; i++) { 173 aiocbp[i]->thread.join(); 174 } 175 return 0; 176} 177 178int aio_cancel(int, struct aiocb *) { 179 // Not implemented 180 return -1; 181} 182 183