rsThreadIO.cpp revision af12ac6a08651464f8d823add667c706f993b587
1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "rsContext.h"
18
19#include "rsThreadIO.h"
20
21using namespace android;
22using namespace android::renderscript;
23
24ThreadIO::ThreadIO() : mUsingSocket(false) {
25}
26
27ThreadIO::~ThreadIO() {
28}
29
30void ThreadIO::init(bool useSocket) {
31    mUsingSocket = useSocket;
32    mToCore.init(16 * 1024);
33
34    if (mUsingSocket) {
35        mToClientSocket.init();
36        mToCoreSocket.init();
37    } else {
38        mToClient.init(1024);
39    }
40}
41
42void ThreadIO::shutdown() {
43    //ALOGE("shutdown 1");
44    mToCore.shutdown();
45    //ALOGE("shutdown 2");
46}
47
48void ThreadIO::coreFlush() {
49    //ALOGE("coreFlush 1");
50    if (mUsingSocket) {
51    } else {
52        mToCore.flush();
53    }
54    //ALOGE("coreFlush 2");
55}
56
57void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
58    //ALOGE("coreHeader %i %i", cmdID, dataLen);
59    if (mUsingSocket) {
60        CoreCmdHeader hdr;
61        hdr.bytes = dataLen;
62        hdr.cmdID = cmdID;
63        mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
64    } else {
65        mCoreCommandSize = dataLen;
66        mCoreCommandID = cmdID;
67        mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen);
68        mCoreDataBasePtr = mCoreDataPtr;
69    }
70    //ALOGE("coreHeader ret %p", mCoreDataPtr);
71    return mCoreDataPtr;
72}
73
74void ThreadIO::coreData(const void *data, size_t dataLen) {
75    //ALOGE("coreData %p %i", data, dataLen);
76    mToCoreSocket.writeAsync(data, dataLen);
77    //ALOGE("coreData ret %p", mCoreDataPtr);
78}
79
80void ThreadIO::coreCommit() {
81    //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
82    if (mUsingSocket) {
83    } else {
84        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
85        mToCore.commit(mCoreCommandID, mCoreCommandSize);
86    }
87    //ALOGE("coreCommit ret");
88}
89
90void ThreadIO::coreCommitSync() {
91    //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
92    if (mUsingSocket) {
93    } else {
94        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
95        mToCore.commitSync(mCoreCommandID, mCoreCommandSize);
96    }
97    //ALOGE("coreCommitSync ret");
98}
99
100void ThreadIO::clientShutdown() {
101    //ALOGE("coreShutdown 1");
102    mToClient.shutdown();
103    //ALOGE("coreShutdown 2");
104}
105
106void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
107    rsAssert(dataLen <= sizeof(mToCoreRet));
108    memcpy(&mToCoreRet, data, dataLen);
109}
110
111void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
112    memcpy(data, &mToCoreRet, dataLen);
113}
114
115void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
116    mToCore.setTimoutCallback(cb, dat, timeout);
117}
118
119
120bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) {
121    bool ret = false;
122    uint64_t startTime = con->getTime();
123
124    while (!mToCore.isEmpty() || waitForCommand) {
125        uint32_t cmdID = 0;
126        uint32_t cmdSize = 0;
127        ret = true;
128        if (con->props.mLogTimes) {
129            con->timerSet(Context::RS_TIMER_IDLE);
130        }
131
132        uint64_t delay = 0;
133        if (waitForCommand) {
134            delay = timeToWait - (con->getTime() - startTime);
135            if (delay > timeToWait) {
136                delay = 0;
137            }
138        }
139        const void * data = mToCore.get(&cmdID, &cmdSize, delay);
140        if (!cmdSize) {
141            // exception or timeout occurred.
142            return false;
143        }
144        if (con->props.mLogTimes) {
145            con->timerSet(Context::RS_TIMER_INTERNAL);
146        }
147        waitForCommand = false;
148        //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize);
149
150        if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
151            rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
152            ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID);
153            mToCore.printDebugData();
154        }
155        gPlaybackFuncs[cmdID](con, data, cmdSize << 2);
156        mToCore.next();
157    }
158    return ret;
159}
160
161RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
162    if (mUsingSocket) {
163        mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader));
164    } else {
165        size_t bytesData = 0;
166        const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData);
167        if (bytesData >= sizeof(uint32_t)) {
168            mLastClientHeader.userID = d[0];
169            mLastClientHeader.bytes = bytesData - sizeof(uint32_t);
170        } else {
171            mLastClientHeader.userID = 0;
172            mLastClientHeader.bytes = 0;
173        }
174    }
175    receiveLen[0] = mLastClientHeader.bytes;
176    usrID[0] = mLastClientHeader.userID;
177    return (RsMessageToClientType)mLastClientHeader.cmdID;
178}
179
180RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
181                                uint32_t *usrID, size_t bufferLen) {
182    receiveLen[0] = mLastClientHeader.bytes;
183    usrID[0] = mLastClientHeader.userID;
184    if (bufferLen < mLastClientHeader.bytes) {
185        return RS_MESSAGE_TO_CLIENT_RESIZE;
186    }
187    if (mUsingSocket) {
188        if (receiveLen[0]) {
189            mToClientSocket.read(data, receiveLen[0]);
190        }
191        return (RsMessageToClientType)mLastClientHeader.cmdID;
192    } else {
193        uint32_t bytesData = 0;
194        uint32_t commandID = 0;
195        const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData);
196        //ALOGE("getMessageToClient 3    %i  %i", commandID, bytesData);
197        //ALOGE("getMessageToClient  %i %i", commandID, *subID);
198        if (bufferLen >= receiveLen[0]) {
199            memcpy(data, d+1, receiveLen[0]);
200            mToClient.next();
201            return (RsMessageToClientType)commandID;
202        }
203    }
204    return RS_MESSAGE_TO_CLIENT_RESIZE;
205}
206
207bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
208                            size_t dataLen, bool waitForSpace) {
209    ClientCmdHeader hdr;
210    hdr.bytes = dataLen;
211    hdr.cmdID = cmdID;
212    hdr.userID = usrID;
213    if (mUsingSocket) {
214        mToClientSocket.writeAsync(&hdr, sizeof(hdr));
215        if (dataLen) {
216            mToClientSocket.writeAsync(data, dataLen);
217        }
218        return true;
219    } else {
220        if (!waitForSpace) {
221            if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) {
222                // Not enough room, and not waiting.
223                return false;
224            }
225        }
226
227        //ALOGE("sendMessageToClient 2");
228        uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID));
229        p[0] = usrID;
230        if (dataLen > 0) {
231            memcpy(p+1, data, dataLen);
232        }
233        mToClient.commit(cmdID, dataLen + sizeof(usrID));
234        //ALOGE("sendMessageToClient 3");
235        return true;
236    }
237    return false;
238}
239
240