1/* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16#define DEBUG false 17#include "Log.h" 18 19#include "FdBuffer.h" 20 21#include <cutils/log.h> 22#include <utils/SystemClock.h> 23 24#include <fcntl.h> 25#include <poll.h> 26#include <unistd.h> 27#include <wait.h> 28 29namespace android { 30namespace os { 31namespace incidentd { 32 33const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB 34const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max 35 36FdBuffer::FdBuffer() 37 : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {} 38 39FdBuffer::~FdBuffer() {} 40 41status_t FdBuffer::read(int fd, int64_t timeout) { 42 struct pollfd pfds = {.fd = fd, .events = POLLIN}; 43 mStartTime = uptimeMillis(); 44 45 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 46 47 while (true) { 48 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 49 mTruncated = true; 50 break; 51 } 52 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 53 54 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); 55 if (remainingTime <= 0) { 56 VLOG("timed out due to long read"); 57 mTimedOut = true; 58 break; 59 } 60 61 int count = poll(&pfds, 1, remainingTime); 62 if (count == 0) { 63 VLOG("timed out due to block calling poll"); 64 mTimedOut = true; 65 break; 66 } else if (count < 0) { 67 VLOG("poll failed: %s", strerror(errno)); 68 return -errno; 69 } else { 70 if ((pfds.revents & POLLERR) != 0) { 71 VLOG("return event has error %s", strerror(errno)); 72 return errno != 0 ? -errno : UNKNOWN_ERROR; 73 } else { 74 ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()); 75 if (amt < 0) { 76 if (errno == EAGAIN || errno == EWOULDBLOCK) { 77 continue; 78 } else { 79 VLOG("Fail to read %d: %s", fd, strerror(errno)); 80 return -errno; 81 } 82 } else if (amt == 0) { 83 VLOG("Reached EOF of fd=%d", fd); 84 break; 85 } 86 mBuffer.wp()->move(amt); 87 } 88 } 89 } 90 mFinishTime = uptimeMillis(); 91 return NO_ERROR; 92} 93 94status_t FdBuffer::readFully(int fd) { 95 mStartTime = uptimeMillis(); 96 97 while (true) { 98 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 99 // Don't let it get too big. 100 mTruncated = true; 101 VLOG("Truncating data"); 102 break; 103 } 104 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 105 106 ssize_t amt = 107 TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite())); 108 if (amt < 0) { 109 VLOG("Fail to read %d: %s", fd, strerror(errno)); 110 return -errno; 111 } else if (amt == 0) { 112 VLOG("Done reading %zu bytes", mBuffer.size()); 113 // We're done. 114 break; 115 } 116 mBuffer.wp()->move(amt); 117 } 118 119 mFinishTime = uptimeMillis(); 120 return NO_ERROR; 121} 122 123status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, 124 int64_t timeoutMs, const bool isSysfs) { 125 struct pollfd pfds[] = { 126 {.fd = fd, .events = POLLIN}, 127 {.fd = toFd.get(), .events = POLLOUT}, 128 {.fd = fromFd.get(), .events = POLLIN}, 129 }; 130 131 mStartTime = uptimeMillis(); 132 133 // mark all fds non blocking 134 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 135 fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK); 136 fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK); 137 138 // A circular buffer holds data read from fd and writes to parsing process 139 uint8_t cirBuf[BUFFER_SIZE]; 140 size_t cirSize = 0; 141 int rpos = 0, wpos = 0; 142 143 // This is the buffer used to store processed data 144 while (true) { 145 if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 146 mTruncated = true; 147 break; 148 } 149 if (mBuffer.writeBuffer() == NULL) return NO_MEMORY; 150 151 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); 152 if (remainingTime <= 0) { 153 VLOG("timed out due to long read"); 154 mTimedOut = true; 155 break; 156 } 157 158 // wait for any pfds to be ready to perform IO 159 int count = poll(pfds, 3, remainingTime); 160 if (count == 0) { 161 VLOG("timed out due to block calling poll"); 162 mTimedOut = true; 163 break; 164 } else if (count < 0) { 165 VLOG("Fail to poll: %s", strerror(errno)); 166 return -errno; 167 } 168 169 // make sure no errors occur on any fds 170 for (int i = 0; i < 3; ++i) { 171 if ((pfds[i].revents & POLLERR) != 0) { 172 if (i == 0 && isSysfs) { 173 VLOG("fd %d is sysfs, ignore its POLLERR return value", fd); 174 continue; 175 } 176 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); 177 return errno != 0 ? -errno : UNKNOWN_ERROR; 178 } 179 } 180 181 // read from fd 182 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { 183 ssize_t amt; 184 if (rpos >= wpos) { 185 amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos); 186 } else { 187 amt = ::read(fd, cirBuf + rpos, wpos - rpos); 188 } 189 if (amt < 0) { 190 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 191 VLOG("Fail to read fd %d: %s", fd, strerror(errno)); 192 return -errno; 193 } // otherwise just continue 194 } else if (amt == 0) { 195 VLOG("Reached EOF of input file %d", fd); 196 pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. 197 } else { 198 rpos += amt; 199 cirSize += amt; 200 } 201 } 202 203 // write to parsing process 204 if (cirSize > 0 && pfds[1].fd != -1) { 205 ssize_t amt; 206 if (rpos > wpos) { 207 amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos); 208 } else { 209 amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos); 210 } 211 if (amt < 0) { 212 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 213 VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno)); 214 return -errno; 215 } // otherwise just continue 216 } else { 217 wpos += amt; 218 cirSize -= amt; 219 } 220 } 221 222 // if buffer is empty and fd is closed, close write fd. 223 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { 224 VLOG("Close write pipe %d", toFd.get()); 225 toFd.reset(); 226 pfds[1].fd = -1; 227 } 228 229 // circular buffer, reset rpos and wpos 230 if (rpos >= BUFFER_SIZE) { 231 rpos = 0; 232 } 233 if (wpos >= BUFFER_SIZE) { 234 wpos = 0; 235 } 236 237 // read from parsing process 238 ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite()); 239 if (amt < 0) { 240 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 241 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno)); 242 return -errno; 243 } // otherwise just continue 244 } else if (amt == 0) { 245 VLOG("Reached EOF of fromFd %d", fromFd.get()); 246 break; 247 } else { 248 mBuffer.wp()->move(amt); 249 } 250 } 251 252 mFinishTime = uptimeMillis(); 253 return NO_ERROR; 254} 255 256size_t FdBuffer::size() const { return mBuffer.size(); } 257 258EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); } 259 260} // namespace incidentd 261} // namespace os 262} // namespace android 263