1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef SINGLE_STATE_QUEUE_H
18#define SINGLE_STATE_QUEUE_H
19
20// Non-blocking single element state queue, or
21// Non-blocking single-reader / single-writer multi-word atomic load / store
22
23#include <stdint.h>
24#include <cutils/atomic.h>
25
26namespace android {
27
28template<typename T> class SingleStateQueue {
29
30public:
31
32    class Mutator;
33    class Observer;
34
35    enum SSQ_STATUS {
36        SSQ_PENDING, /* = 0 */
37        SSQ_READ,
38        SSQ_DONE,
39    };
40
41    struct Shared {
42        // needs to be part of a union so don't define constructor or destructor
43
44        friend class Mutator;
45        friend class Observer;
46
47private:
48        void                init() { mAck = 0; mSequence = 0; }
49
50        volatile int32_t    mAck;
51        volatile int32_t    mSequence;
52        T                   mValue;
53    };
54
55    class Mutator {
56    public:
57        Mutator(Shared *shared)
58            : mSequence(0), mShared(shared)
59        {
60            // exactly one of Mutator and Observer must initialize, currently it is Observer
61            // shared->init();
62        }
63
64        // push new value onto state queue, overwriting previous value;
65        // returns a sequence number which can be used with ack()
66        int32_t push(const T& value)
67        {
68            Shared *shared = mShared;
69            int32_t sequence = mSequence;
70            sequence++;
71            android_atomic_acquire_store(sequence, &shared->mSequence);
72            shared->mValue = value;
73            sequence++;
74            android_atomic_release_store(sequence, &shared->mSequence);
75            mSequence = sequence;
76            // consider signalling a futex here, if we know that observer is waiting
77            return sequence;
78        }
79
80        // returns the status of the last state push.  This may be a stale value.
81        //
82        // SSQ_PENDING, or 0, means it has not been observed
83        // SSQ_READ means it has been read
84        // SSQ_DONE means it has been acted upon, after Observer::done() is called
85        enum SSQ_STATUS ack() const
86        {
87            // in the case of SSQ_DONE, prevent any subtle data-races of subsequent reads
88            // being performed (out-of-order) before the ack read, should the caller be
89            // depending on sequentiality of reads.
90            const int32_t ack = android_atomic_acquire_load(&mShared->mAck);
91            return ack - mSequence & ~1 ? SSQ_PENDING /* seq differ */ :
92                    ack & 1 ? SSQ_DONE : SSQ_READ;
93        }
94
95        // return true if a push with specified sequence number or later has been observed
96        bool ack(int32_t sequence) const
97        {
98            // this relies on 2's complement rollover to detect an ancient sequence number
99            return mShared->mAck - sequence >= 0;
100        }
101
102    private:
103        int32_t     mSequence;
104        Shared * const mShared;
105    };
106
107    class Observer {
108    public:
109        Observer(Shared *shared)
110            : mSequence(0), mSeed(1), mShared(shared)
111        {
112            // exactly one of Mutator and Observer must initialize, currently it is Observer
113            shared->init();
114        }
115
116        // return true if value has changed
117        bool poll(T& value)
118        {
119            Shared *shared = mShared;
120            int32_t before = shared->mSequence;
121            if (before == mSequence) {
122                return false;
123            }
124            for (int tries = 0; ; ) {
125                const int MAX_TRIES = 5;
126                if (before & 1) {
127                    if (++tries >= MAX_TRIES) {
128                        return false;
129                    }
130                    before = shared->mSequence;
131                } else {
132                    android_memory_barrier();
133                    T temp = shared->mValue;
134                    int32_t after = android_atomic_release_load(&shared->mSequence);
135                    if (after == before) {
136                        value = temp;
137                        shared->mAck = before;
138                        mSequence = before; // mSequence is even after poll success
139                        return true;
140                    }
141                    if (++tries >= MAX_TRIES) {
142                        return false;
143                    }
144                    before = after;
145                }
146            }
147        }
148
149        // (optional) used to indicate to the Mutator that the state that has been polled
150        // has also been acted upon.
151        void done()
152        {
153            const int32_t ack = mShared->mAck + 1;
154            // ensure all previous writes have been performed.
155            android_atomic_release_store(ack, &mShared->mAck); // mSequence is odd after "done"
156        }
157
158    private:
159        int32_t     mSequence;
160        int         mSeed;  // for PRNG
161        Shared * const mShared;
162    };
163
164#if 0
165    SingleStateQueue(void /*Shared*/ *shared);
166    /*virtual*/ ~SingleStateQueue() { }
167
168    static size_t size() { return sizeof(Shared); }
169#endif
170
171};
172
173}   // namespace android
174
175#endif  // SINGLE_STATE_QUEUE_H
176