ConcurrentBufferQueue.java revision 86b49644dba70ff37d1715feffa4a56a28e3bde7
1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.async;
18
19import java.util.ArrayList;
20import java.util.List;
21import java.util.NoSuchElementException;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.LinkedBlockingQueue;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.TimeoutException;
26import java.util.concurrent.atomic.AtomicBoolean;
27
28/**
29 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer
30 * interactions.<br>
31 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
32 * closing the queue from either the producer or consumer side and enables
33 * precise accounting of objects which are never read by the consumer. Notably,
34 * this enables cleanly shutting down producer-consumer interactions without
35 * leaking managed resources which might otherwise be left dangling in the
36 * queue.
37 */
38public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>,
39        SafeCloseable {
40    /**
41     * A callback to be invoked with all of the elements of the sequence which
42     * are added but never retrieved via {@link #getNext}.
43     */
44    public static interface UnusedElementProcessor<T> {
45        /**
46         * Implementations should properly close the discarded element, if
47         * necessary.
48         */
49        public void process(T element);
50    }
51
52    /**
53     * An entry can either be a {@link T} or a special "poison-pill" marker
54     * indicating that the sequence has been closed.
55     */
56    private static class Entry<T> {
57        private final T mValue;
58        private final boolean mClosing;
59
60        private Entry(T value, boolean closing) {
61            mValue = value;
62            mClosing = closing;
63        }
64
65        public boolean isClosingMarker() {
66            return mClosing;
67        }
68
69        public T getValue() {
70            return mValue;
71        }
72    }
73    /**
74     * Lock used for mQueue modification and mClosed.
75     */
76    private final Object mLock;
77    /**
78     * The queue in which to store elements of the sequence as they arrive.
79     */
80    private final BlockingQueue<Entry<T>> mQueue;
81    /**
82     * Whether this sequence is closed.
83     */
84    private final AtomicBoolean mClosed;
85    /**
86     * The callback to use to process all elements which are discarded by the
87     * queue.
88     */
89    private final UnusedElementProcessor<T> mUnusedElementProcessor;
90
91    public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
92        mUnusedElementProcessor = unusedElementProcessor;
93        mLock = new Object();
94        mQueue = new LinkedBlockingQueue<>();
95        mClosed = new AtomicBoolean();
96    }
97
98    public ConcurrentBufferQueue() {
99        // Instantiate with a DiscardedElementProcessor which does nothing.
100        this(new UnusedElementProcessor<T>() {
101            @Override
102            public void process(T element) {
103            }
104        });
105    }
106
107    @Override
108    public void close() {
109        List<Entry<T>> remainingElements = new ArrayList<>();
110        synchronized (mLock) {
111            // Mark as closed so that no more threads wait in getNext().
112            // Any additional calls to close() will return immediately.
113            boolean alreadyClosed = mClosed.getAndSet(true);
114            if (alreadyClosed) {
115                return;
116            }
117
118            mQueue.drainTo(remainingElements);
119
120            // Keep feeding any currently-waiting consumer threads "poison pill"
121            // {@link Entry}s indicating that the sequence has ended so they
122            // wake up. When no more threads are waiting for another value from
123            // mQueue, the call to peek() from this thread will see a value.
124            // Note that this also ensures that there is a poison pill in the
125            // queue
126            // to keep waking-up any threads which manage to block in getNext()
127            // even after marking mClosed.
128            while (mQueue.peek() == null) {
129                mQueue.add(makeClosingMarker());
130            }
131        }
132
133        for (Entry<T> entry : remainingElements) {
134            if (!entry.isClosingMarker()) {
135                mUnusedElementProcessor.process(entry.getValue());
136            }
137        }
138    }
139
140    @Override
141    public void update(T element) {
142        boolean closed = false;
143        synchronized (mLock) {
144            closed = mClosed.get();
145            if (!closed) {
146                mQueue.add(makeEntry(element));
147            }
148        }
149        if (closed) {
150            mUnusedElementProcessor.process(element);
151        }
152    }
153
154    private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException {
155        if (nextEntry.isClosingMarker()) {
156            // Always keep a poison-pill in the queue to avoid a race condition
157            // in which a thread reaches the mQueue.take() call after close().
158            mQueue.add(nextEntry);
159            throw new BufferQueueClosedException();
160        } else {
161            return nextEntry.getValue();
162        }
163    }
164
165    @Override
166    public T getNext() throws InterruptedException, BufferQueueClosedException {
167        Entry<T> nextEntry = mQueue.take();
168        return doWithNextEntry(nextEntry);
169    }
170
171    @Override
172    public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
173            BufferQueueClosedException {
174        Entry<T> nextEntry = mQueue.poll(timeout, unit);
175        if (nextEntry == null) {
176            throw new TimeoutException();
177        }
178        return doWithNextEntry(nextEntry);
179    }
180
181    @Override
182    public T peekNext() {
183        Entry<T> nextEntry = mQueue.peek();
184        if (nextEntry == null) {
185            return null;
186        } else if (nextEntry.isClosingMarker()) {
187            return null;
188        } else {
189            return nextEntry.getValue();
190        }
191    }
192
193    @Override
194    public void discardNext() {
195        try {
196            Entry<T> nextEntry = mQueue.remove();
197            if (nextEntry.isClosingMarker()) {
198                // Always keep a poison-pill in the queue to avoid a race
199                // condition in which a thread reaches the mQueue.take() call
200                // after close().
201                mQueue.add(nextEntry);
202            } else {
203                mUnusedElementProcessor.process(nextEntry.getValue());
204            }
205        } catch (NoSuchElementException e) {
206            // If the queue is already empty, do nothing.
207            return;
208        }
209    }
210
211    @Override
212    public boolean isClosed() {
213        return mClosed.get();
214    }
215
216    private Entry makeEntry(T value) {
217        return new Entry(value, false);
218    }
219
220    private Entry makeClosingMarker() {
221        return new Entry(null, true);
222    }
223}
224