CountableBufferQueue.java revision 4961ad31d9a877e3a68566fb5d4b33b7f79ce44e
1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.async;
18
19import java.util.concurrent.TimeUnit;
20import java.util.concurrent.TimeoutException;
21
22import javax.annotation.Nonnull;
23
24/**
25 * Like {@link ConcurrentBufferQueue}, but also tracks the number of objects
26 * currently in the queue.
27 */
28public class CountableBufferQueue<T> implements BufferQueueController<T>, BufferQueue<T> {
29    private class DecrementingProcessor<T> implements
30            ConcurrentBufferQueue.UnusedElementProcessor<T> {
31        private final ConcurrentBufferQueue.UnusedElementProcessor mProcessor;
32
33        private DecrementingProcessor(ConcurrentBufferQueue.UnusedElementProcessor<T> processor) {
34            mProcessor = processor;
35        }
36
37        @Override
38        public void process(T element) {
39            mProcessor.process(element);
40            decrementSize();
41        }
42    }
43
44    private final ConcurrentBufferQueue<T> mBufferQueue;
45    private final Object mCountLock;
46    private final Updatable<Integer> mSizeCallback;
47    private int mCount;
48
49    /**
50     * @param sizeCallback A thread-safe callback to be updated with the size
51     *            of the queue.
52     * @param processor The callback for processing elements discarded from the
53     *            queue.
54     */
55    public CountableBufferQueue(Updatable<Integer> sizeCallback, ConcurrentBufferQueue
56            .UnusedElementProcessor<T> processor) {
57        mBufferQueue = new ConcurrentBufferQueue<T>(new DecrementingProcessor<T>(processor));
58        mCountLock = new Object();
59        mCount = 0;
60        mSizeCallback = sizeCallback;
61    }
62
63    public CountableBufferQueue(ConcurrentState<Integer> sizeCallback) {
64        this(sizeCallback, new ConcurrentBufferQueue.UnusedElementProcessor<T>() {
65            @Override
66            public void process(T element) {
67                // Do nothing by default.
68            }
69        });
70    }
71
72    private void decrementSize() {
73        int count;
74        synchronized (mCountLock) {
75            mCount--;
76            count = mCount;
77        }
78        mSizeCallback.update(count);
79    }
80
81    @Override
82    public T getNext() throws InterruptedException, BufferQueueClosedException {
83        T result = mBufferQueue.getNext();
84        decrementSize();
85        return result;
86    }
87
88    @Override
89    public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
90            BufferQueueClosedException {
91        T result = mBufferQueue.getNext(timeout, unit);
92        decrementSize();
93        return result;
94    }
95
96    @Override
97    public T peekNext() {
98        return mBufferQueue.peekNext();
99    }
100
101    @Override
102    public void discardNext() {
103        mBufferQueue.discardNext();
104    }
105
106    @Override
107    public void update(@Nonnull T element) {
108        // This is tricky since mBufferQueue.update() may immediately discard
109        // the element if the queue is closed. Sending redundant updates for 0
110        // size is acceptable, but sending updates indicating that the size has
111        // increased and then decreased, even after the queue is closed, would
112        // be bad. Thus, the following will filter these out.
113        int preCount;
114        int postCount;
115        synchronized (mCountLock) {
116            preCount = mCount;
117            mCount++;
118            mBufferQueue.update(element);
119            postCount = mCount;
120        }
121        if (preCount != postCount) {
122            mSizeCallback.update(postCount);
123        }
124    }
125
126    @Override
127    public void close() {
128        mBufferQueue.close();
129    }
130
131    @Override
132    public boolean isClosed() {
133        return mBufferQueue.isClosed();
134    }
135}
136