FdBuffer.cpp revision c23fad2f9079f678eae15338f5e57e2a6bf7e391
1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "incidentd"
18
19#include "FdBuffer.h"
20
21#include <cutils/log.h>
22#include <utils/SystemClock.h>
23
24#include <fcntl.h>
25#include <poll.h>
26#include <unistd.h>
27#include <wait.h>
28
29const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB
30const ssize_t MAX_BUFFER_COUNT = 256; // 4 MB max
31
32FdBuffer::FdBuffer()
33    :mBuffer(BUFFER_SIZE),
34     mStartTime(-1),
35     mFinishTime(-1),
36     mTimedOut(false),
37     mTruncated(false)
38{
39}
40
41FdBuffer::~FdBuffer()
42{
43}
44
45status_t
46FdBuffer::read(int fd, int64_t timeout)
47{
48    struct pollfd pfds = {
49        .fd = fd,
50        .events = POLLIN
51    };
52    mStartTime = uptimeMillis();
53
54    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
55
56    while (true) {
57        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
58            mTruncated = true;
59            break;
60        }
61        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
62
63        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
64        if (remainingTime <= 0) {
65            mTimedOut = true;
66            break;
67        }
68
69        int count = poll(&pfds, 1, remainingTime);
70        if (count == 0) {
71            mTimedOut = true;
72            break;
73        } else if (count < 0) {
74            return -errno;
75        } else {
76            if ((pfds.revents & POLLERR) != 0) {
77                return errno != 0 ? -errno : UNKNOWN_ERROR;
78            } else {
79                ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
80                if (amt < 0) {
81                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
82                        continue;
83                    } else {
84                        return -errno;
85                    }
86                } else if (amt == 0) {
87                    break;
88                }
89                mBuffer.wp()->move(amt);
90            }
91        }
92    }
93    mFinishTime = uptimeMillis();
94    return NO_ERROR;
95}
96
97status_t
98FdBuffer::readProcessedDataInStream(int fd, int toFd, int fromFd, int64_t timeoutMs)
99{
100    struct pollfd pfds[] = {
101        { .fd = fd,     .events = POLLIN  },
102        { .fd = toFd,   .events = POLLOUT },
103        { .fd = fromFd, .events = POLLIN  },
104    };
105
106    mStartTime = uptimeMillis();
107
108    // mark all fds non blocking
109    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
110    fcntl(toFd, F_SETFL, fcntl(toFd, F_GETFL, 0) | O_NONBLOCK);
111    fcntl(fromFd, F_SETFL, fcntl(fromFd, F_GETFL, 0) | O_NONBLOCK);
112
113    // A circular buffer holds data read from fd and writes to parsing process
114    uint8_t cirBuf[BUFFER_SIZE];
115    size_t cirSize = 0;
116    int rpos = 0, wpos = 0;
117
118    // This is the buffer used to store processed data
119    while (true) {
120        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
121            mTruncated = true;
122            break;
123        }
124        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
125
126        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
127        if (remainingTime <= 0) {
128            mTimedOut = true;
129            break;
130        }
131
132        // wait for any pfds to be ready to perform IO
133        int count = poll(pfds, 3, remainingTime);
134        if (count == 0) {
135            mTimedOut = true;
136            break;
137        } else if (count < 0) {
138            return -errno;
139        }
140
141        // make sure no errors occur on any fds
142        for (int i = 0; i < 3; ++i) {
143            if ((pfds[i].revents & POLLERR) != 0) {
144                return errno != 0 ? -errno : UNKNOWN_ERROR;
145            }
146        }
147
148        // read from fd
149        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
150            ssize_t amt;
151            if (rpos >= wpos) {
152                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
153            } else {
154                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
155            }
156            if (amt < 0) {
157                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
158                    return -errno;
159                } // otherwise just continue
160            } else if (amt == 0) {  // reach EOF so don't have to poll pfds[0].
161                ::close(pfds[0].fd);
162                pfds[0].fd = -1;
163            } else {
164                rpos += amt;
165                cirSize += amt;
166            }
167        }
168
169        // write to parsing process
170        if (cirSize > 0 && pfds[1].fd != -1) {
171            ssize_t amt;
172            if (rpos > wpos) {
173                amt = ::write(toFd, cirBuf + wpos, rpos - wpos);
174            } else {
175                amt = ::write(toFd, cirBuf + wpos, BUFFER_SIZE - wpos);
176            }
177            if (amt < 0) {
178                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
179                    return -errno;
180                } // otherwise just continue
181            } else {
182                wpos += amt;
183                cirSize -= amt;
184            }
185        }
186
187        // if buffer is empty and fd is closed, close write fd.
188        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
189            ::close(pfds[1].fd);
190            pfds[1].fd = -1;
191        }
192
193        // circular buffer, reset rpos and wpos
194        if (rpos >= BUFFER_SIZE) {
195            rpos = 0;
196        }
197        if (wpos >= BUFFER_SIZE) {
198            wpos = 0;
199        }
200
201        // read from parsing process
202        ssize_t amt = ::read(fromFd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
203        if (amt < 0) {
204            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
205                return -errno;
206            } // otherwise just continue
207        } else if (amt == 0) {
208            break;
209        } else {
210            mBuffer.wp()->move(amt);
211        }
212    }
213
214    mFinishTime = uptimeMillis();
215    return NO_ERROR;
216}
217
218size_t
219FdBuffer::size() const
220{
221    return mBuffer.size();
222}
223
224EncodedBuffer::iterator
225FdBuffer::data() const
226{
227    return mBuffer.begin();
228}
229