ConcurrentBufferQueue.java revision de15a5ad14da2e9069642e6f616b66b4ae660e01
1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.async;
18
19import java.util.ArrayList;
20import java.util.List;
21import java.util.NoSuchElementException;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.LinkedBlockingQueue;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26import java.util.concurrent.atomic.AtomicBoolean;
27
28/**
29 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer
30 * interactions.<br>
31 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
32 * closing the queue from either the producer or consumer side and enables
33 * precise accounting of objects which are never read by the consumer. Notably,
34 * this enables cleanly shutting down producer-consumer interactions without
35 * leaking managed resources which might otherwise be left dangling in the
36 * queue.
37 */
38public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>, SafeCloseable {
39    /**
40     * An entry can either be a {@link T} or a special "poison-pill" marker
41     * indicating that the sequence has been closed.
42     */
43    private static class Entry<T> {
44        private final T mValue;
45        private final boolean mClosing;
46
47        private Entry(T value, boolean closing) {
48            mValue = value;
49            mClosing = closing;
50        }
51
52        public boolean isClosingMarker() {
53            return mClosing;
54        }
55
56        public T getValue() {
57            return mValue;
58        }
59    }
60
61    /**
62     * Lock used for mQueue modification and mClosed.
63     */
64    private final Object mLock;
65    /**
66     * The queue in which to store elements of the sequence as they arrive.
67     */
68    private final BlockingQueue<Entry<T>> mQueue;
69    /**
70     * Whether this sequence is closed.
71     */
72    private final AtomicBoolean mClosed;
73    /**
74     * The callback to use to process all elements which are discarded by the
75     * queue.
76     */
77    private final UnusedElementProcessor<T> mUnusedElementProcessor;
78
79    /**
80     * @param unusedElementProcessor A callback to process elements which are
81     *            never retrieved via {@link #getNext}.
82     */
83    public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
84        mLock = new Object();
85        mQueue = new LinkedBlockingQueue<>();
86        mUnusedElementProcessor = unusedElementProcessor;
87        mClosed = new AtomicBoolean();
88    }
89
90    public ConcurrentBufferQueue() {
91        // Instantiate with a DiscardedElementProcessor which does nothing.
92        this(new UnusedElementProcessor<T>() {
93            @Override
94            public void process(T element) {
95            }
96        });
97    }
98
99    @Override
100    public void close() {
101        List<Entry<T>> remainingElements = new ArrayList<>();
102        synchronized (mLock) {
103            // Mark as closed so that no more threads wait in getNext().
104            // Any additional calls to close() will return immediately.
105            boolean alreadyClosed = mClosed.getAndSet(true);
106            if (alreadyClosed) {
107                return;
108            }
109
110            mQueue.drainTo(remainingElements);
111
112            // Keep feeding any currently-waiting consumer threads "poison pill"
113            // {@link Entry}s indicating that the sequence has ended so they
114            // wake up. When no more threads are waiting for another value from
115            // mQueue, the call to peek() from this thread will see a value.
116            // Note that this also ensures that there is a poison pill in the
117            // queue
118            // to keep waking-up any threads which manage to block in getNext()
119            // even after marking mClosed.
120            while (mQueue.peek() == null) {
121                mQueue.add(makeClosingMarker());
122            }
123        }
124
125        for (Entry<T> entry : remainingElements) {
126            if (!entry.isClosingMarker()) {
127                mUnusedElementProcessor.process(entry.getValue());
128            }
129        }
130    }
131
132    @Override
133    public void append(T element) {
134        boolean closed = false;
135        synchronized (mLock) {
136            closed = mClosed.get();
137            if (!closed) {
138                mQueue.add(makeEntry(element));
139            }
140        }
141        if (closed) {
142            mUnusedElementProcessor.process(element);
143        }
144    }
145
146    @Override
147    public T getNext() throws InterruptedException, BufferQueueClosedException {
148        if (mClosed.get()) {
149            throw new BufferQueueClosedException();
150        }
151        Entry<T> nextEntry = mQueue.take();
152        if (nextEntry.isClosingMarker()) {
153            // Always keep a poisson-pill in the queue to avoid a race condition
154            // in which a
155            // thread reaches the mQueue.take() call after close().
156            mQueue.add(nextEntry);
157            throw new BufferQueueClosedException();
158        } else {
159            return nextEntry.getValue();
160        }
161    }
162
163    @Override
164    public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
165            BufferQueueClosedException {
166        if (mClosed.get()) {
167            throw new BufferQueueClosedException();
168        }
169        Entry<T> nextEntry = mQueue.poll(timeout, unit);
170        if (nextEntry.isClosingMarker()) {
171            // Always keep a poisson-pill in the queue to avoid a race condition
172            // in which a thread reaches the mQueue.take() call after close().
173            mQueue.add(nextEntry);
174            throw new BufferQueueClosedException();
175        }
176        return nextEntry.getValue();
177    }
178
179    @Override
180    public T peekNext() {
181        Entry<T> nextEntry = mQueue.peek();
182        if (nextEntry == null) {
183            return null;
184        } else if (nextEntry.isClosingMarker()) {
185            return null;
186        } else {
187            return nextEntry.getValue();
188        }
189    }
190
191    @Override
192    public void discardNext() {
193        try {
194            Entry<T> nextEntry = mQueue.remove();
195            if (!nextEntry.isClosingMarker()) {
196                // Always keep a poisson-pill in the queue to avoid a race
197                // condition in which a thread reaches the mQueue.take() call
198                // after close().
199                mQueue.add(nextEntry);
200                mUnusedElementProcessor.process(nextEntry.getValue());
201            }
202        } catch (NoSuchElementException e) {
203            // If the queue is already empty, do nothing.
204            return;
205        }
206    }
207
208    @Override
209    public boolean isClosed() {
210        return mClosed.get();
211    }
212
213    private Entry makeEntry(T value) {
214        return new Entry(value, false);
215    }
216
217    private Entry makeClosingMarker() {
218        return new Entry(null, true);
219    }
220
221    /**
222     * A callback to be invoked with all of the elements of the sequence which
223     * are added but never retrieved via {@link #getNext}.
224     */
225    public static interface UnusedElementProcessor<T> {
226        /**
227         * Implementations should properly close the discarded element, if
228         * necessary.
229         */
230        public void process(T element);
231    }
232}
233