1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "rsContext.h"
18#include "rsThreadIO.h"
19#include "rsgApiStructs.h"
20
21#include <unistd.h>
22#include <sys/types.h>
23#include <sys/socket.h>
24
25#include <fcntl.h>
26#include <poll.h>
27
28
29using namespace android;
30using namespace android::renderscript;
31
32ThreadIO::ThreadIO() {
33    mRunning = true;
34    mMaxInlineSize = 1024;
35}
36
37ThreadIO::~ThreadIO() {
38}
39
40void ThreadIO::init() {
41    mToClient.init();
42    mToCore.init();
43}
44
45void ThreadIO::shutdown() {
46    mRunning = false;
47    mToCore.shutdown();
48}
49
50void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
51    //ALOGE("coreHeader %i %i", cmdID, dataLen);
52    CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
53    hdr->bytes = dataLen;
54    hdr->cmdID = cmdID;
55    mSendLen = dataLen + sizeof(CoreCmdHeader);
56    //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
57    //ALOGE("coreHeader ret ");
58    return &mSendBuffer[sizeof(CoreCmdHeader)];
59}
60
61void ThreadIO::coreCommit() {
62    mToCore.writeAsync(&mSendBuffer, mSendLen);
63}
64
65void ThreadIO::clientShutdown() {
66    mToClient.shutdown();
67}
68
69void ThreadIO::coreWrite(const void *data, size_t len) {
70    //ALOGV("core write %p %i", data, (int)len);
71    mToCore.writeAsync(data, len, true);
72}
73
74void ThreadIO::coreRead(void *data, size_t len) {
75    //ALOGV("core read %p %i", data, (int)len);
76    mToCore.read(data, len);
77}
78
79void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
80    uint32_t buf;
81    if (data == nullptr) {
82        data = &buf;
83        dataLen = sizeof(buf);
84    }
85
86    mToCore.readReturn(data, dataLen);
87}
88
89void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
90    uint32_t buf;
91    if (data == nullptr) {
92        data = &buf;
93        dataLen = sizeof(buf);
94    }
95
96    mToCore.writeWaitReturn(data, dataLen);
97}
98
99void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
100    //mToCore.setTimeoutCallback(cb, dat, timeout);
101}
102
103bool ThreadIO::playCoreCommands(Context *con, int waitFd) {
104    bool ret = false;
105
106    uint8_t buf[2 * 1024];
107    const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
108    const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
109
110    struct pollfd p[2];
111    p[0].fd = mToCore.getReadFd();
112    p[0].events = POLLIN;
113    p[0].revents = 0;
114    p[1].fd = waitFd;
115    p[1].events = POLLIN;
116    p[1].revents = 0;
117    int pollCount = 1;
118    if (waitFd >= 0) {
119        pollCount = 2;
120    }
121
122    if (con->props.mLogTimes) {
123        con->timerSet(Context::RS_TIMER_IDLE);
124    }
125
126    int waitTime = -1;
127    while (mRunning) {
128        int pr = poll(p, pollCount, waitTime);
129        if (pr <= 0) {
130            break;
131        }
132
133        if (p[0].revents) {
134            size_t r = 0;
135            r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
136            mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
137            if (r != sizeof(CoreCmdHeader)) {
138              // exception or timeout occurred.
139              break;
140            }
141
142            ret = true;
143            if (con->props.mLogTimes) {
144                con->timerSet(Context::RS_TIMER_INTERNAL);
145            }
146            //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
147
148            if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
149                rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
150                ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
151            }
152
153            gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
154
155            if (con->props.mLogTimes) {
156                con->timerSet(Context::RS_TIMER_IDLE);
157            }
158
159            if (waitFd < 0) {
160                // If we don't have a secondary wait object we should stop blocking now
161                // that at least one command has been processed.
162                waitTime = 0;
163            }
164        }
165
166        if (p[1].revents && !p[0].revents) {
167            // We want to finish processing fifo events before processing the vsync.
168            // Otherwise we can end up falling behind and having tremendous lag.
169            break;
170        }
171    }
172    return ret;
173}
174
175RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
176    //ALOGE("getClientHeader");
177    mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
178
179    receiveLen[0] = mLastClientHeader.bytes;
180    usrID[0] = mLastClientHeader.userID;
181    //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
182    return (RsMessageToClientType)mLastClientHeader.cmdID;
183}
184
185RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
186                                uint32_t *usrID, size_t bufferLen) {
187    //ALOGE("getClientPayload");
188    receiveLen[0] = mLastClientHeader.bytes;
189    usrID[0] = mLastClientHeader.userID;
190    if (bufferLen < mLastClientHeader.bytes) {
191        return RS_MESSAGE_TO_CLIENT_RESIZE;
192    }
193    if (receiveLen[0]) {
194        mToClient.read(data, receiveLen[0]);
195    }
196    //ALOGE("getClientPayload x");
197    return (RsMessageToClientType)mLastClientHeader.cmdID;
198}
199
200bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
201                            size_t dataLen, bool waitForSpace) {
202
203    //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
204    ClientCmdHeader hdr;
205    hdr.bytes = (uint32_t)dataLen;
206    hdr.cmdID = cmdID;
207    hdr.userID = usrID;
208
209    mToClient.writeAsync(&hdr, sizeof(hdr));
210    if (dataLen) {
211        mToClient.writeAsync(data, dataLen);
212    }
213
214    //ALOGE("sendToClient x");
215    return true;
216}
217
218