ConcurrentBufferQueue.java revision 9c94ab32a69a1ad3642a0f1e38e68bcfd97d3511
1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.camera.async; 18 19import java.util.ArrayList; 20import java.util.List; 21import java.util.NoSuchElementException; 22import java.util.concurrent.BlockingQueue; 23import java.util.concurrent.LinkedBlockingQueue; 24import java.util.concurrent.TimeUnit; 25import java.util.concurrent.TimeoutException; 26import java.util.concurrent.atomic.AtomicBoolean; 27 28/** 29 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer 30 * interactions.<br> 31 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows 32 * closing the queue from either the producer or consumer side and enables 33 * precise accounting of objects which are never read by the consumer. Notably, 34 * this enables cleanly shutting down producer-consumer interactions without 35 * leaking managed resources which might otherwise be left dangling in the 36 * queue. 37 */ 38public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>, SafeCloseable { 39 /** 40 * An entry can either be a {@link T} or a special "poison-pill" marker 41 * indicating that the sequence has been closed. 42 */ 43 private static class Entry<T> { 44 private final T mValue; 45 private final boolean mClosing; 46 47 private Entry(T value, boolean closing) { 48 mValue = value; 49 mClosing = closing; 50 } 51 52 public boolean isClosingMarker() { 53 return mClosing; 54 } 55 56 public T getValue() { 57 return mValue; 58 } 59 } 60 61 /** 62 * Lock used for mQueue modification and mClosed. 63 */ 64 private final Object mLock; 65 /** 66 * The queue in which to store elements of the sequence as they arrive. 67 */ 68 private final BlockingQueue<Entry<T>> mQueue; 69 /** 70 * Whether this sequence is closed. 71 */ 72 private final AtomicBoolean mClosed; 73 /** 74 * The callback to use to process all elements which are discarded by the 75 * queue. 76 */ 77 private final UnusedElementProcessor<T> mUnusedElementProcessor; 78 79 /** 80 * @param unusedElementProcessor A callback to process elements which are 81 * never retrieved via {@link #getNext}. 82 */ 83 public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) { 84 mLock = new Object(); 85 mQueue = new LinkedBlockingQueue<>(); 86 mUnusedElementProcessor = unusedElementProcessor; 87 mClosed = new AtomicBoolean(); 88 } 89 90 public ConcurrentBufferQueue() { 91 // Instantiate with a DiscardedElementProcessor which does nothing. 92 this(new UnusedElementProcessor<T>() { 93 @Override 94 public void process(T element) { 95 } 96 }); 97 } 98 99 @Override 100 public void close() { 101 List<Entry<T>> remainingElements = new ArrayList<>(); 102 synchronized (mLock) { 103 // Mark as closed so that no more threads wait in getNext(). 104 // Any additional calls to close() will return immediately. 105 boolean alreadyClosed = mClosed.getAndSet(true); 106 if (alreadyClosed) { 107 return; 108 } 109 110 mQueue.drainTo(remainingElements); 111 112 // Keep feeding any currently-waiting consumer threads "poison pill" 113 // {@link Entry}s indicating that the sequence has ended so they 114 // wake up. When no more threads are waiting for another value from 115 // mQueue, the call to peek() from this thread will see a value. 116 // Note that this also ensures that there is a poison pill in the 117 // queue 118 // to keep waking-up any threads which manage to block in getNext() 119 // even after marking mClosed. 120 while (mQueue.peek() == null) { 121 mQueue.add(makeClosingMarker()); 122 } 123 } 124 125 for (Entry<T> entry : remainingElements) { 126 if (!entry.isClosingMarker()) { 127 mUnusedElementProcessor.process(entry.getValue()); 128 } 129 } 130 } 131 132 @Override 133 public void update(T element) { 134 boolean closed = false; 135 synchronized (mLock) { 136 closed = mClosed.get(); 137 if (!closed) { 138 mQueue.add(makeEntry(element)); 139 } 140 } 141 if (closed) { 142 mUnusedElementProcessor.process(element); 143 } 144 } 145 146 @Override 147 public T getNext() throws InterruptedException, BufferQueueClosedException { 148 if (mClosed.get()) { 149 throw new BufferQueueClosedException(); 150 } 151 Entry<T> nextEntry = mQueue.take(); 152 if (nextEntry.isClosingMarker()) { 153 // Always keep a poisson-pill in the queue to avoid a race condition 154 // in which a 155 // thread reaches the mQueue.take() call after close(). 156 mQueue.add(nextEntry); 157 throw new BufferQueueClosedException(); 158 } else { 159 return nextEntry.getValue(); 160 } 161 } 162 163 @Override 164 public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, 165 BufferQueueClosedException { 166 if (mClosed.get()) { 167 throw new BufferQueueClosedException(); 168 } 169 Entry<T> nextEntry = mQueue.poll(timeout, unit); 170 if (nextEntry.isClosingMarker()) { 171 // Always keep a poisson-pill in the queue to avoid a race condition 172 // in which a thread reaches the mQueue.take() call after close(). 173 mQueue.add(nextEntry); 174 throw new BufferQueueClosedException(); 175 } 176 return nextEntry.getValue(); 177 } 178 179 @Override 180 public T peekNext() { 181 Entry<T> nextEntry = mQueue.peek(); 182 if (nextEntry == null) { 183 return null; 184 } else if (nextEntry.isClosingMarker()) { 185 return null; 186 } else { 187 return nextEntry.getValue(); 188 } 189 } 190 191 @Override 192 public void discardNext() { 193 try { 194 Entry<T> nextEntry = mQueue.remove(); 195 if (!nextEntry.isClosingMarker()) { 196 // Always keep a poisson-pill in the queue to avoid a race 197 // condition in which a thread reaches the mQueue.take() call 198 // after close(). 199 mQueue.add(nextEntry); 200 mUnusedElementProcessor.process(nextEntry.getValue()); 201 } 202 } catch (NoSuchElementException e) { 203 // If the queue is already empty, do nothing. 204 return; 205 } 206 } 207 208 @Override 209 public boolean isClosed() { 210 return mClosed.get(); 211 } 212 213 private Entry makeEntry(T value) { 214 return new Entry(value, false); 215 } 216 217 private Entry makeClosingMarker() { 218 return new Entry(null, true); 219 } 220 221 /** 222 * A callback to be invoked with all of the elements of the sequence which 223 * are added but never retrieved via {@link #getNext}. 224 */ 225 public static interface UnusedElementProcessor<T> { 226 /** 227 * Implementations should properly close the discarded element, if 228 * necessary. 229 */ 230 public void process(T element); 231 } 232} 233