CountableBufferQueue.java revision 12f608f3d2089439a108788a1908941eea4277b9
1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.async;
18
19import java.util.concurrent.TimeUnit;
20import java.util.concurrent.TimeoutException;
21
22/**
23 * Like {@link ConcurrentBufferQueue}, but also tracks the number of objects
24 * currently in the queue.
25 */
26public class CountableBufferQueue<T> implements BufferQueueController<T>, BufferQueue<T> {
27    private class DecrementingProcessor<T> implements
28            ConcurrentBufferQueue.UnusedElementProcessor<T> {
29        private final ConcurrentBufferQueue.UnusedElementProcessor mProcessor;
30
31        private DecrementingProcessor(ConcurrentBufferQueue.UnusedElementProcessor<T> processor) {
32            mProcessor = processor;
33        }
34
35        @Override
36        public void process(T element) {
37            mProcessor.process(element);
38            int count;
39            synchronized (mCountLock) {
40                mCount--;
41                count = mCount;
42            }
43            mSizeUpdatable.update(count);
44        }
45    }
46
47    private final ConcurrentBufferQueue<T> mBufferQueue;
48    private final Object mCountLock;
49    private final Updatable<Integer> mSizeUpdatable;
50    private int mCount;
51
52    /**
53     * @param sizeUpdatable A thread-safe callback to be updated with the size
54     *            of the queue.
55     * @param processor The callback for processing elements discarded from the
56     *            queue.
57     */
58    public CountableBufferQueue(Updatable<Integer> sizeUpdatable, ConcurrentBufferQueue
59            .UnusedElementProcessor<T> processor) {
60        mBufferQueue = new ConcurrentBufferQueue<T>(new DecrementingProcessor<T>(processor));
61        mCountLock = new Object();
62        mCount = 0;
63        mSizeUpdatable = sizeUpdatable;
64    }
65
66    public CountableBufferQueue(ConcurrentState<Integer> sizeUpdatable) {
67        this(sizeUpdatable, new ConcurrentBufferQueue.UnusedElementProcessor<T>() {
68            @Override
69            public void process(T element) {
70                // Do nothing by default.
71            }
72        });
73    }
74
75    @Override
76    public T getNext() throws InterruptedException, BufferQueueClosedException {
77        T result = mBufferQueue.getNext();
78        int count;
79        synchronized (mCountLock) {
80            mCount--;
81            count = mCount;
82        }
83        mSizeUpdatable.update(count);
84        return result;
85    }
86
87    @Override
88    public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
89            BufferQueueClosedException {
90        T result = mBufferQueue.getNext();
91        int count;
92        synchronized (mCountLock) {
93            mCount--;
94            count = mCount;
95        }
96        mSizeUpdatable.update(count);
97        return result;
98    }
99
100    @Override
101    public T peekNext() {
102        return mBufferQueue.peekNext();
103    }
104
105    @Override
106    public void discardNext() {
107        mBufferQueue.discardNext();
108    }
109
110    @Override
111    public void update(T element) {
112        // This is tricky since mBufferQueue.update() may immediately discard
113        // the element if the queue is closed. Sending redundant updates for 0
114        // size is acceptable, but sending updates indicating that the size has
115        // increased and then decreased, even after the queue is closed, would
116        // be bad. Thus, the following will filter these out.
117        int preCount;
118        int postCount;
119        synchronized (mCountLock) {
120            preCount = mCount;
121            mCount++;
122            mBufferQueue.update(element);
123            postCount = mCount;
124        }
125        if (preCount != postCount) {
126            mSizeUpdatable.update(postCount);
127        }
128    }
129
130    @Override
131    public void close() {
132        mBufferQueue.close();
133    }
134
135    @Override
136    public boolean isClosed() {
137        return mBufferQueue.isClosed();
138    }
139}
140