FdBuffer.cpp revision 99c248feb2d1f863b864bdfd1e3b37af17f18732
1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "incidentd"
18
19#include "FdBuffer.h"
20#include "io_util.h"
21
22#include <cutils/log.h>
23#include <utils/SystemClock.h>
24
25#include <fcntl.h>
26#include <poll.h>
27#include <unistd.h>
28#include <wait.h>
29
30const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
31const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
32
33FdBuffer::FdBuffer()
34    :mBuffers(),
35     mStartTime(-1),
36     mFinishTime(-1),
37     mCurrentWritten(-1),
38     mTimedOut(false),
39     mTruncated(false)
40{
41}
42
43FdBuffer::~FdBuffer()
44{
45    const int N = mBuffers.size();
46    for (int i=0; i<N; i++) {
47        uint8_t* buf = mBuffers[i];
48        free(buf);
49    }
50}
51
52status_t
53FdBuffer::read(int fd, int64_t timeout)
54{
55    struct pollfd pfds = {
56        .fd = fd,
57        .events = POLLIN
58    };
59    mStartTime = uptimeMillis();
60
61    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
62
63    uint8_t* buf = NULL;
64    while (true) {
65        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
66            if (mBuffers.size() == MAX_BUFFER_COUNT) {
67                mTruncated = true;
68                break;
69            }
70            buf = (uint8_t*)malloc(BUFFER_SIZE);
71            if (buf == NULL) {
72                return NO_MEMORY;
73            }
74            mBuffers.push_back(buf);
75            mCurrentWritten = 0;
76        }
77
78        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
79        if (remainingTime <= 0) {
80            mTimedOut = true;
81            break;
82        }
83
84        int count = poll(&pfds, 1, remainingTime);
85        if (count == 0) {
86            mTimedOut = true;
87            break;
88        } else if (count < 0) {
89            return -errno;
90        } else {
91            if ((pfds.revents & POLLERR) != 0) {
92                return errno != 0 ? -errno : UNKNOWN_ERROR;
93            } else {
94                ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
95                if (amt < 0) {
96                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
97                        continue;
98                    } else {
99                        return -errno;
100                    }
101                } else if (amt == 0) {
102                    break;
103                }
104                mCurrentWritten += amt;
105            }
106        }
107    }
108
109    mFinishTime = uptimeMillis();
110    return NO_ERROR;
111}
112
113status_t
114FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs)
115{
116    struct pollfd pfds[] = {
117        { .fd = fd,     .events = POLLIN  },
118        { .fd = toFd,   .events = POLLOUT },
119        { .fd = fromFd, .events = POLLIN  },
120    };
121
122    mStartTime = uptimeMillis();
123
124    // mark all fds non blocking
125    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
126    fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
127    fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
128
129    // A circular buffer holds data read from fd and writes to parsing process
130    uint8_t cirBuf[BUFFER_SIZE];
131    size_t cirSize = 0;
132    int rpos = 0, wpos = 0;
133
134    // This is the buffer used to store processed data
135    uint8_t* buf = NULL;
136    while (true) {
137        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
138            if (mBuffers.size() == MAX_BUFFER_COUNT) {
139                mTruncated = true;
140                break;
141            }
142            buf = (uint8_t*)malloc(BUFFER_SIZE);
143            if (buf == NULL) {
144                return NO_MEMORY;
145            }
146            mBuffers.push_back(buf);
147            mCurrentWritten = 0;
148        }
149
150        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
151        if (remainingTime <= 0) {
152            mTimedOut = true;
153            break;
154        }
155
156        // wait for any pfds to be ready to perform IO
157        int count = poll(pfds, 3, remainingTime);
158        if (count == 0) {
159            mTimedOut = true;
160            break;
161        } else if (count < 0) {
162            return -errno;
163        }
164
165        // make sure no errors occur on any fds
166        for (int i = 0; i < 3; ++i) {
167            if ((pfds[i].revents & POLLERR) != 0) {
168                return errno != 0 ? -errno : UNKNOWN_ERROR;
169            }
170        }
171
172        // read from fd
173        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
174            ssize_t amt;
175            if (rpos >= wpos) {
176                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
177            } else {
178                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
179            }
180            if (amt < 0) {
181                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
182                    return -errno;
183                } // otherwise just continue
184            } else if (amt == 0) {  // reach EOF so don't have to poll pfds[0].
185                ::close(pfds[0].fd);
186                pfds[0].fd = -1;
187            } else {
188                rpos += amt;
189                cirSize += amt;
190            }
191        }
192
193        // write to parsing process
194        if (cirSize > 0 && pfds[1].fd != -1) {
195            ssize_t amt;
196            if (rpos > wpos) {
197                amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
198            } else {
199                amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
200            }
201            if (amt < 0) {
202                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
203                    return -errno;
204                } // otherwise just continue
205            } else {
206                wpos += amt;
207                cirSize -= amt;
208            }
209        }
210
211        // if buffer is empty and fd is closed, close write fd.
212        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
213            ::close(pfds[1].fd);
214            pfds[1].fd = -1;
215        }
216
217        // circular buffer, reset rpos and wpos
218        if (rpos >= BUFFER_SIZE) {
219            rpos = 0;
220        }
221        if (wpos >= BUFFER_SIZE) {
222            wpos = 0;
223        }
224
225        // read from parsing process
226        ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
227        if (amt < 0) {
228            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
229                return -errno;
230            } // otherwise just continue
231        } else if (amt == 0) {
232            break;
233        } else {
234            mCurrentWritten += amt;
235        }
236    }
237
238    mFinishTime = uptimeMillis();
239    return NO_ERROR;
240}
241
242size_t
243FdBuffer::size() const
244{
245    if (mBuffers.empty()) return 0;
246    return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
247}
248
249status_t
250FdBuffer::flush(int fd) const
251{
252    size_t i=0;
253    status_t err = NO_ERROR;
254    for (i=0; i<mBuffers.size()-1; i++) {
255        err = write_all(fd, mBuffers[i], BUFFER_SIZE);
256        if (err != NO_ERROR) return err;
257    }
258    return write_all(fd, mBuffers[i], mCurrentWritten);
259}
260
261FdBuffer::iterator
262FdBuffer::end() const
263{
264    if (mBuffers.empty() || mCurrentWritten < 0) return begin();
265    if (mCurrentWritten == BUFFER_SIZE)
266        // FdBuffer doesn't allocate another buf since no more bytes to read.
267        return FdBuffer::iterator(*this, mBuffers.size(), 0);
268    return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten);
269}
270
271FdBuffer::iterator&
272FdBuffer::iterator::operator+(size_t offset)
273{
274    size_t newOffset = mOffset + offset;
275    while (newOffset >= BUFFER_SIZE) {
276        mIndex++;
277        newOffset -= BUFFER_SIZE;
278    }
279    mOffset = newOffset;
280    return *this;
281}
282
283size_t
284FdBuffer::iterator::bytesRead() const
285{
286    return mIndex * BUFFER_SIZE + mOffset;
287}
288