137f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com/*
237f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * Copyright (C) 2016 The Android Open Source Project
31cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org *
437f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * Licensed under the Apache License, Version 2.0 (the "License");
537f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * you may not use this file except in compliance with the License.
637f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com * You may obtain a copy of the License at
737f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com *
81cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org *      http://www.apache.org/licenses/LICENSE-2.0
91cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org *
101cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * Unless required by applicable law or agreed to in writing, software
111cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * distributed under the License is distributed on an "AS IS" BASIS,
121cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
131cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * See the License for the specific language governing permissions and
141cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org * limitations under the License.
151cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org */
161cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
171cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#ifndef android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
181cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#define android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
191cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
201cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <queue>
21d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com#include <atomic>
221cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <thread>
231cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <condition_variable>
241cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#include <iostream>
251cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
261cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgnamespace android {
271cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
281cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgtemplate<typename T>
291cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgclass ConcurrentQueue {
301cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgpublic:
311cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    void waitForItems() {
321cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        std::unique_lock<std::mutex> g(mLock);
331cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        while (mQueue.empty() && mIsActive) {
341cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            mCond.wait(g);
351cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        }
361cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
371cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
381cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    std::vector<T> flush() {
391cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        std::vector<T> items;
401cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
411cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        MuxGuard g(mLock);
421cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        if (mQueue.empty() || !mIsActive) {
431cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            return items;
441cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        }
451cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        while (!mQueue.empty()) {
461cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            items.push_back(std::move(mQueue.front()));
4737f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com            mQueue.pop();
4837f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com        }
491cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        return items;
501cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
511cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
521cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    void push(T&& item) {
531cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        {
541cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            MuxGuard g(mLock);
551cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            if (!mIsActive) {
561cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                return;
571cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            }
581cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            mQueue.push(std::move(item));
591cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        }
601cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        mCond.notify_one();
611cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
621cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
631cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    /* Deactivates the queue, thus no one can push items to it, also
6437f3ae0b9f31abb62f34f91b17e2eb86e514ae27reed@google.com     * notifies all waiting thread.
6590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com     */
6690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    void deactivate() {
6790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com        {
6890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com            MuxGuard g(mLock);
6990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com            mIsActive = false;
7090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com        }
7190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com        mCond.notify_all();  // To unblock all waiting consumers.
7290c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    }
7390c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com
7490c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    ConcurrentQueue() = default;
7590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com
7690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    ConcurrentQueue(const ConcurrentQueue &) = delete;
7790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    ConcurrentQueue &operator=(const ConcurrentQueue &) = delete;
7890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comprivate:
7990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    using MuxGuard = std::lock_guard<std::mutex>;
8090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com
8190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    bool mIsActive = true;
82d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com    mutable std::mutex mLock;
8390c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    std::condition_variable mCond;
8490c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    std::queue<T> mQueue;
8590c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com};
8690c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com
8790c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comtemplate<typename T>
8890c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comclass BatchingConsumer {
8990c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.comprivate:
9090c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com    enum class State {
9190c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com        INIT = 0,
9290c07ea1d0aa6b7f20252c43fe23ee5ddc1d23cbreed@google.com        RUNNING = 1,
931cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        STOP_REQUESTED = 2,
941cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        STOPPED = 3,
951cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    };
96d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
971cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgpublic:
981cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    BatchingConsumer() : mState(State::INIT) {}
991cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1001cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    BatchingConsumer(const BatchingConsumer &) = delete;
1011cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    BatchingConsumer &operator=(const BatchingConsumer &) = delete;
1021cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1031cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    using OnBatchReceivedFunc = std::function<void(const std::vector<T>& vec)>;
104d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
1051cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    void run(ConcurrentQueue<T>* queue,
106d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com             std::chrono::nanoseconds batchInterval,
1071cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org             const OnBatchReceivedFunc& func) {
108d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com        mQueue = queue;
1091cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        mBatchInterval = batchInterval;
110d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
1111cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        mWorkerThread = std::thread(
112d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com            &BatchingConsumer<T>::runInternal, this, func);
1131cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
114d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
1151cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    void requestStop() {
116d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com        mState = State::STOP_REQUESTED;
1171cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
118d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
1191cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    void waitStopped() {
120d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com        if (mWorkerThread.joinable()) {
1211cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            mWorkerThread.join();
122d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com        }
1231cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
124d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com
1251cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgprivate:
126cff01c5b60d4969d817c34d002752cd379b1862ajunov@chromium.org    void runInternal(const OnBatchReceivedFunc& onBatchReceived) {
127cff01c5b60d4969d817c34d002752cd379b1862ajunov@chromium.org        if (mState.exchange(State::RUNNING) == State::INIT) {
128d6176b0dcacb124539e0cfd051e6d93a9782f020rmistry@google.com            while (State::RUNNING == mState) {
1291cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                mQueue->waitForItems();
1301cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                if (State::STOP_REQUESTED == mState) break;
1311cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1321cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                std::this_thread::sleep_for(mBatchInterval);
1331cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                if (State::STOP_REQUESTED == mState) break;
1341cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1351cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                std::vector<T> items = mQueue->flush();
1361cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1371cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                if (items.size() > 0) {
1381cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                    onBatchReceived(items);
1391cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org                }
1401cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org            }
1411cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        }
1421cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1431cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org        mState = State::STOPPED;
1441cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    }
1451cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1461cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.orgprivate:
1471cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    std::thread mWorkerThread;
1481cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1491cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    std::atomic<State> mState;
1501cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    std::chrono::nanoseconds mBatchInterval;
1511cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org    ConcurrentQueue<T>* mQueue;
1521cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org};
1531cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1541cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org}  // namespace android
1551cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org
1561cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org#endif //android_hardware_automotive_vehicle_V2_0_ConcurrentQueue_H_
1571cc8f6f3c48b33430d0e39a4a36601ac0d1de04ajunov@chromium.org