1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#define DEBUG false
17#include "Log.h"
18
19#include "FdBuffer.h"
20
21#include <cutils/log.h>
22#include <utils/SystemClock.h>
23
24#include <fcntl.h>
25#include <poll.h>
26#include <unistd.h>
27#include <wait.h>
28
29namespace android {
30namespace os {
31namespace incidentd {
32
33const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
34const ssize_t MAX_BUFFER_COUNT = 256;   // 4 MB max
35
36FdBuffer::FdBuffer()
37    : mBuffer(BUFFER_SIZE), mStartTime(-1), mFinishTime(-1), mTimedOut(false), mTruncated(false) {}
38
39FdBuffer::~FdBuffer() {}
40
41status_t FdBuffer::read(int fd, int64_t timeout) {
42    struct pollfd pfds = {.fd = fd, .events = POLLIN};
43    mStartTime = uptimeMillis();
44
45    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
46
47    while (true) {
48        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
49            mTruncated = true;
50            break;
51        }
52        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
53
54        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
55        if (remainingTime <= 0) {
56            VLOG("timed out due to long read");
57            mTimedOut = true;
58            break;
59        }
60
61        int count = poll(&pfds, 1, remainingTime);
62        if (count == 0) {
63            VLOG("timed out due to block calling poll");
64            mTimedOut = true;
65            break;
66        } else if (count < 0) {
67            VLOG("poll failed: %s", strerror(errno));
68            return -errno;
69        } else {
70            if ((pfds.revents & POLLERR) != 0) {
71                VLOG("return event has error %s", strerror(errno));
72                return errno != 0 ? -errno : UNKNOWN_ERROR;
73            } else {
74                ssize_t amt = ::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite());
75                if (amt < 0) {
76                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
77                        continue;
78                    } else {
79                        VLOG("Fail to read %d: %s", fd, strerror(errno));
80                        return -errno;
81                    }
82                } else if (amt == 0) {
83                    VLOG("Reached EOF of fd=%d", fd);
84                    break;
85                }
86                mBuffer.wp()->move(amt);
87            }
88        }
89    }
90    mFinishTime = uptimeMillis();
91    return NO_ERROR;
92}
93
94status_t FdBuffer::readFully(int fd) {
95    mStartTime = uptimeMillis();
96
97    while (true) {
98        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
99            // Don't let it get too big.
100            mTruncated = true;
101            VLOG("Truncating data");
102            break;
103        }
104        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
105
106        ssize_t amt =
107                TEMP_FAILURE_RETRY(::read(fd, mBuffer.writeBuffer(), mBuffer.currentToWrite()));
108        if (amt < 0) {
109            VLOG("Fail to read %d: %s", fd, strerror(errno));
110            return -errno;
111        } else if (amt == 0) {
112            VLOG("Done reading %zu bytes", mBuffer.size());
113            // We're done.
114            break;
115        }
116        mBuffer.wp()->move(amt);
117    }
118
119    mFinishTime = uptimeMillis();
120    return NO_ERROR;
121}
122
123status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
124                                             int64_t timeoutMs, const bool isSysfs) {
125    struct pollfd pfds[] = {
126            {.fd = fd, .events = POLLIN},
127            {.fd = toFd.get(), .events = POLLOUT},
128            {.fd = fromFd.get(), .events = POLLIN},
129    };
130
131    mStartTime = uptimeMillis();
132
133    // mark all fds non blocking
134    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
135    fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
136    fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);
137
138    // A circular buffer holds data read from fd and writes to parsing process
139    uint8_t cirBuf[BUFFER_SIZE];
140    size_t cirSize = 0;
141    int rpos = 0, wpos = 0;
142
143    // This is the buffer used to store processed data
144    while (true) {
145        if (mBuffer.size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
146            mTruncated = true;
147            break;
148        }
149        if (mBuffer.writeBuffer() == NULL) return NO_MEMORY;
150
151        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
152        if (remainingTime <= 0) {
153            VLOG("timed out due to long read");
154            mTimedOut = true;
155            break;
156        }
157
158        // wait for any pfds to be ready to perform IO
159        int count = poll(pfds, 3, remainingTime);
160        if (count == 0) {
161            VLOG("timed out due to block calling poll");
162            mTimedOut = true;
163            break;
164        } else if (count < 0) {
165            VLOG("Fail to poll: %s", strerror(errno));
166            return -errno;
167        }
168
169        // make sure no errors occur on any fds
170        for (int i = 0; i < 3; ++i) {
171            if ((pfds[i].revents & POLLERR) != 0) {
172                if (i == 0 && isSysfs) {
173                    VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
174                    continue;
175                }
176                VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
177                return errno != 0 ? -errno : UNKNOWN_ERROR;
178            }
179        }
180
181        // read from fd
182        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
183            ssize_t amt;
184            if (rpos >= wpos) {
185                amt = ::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos);
186            } else {
187                amt = ::read(fd, cirBuf + rpos, wpos - rpos);
188            }
189            if (amt < 0) {
190                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
191                    VLOG("Fail to read fd %d: %s", fd, strerror(errno));
192                    return -errno;
193                }  // otherwise just continue
194            } else if (amt == 0) {
195                VLOG("Reached EOF of input file %d", fd);
196                pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
197            } else {
198                rpos += amt;
199                cirSize += amt;
200            }
201        }
202
203        // write to parsing process
204        if (cirSize > 0 && pfds[1].fd != -1) {
205            ssize_t amt;
206            if (rpos > wpos) {
207                amt = ::write(toFd.get(), cirBuf + wpos, rpos - wpos);
208            } else {
209                amt = ::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos);
210            }
211            if (amt < 0) {
212                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
213                    VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
214                    return -errno;
215                }  // otherwise just continue
216            } else {
217                wpos += amt;
218                cirSize -= amt;
219            }
220        }
221
222        // if buffer is empty and fd is closed, close write fd.
223        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
224            VLOG("Close write pipe %d", toFd.get());
225            toFd.reset();
226            pfds[1].fd = -1;
227        }
228
229        // circular buffer, reset rpos and wpos
230        if (rpos >= BUFFER_SIZE) {
231            rpos = 0;
232        }
233        if (wpos >= BUFFER_SIZE) {
234            wpos = 0;
235        }
236
237        // read from parsing process
238        ssize_t amt = ::read(fromFd.get(), mBuffer.writeBuffer(), mBuffer.currentToWrite());
239        if (amt < 0) {
240            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
241                VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
242                return -errno;
243            }  // otherwise just continue
244        } else if (amt == 0) {
245            VLOG("Reached EOF of fromFd %d", fromFd.get());
246            break;
247        } else {
248            mBuffer.wp()->move(amt);
249        }
250    }
251
252    mFinishTime = uptimeMillis();
253    return NO_ERROR;
254}
255
256size_t FdBuffer::size() const { return mBuffer.size(); }
257
258EncodedBuffer::iterator FdBuffer::data() const { return mBuffer.begin(); }
259
260}  // namespace incidentd
261}  // namespace os
262}  // namespace android
263