1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "rsContext.h"
18#include "rsThreadIO.h"
19#include "rsgApiStructs.h"
20
21#include <unistd.h>
22#include <sys/types.h>
23#include <sys/socket.h>
24
25#include <fcntl.h>
26#include <poll.h>
27
28
29using namespace android;
30using namespace android::renderscript;
31
32ThreadIO::ThreadIO() {
33    mRunning = true;
34    mPureFifo = false;
35    mMaxInlineSize = 1024;
36}
37
38ThreadIO::~ThreadIO() {
39}
40
41void ThreadIO::init() {
42    mToClient.init();
43    mToCore.init();
44}
45
46void ThreadIO::shutdown() {
47    mRunning = false;
48    mToCore.shutdown();
49}
50
51void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
52    //ALOGE("coreHeader %i %i", cmdID, dataLen);
53    CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
54    hdr->bytes = dataLen;
55    hdr->cmdID = cmdID;
56    mSendLen = dataLen + sizeof(CoreCmdHeader);
57    //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
58    //ALOGE("coreHeader ret ");
59    return &mSendBuffer[sizeof(CoreCmdHeader)];
60}
61
62void ThreadIO::coreCommit() {
63    mToCore.writeAsync(&mSendBuffer, mSendLen);
64}
65
66void ThreadIO::clientShutdown() {
67    mToClient.shutdown();
68}
69
70void ThreadIO::coreWrite(const void *data, size_t len) {
71    //ALOGV("core write %p %i", data, (int)len);
72    mToCore.writeAsync(data, len, true);
73}
74
75void ThreadIO::coreRead(void *data, size_t len) {
76    //ALOGV("core read %p %i", data, (int)len);
77    mToCore.read(data, len);
78}
79
80void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
81    uint32_t buf;
82    if (data == NULL) {
83        data = &buf;
84        dataLen = sizeof(buf);
85    }
86
87    mToCore.readReturn(data, dataLen);
88}
89
90void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
91    uint32_t buf;
92    if (data == NULL) {
93        data = &buf;
94        dataLen = sizeof(buf);
95    }
96
97    mToCore.writeWaitReturn(data, dataLen);
98}
99
100void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
101    //mToCore.setTimeoutCallback(cb, dat, timeout);
102}
103
104bool ThreadIO::playCoreCommands(Context *con, int waitFd) {
105    bool ret = false;
106    const bool isLocal = !isPureFifo();
107
108    uint8_t buf[2 * 1024];
109    const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
110    const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
111
112    struct pollfd p[2];
113    p[0].fd = mToCore.getReadFd();
114    p[0].events = POLLIN;
115    p[0].revents = 0;
116    p[1].fd = waitFd;
117    p[1].events = POLLIN;
118    p[1].revents = 0;
119    int pollCount = 1;
120    if (waitFd >= 0) {
121        pollCount = 2;
122    }
123
124    if (con->props.mLogTimes) {
125        con->timerSet(Context::RS_TIMER_IDLE);
126    }
127
128    int waitTime = -1;
129    while (mRunning) {
130        int pr = poll(p, pollCount, waitTime);
131        if (pr <= 0) {
132            break;
133        }
134
135        if (p[0].revents) {
136            size_t r = 0;
137            if (isLocal) {
138                r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
139                mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
140                if (r != sizeof(CoreCmdHeader)) {
141                    // exception or timeout occurred.
142                    break;
143                }
144            } else {
145                r = mToCore.read((void *)&cmd->cmdID, sizeof(cmd->cmdID));
146            }
147
148
149            ret = true;
150            if (con->props.mLogTimes) {
151                con->timerSet(Context::RS_TIMER_INTERNAL);
152            }
153            //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
154
155            if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
156                rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
157                ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
158            }
159
160            if (isLocal) {
161                gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
162            } else {
163                gPlaybackRemoteFuncs[cmd->cmdID](con, this);
164            }
165
166            if (con->props.mLogTimes) {
167                con->timerSet(Context::RS_TIMER_IDLE);
168            }
169
170            if (waitFd < 0) {
171                // If we don't have a secondary wait object we should stop blocking now
172                // that at least one command has been processed.
173                waitTime = 0;
174            }
175        }
176
177        if (p[1].revents && !p[0].revents) {
178            // We want to finish processing fifo events before processing the vsync.
179            // Otherwise we can end up falling behind and having tremendous lag.
180            break;
181        }
182    }
183    return ret;
184}
185
186RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
187    //ALOGE("getClientHeader");
188    mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
189
190    receiveLen[0] = mLastClientHeader.bytes;
191    usrID[0] = mLastClientHeader.userID;
192    //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
193    return (RsMessageToClientType)mLastClientHeader.cmdID;
194}
195
196RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
197                                uint32_t *usrID, size_t bufferLen) {
198    //ALOGE("getClientPayload");
199    receiveLen[0] = mLastClientHeader.bytes;
200    usrID[0] = mLastClientHeader.userID;
201    if (bufferLen < mLastClientHeader.bytes) {
202        return RS_MESSAGE_TO_CLIENT_RESIZE;
203    }
204    if (receiveLen[0]) {
205        mToClient.read(data, receiveLen[0]);
206    }
207    //ALOGE("getClientPayload x");
208    return (RsMessageToClientType)mLastClientHeader.cmdID;
209}
210
211bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
212                            size_t dataLen, bool waitForSpace) {
213
214    //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
215    ClientCmdHeader hdr;
216    hdr.bytes = (uint32_t)dataLen;
217    hdr.cmdID = cmdID;
218    hdr.userID = usrID;
219
220    mToClient.writeAsync(&hdr, sizeof(hdr));
221    if (dataLen) {
222        mToClient.writeAsync(data, dataLen);
223    }
224
225    //ALOGE("sendToClient x");
226    return true;
227}
228
229