1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.camera.one.v2.sharedimagereader.ringbuffer; 18 19import com.android.camera.async.BufferQueue; 20import com.android.camera.async.BufferQueueController; 21import com.android.camera.async.ConcurrentState; 22import com.android.camera.async.CountableBufferQueue; 23import com.android.camera.async.Observable; 24import com.android.camera.one.v2.camera2proxy.ImageProxy; 25import com.android.camera.one.v2.sharedimagereader.ticketpool.Ticket; 26import com.android.camera.one.v2.sharedimagereader.ticketpool.TicketPool; 27import com.android.camera.one.v2.sharedimagereader.util.ImageCloser; 28import com.android.camera.one.v2.sharedimagereader.util.TicketImageProxy; 29import com.google.common.base.Preconditions; 30 31import java.util.Arrays; 32import java.util.Collection; 33import java.util.concurrent.TimeUnit; 34import java.util.concurrent.TimeoutException; 35import java.util.concurrent.atomic.AtomicInteger; 36 37import javax.annotation.Nonnull; 38import javax.annotation.Nullable; 39import javax.annotation.ParametersAreNonnullByDefault; 40import javax.annotation.concurrent.ThreadSafe; 41 42/** 43 * A dynamically-sized ring-buffer, implementing BufferQueue (output) and 44 * BufferQueueController (input). 45 * <p> 46 * The size of the buffer is implicitly defined by the number of "Tickets" 47 * available from the parent {@link TicketPool} at any given time. When the 48 * number of available tickets decreases, the buffer shrinks, discarding old 49 * elements. When the number of available tickets increases, the buffer expands, 50 * retaining old elements when new elements are added. 51 * <p> 52 * The ring-buffer is also a TicketPool, which allows higher-priority requests 53 * to reserve "Tickets" (representing ImageReader capacity) to evict images from 54 * the ring-buffer. 55 * <p> 56 * See docs for {@link DynamicRingBufferFactory} for more information. 57 */ 58@ThreadSafe 59@ParametersAreNonnullByDefault 60final class DynamicRingBuffer implements TicketPool, BufferQueue<ImageProxy>, 61 BufferQueueController<ImageProxy> { 62 private final CountableBufferQueue<ImageProxy> mQueue; 63 private final TicketPool mTicketPool; 64 private final AtomicInteger mTicketWaiterCount; 65 private final AvailableTicketCounter mAvailableTicketCount; 66 private final AtomicInteger mMaxSize; 67 private final ConcurrentState<Integer> mQueueSize; 68 69 /** 70 * @param parentTickets The parent ticket pool which implicitly determines 71 * how much capacity is available at any given time. 72 */ 73 DynamicRingBuffer(TicketPool parentTickets) { 74 mQueueSize = new ConcurrentState<>(0); 75 mQueue = new CountableBufferQueue<>(mQueueSize, new ImageCloser()); 76 mAvailableTicketCount = new AvailableTicketCounter(Arrays.asList(mQueueSize, parentTickets 77 .getAvailableTicketCount())); 78 mTicketPool = parentTickets; 79 mTicketWaiterCount = new AtomicInteger(0); 80 mMaxSize = new AtomicInteger(Integer.MAX_VALUE); 81 } 82 83 @Override 84 public void update(@Nonnull ImageProxy image) { 85 // Try to acquire a ticket to expand the ring-buffer and save the image. 86 Ticket ticket = null; 87 88 // Counting is hard. {@link mAvailableTicketCount} must reflect the sum 89 // of mTicketPool.getAvailableTicketCount() and the number of images in 90 // mQueue. However, for a brief moment, we acquire a ticket from 91 // mTicketPool, but have yet added it to mQueue. During this period, 92 // mAvailableTicketCount would appear to be 1 less than it should. 93 // To fix this, we must lock it to the current value, perform the 94 // transaction, and then unlock it, marking it as "valid" again, which 95 // also notifies listeners of the change. 96 mAvailableTicketCount.freeze(); 97 try { 98 ticket = tryAcquireLowPriorityTicket(); 99 if (ticket == null) { 100 // If we cannot expand the ring-buffer, remove the last element 101 // (decreasing the size), and then try to increase the size 102 // again. 103 mQueue.discardNext(); 104 ticket = tryAcquireLowPriorityTicket(); 105 } 106 if (ticket != null) { 107 mQueue.update(new TicketImageProxy(image, ticket)); 108 } else { 109 image.close(); 110 } 111 shrinkToFitMaxSize(); 112 } finally { 113 mAvailableTicketCount.unfreeze(); 114 } 115 } 116 117 @Nullable 118 private Ticket tryAcquireLowPriorityTicket() { 119 if (mTicketWaiterCount.get() != 0) { 120 return null; 121 } 122 return mTicketPool.tryAcquire(); 123 } 124 125 @Override 126 public void close() { 127 mQueue.close(); 128 } 129 130 @Override 131 public ImageProxy getNext() throws InterruptedException, BufferQueueClosedException { 132 return mQueue.getNext(); 133 } 134 135 @Override 136 public ImageProxy getNext(long timeout, TimeUnit unit) throws InterruptedException, 137 TimeoutException, BufferQueueClosedException { 138 return mQueue.getNext(timeout, unit); 139 } 140 141 @Override 142 public ImageProxy peekNext() { 143 return mQueue.peekNext(); 144 } 145 146 @Override 147 public void discardNext() { 148 mQueue.discardNext(); 149 } 150 151 @Override 152 public boolean isClosed() { 153 return mQueue.isClosed(); 154 } 155 156 @Nonnull 157 @Override 158 public Collection<Ticket> acquire(int tickets) throws InterruptedException, 159 NoCapacityAvailableException { 160 mTicketWaiterCount.incrementAndGet(); 161 try { 162 while (mQueue.peekNext() != null) { 163 mQueue.discardNext(); 164 } 165 return mTicketPool.acquire(tickets); 166 } finally { 167 mTicketWaiterCount.decrementAndGet(); 168 } 169 } 170 171 @Nonnull 172 @Override 173 public Observable<Integer> getAvailableTicketCount() { 174 return mAvailableTicketCount; 175 } 176 177 @Nullable 178 @Override 179 public Ticket tryAcquire() { 180 mTicketWaiterCount.incrementAndGet(); 181 try { 182 while (mQueue.peekNext() != null) { 183 mQueue.discardNext(); 184 } 185 return mTicketPool.tryAcquire(); 186 } finally { 187 mTicketWaiterCount.decrementAndGet(); 188 } 189 } 190 191 public void setMaxSize(int newMaxSize) { 192 Preconditions.checkArgument(newMaxSize >= 0); 193 mMaxSize.set(newMaxSize); 194 // Shrink the queue to meet this new constraint. 195 shrinkToFitMaxSize(); 196 } 197 198 private void shrinkToFitMaxSize() { 199 // To ensure that the available ticket count never "flickers" when we 200 // logically move the ticket from the queue into the parent ticket pool, 201 // lock the available ticket count. 202 mAvailableTicketCount.freeze(); 203 try { 204 // Note that to maintain the invariant of eventual-consistency 205 // (since this class is inherently shared between multiple threads), 206 // we must repeatedly poll these values each time. 207 while (mQueueSize.get() > mMaxSize.get()) { 208 mQueue.discardNext(); 209 } 210 } finally { 211 mAvailableTicketCount.unfreeze(); 212 } 213 } 214} 215