137f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com/* 237f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * Copyright (C) 2016 The Android Open Source Project 31cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * 437f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * Licensed under the Apache License, Version 2.0 (the "License"); 537f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * you may not use this file except in compliance with the License. 637f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * You may obtain a copy of the License at 737f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * 81cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * http://www.apache.org/licenses/LICENSE-2.0 91cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * 101cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * Unless required by applicable law or agreed to in writing, software 111cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * distributed under the License is distributed on an "AS IS" BASIS, 121cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 131cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * See the License for the specific language governing permissions and 141cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * limitations under the License. 151cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org */ 161cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 171cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#ifndef android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 181cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#define android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 191cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 201cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <queue> 21d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com#include <atomic> 221cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <thread> 231cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <condition_variable> 241cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <iostream> 251cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 261cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgnamespace android { 271cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 281cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgtemplate<typename T> 291cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgclass ConcurrentQueue { 301cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgpublic: 311cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org void waitForItems() { 321cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::unique_lock<std::mutex> g(mLock); 331cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org while (mQueue.empty() && mIsActive) { 341cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mCond.wait(g); 351cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 361cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 371cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 381cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::vector<T> flush() { 391cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::vector<T> items; 401cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 411cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org MuxGuard g(mLock); 421cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org if (mQueue.empty() || !mIsActive) { 431cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org return items; 441cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 451cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org while (!mQueue.empty()) { 461cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org items.push_back(std::move(mQueue.front())); 4737f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com mQueue.pop(); 4837f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com } 491cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org return items; 501cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 511cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 521cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org void push(T&& item) { 531cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org { 541cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org MuxGuard g(mLock); 551cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org if (!mIsActive) { 561cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org return; 571cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 581cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mQueue.push(std::move(item)); 591cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 601cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mCond.notify_one(); 611cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 621cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 631cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org /* Deactivates the queue, thus no one can push items to it, also 6437f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * notifies all waiting thread. 6590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com */ 6690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com void deactivate() { 6790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com { 6890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com MuxGuard g(mLock); 6990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com mIsActive = false; 7090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com } 7190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com mCond.notify_all(); // To unblock all waiting consumers. 7290c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com } 7390c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com 7490c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com ConcurrentQueue() = default; 7590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com 7690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com ConcurrentQueue(const ConcurrentQueue &) = delete; 7790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com ConcurrentQueue &operator=(const ConcurrentQueue &) = delete; 7890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comprivate: 7990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com using MuxGuard = std::lock_guard<std::mutex>; 8090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com 8190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com bool mIsActive = true; 82d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com mutable std::mutex mLock; 8390c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com std::condition_variable mCond; 8490c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com std::queue<T> mQueue; 8590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com}; 8690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com 8790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comtemplate<typename T> 8890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comclass BatchingConsumer { 8990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comprivate: 9090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com enum class State { 9190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com INIT = 0, 9290c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com RUNNING = 1, 931cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org STOP_REQUESTED = 2, 941cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org STOPPED = 3, 951cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org }; 96d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 971cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgpublic: 981cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org BatchingConsumer() : mState(State::INIT) {} 991cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1001cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org BatchingConsumer(const BatchingConsumer &) = delete; 1011cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org BatchingConsumer &operator=(const BatchingConsumer &) = delete; 1021cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1031cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org using OnBatchReceivedFunc = std::function<void(const std::vector<T>& vec)>; 104d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 1051cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org void run(ConcurrentQueue<T>* queue, 106d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com std::chrono::nanoseconds batchInterval, 1071cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org const OnBatchReceivedFunc& func) { 108d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com mQueue = queue; 1091cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mBatchInterval = batchInterval; 110d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 1111cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mWorkerThread = std::thread( 112d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com &BatchingConsumer<T>::runInternal, this, func); 1131cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 114d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 1151cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org void requestStop() { 116d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com mState = State::STOP_REQUESTED; 1171cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 118d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 1191cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org void waitStopped() { 120d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com if (mWorkerThread.joinable()) { 1211cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mWorkerThread.join(); 122d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com } 1231cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 124d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com 1251cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgprivate: 126cff01c5b60d4969d817c34d002752cd379b1862ajunov@chromium.org void runInternal(const OnBatchReceivedFunc& onBatchReceived) { 127cff01c5b60d4969d817c34d002752cd379b1862ajunov@chromium.org if (mState.exchange(State::RUNNING) == State::INIT) { 128d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com while (State::RUNNING == mState) { 1291cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mQueue->waitForItems(); 1301cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org if (State::STOP_REQUESTED == mState) break; 1311cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1321cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::this_thread::sleep_for(mBatchInterval); 1331cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org if (State::STOP_REQUESTED == mState) break; 1341cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1351cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::vector<T> items = mQueue->flush(); 1361cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1371cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org if (items.size() > 0) { 1381cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org onBatchReceived(items); 1391cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 1401cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 1411cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 1421cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1431cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org mState = State::STOPPED; 1441cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org } 1451cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1461cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgprivate: 1471cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::thread mWorkerThread; 1481cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1491cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::atomic<State> mState; 1501cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org std::chrono::nanoseconds mBatchInterval; 1511cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org ConcurrentQueue<T>* mQueue; 1521cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org}; 1531cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1541cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org} // namespace android 1551cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org 1561cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#endif //android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_ 1571cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org