FdBuffer.cpp revision eadd123d68850cb27aa6d030ade6190e30991b19
1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#define DEBUG false
17#include "Log.h"
18
19#include "FdBuffer.h"
20
21#include <cutils/log.h>
22#include <utils/SystemClock.h>
23
24#include <fcntl.h>
25#include <poll.h>
26#include <unistd.h>
27#include <wait.h>
28
29const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
30const ssize_t MAX_BUFFER_COUNT = 256;   // 4 MB max
31
32FdBuffer::FdBuffer()
33    : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {}
34
35FdBuffer::~FdBuffer() {}
36
37status_t FdBuffer::read(int fd, int64_t timeout) {
38    struct pollfd pfds = {.fd = fd, .events = POLLIN};
39    mStartTime = uptimeMillis();
40
41    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
42
43    while (true) {
44        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
45            mTruncated = true;
46            break;
47        }
48        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
49
50        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
51        if (remainingTime <= 0) {
52            VLOG("timed out due to long read");
53            mTimedOut = true;
54            break;
55        }
56
57        int count = poll(&pfds, 1, remainingTime);
58        if (count == 0) {
59            VLOG("timed out due to block calling poll");
60            mTimedOut = true;
61            break;
62        } else if (count < 0) {
63            VLOG("poll failed: %s", strerror(errno));
64            return -errno;
65        } else {
66            if ((pfds.revents & POLLERR) != 0) {
67                VLOG("return event has error %s", strerror(errno));
68                return errno != 0 ? -errno : UNKNOWN_ERROR;
69            } else {
70                ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
71                if (amt < 0) {
72                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
73                        continue;
74                    } else {
75                        VLOG("Fail to read %d: %s", fd, strerror(errno));
76                        return -errno;
77                    }
78                } else if (amt == 0) {
79                    VLOG("Reached EOF of fd=%d", fd);
80                    break;
81                }
82                mBuffer.wp()->move(amt);
83            }
84        }
85    }
86    mFinishTime = uptimeMillis();
87    return NO_ERROR;
88}
89
90status_t FdBuffer::readFully(int fd) {
91    mStartTime = uptimeMillis();
92
93    while (true) {
94        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
95            // Don't let it get too big.
96            mTruncated = true;
97            VLOG("Truncating data");
98            break;
99        }
100        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
101
102        ssize_t amt =
103                TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()));
104        if (amt < 0) {
105            VLOG("Fail to read %d: %s", fd, strerror(errno));
106            return -errno;
107        } else if (amt == 0) {
108            VLOG("Done reading %zu bytes", mBuffer.size());
109            // We're done.
110            break;
111        }
112        mBuffer.wp()->move(amt);
113    }
114
115    mFinishTime = uptimeMillis();
116    return NO_ERROR;
117}
118
119status_t FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs,
120                                             const bool isSysfs) {
121    struct pollfd pfds[] = {
122            {.fd = fd, .events = POLLIN},
123            {.fd = toFd, .events = POLLOUT},
124            {.fd = fromFd, .events = POLLIN},
125    };
126
127    mStartTime = uptimeMillis();
128
129    // mark all fds non blocking
130    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
131    fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
132    fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
133
134    // A circular buffer holds data read from fd and writes to parsing process
135    uint8_t cirBuf[BUFFER_SIZE];
136    size_t cirSize = 0;
137    int rpos = 0, wpos = 0;
138
139    // This is the buffer used to store processed data
140    while (true) {
141        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
142            mTruncated = true;
143            break;
144        }
145        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
146
147        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
148        if (remainingTime <= 0) {
149            VLOG("timed out due to long read");
150            mTimedOut = true;
151            break;
152        }
153
154        // wait for any pfds to be ready to perform IO
155        int count = poll(pfds, 3, remainingTime);
156        if (count == 0) {
157            VLOG("timed out due to block calling poll");
158            mTimedOut = true;
159            break;
160        } else if (count < 0) {
161            VLOG("Fail to poll: %s", strerror(errno));
162            return -errno;
163        }
164
165        // make sure no errors occur on any fds
166        for (int i = 0; i < 3; ++i) {
167            if ((pfds[i].revents & POLLERR) != 0) {
168                if (i == 0 && isSysfs) {
169                    VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
170                    continue;
171                }
172                VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
173                return errno != 0 ? -errno : UNKNOWN_ERROR;
174            }
175        }
176
177        // read from fd
178        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
179            ssize_t amt;
180            if (rpos >= wpos) {
181                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
182            } else {
183                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
184            }
185            if (amt < 0) {
186                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
187                    VLOG("Fail to read fd %d: %s", fd, strerror(errno));
188                    return -errno;
189                }  // otherwise just continue
190            } else if (amt == 0) {
191                VLOG("Reached EOF of input file %d", fd);
192                pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
193            } else {
194                rpos += amt;
195                cirSize += amt;
196            }
197        }
198
199        // write to parsing process
200        if (cirSize > 0 && pfds[1].fd != -1) {
201            ssize_t amt;
202            if (rpos > wpos) {
203                amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
204            } else {
205                amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
206            }
207            if (amt < 0) {
208                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
209                    VLOG("Fail to write toFd %d: %s", toFd, strerror(errno));
210                    return -errno;
211                }  // otherwise just continue
212            } else {
213                wpos += amt;
214                cirSize -= amt;
215            }
216        }
217
218        // if buffer is empty and fd is closed, close write fd.
219        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
220            VLOG("Close write pipe %d", toFd);
221            ::close(pfds[1].fd);
222            pfds[1].fd = -1;
223        }
224
225        // circular buffer, reset rpos and wpos
226        if (rpos >= BUFFER_SIZE) {
227            rpos = 0;
228        }
229        if (wpos >= BUFFER_SIZE) {
230            wpos = 0;
231        }
232
233        // read from parsing process
234        ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
235        if (amt < 0) {
236            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
237                VLOG("Fail to read fromFd %d: %s", fromFd, strerror(errno));
238                return -errno;
239            }  // otherwise just continue
240        } else if (amt == 0) {
241            VLOG("Reached EOF of fromFd %d", fromFd);
242            break;
243        } else {
244            mBuffer.wp()->move(amt);
245        }
246    }
247
248    mFinishTime = uptimeMillis();
249    return NO_ERROR;
250}
251
252size_t FdBuffer::size() const { return mBuffer.size(); }
253
254EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); }
255