1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.one.v2.sharedimagereader.ringbuffer;
18
19import com.android.camera.async.BufferQueue;
20import com.android.camera.async.BufferQueueController;
21import com.android.camera.async.ConcurrentState;
22import com.android.camera.async.CountableBufferQueue;
23import com.android.camera.async.Observable;
24import com.android.camera.one.v2.camera2proxy.ImageProxy;
25import com.android.camera.one.v2.sharedimagereader.ticketpool.Ticket;
26import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool;
27import com.android.camera.one.v2.sharedimagereader.util.ImageCloser;
28import com.android.camera.one.v2.sharedimagereader.util.TicketImageProxy;
29import com.google.common.base.Preconditions;
30
31import java.util.Arrays;
32import java.util.Collection;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.TimeoutException;
35import java.util.concurrent.atomic.AtomicInteger;
36
37import javax.annotation.Nonnull;
38import javax.annotation.Nullable;
39import javax.annotation.ParametersAreNonnullByDefault;
40import javax.annotation.concurrent.ThreadSafe;
41
42/**
43 * A dynamically-sized ring-buffer, implementing BufferQueue (output) and
44 * BufferQueueController (input).
45 * <p>
46 * The size of the buffer is implicitly defined by the number of "Tickets"
47 * available from the parent {@link TicketPool} at any given time. When the
48 * number of available tickets decreases, the buffer shrinks, discarding old
49 * elements. When the number of available tickets increases, the buffer expands,
50 * retaining old elements when new elements are added.
51 * <p>
52 * The ring-buffer is also a TicketPool, which allows higher-priority requests
53 * to reserve "Tickets" (representing ImageReader capacity) to evict images from
54 * the ring-buffer.
55 * <p>
56 * See docs for {@link DynamicRingBufferFactory} for more information.
57 */
58@ThreadSafe
59@ParametersAreNonnullByDefault
60final class DynamicRingBuffer implements TicketPool, BufferQueue<ImageProxy>,
61        BufferQueueController<ImageProxy> {
62    private final CountableBufferQueue<ImageProxy> mQueue;
63    private final TicketPool mTicketPool;
64    private final AtomicInteger mTicketWaiterCount;
65    private final AvailableTicketCounter mAvailableTicketCount;
66    private final AtomicInteger mMaxSize;
67    private final ConcurrentState<Integer> mQueueSize;
68
69    /**
70     * @param parentTickets The parent ticket pool which implicitly determines
71     *            how much capacity is available at any given time.
72     */
73    DynamicRingBuffer(TicketPool parentTickets) {
74        mQueueSize = new ConcurrentState<>(0);
75        mQueue = new CountableBufferQueue<>(mQueueSize, new ImageCloser());
76        mAvailableTicketCount = new AvailableTicketCounter(Arrays.asList(mQueueSize, parentTickets
77                .getAvailableTicketCount()));
78        mTicketPool = parentTickets;
79        mTicketWaiterCount = new AtomicInteger(0);
80        mMaxSize = new AtomicInteger(Integer.MAX_VALUE);
81    }
82
83    @Override
84    public void update(@Nonnull ImageProxy image) {
85        // Try to acquire a ticket to expand the ring-buffer and save the image.
86        Ticket ticket = null;
87
88        // Counting is hard. {@link mAvailableTicketCount} must reflect the sum
89        // of mTicketPool.getAvailableTicketCount() and the number of images in
90        // mQueue. However, for a brief moment, we acquire a ticket from
91        // mTicketPool, but have yet added it to mQueue. During this period,
92        // mAvailableTicketCount would appear to be 1 less than it should.
93        // To fix this, we must lock it to the current value, perform the
94        // transaction, and then unlock it, marking it as "valid" again, which
95        // also notifies listeners of the change.
96        mAvailableTicketCount.freeze();
97        try {
98            ticket = tryAcquireLowPriorityTicket();
99            if (ticket == null) {
100                // If we cannot expand the ring-buffer, remove the last element
101                // (decreasing the size), and then try to increase the size
102                // again.
103                mQueue.discardNext();
104                ticket = tryAcquireLowPriorityTicket();
105            }
106            if (ticket != null) {
107                mQueue.update(new TicketImageProxy(image, ticket));
108            } else {
109                image.close();
110            }
111            shrinkToFitMaxSize();
112        } finally {
113            mAvailableTicketCount.unfreeze();
114        }
115    }
116
117    @Nullable
118    private Ticket tryAcquireLowPriorityTicket() {
119        if (mTicketWaiterCount.get() != 0) {
120            return null;
121        }
122        return mTicketPool.tryAcquire();
123    }
124
125    @Override
126    public void close() {
127        mQueue.close();
128    }
129
130    @Override
131    public ImageProxy getNext() throws InterruptedException, BufferQueueClosedException {
132        return mQueue.getNext();
133    }
134
135    @Override
136    public ImageProxy getNext(long timeout, TimeUnit unit) throws InterruptedException,
137            TimeoutException, BufferQueueClosedException {
138        return mQueue.getNext(timeout, unit);
139    }
140
141    @Override
142    public ImageProxy peekNext() {
143        return mQueue.peekNext();
144    }
145
146    @Override
147    public void discardNext() {
148        mQueue.discardNext();
149    }
150
151    @Override
152    public boolean isClosed() {
153        return mQueue.isClosed();
154    }
155
156    @Nonnull
157    @Override
158    public Collection<Ticket> acquire(int tickets) throws InterruptedException,
159            NoCapacityAvailableException {
160        mTicketWaiterCount.incrementAndGet();
161        try {
162            while (mQueue.peekNext() != null) {
163                mQueue.discardNext();
164            }
165            return mTicketPool.acquire(tickets);
166        } finally {
167            mTicketWaiterCount.decrementAndGet();
168        }
169    }
170
171    @Nonnull
172    @Override
173    public Observable<Integer> getAvailableTicketCount() {
174        return mAvailableTicketCount;
175    }
176
177    @Nullable
178    @Override
179    public Ticket tryAcquire() {
180        mTicketWaiterCount.incrementAndGet();
181        try {
182            while (mQueue.peekNext() != null) {
183                mQueue.discardNext();
184            }
185            return mTicketPool.tryAcquire();
186        } finally {
187            mTicketWaiterCount.decrementAndGet();
188        }
189    }
190
191    public void setMaxSize(int newMaxSize) {
192        Preconditions.checkArgument(newMaxSize >= 0);
193        mMaxSize.set(newMaxSize);
194        // Shrink the queue to meet this new constraint.
195        shrinkToFitMaxSize();
196    }
197
198    private void shrinkToFitMaxSize() {
199        // To ensure that the available ticket count never "flickers" when we
200        // logically move the ticket from the queue into the parent ticket pool,
201        // lock the available ticket count.
202        mAvailableTicketCount.freeze();
203        try {
204            // Note that to maintain the invariant of eventual-consistency
205            // (since this class is inherently shared between multiple threads),
206            // we must repeatedly poll these values each time.
207            while (mQueueSize.get() > mMaxSize.get()) {
208                mQueue.discardNext();
209            }
210        } finally {
211            mAvailableTicketCount.unfreeze();
212        }
213    }
214}
215