ConcurrentBufferQueue.java revision 86b49644dba70ff37d1715feffa4a56a28e3bde7
1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.camera.async; 18 19import java.util.ArrayList; 20import java.util.List; 21import java.util.NoSuchElementException; 22import java.util.concurrent.BlockingQueue; 23import java.util.concurrent.LinkedBlockingQueue; 24import java.util.concurrent.TimeUnit; 25import java.util.concurrent.TimeoutException; 26import java.util.concurrent.atomic.AtomicBoolean; 27 28/** 29 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer 30 * interactions.<br> 31 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows 32 * closing the queue from either the producer or consumer side and enables 33 * precise accounting of objects which are never read by the consumer. Notably, 34 * this enables cleanly shutting down producer-consumer interactions without 35 * leaking managed resources which might otherwise be left dangling in the 36 * queue. 37 */ 38public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>, 39 SafeCloseable { 40 /** 41 * A callback to be invoked with all of the elements of the sequence which 42 * are added but never retrieved via {@link #getNext}. 43 */ 44 public static interface UnusedElementProcessor<T> { 45 /** 46 * Implementations should properly close the discarded element, if 47 * necessary. 48 */ 49 public void process(T element); 50 } 51 52 /** 53 * An entry can either be a {@link T} or a special "poison-pill" marker 54 * indicating that the sequence has been closed. 55 */ 56 private static class Entry<T> { 57 private final T mValue; 58 private final boolean mClosing; 59 60 private Entry(T value, boolean closing) { 61 mValue = value; 62 mClosing = closing; 63 } 64 65 public boolean isClosingMarker() { 66 return mClosing; 67 } 68 69 public T getValue() { 70 return mValue; 71 } 72 } 73 /** 74 * Lock used for mQueue modification and mClosed. 75 */ 76 private final Object mLock; 77 /** 78 * The queue in which to store elements of the sequence as they arrive. 79 */ 80 private final BlockingQueue<Entry<T>> mQueue; 81 /** 82 * Whether this sequence is closed. 83 */ 84 private final AtomicBoolean mClosed; 85 /** 86 * The callback to use to process all elements which are discarded by the 87 * queue. 88 */ 89 private final UnusedElementProcessor<T> mUnusedElementProcessor; 90 91 public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) { 92 mUnusedElementProcessor = unusedElementProcessor; 93 mLock = new Object(); 94 mQueue = new LinkedBlockingQueue<>(); 95 mClosed = new AtomicBoolean(); 96 } 97 98 public ConcurrentBufferQueue() { 99 // Instantiate with a DiscardedElementProcessor which does nothing. 100 this(new UnusedElementProcessor<T>() { 101 @Override 102 public void process(T element) { 103 } 104 }); 105 } 106 107 @Override 108 public void close() { 109 List<Entry<T>> remainingElements = new ArrayList<>(); 110 synchronized (mLock) { 111 // Mark as closed so that no more threads wait in getNext(). 112 // Any additional calls to close() will return immediately. 113 boolean alreadyClosed = mClosed.getAndSet(true); 114 if (alreadyClosed) { 115 return; 116 } 117 118 mQueue.drainTo(remainingElements); 119 120 // Keep feeding any currently-waiting consumer threads "poison pill" 121 // {@link Entry}s indicating that the sequence has ended so they 122 // wake up. When no more threads are waiting for another value from 123 // mQueue, the call to peek() from this thread will see a value. 124 // Note that this also ensures that there is a poison pill in the 125 // queue 126 // to keep waking-up any threads which manage to block in getNext() 127 // even after marking mClosed. 128 while (mQueue.peek() == null) { 129 mQueue.add(makeClosingMarker()); 130 } 131 } 132 133 for (Entry<T> entry : remainingElements) { 134 if (!entry.isClosingMarker()) { 135 mUnusedElementProcessor.process(entry.getValue()); 136 } 137 } 138 } 139 140 @Override 141 public void update(T element) { 142 boolean closed = false; 143 synchronized (mLock) { 144 closed = mClosed.get(); 145 if (!closed) { 146 mQueue.add(makeEntry(element)); 147 } 148 } 149 if (closed) { 150 mUnusedElementProcessor.process(element); 151 } 152 } 153 154 private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException { 155 if (nextEntry.isClosingMarker()) { 156 // Always keep a poison-pill in the queue to avoid a race condition 157 // in which a thread reaches the mQueue.take() call after close(). 158 mQueue.add(nextEntry); 159 throw new BufferQueueClosedException(); 160 } else { 161 return nextEntry.getValue(); 162 } 163 } 164 165 @Override 166 public T getNext() throws InterruptedException, BufferQueueClosedException { 167 Entry<T> nextEntry = mQueue.take(); 168 return doWithNextEntry(nextEntry); 169 } 170 171 @Override 172 public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, 173 BufferQueueClosedException { 174 Entry<T> nextEntry = mQueue.poll(timeout, unit); 175 if (nextEntry == null) { 176 throw new TimeoutException(); 177 } 178 return doWithNextEntry(nextEntry); 179 } 180 181 @Override 182 public T peekNext() { 183 Entry<T> nextEntry = mQueue.peek(); 184 if (nextEntry == null) { 185 return null; 186 } else if (nextEntry.isClosingMarker()) { 187 return null; 188 } else { 189 return nextEntry.getValue(); 190 } 191 } 192 193 @Override 194 public void discardNext() { 195 try { 196 Entry<T> nextEntry = mQueue.remove(); 197 if (nextEntry.isClosingMarker()) { 198 // Always keep a poison-pill in the queue to avoid a race 199 // condition in which a thread reaches the mQueue.take() call 200 // after close(). 201 mQueue.add(nextEntry); 202 } else { 203 mUnusedElementProcessor.process(nextEntry.getValue()); 204 } 205 } catch (NoSuchElementException e) { 206 // If the queue is already empty, do nothing. 207 return; 208 } 209 } 210 211 @Override 212 public boolean isClosed() { 213 return mClosed.get(); 214 } 215 216 private Entry makeEntry(T value) { 217 return new Entry(value, false); 218 } 219 220 private Entry makeClosingMarker() { 221 return new Entry(null, true); 222 } 223} 224