1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.async;
18
19import java.util.ArrayList;
20import java.util.List;
21import java.util.NoSuchElementException;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.LinkedBlockingQueue;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26import java.util.concurrent.atomic.AtomicBoolean;
27
28import javax.annotation.Nonnull;
29
30/**
31 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer
32 * interactions.<br>
33 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
34 * closing the queue from either the producer or consumer side and enables
35 * precise accounting of objects which are never read by the consumer. Notably,
36 * this enables cleanly shutting down producer-consumer interactions without
37 * leaking managed resources which might otherwise be left dangling in the
38 * queue.
39 */
40public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>,
41        SafeCloseable {
42    /**
43     * A callback to be invoked with all of the elements of the sequence which
44     * are added but never retrieved via {@link #getNext}.
45     */
46    public static interface UnusedElementProcessor<T> {
47        /**
48         * Implementations should properly close the discarded element, if
49         * necessary.
50         */
51        public void process(T element);
52    }
53
54    /**
55     * An entry can either be a {@link T} or a special "poison-pill" marker
56     * indicating that the sequence has been closed.
57     */
58    private static class Entry<T> {
59        private final T mValue;
60        private final boolean mClosing;
61
62        private Entry(T value, boolean closing) {
63            mValue = value;
64            mClosing = closing;
65        }
66
67        public boolean isClosingMarker() {
68            return mClosing;
69        }
70
71        public T getValue() {
72            return mValue;
73        }
74    }
75    /**
76     * Lock used for mQueue modification and mClosed.
77     */
78    private final Object mLock;
79    /**
80     * The queue in which to store elements of the sequence as they arrive.
81     */
82    private final BlockingQueue<Entry<T>> mQueue;
83    /**
84     * Whether this sequence is closed.
85     */
86    private final AtomicBoolean mClosed;
87    /**
88     * The callback to use to process all elements which are discarded by the
89     * queue.
90     */
91    private final UnusedElementProcessor<T> mUnusedElementProcessor;
92
93    public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
94        mUnusedElementProcessor = unusedElementProcessor;
95        mLock = new Object();
96        mQueue = new LinkedBlockingQueue<>();
97        mClosed = new AtomicBoolean();
98    }
99
100    public ConcurrentBufferQueue() {
101        // Instantiate with a DiscardedElementProcessor which does nothing.
102        this(new UnusedElementProcessor<T>() {
103            @Override
104            public void process(T element) {
105            }
106        });
107    }
108
109    @Override
110    public void close() {
111        List<Entry<T>> remainingElements = new ArrayList<>();
112        synchronized (mLock) {
113            // Mark as closed so that no more threads wait in getNext().
114            // Any additional calls to close() will return immediately.
115            boolean alreadyClosed = mClosed.getAndSet(true);
116            if (alreadyClosed) {
117                return;
118            }
119
120            mQueue.drainTo(remainingElements);
121
122            // Keep feeding any currently-waiting consumer threads "poison pill"
123            // {@link Entry}s indicating that the sequence has ended so they
124            // wake up. When no more threads are waiting for another value from
125            // mQueue, the call to peek() from this thread will see a value.
126            // Note that this also ensures that there is a poison pill in the
127            // queue
128            // to keep waking-up any threads which manage to block in getNext()
129            // even after marking mClosed.
130            while (mQueue.peek() == null) {
131                mQueue.add(makeClosingMarker());
132            }
133        }
134
135        for (Entry<T> entry : remainingElements) {
136            if (!entry.isClosingMarker()) {
137                mUnusedElementProcessor.process(entry.getValue());
138            }
139        }
140    }
141
142    @Override
143    public void update(@Nonnull T element) {
144        boolean closed = false;
145        synchronized (mLock) {
146            closed = mClosed.get();
147            if (!closed) {
148                mQueue.add(makeEntry(element));
149            }
150        }
151        if (closed) {
152            mUnusedElementProcessor.process(element);
153        }
154    }
155
156    private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException {
157        if (nextEntry.isClosingMarker()) {
158            // Always keep a poison-pill in the queue to avoid a race condition
159            // in which a thread reaches the mQueue.take() call after close().
160            mQueue.add(nextEntry);
161            throw new BufferQueueClosedException();
162        } else {
163            return nextEntry.getValue();
164        }
165    }
166
167    @Override
168    public T getNext() throws InterruptedException, BufferQueueClosedException {
169        Entry<T> nextEntry = mQueue.take();
170        return doWithNextEntry(nextEntry);
171    }
172
173    @Override
174    public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
175            BufferQueueClosedException {
176        Entry<T> nextEntry = mQueue.poll(timeout, unit);
177        if (nextEntry == null) {
178            throw new TimeoutException();
179        }
180        return doWithNextEntry(nextEntry);
181    }
182
183    @Override
184    public T peekNext() {
185        Entry<T> nextEntry = mQueue.peek();
186        if (nextEntry == null) {
187            return null;
188        } else if (nextEntry.isClosingMarker()) {
189            return null;
190        } else {
191            return nextEntry.getValue();
192        }
193    }
194
195    @Override
196    public void discardNext() {
197        try {
198            Entry<T> nextEntry = mQueue.remove();
199            if (nextEntry.isClosingMarker()) {
200                // Always keep a poison-pill in the queue to avoid a race
201                // condition in which a thread reaches the mQueue.take() call
202                // after close().
203                mQueue.add(nextEntry);
204            } else {
205                mUnusedElementProcessor.process(nextEntry.getValue());
206            }
207        } catch (NoSuchElementException e) {
208            // If the queue is already empty, do nothing.
209            return;
210        }
211    }
212
213    @Override
214    public boolean isClosed() {
215        return mClosed.get();
216    }
217
218    private Entry makeEntry(T value) {
219        return new Entry(value, false);
220    }
221
222    private Entry makeClosingMarker() {
223        return new Entry(null, true);
224    }
225}
226