FdBuffer.cpp revision c23fad2f9079f678eae15338f5e57e2a6bf7e391
1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define LOG_TAG "incidentd" 18 19#include "FdBuffer.h" 20 21#include <cutils/log.h> 22#include <utils/SystemClock.h> 23 24#include <fcntl.h> 25#include <poll.h> 26#include <unistd.h> 27#include <wait.h> 28 29const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB 30const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max 31 32FdBuffer::FdBuffer() 33 :mBuffer(BUFFER_SIZE), 34 mStartTime(-1), 35 mFinishTime(-1), 36 mTimedOut(false), 37 mTruncated(false) 38{ 39} 40 41FdBuffer::~FdBuffer() 42{ 43} 44 45status_t 46FdBuffer::read(int fd, int64_t timeout) 47{ 48 struct pollfd pfds = { 49 .fd = fd, 50 .events = POLLIN 51 }; 52 mStartTime = uptimeMillis(); 53 54 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 55 56 while (true) { 57 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 58 mTruncated = true; 59 break; 60 } 61 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 62 63 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); 64 if (remainingTime <= 0) { 65 mTimedOut = true; 66 break; 67 } 68 69 int count = poll(&pfds, 1, remainingTime); 70 if (count == 0) { 71 mTimedOut = true; 72 break; 73 } else if (count < 0) { 74 return -errno; 75 } else { 76 if ((pfds.revents & POLLERR) != 0) { 77 return errno != 0 ? -errno : UNKNOWN_ERROR; 78 } else { 79 ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); 80 if (amt < 0) { 81 if (errno == EAGAIN || errno == EWOULDBLOCK) { 82 continue; 83 } else { 84 return -errno; 85 } 86 } else if (amt == 0) { 87 break; 88 } 89 mBuffer.wp()->move(amt); 90 } 91 } 92 } 93 mFinishTime = uptimeMillis(); 94 return NO_ERROR; 95} 96 97status_t 98FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs) 99{ 100 struct pollfd pfds[] = { 101 { .fd = fd, .events = POLLIN }, 102 { .fd = toFd, .events = POLLOUT }, 103 { .fd = fromFd, .events = POLLIN }, 104 }; 105 106 mStartTime = uptimeMillis(); 107 108 // mark all fds non blocking 109 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 110 fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK); 111 fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK); 112 113 // A circular buffer holds data read from fd and writes to parsing process 114 uint8_t cirBuf[BUFFER_SIZE]; 115 size_t cirSize = 0; 116 int rpos = 0, wpos = 0; 117 118 // This is the buffer used to store processed data 119 while (true) { 120 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 121 mTruncated = true; 122 break; 123 } 124 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 125 126 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); 127 if (remainingTime <= 0) { 128 mTimedOut = true; 129 break; 130 } 131 132 // wait for any pfds to be ready to perform IO 133 int count = poll(pfds, 3, remainingTime); 134 if (count == 0) { 135 mTimedOut = true; 136 break; 137 } else if (count < 0) { 138 return -errno; 139 } 140 141 // make sure no errors occur on any fds 142 for (int i = 0; i < 3; ++i) { 143 if ((pfds[i].revents & POLLERR) != 0) { 144 return errno != 0 ? -errno : UNKNOWN_ERROR; 145 } 146 } 147 148 // read from fd 149 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { 150 ssize_t amt; 151 if (rpos >= wpos) { 152 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); 153 } else { 154 amt = ::read(fd, cirBuf + rpos, wpos - rpos); 155 } 156 if (amt < 0) { 157 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 158 return -errno; 159 } // otherwise just continue 160 } else if (amt == 0) { // reach EOF so don't have to poll pfds[0]. 161 ::close(pfds[0].fd); 162 pfds[0].fd = -1; 163 } else { 164 rpos += amt; 165 cirSize += amt; 166 } 167 } 168 169 // write to parsing process 170 if (cirSize > 0 && pfds[1].fd != -1) { 171 ssize_t amt; 172 if (rpos > wpos) { 173 amt = ::write(toFd, cirBuf + wpos, rpos - wpos); 174 } else { 175 amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos); 176 } 177 if (amt < 0) { 178 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 179 return -errno; 180 } // otherwise just continue 181 } else { 182 wpos += amt; 183 cirSize -= amt; 184 } 185 } 186 187 // if buffer is empty and fd is closed, close write fd. 188 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { 189 ::close(pfds[1].fd); 190 pfds[1].fd = -1; 191 } 192 193 // circular buffer, reset rpos and wpos 194 if (rpos >= BUFFER_SIZE) { 195 rpos = 0; 196 } 197 if (wpos >= BUFFER_SIZE) { 198 wpos = 0; 199 } 200 201 // read from parsing process 202 ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); 203 if (amt < 0) { 204 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 205 return -errno; 206 } // otherwise just continue 207 } else if (amt == 0) { 208 break; 209 } else { 210 mBuffer.wp()->move(amt); 211 } 212 } 213 214 mFinishTime = uptimeMillis(); 215 return NO_ERROR; 216} 217 218size_t 219FdBuffer::size() const 220{ 221 return mBuffer.size(); 222} 223 224EncodedBuffer::iterator 225FdBuffer::data() const 226{ 227 return mBuffer.begin(); 228} 229