FdBuffer.cpp revision 99c248feb2d1f863b864bdfd1e3b37af17f18732
1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "incidentd" 18 19#include "FdBuffer.h" 20#include "io_util.h" 21 22#include <cutils/log.h> 23#include <utils/SystemClock.h> 24 25#include <fcntl.h> 26#include <poll.h> 27#include <unistd.h> 28#include <wait.h> 29 30const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB 31const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max 32 33FdBuffer::FdBuffer() 34 :mBuffers(), 35 mStartTime(-1), 36 mFinishTime(-1), 37 mCurrentWritten(-1), 38 mTimedOut(false), 39 mTruncated(false) 40{ 41} 42 43FdBuffer::~FdBuffer() 44{ 45 const int N = mBuffers.size(); 46 for (int i=0; i<N; i++) { 47 uint8_t* buf = mBuffers[i]; 48 free(buf); 49 } 50} 51 52status_t 53FdBuffer::read(int fd, int64_t timeout) 54{ 55 struct pollfd pfds = { 56 .fd = fd, 57 .events = POLLIN 58 }; 59 mStartTime = uptimeMillis(); 60 61 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 62 63 uint8_t* buf = NULL; 64 while (true) { 65 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) { 66 if (mBuffers.size() == MAX_BUFFER_COUNT) { 67 mTruncated = true; 68 break; 69 } 70 buf = (uint8_t*)malloc(BUFFER_SIZE); 71 if (buf == NULL) { 72 return NO_MEMORY; 73 } 74 mBuffers.push_back(buf); 75 mCurrentWritten = 0; 76 } 77 78 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); 79 if (remainingTime <= 0) { 80 mTimedOut = true; 81 break; 82 } 83 84 int count = poll(&pfds, 1, remainingTime); 85 if (count == 0) { 86 mTimedOut = true; 87 break; 88 } else if (count < 0) { 89 return -errno; 90 } else { 91 if ((pfds.revents & POLLERR) != 0) { 92 return errno != 0 ? -errno : UNKNOWN_ERROR; 93 } else { 94 ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten); 95 if (amt < 0) { 96 if (errno == EAGAIN || errno == EWOULDBLOCK) { 97 continue; 98 } else { 99 return -errno; 100 } 101 } else if (amt == 0) { 102 break; 103 } 104 mCurrentWritten += amt; 105 } 106 } 107 } 108 109 mFinishTime = uptimeMillis(); 110 return NO_ERROR; 111} 112 113status_t 114FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs) 115{ 116 struct pollfd pfds[] = { 117 { .fd = fd, .events = POLLIN }, 118 { .fd = toFd, .events = POLLOUT }, 119 { .fd = fromFd, .events = POLLIN }, 120 }; 121 122 mStartTime = uptimeMillis(); 123 124 // mark all fds non blocking 125 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 126 fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK); 127 fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK); 128 129 // A circular buffer holds data read from fd and writes to parsing process 130 uint8_t cirBuf[BUFFER_SIZE]; 131 size_t cirSize = 0; 132 int rpos = 0, wpos = 0; 133 134 // This is the buffer used to store processed data 135 uint8_t* buf = NULL; 136 while (true) { 137 if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) { 138 if (mBuffers.size() == MAX_BUFFER_COUNT) { 139 mTruncated = true; 140 break; 141 } 142 buf = (uint8_t*)malloc(BUFFER_SIZE); 143 if (buf == NULL) { 144 return NO_MEMORY; 145 } 146 mBuffers.push_back(buf); 147 mCurrentWritten = 0; 148 } 149 150 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); 151 if (remainingTime <= 0) { 152 mTimedOut = true; 153 break; 154 } 155 156 // wait for any pfds to be ready to perform IO 157 int count = poll(pfds, 3, remainingTime); 158 if (count == 0) { 159 mTimedOut = true; 160 break; 161 } else if (count < 0) { 162 return -errno; 163 } 164 165 // make sure no errors occur on any fds 166 for (int i = 0; i < 3; ++i) { 167 if ((pfds[i].revents & POLLERR) != 0) { 168 return errno != 0 ? -errno : UNKNOWN_ERROR; 169 } 170 } 171 172 // read from fd 173 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { 174 ssize_t amt; 175 if (rpos >= wpos) { 176 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); 177 } else { 178 amt = ::read(fd, cirBuf + rpos, wpos - rpos); 179 } 180 if (amt < 0) { 181 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 182 return -errno; 183 } // otherwise just continue 184 } else if (amt == 0) { // reach EOF so don't have to poll pfds[0]. 185 ::close(pfds[0].fd); 186 pfds[0].fd = -1; 187 } else { 188 rpos += amt; 189 cirSize += amt; 190 } 191 } 192 193 // write to parsing process 194 if (cirSize > 0 && pfds[1].fd != -1) { 195 ssize_t amt; 196 if (rpos > wpos) { 197 amt = ::write(toFd, cirBuf + wpos, rpos - wpos); 198 } else { 199 amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos); 200 } 201 if (amt < 0) { 202 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 203 return -errno; 204 } // otherwise just continue 205 } else { 206 wpos += amt; 207 cirSize -= amt; 208 } 209 } 210 211 // if buffer is empty and fd is closed, close write fd. 212 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { 213 ::close(pfds[1].fd); 214 pfds[1].fd = -1; 215 } 216 217 // circular buffer, reset rpos and wpos 218 if (rpos >= BUFFER_SIZE) { 219 rpos = 0; 220 } 221 if (wpos >= BUFFER_SIZE) { 222 wpos = 0; 223 } 224 225 // read from parsing process 226 ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten); 227 if (amt < 0) { 228 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 229 return -errno; 230 } // otherwise just continue 231 } else if (amt == 0) { 232 break; 233 } else { 234 mCurrentWritten += amt; 235 } 236 } 237 238 mFinishTime = uptimeMillis(); 239 return NO_ERROR; 240} 241 242size_t 243FdBuffer::size() const 244{ 245 if (mBuffers.empty()) return 0; 246 return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten; 247} 248 249status_t 250FdBuffer::flush(int fd) const 251{ 252 size_t i=0; 253 status_t err = NO_ERROR; 254 for (i=0; i<mBuffers.size()-1; i++) { 255 err = write_all(fd, mBuffers[i], BUFFER_SIZE); 256 if (err != NO_ERROR) return err; 257 } 258 return write_all(fd, mBuffers[i], mCurrentWritten); 259} 260 261FdBuffer::iterator 262FdBuffer::end() const 263{ 264 if (mBuffers.empty() || mCurrentWritten < 0) return begin(); 265 if (mCurrentWritten == BUFFER_SIZE) 266 // FdBuffer doesn't allocate another buf since no more bytes to read. 267 return FdBuffer::iterator(*this, mBuffers.size(), 0); 268 return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten); 269} 270 271FdBuffer::iterator& 272FdBuffer::iterator::operator+(size_t offset) 273{ 274 size_t newOffset = mOffset + offset; 275 while (newOffset >= BUFFER_SIZE) { 276 mIndex++; 277 newOffset -= BUFFER_SIZE; 278 } 279 mOffset = newOffset; 280 return *this; 281} 282 283size_t 284FdBuffer::iterator::bytesRead() const 285{ 286 return mIndex * BUFFER_SIZE + mOffset; 287} 288