FdBuffer.cpp revision 0ed9b68a3fa8f6eab536a93cb18ce75d7d22b757
1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "incidentd"
18
19#include "FdBuffer.h"
20
21#include <cutils/log.h>
22#include <utils/SystemClock.h>
23
24#include <fcntl.h>
25#include <poll.h>
26#include <unistd.h>
27#include <wait.h>
28
29const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
30const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
31
32FdBuffer::FdBuffer()
33    :mBuffers(),
34     mStartTime(-1),
35     mFinishTime(-1),
36     mCurrentWritten(-1),
37     mTimedOut(false),
38     mTruncated(false)
39{
40}
41
42FdBuffer::~FdBuffer()
43{
44    const int N = mBuffers.size();
45    for (int i=0; i<N; i++) {
46        uint8_t* buf = mBuffers[i];
47        free(buf);
48    }
49}
50
51status_t
52FdBuffer::read(int fd, int64_t timeout)
53{
54    struct pollfd pfds = {
55        .fd = fd,
56        .events = POLLIN
57    };
58    mStartTime = uptimeMillis();
59
60    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
61
62    uint8_t* buf = NULL;
63    while (true) {
64        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
65            if (mBuffers.size() == MAX_BUFFER_COUNT) {
66                mTruncated = true;
67                break;
68            }
69            buf = (uint8_t*)malloc(BUFFER_SIZE);
70            if (buf == NULL) {
71                return NO_MEMORY;
72            }
73            mBuffers.push_back(buf);
74            mCurrentWritten = 0;
75        }
76
77        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
78        if (remainingTime <= 0) {
79            mTimedOut = true;
80            break;
81        }
82
83        int count = poll(&pfds, 1, remainingTime);
84        if (count == 0) {
85            mTimedOut = true;
86            break;
87        } else if (count < 0) {
88            return -errno;
89        } else {
90            if ((pfds.revents & POLLERR) != 0) {
91                return errno != 0 ? -errno : UNKNOWN_ERROR;
92            } else {
93                ssize_t amt = ::read(fd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
94                if (amt < 0) {
95                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
96                        continue;
97                    } else {
98                        return -errno;
99                    }
100                } else if (amt == 0) {
101                    break;
102                }
103                mCurrentWritten += amt;
104            }
105        }
106    }
107
108    mFinishTime = uptimeMillis();
109    return NO_ERROR;
110}
111
112status_t
113FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs)
114{
115    struct pollfd pfds[] = {
116        { .fd = fd,     .events = POLLIN  },
117        { .fd = toFd,   .events = POLLOUT },
118        { .fd = fromFd, .events = POLLIN  },
119    };
120
121    mStartTime = uptimeMillis();
122
123    // mark all fds non blocking
124    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
125    fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
126    fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
127
128    // A circular buffer holds data read from fd and writes to parsing process
129    uint8_t cirBuf[BUFFER_SIZE];
130    size_t cirSize = 0;
131    int rpos = 0, wpos = 0;
132
133    // This is the buffer used to store processed data
134    uint8_t* buf = NULL;
135    while (true) {
136        if (mCurrentWritten >= BUFFER_SIZE || mCurrentWritten < 0) {
137            if (mBuffers.size() == MAX_BUFFER_COUNT) {
138                mTruncated = true;
139                break;
140            }
141            buf = (uint8_t*)malloc(BUFFER_SIZE);
142            if (buf == NULL) {
143                return NO_MEMORY;
144            }
145            mBuffers.push_back(buf);
146            mCurrentWritten = 0;
147        }
148
149        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
150        if (remainingTime <= 0) {
151            mTimedOut = true;
152            break;
153        }
154
155        // wait for any pfds to be ready to perform IO
156        int count = poll(pfds, 3, remainingTime);
157        if (count == 0) {
158            mTimedOut = true;
159            break;
160        } else if (count < 0) {
161            return -errno;
162        }
163
164        // make sure no errors occur on any fds
165        for (int i = 0; i < 3; ++i) {
166            if ((pfds[i].revents & POLLERR) != 0) {
167                return errno != 0 ? -errno : UNKNOWN_ERROR;
168            }
169        }
170
171        // read from fd
172        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
173            ssize_t amt;
174            if (rpos >= wpos) {
175                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
176            } else {
177                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
178            }
179            if (amt < 0) {
180                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
181                    return -errno;
182                } // otherwise just continue
183            } else if (amt == 0) {  // reach EOF so don't have to poll pfds[0].
184                ::close(pfds[0].fd);
185                pfds[0].fd = -1;
186            } else {
187                rpos += amt;
188                cirSize += amt;
189            }
190        }
191
192        // write to parsing process
193        if (cirSize > 0 && pfds[1].fd != -1) {
194            ssize_t amt;
195            if (rpos > wpos) {
196                amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
197            } else {
198                amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
199            }
200            if (amt < 0) {
201                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
202                    return -errno;
203                } // otherwise just continue
204            } else {
205                wpos += amt;
206                cirSize -= amt;
207            }
208        }
209
210        // if buffer is empty and fd is closed, close write fd.
211        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
212            ::close(pfds[1].fd);
213            pfds[1].fd = -1;
214        }
215
216        // circular buffer, reset rpos and wpos
217        if (rpos >= BUFFER_SIZE) {
218            rpos = 0;
219        }
220        if (wpos >= BUFFER_SIZE) {
221            wpos = 0;
222        }
223
224        // read from parsing process
225        ssize_t amt = ::read(fromFd, buf + mCurrentWritten, BUFFER_SIZE - mCurrentWritten);
226        if (amt < 0) {
227            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
228                return -errno;
229            } // otherwise just continue
230        } else if (amt == 0) {
231            break;
232        } else {
233            mCurrentWritten += amt;
234        }
235    }
236
237    mFinishTime = uptimeMillis();
238    return NO_ERROR;
239}
240
241size_t
242FdBuffer::size()
243{
244    if (mBuffers.empty()) return 0;
245    return ((mBuffers.size() - 1) * BUFFER_SIZE) + mCurrentWritten;
246}
247
248status_t
249FdBuffer::write(ReportRequestSet* reporter)
250{
251    const int N = mBuffers.size() - 1;
252    for (int i=0; i<N; i++) {
253        reporter->write(mBuffers[i], BUFFER_SIZE);
254    }
255    reporter->write(mBuffers[N], mCurrentWritten);
256    return NO_ERROR;
257}
258
259FdBuffer::iterator
260FdBuffer::end()
261{
262    if (mBuffers.empty() || mCurrentWritten < 0) return begin();
263    if (mCurrentWritten == BUFFER_SIZE)
264        // FdBuffer doesn't allocate another buf since no more bytes to read.
265        return FdBuffer::iterator(*this, mBuffers.size(), 0);
266    return FdBuffer::iterator(*this, mBuffers.size() - 1, mCurrentWritten);
267}
268
269FdBuffer::iterator&
270FdBuffer::iterator::operator+(size_t offset)
271{
272    size_t newOffset = mOffset + offset;
273    while (newOffset >= BUFFER_SIZE) {
274        mIndex++;
275        newOffset -= BUFFER_SIZE;
276    }
277    mOffset = newOffset;
278    return *this;
279}
280
281size_t
282FdBuffer::iterator::bytesRead()
283{
284    return mIndex * BUFFER_SIZE + mOffset;
285}
286