/* * Copyright (C) 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.camera.async; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; /** * A {@link BufferQueue} implementation useful for thread-safe producer-consumer * interactions.
* Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows * closing the queue from either the producer or consumer side and enables * precise accounting of objects which are never read by the consumer. Notably, * this enables cleanly shutting down producer-consumer interactions without * leaking managed resources which might otherwise be left dangling in the * queue. */ public class ConcurrentBufferQueue implements BufferQueue, BufferQueueController, SafeCloseable { /** * A callback to be invoked with all of the elements of the sequence which * are added but never retrieved via {@link #getNext}. */ public static interface UnusedElementProcessor { /** * Implementations should properly close the discarded element, if * necessary. */ public void process(T element); } /** * An entry can either be a {@link T} or a special "poison-pill" marker * indicating that the sequence has been closed. */ private static class Entry { private final T mValue; private final boolean mClosing; private Entry(T value, boolean closing) { mValue = value; mClosing = closing; } public boolean isClosingMarker() { return mClosing; } public T getValue() { return mValue; } } /** * Lock used for mQueue modification and mClosed. */ private final Object mLock; /** * The queue in which to store elements of the sequence as they arrive. */ private final BlockingQueue> mQueue; /** * Whether this sequence is closed. */ private final AtomicBoolean mClosed; /** * The callback to use to process all elements which are discarded by the * queue. */ private final UnusedElementProcessor mUnusedElementProcessor; public ConcurrentBufferQueue(UnusedElementProcessor unusedElementProcessor) { mUnusedElementProcessor = unusedElementProcessor; mLock = new Object(); mQueue = new LinkedBlockingQueue<>(); mClosed = new AtomicBoolean(); } public ConcurrentBufferQueue() { // Instantiate with a DiscardedElementProcessor which does nothing. this(new UnusedElementProcessor() { @Override public void process(T element) { } }); } @Override public void close() { List> remainingElements = new ArrayList<>(); synchronized (mLock) { // Mark as closed so that no more threads wait in getNext(). // Any additional calls to close() will return immediately. boolean alreadyClosed = mClosed.getAndSet(true); if (alreadyClosed) { return; } mQueue.drainTo(remainingElements); // Keep feeding any currently-waiting consumer threads "poison pill" // {@link Entry}s indicating that the sequence has ended so they // wake up. When no more threads are waiting for another value from // mQueue, the call to peek() from this thread will see a value. // Note that this also ensures that there is a poison pill in the // queue // to keep waking-up any threads which manage to block in getNext() // even after marking mClosed. while (mQueue.peek() == null) { mQueue.add(makeClosingMarker()); } } for (Entry entry : remainingElements) { if (!entry.isClosingMarker()) { mUnusedElementProcessor.process(entry.getValue()); } } } @Override public void update(@Nonnull T element) { boolean closed = false; synchronized (mLock) { closed = mClosed.get(); if (!closed) { mQueue.add(makeEntry(element)); } } if (closed) { mUnusedElementProcessor.process(element); } } private T doWithNextEntry(Entry nextEntry) throws BufferQueueClosedException { if (nextEntry.isClosingMarker()) { // Always keep a poison-pill in the queue to avoid a race condition // in which a thread reaches the mQueue.take() call after close(). mQueue.add(nextEntry); throw new BufferQueueClosedException(); } else { return nextEntry.getValue(); } } @Override public T getNext() throws InterruptedException, BufferQueueClosedException { Entry nextEntry = mQueue.take(); return doWithNextEntry(nextEntry); } @Override public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, BufferQueueClosedException { Entry nextEntry = mQueue.poll(timeout, unit); if (nextEntry == null) { throw new TimeoutException(); } return doWithNextEntry(nextEntry); } @Override public T peekNext() { Entry nextEntry = mQueue.peek(); if (nextEntry == null) { return null; } else if (nextEntry.isClosingMarker()) { return null; } else { return nextEntry.getValue(); } } @Override public void discardNext() { try { Entry nextEntry = mQueue.remove(); if (nextEntry.isClosingMarker()) { // Always keep a poison-pill in the queue to avoid a race // condition in which a thread reaches the mQueue.take() call // after close(). mQueue.add(nextEntry); } else { mUnusedElementProcessor.process(nextEntry.getValue()); } } catch (NoSuchElementException e) { // If the queue is already empty, do nothing. return; } } @Override public boolean isClosed() { return mClosed.get(); } private Entry makeEntry(T value) { return new Entry(value, false); } private Entry makeClosingMarker() { return new Entry(null, true); } }