FdBuffer.cpp revision eadd123d68850cb27aa6d030ade6190e30991b19
1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16#define DEBUG false 17#include "Log.h" 18 19#include "FdBuffer.h" 20 21#include <cutils/log.h> 22#include <utils/SystemClock.h> 23 24#include <fcntl.h> 25#include <poll.h> 26#include <unistd.h> 27#include <wait.h> 28 29const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB 30const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max 31 32FdBuffer::FdBuffer() 33 : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {} 34 35FdBuffer::~FdBuffer() {} 36 37status_t FdBuffer::read(int fd, int64_t timeout) { 38 struct pollfd pfds = {.fd = fd, .events = POLLIN}; 39 mStartTime = uptimeMillis(); 40 41 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 42 43 while (true) { 44 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 45 mTruncated = true; 46 break; 47 } 48 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 49 50 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); 51 if (remainingTime <= 0) { 52 VLOG("timed out due to long read"); 53 mTimedOut = true; 54 break; 55 } 56 57 int count = poll(&pfds, 1, remainingTime); 58 if (count == 0) { 59 VLOG("timed out due to block calling poll"); 60 mTimedOut = true; 61 break; 62 } else if (count < 0) { 63 VLOG("poll failed: %s", strerror(errno)); 64 return -errno; 65 } else { 66 if ((pfds.revents & POLLERR) != 0) { 67 VLOG("return event has error %s", strerror(errno)); 68 return errno != 0 ? -errno : UNKNOWN_ERROR; 69 } else { 70 ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); 71 if (amt < 0) { 72 if (errno == EAGAIN || errno == EWOULDBLOCK) { 73 continue; 74 } else { 75 VLOG("Fail to read %d: %s", fd, strerror(errno)); 76 return -errno; 77 } 78 } else if (amt == 0) { 79 VLOG("Reached EOF of fd=%d", fd); 80 break; 81 } 82 mBuffer.wp()->move(amt); 83 } 84 } 85 } 86 mFinishTime = uptimeMillis(); 87 return NO_ERROR; 88} 89 90status_t FdBuffer::readFully(int fd) { 91 mStartTime = uptimeMillis(); 92 93 while (true) { 94 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 95 // Don't let it get too big. 96 mTruncated = true; 97 VLOG("Truncating data"); 98 break; 99 } 100 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 101 102 ssize_t amt = 103 TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite())); 104 if (amt < 0) { 105 VLOG("Fail to read %d: %s", fd, strerror(errno)); 106 return -errno; 107 } else if (amt == 0) { 108 VLOG("Done reading %zu bytes", mBuffer.size()); 109 // We're done. 110 break; 111 } 112 mBuffer.wp()->move(amt); 113 } 114 115 mFinishTime = uptimeMillis(); 116 return NO_ERROR; 117} 118 119status_t FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs, 120 const bool isSysfs) { 121 struct pollfd pfds[] = { 122 {.fd = fd, .events = POLLIN}, 123 {.fd = toFd, .events = POLLOUT}, 124 {.fd = fromFd, .events = POLLIN}, 125 }; 126 127 mStartTime = uptimeMillis(); 128 129 // mark all fds non blocking 130 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 131 fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK); 132 fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK); 133 134 // A circular buffer holds data read from fd and writes to parsing process 135 uint8_t cirBuf[BUFFER_SIZE]; 136 size_t cirSize = 0; 137 int rpos = 0, wpos = 0; 138 139 // This is the buffer used to store processed data 140 while (true) { 141 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 142 mTruncated = true; 143 break; 144 } 145 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 146 147 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); 148 if (remainingTime <= 0) { 149 VLOG("timed out due to long read"); 150 mTimedOut = true; 151 break; 152 } 153 154 // wait for any pfds to be ready to perform IO 155 int count = poll(pfds, 3, remainingTime); 156 if (count == 0) { 157 VLOG("timed out due to block calling poll"); 158 mTimedOut = true; 159 break; 160 } else if (count < 0) { 161 VLOG("Fail to poll: %s", strerror(errno)); 162 return -errno; 163 } 164 165 // make sure no errors occur on any fds 166 for (int i = 0; i < 3; ++i) { 167 if ((pfds[i].revents & POLLERR) != 0) { 168 if (i == 0 && isSysfs) { 169 VLOG("fd %d is sysfs, ignore its POLLERR return value", fd); 170 continue; 171 } 172 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); 173 return errno != 0 ? -errno : UNKNOWN_ERROR; 174 } 175 } 176 177 // read from fd 178 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { 179 ssize_t amt; 180 if (rpos >= wpos) { 181 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); 182 } else { 183 amt = ::read(fd, cirBuf + rpos, wpos - rpos); 184 } 185 if (amt < 0) { 186 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 187 VLOG("Fail to read fd %d: %s", fd, strerror(errno)); 188 return -errno; 189 } // otherwise just continue 190 } else if (amt == 0) { 191 VLOG("Reached EOF of input file %d", fd); 192 pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. 193 } else { 194 rpos += amt; 195 cirSize += amt; 196 } 197 } 198 199 // write to parsing process 200 if (cirSize > 0 && pfds[1].fd != -1) { 201 ssize_t amt; 202 if (rpos > wpos) { 203 amt = ::write(toFd, cirBuf + wpos, rpos - wpos); 204 } else { 205 amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos); 206 } 207 if (amt < 0) { 208 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 209 VLOG("Fail to write toFd %d: %s", toFd, strerror(errno)); 210 return -errno; 211 } // otherwise just continue 212 } else { 213 wpos += amt; 214 cirSize -= amt; 215 } 216 } 217 218 // if buffer is empty and fd is closed, close write fd. 219 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { 220 VLOG("Close write pipe %d", toFd); 221 ::close(pfds[1].fd); 222 pfds[1].fd = -1; 223 } 224 225 // circular buffer, reset rpos and wpos 226 if (rpos >= BUFFER_SIZE) { 227 rpos = 0; 228 } 229 if (wpos >= BUFFER_SIZE) { 230 wpos = 0; 231 } 232 233 // read from parsing process 234 ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); 235 if (amt < 0) { 236 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 237 VLOG("Fail to read fromFd %d: %s", fromFd, strerror(errno)); 238 return -errno; 239 } // otherwise just continue 240 } else if (amt == 0) { 241 VLOG("Reached EOF of fromFd %d", fromFd); 242 break; 243 } else { 244 mBuffer.wp()->move(amt); 245 } 246 } 247 248 mFinishTime = uptimeMillis(); 249 return NO_ERROR; 250} 251 252size_t FdBuffer::size() const { return mBuffer.size(); } 253 254EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); } 255