CountableBufferQueue.java revision 12f608f3d2089439a108788a1908941eea4277b9
1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.camera.async; 18 19import java.util.concurrent.TimeUnit; 20import java.util.concurrent.TimeoutException; 21 22/** 23 * Like {@link ConcurrentBufferQueue}, but also tracks the number of objects 24 * currently in the queue. 25 */ 26public class CountableBufferQueue<T> implements BufferQueueController<T>, BufferQueue<T> { 27 private class DecrementingProcessor<T> implements 28 ConcurrentBufferQueue.UnusedElementProcessor<T> { 29 private final ConcurrentBufferQueue.UnusedElementProcessor mProcessor; 30 31 private DecrementingProcessor(ConcurrentBufferQueue.UnusedElementProcessor<T> processor) { 32 mProcessor = processor; 33 } 34 35 @Override 36 public void process(T element) { 37 mProcessor.process(element); 38 int count; 39 synchronized (mCountLock) { 40 mCount--; 41 count = mCount; 42 } 43 mSizeUpdatable.update(count); 44 } 45 } 46 47 private final ConcurrentBufferQueue<T> mBufferQueue; 48 private final Object mCountLock; 49 private final Updatable<Integer> mSizeUpdatable; 50 private int mCount; 51 52 /** 53 * @param sizeUpdatable A thread-safe callback to be updated with the size 54 * of the queue. 55 * @param processor The callback for processing elements discarded from the 56 * queue. 57 */ 58 public CountableBufferQueue(Updatable<Integer> sizeUpdatable, ConcurrentBufferQueue 59 .UnusedElementProcessor<T> processor) { 60 mBufferQueue = new ConcurrentBufferQueue<T>(new DecrementingProcessor<T>(processor)); 61 mCountLock = new Object(); 62 mCount = 0; 63 mSizeUpdatable = sizeUpdatable; 64 } 65 66 public CountableBufferQueue(ConcurrentState<Integer> sizeUpdatable) { 67 this(sizeUpdatable, new ConcurrentBufferQueue.UnusedElementProcessor<T>() { 68 @Override 69 public void process(T element) { 70 // Do nothing by default. 71 } 72 }); 73 } 74 75 @Override 76 public T getNext() throws InterruptedException, BufferQueueClosedException { 77 T result = mBufferQueue.getNext(); 78 int count; 79 synchronized (mCountLock) { 80 mCount--; 81 count = mCount; 82 } 83 mSizeUpdatable.update(count); 84 return result; 85 } 86 87 @Override 88 public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, 89 BufferQueueClosedException { 90 T result = mBufferQueue.getNext(); 91 int count; 92 synchronized (mCountLock) { 93 mCount--; 94 count = mCount; 95 } 96 mSizeUpdatable.update(count); 97 return result; 98 } 99 100 @Override 101 public T peekNext() { 102 return mBufferQueue.peekNext(); 103 } 104 105 @Override 106 public void discardNext() { 107 mBufferQueue.discardNext(); 108 } 109 110 @Override 111 public void update(T element) { 112 // This is tricky since mBufferQueue.update() may immediately discard 113 // the element if the queue is closed. Sending redundant updates for 0 114 // size is acceptable, but sending updates indicating that the size has 115 // increased and then decreased, even after the queue is closed, would 116 // be bad. Thus, the following will filter these out. 117 int preCount; 118 int postCount; 119 synchronized (mCountLock) { 120 preCount = mCount; 121 mCount++; 122 mBufferQueue.update(element); 123 postCount = mCount; 124 } 125 if (preCount != postCount) { 126 mSizeUpdatable.update(postCount); 127 } 128 } 129 130 @Override 131 public void close() { 132 mBufferQueue.close(); 133 } 134 135 @Override 136 public boolean isClosed() { 137 return mBufferQueue.isClosed(); 138 } 139} 140