rsThreadIO.cpp revision af12ac6a08651464f8d823add667c706f993b587
1/* 2 * Copyright (C) 2009 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "rsContext.h" 18 19#include "rsThreadIO.h" 20 21using namespace android; 22using namespace android::renderscript; 23 24ThreadIO::ThreadIO() : mUsingSocket(false) { 25} 26 27ThreadIO::~ThreadIO() { 28} 29 30void ThreadIO::init(bool useSocket) { 31 mUsingSocket = useSocket; 32 mToCore.init(16 * 1024); 33 34 if (mUsingSocket) { 35 mToClientSocket.init(); 36 mToCoreSocket.init(); 37 } else { 38 mToClient.init(1024); 39 } 40} 41 42void ThreadIO::shutdown() { 43 //ALOGE("shutdown 1"); 44 mToCore.shutdown(); 45 //ALOGE("shutdown 2"); 46} 47 48void ThreadIO::coreFlush() { 49 //ALOGE("coreFlush 1"); 50 if (mUsingSocket) { 51 } else { 52 mToCore.flush(); 53 } 54 //ALOGE("coreFlush 2"); 55} 56 57void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) { 58 //ALOGE("coreHeader %i %i", cmdID, dataLen); 59 if (mUsingSocket) { 60 CoreCmdHeader hdr; 61 hdr.bytes = dataLen; 62 hdr.cmdID = cmdID; 63 mToCoreSocket.writeAsync(&hdr, sizeof(hdr)); 64 } else { 65 mCoreCommandSize = dataLen; 66 mCoreCommandID = cmdID; 67 mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen); 68 mCoreDataBasePtr = mCoreDataPtr; 69 } 70 //ALOGE("coreHeader ret %p", mCoreDataPtr); 71 return mCoreDataPtr; 72} 73 74void ThreadIO::coreData(const void *data, size_t dataLen) { 75 //ALOGE("coreData %p %i", data, dataLen); 76 mToCoreSocket.writeAsync(data, dataLen); 77 //ALOGE("coreData ret %p", mCoreDataPtr); 78} 79 80void ThreadIO::coreCommit() { 81 //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize); 82 if (mUsingSocket) { 83 } else { 84 rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize); 85 mToCore.commit(mCoreCommandID, mCoreCommandSize); 86 } 87 //ALOGE("coreCommit ret"); 88} 89 90void ThreadIO::coreCommitSync() { 91 //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize); 92 if (mUsingSocket) { 93 } else { 94 rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize); 95 mToCore.commitSync(mCoreCommandID, mCoreCommandSize); 96 } 97 //ALOGE("coreCommitSync ret"); 98} 99 100void ThreadIO::clientShutdown() { 101 //ALOGE("coreShutdown 1"); 102 mToClient.shutdown(); 103 //ALOGE("coreShutdown 2"); 104} 105 106void ThreadIO::coreSetReturn(const void *data, size_t dataLen) { 107 rsAssert(dataLen <= sizeof(mToCoreRet)); 108 memcpy(&mToCoreRet, data, dataLen); 109} 110 111void ThreadIO::coreGetReturn(void *data, size_t dataLen) { 112 memcpy(data, &mToCoreRet, dataLen); 113} 114 115void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) { 116 mToCore.setTimoutCallback(cb, dat, timeout); 117} 118 119 120bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) { 121 bool ret = false; 122 uint64_t startTime = con->getTime(); 123 124 while (!mToCore.isEmpty() || waitForCommand) { 125 uint32_t cmdID = 0; 126 uint32_t cmdSize = 0; 127 ret = true; 128 if (con->props.mLogTimes) { 129 con->timerSet(Context::RS_TIMER_IDLE); 130 } 131 132 uint64_t delay = 0; 133 if (waitForCommand) { 134 delay = timeToWait - (con->getTime() - startTime); 135 if (delay > timeToWait) { 136 delay = 0; 137 } 138 } 139 const void * data = mToCore.get(&cmdID, &cmdSize, delay); 140 if (!cmdSize) { 141 // exception or timeout occurred. 142 return false; 143 } 144 if (con->props.mLogTimes) { 145 con->timerSet(Context::RS_TIMER_INTERNAL); 146 } 147 waitForCommand = false; 148 //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize); 149 150 if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) { 151 rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *))); 152 ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID); 153 mToCore.printDebugData(); 154 } 155 gPlaybackFuncs[cmdID](con, data, cmdSize << 2); 156 mToCore.next(); 157 } 158 return ret; 159} 160 161RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) { 162 if (mUsingSocket) { 163 mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader)); 164 } else { 165 size_t bytesData = 0; 166 const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData); 167 if (bytesData >= sizeof(uint32_t)) { 168 mLastClientHeader.userID = d[0]; 169 mLastClientHeader.bytes = bytesData - sizeof(uint32_t); 170 } else { 171 mLastClientHeader.userID = 0; 172 mLastClientHeader.bytes = 0; 173 } 174 } 175 receiveLen[0] = mLastClientHeader.bytes; 176 usrID[0] = mLastClientHeader.userID; 177 return (RsMessageToClientType)mLastClientHeader.cmdID; 178} 179 180RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen, 181 uint32_t *usrID, size_t bufferLen) { 182 receiveLen[0] = mLastClientHeader.bytes; 183 usrID[0] = mLastClientHeader.userID; 184 if (bufferLen < mLastClientHeader.bytes) { 185 return RS_MESSAGE_TO_CLIENT_RESIZE; 186 } 187 if (mUsingSocket) { 188 if (receiveLen[0]) { 189 mToClientSocket.read(data, receiveLen[0]); 190 } 191 return (RsMessageToClientType)mLastClientHeader.cmdID; 192 } else { 193 uint32_t bytesData = 0; 194 uint32_t commandID = 0; 195 const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData); 196 //ALOGE("getMessageToClient 3 %i %i", commandID, bytesData); 197 //ALOGE("getMessageToClient %i %i", commandID, *subID); 198 if (bufferLen >= receiveLen[0]) { 199 memcpy(data, d+1, receiveLen[0]); 200 mToClient.next(); 201 return (RsMessageToClientType)commandID; 202 } 203 } 204 return RS_MESSAGE_TO_CLIENT_RESIZE; 205} 206 207bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data, 208 size_t dataLen, bool waitForSpace) { 209 ClientCmdHeader hdr; 210 hdr.bytes = dataLen; 211 hdr.cmdID = cmdID; 212 hdr.userID = usrID; 213 if (mUsingSocket) { 214 mToClientSocket.writeAsync(&hdr, sizeof(hdr)); 215 if (dataLen) { 216 mToClientSocket.writeAsync(data, dataLen); 217 } 218 return true; 219 } else { 220 if (!waitForSpace) { 221 if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) { 222 // Not enough room, and not waiting. 223 return false; 224 } 225 } 226 227 //ALOGE("sendMessageToClient 2"); 228 uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID)); 229 p[0] = usrID; 230 if (dataLen > 0) { 231 memcpy(p+1, data, dataLen); 232 } 233 mToClient.commit(cmdID, dataLen + sizeof(usrID)); 234 //ALOGE("sendMessageToClient 3"); 235 return true; 236 } 237 return false; 238} 239 240