1#ifndef _DEBLOCKBUFFER_HPP 2#define _DEBLOCKBUFFER_HPP 3/*------------------------------------------------------------------------- 4 * drawElements C++ Base Library 5 * ----------------------------- 6 * 7 * Copyright 2014 The Android Open Source Project 8 * 9 * Licensed under the Apache License, Version 2.0 (the "License"); 10 * you may not use this file except in compliance with the License. 11 * You may obtain a copy of the License at 12 * 13 * http://www.apache.org/licenses/LICENSE-2.0 14 * 15 * Unless required by applicable law or agreed to in writing, software 16 * distributed under the License is distributed on an "AS IS" BASIS, 17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 18 * See the License for the specific language governing permissions and 19 * limitations under the License. 20 * 21 *//*! 22 * \file 23 * \brief Block-based thread-safe queue. 24 *//*--------------------------------------------------------------------*/ 25 26#include "deBlockBuffer.hpp" 27#include "deMutex.hpp" 28#include "deSemaphore.h" 29 30#include <exception> 31 32namespace de 33{ 34 35void BlockBuffer_selfTest (void); 36 37class BufferCanceledException : public std::exception 38{ 39public: 40 inline BufferCanceledException (void) {} 41 inline ~BufferCanceledException (void) throw() {} 42 43 const char* what (void) const throw() { return "BufferCanceledException"; } 44}; 45 46template <typename T> 47class BlockBuffer 48{ 49public: 50 typedef BufferCanceledException CanceledException; 51 52 BlockBuffer (int blockSize, int numBlocks); 53 ~BlockBuffer (void); 54 55 void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed. 56 57 void write (int numElements, const T* elements); 58 int tryWrite (int numElements, const T* elements); 59 void flush (void); 60 bool tryFlush (void); 61 62 void read (int numElements, T* elements); 63 int tryRead (int numElements, T* elements); 64 65 void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException. 66 bool isCanceled (void) const { return !!m_canceled; } 67 68private: 69 BlockBuffer (const BlockBuffer& other); 70 BlockBuffer& operator= (const BlockBuffer& other); 71 72 int writeToCurrentBlock (int numElements, const T* elements, bool blocking); 73 int readFromCurrentBlock(int numElements, T* elements, bool blocking); 74 75 void flushWriteBlock (void); 76 77 deSemaphore m_fill; //!< Block fill count. 78 deSemaphore m_empty; //!< Block empty count. 79 80 int m_writeBlock; //!< Current write block ndx. 81 int m_writePos; //!< Position in block. 0 if block is not yet acquired. 82 83 int m_readBlock; //!< Current read block ndx. 84 int m_readPos; //!< Position in block. 0 if block is not yet acquired. 85 86 int m_blockSize; 87 int m_numBlocks; 88 89 T* m_elements; 90 int* m_numUsedInBlock; 91 92 Mutex m_writeLock; 93 Mutex m_readLock; 94 95 volatile deUint32 m_canceled; 96}; 97 98template <typename T> 99BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks) 100 : m_fill (0) 101 , m_empty (0) 102 , m_writeBlock (0) 103 , m_writePos (0) 104 , m_readBlock (0) 105 , m_readPos (0) 106 , m_blockSize (blockSize) 107 , m_numBlocks (numBlocks) 108 , m_elements (DE_NULL) 109 , m_numUsedInBlock (DE_NULL) 110 , m_writeLock () 111 , m_readLock () 112 , m_canceled (DE_FALSE) 113{ 114 DE_ASSERT(blockSize > 0); 115 DE_ASSERT(numBlocks > 0); 116 117 try 118 { 119 m_elements = new T[m_numBlocks*m_blockSize]; 120 m_numUsedInBlock = new int[m_numBlocks]; 121 } 122 catch (...) 123 { 124 delete[] m_elements; 125 delete[] m_numUsedInBlock; 126 throw; 127 } 128 129 m_fill = deSemaphore_create(0, DE_NULL); 130 m_empty = deSemaphore_create(numBlocks, DE_NULL); 131 DE_ASSERT(m_fill && m_empty); 132} 133 134template <typename T> 135BlockBuffer<T>::~BlockBuffer (void) 136{ 137 delete[] m_elements; 138 delete[] m_numUsedInBlock; 139 140 deSemaphore_destroy(m_fill); 141 deSemaphore_destroy(m_empty); 142} 143 144template <typename T> 145void BlockBuffer<T>::clear (void) 146{ 147 ScopedLock readLock (m_readLock); 148 ScopedLock writeLock (m_writeLock); 149 150 deSemaphore_destroy(m_fill); 151 deSemaphore_destroy(m_empty); 152 153 m_fill = deSemaphore_create(0, DE_NULL); 154 m_empty = deSemaphore_create(m_numBlocks, DE_NULL); 155 m_writeBlock = 0; 156 m_writePos = 0; 157 m_readBlock = 0; 158 m_readPos = 0; 159 m_canceled = DE_FALSE; 160 161 DE_ASSERT(m_fill && m_empty); 162} 163 164template <typename T> 165void BlockBuffer<T>::cancel (void) 166{ 167 DE_ASSERT(!m_canceled); 168 m_canceled = DE_TRUE; 169 170 deSemaphore_increment(m_empty); 171 deSemaphore_increment(m_fill); 172} 173 174template <typename T> 175int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking) 176{ 177 DE_ASSERT(numElements > 0 && elements != DE_NULL); 178 179 if (m_writePos == 0) 180 { 181 /* Write thread doesn't own current block - need to acquire. */ 182 if (blocking) 183 deSemaphore_decrement(m_empty); 184 else 185 { 186 if (!deSemaphore_tryDecrement(m_empty)) 187 return 0; 188 } 189 190 /* Check for canceled bit. */ 191 if (m_canceled) 192 { 193 // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here. 194 deSemaphore_increment(m_empty); 195 m_writeLock.unlock(); 196 throw CanceledException(); 197 } 198 } 199 200 /* Write thread owns current block. */ 201 T* block = m_elements + m_writeBlock*m_blockSize; 202 int numToWrite = de::min(numElements, m_blockSize-m_writePos); 203 204 DE_ASSERT(numToWrite > 0); 205 206 for (int ndx = 0; ndx < numToWrite; ndx++) 207 block[m_writePos+ndx] = elements[ndx]; 208 209 m_writePos += numToWrite; 210 211 if (m_writePos == m_blockSize) 212 flushWriteBlock(); /* Flush current write block. */ 213 214 return numToWrite; 215} 216 217template <typename T> 218int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking) 219{ 220 DE_ASSERT(numElements > 0 && elements != DE_NULL); 221 222 if (m_readPos == 0) 223 { 224 /* Read thread doesn't own current block - need to acquire. */ 225 if (blocking) 226 deSemaphore_decrement(m_fill); 227 else 228 { 229 if (!deSemaphore_tryDecrement(m_fill)) 230 return 0; 231 } 232 233 /* Check for canceled bit. */ 234 if (m_canceled) 235 { 236 // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here. 237 deSemaphore_increment(m_fill); 238 m_readLock.unlock(); 239 throw CanceledException(); 240 } 241 } 242 243 /* Read thread now owns current block. */ 244 const T* block = m_elements + m_readBlock*m_blockSize; 245 int numUsedInBlock = m_numUsedInBlock[m_readBlock]; 246 int numToRead = de::min(numElements, numUsedInBlock-m_readPos); 247 248 DE_ASSERT(numToRead > 0); 249 250 for (int ndx = 0; ndx < numToRead; ndx++) 251 elements[ndx] = block[m_readPos+ndx]; 252 253 m_readPos += numToRead; 254 255 if (m_readPos == numUsedInBlock) 256 { 257 /* Free current read block and advance. */ 258 m_readBlock = (m_readBlock+1) % m_numBlocks; 259 m_readPos = 0; 260 deSemaphore_increment(m_empty); 261 } 262 263 return numToRead; 264} 265 266template <typename T> 267int BlockBuffer<T>::tryWrite (int numElements, const T* elements) 268{ 269 int numWritten = 0; 270 271 DE_ASSERT(numElements > 0 && elements != DE_NULL); 272 273 if (m_canceled) 274 throw CanceledException(); 275 276 if (!m_writeLock.tryLock()) 277 return numWritten; 278 279 while (numWritten < numElements) 280 { 281 int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */); 282 283 if (ret == 0) 284 break; /* Write failed. */ 285 286 numWritten += ret; 287 } 288 289 m_writeLock.unlock(); 290 291 return numWritten; 292} 293 294template <typename T> 295void BlockBuffer<T>::write (int numElements, const T* elements) 296{ 297 DE_ASSERT(numElements > 0 && elements != DE_NULL); 298 299 if (m_canceled) 300 throw CanceledException(); 301 302 m_writeLock.lock(); 303 304 int numWritten = 0; 305 while (numWritten < numElements) 306 numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */); 307 308 m_writeLock.unlock(); 309} 310 311template <typename T> 312void BlockBuffer<T>::flush (void) 313{ 314 m_writeLock.lock(); 315 316 if (m_writePos > 0) 317 flushWriteBlock(); 318 319 m_writeLock.unlock(); 320} 321 322template <typename T> 323bool BlockBuffer<T>::tryFlush (void) 324{ 325 if (!m_writeLock.tryLock()) 326 return false; 327 328 if (m_writePos > 0) 329 flushWriteBlock(); 330 331 m_writeLock.unlock(); 332 333 return true; 334} 335 336template <typename T> 337void BlockBuffer<T>::flushWriteBlock (void) 338{ 339 DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize)); 340 341 m_numUsedInBlock[m_writeBlock] = m_writePos; 342 m_writeBlock = (m_writeBlock+1) % m_numBlocks; 343 m_writePos = 0; 344 deSemaphore_increment(m_fill); 345} 346 347template <typename T> 348int BlockBuffer<T>::tryRead (int numElements, T* elements) 349{ 350 int numRead = 0; 351 352 if (m_canceled) 353 throw CanceledException(); 354 355 if (!m_readLock.tryLock()) 356 return numRead; 357 358 while (numRead < numElements) 359 { 360 int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */); 361 362 if (ret == 0) 363 break; /* Failed. */ 364 365 numRead += ret; 366 } 367 368 m_readLock.unlock(); 369 370 return numRead; 371} 372 373template <typename T> 374void BlockBuffer<T>::read (int numElements, T* elements) 375{ 376 DE_ASSERT(numElements > 0 && elements != DE_NULL); 377 378 if (m_canceled) 379 throw CanceledException(); 380 381 m_readLock.lock(); 382 383 int numRead = 0; 384 while (numRead < numElements) 385 numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */); 386 387 m_readLock.unlock(); 388} 389 390} // de 391 392#endif // _DEBLOCKBUFFER_HPP 393