1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.util;
18
19import android.os.Handler;
20import android.util.Pair;
21
22import com.android.camera.debug.Log.Tag;
23
24import java.security.InvalidParameterException;
25import java.util.ArrayList;
26import java.util.Collections;
27import java.util.Map;
28import java.util.TreeMap;
29import java.util.concurrent.Semaphore;
30
31/**
32 * Implements a thread-safe fixed-size pool map of integers to objects such that
33 * the least element may be swapped out for a new element at any time. Elements
34 * may be temporarily "pinned" for processing in separate threads, during which
35 * they will not be swapped out. <br>
36 * This class enforces the invariant that a new element can always be swapped
37 * in. Thus, requests to pin an element for a particular task may be denied if
38 * there are not enough unpinned elements which can be removed. <br>
39 */
40public class ConcurrentSharedRingBuffer<E> {
41    private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
42
43    /**
44     * Callback interface for swapping elements at the head of the buffer.
45     */
46    public static interface SwapTask<E> {
47        /**
48         * Called if the buffer is under-capacity and a new element is being
49         * added.
50         *
51         * @return the new element to add.
52         */
53        public E create();
54
55        /**
56         * Called if the buffer is full and an old element must be swapped out
57         * to make room for the new element.
58         *
59         * @param oldElement the element being removed from the buffer.
60         * @return the new element to add.
61         */
62        public E swap(E oldElement);
63
64        /**
65         * Called if the buffer already has an element with the specified key.
66         * Note that the element may currently be pinned for processing by other
67         * elements. Therefore, implementations must be thread safe with respect
68         * to any other operations which may be applied to pinned tasks.
69         *
70         * @param existingElement the element to be updated.
71         */
72        public void update(E existingElement);
73
74        /**
75         * Returns the key of the element that the ring buffer should prefer
76         * when considering a swapping candidate. If the returned key is not an
77         * unpinned element then ring buffer will replace the element with least
78         * key.
79         *
80         * @return a key of an existing unpinned element or a negative value.
81         */
82        public long getSwapKey();
83    }
84
85    /**
86     * Callback for selecting an element to pin. See
87     * {@link tryPinGreatestSelected}.
88     */
89    public static interface Selector<E> {
90        /**
91         * @param element The element to select or not select.
92         * @return true if the element should be selected, false otherwise.
93         */
94        public boolean select(E element);
95    }
96
97    public static interface PinStateListener {
98        /**
99         * Invoked whenever the ability to pin an element for processing
100         * changes.
101         *
102         * @param pinsAvailable If true, requests to pin elements (e.g. calls to
103         *            pinGreatest()) are less-likely to fail. If false, they are
104         *            more-likely to fail.
105         */
106        public void onPinStateChange(boolean pinsAvailable);
107    }
108
109    /**
110     * Wraps E with reference counting.
111     */
112    private static class Pinnable<E> {
113        private E mElement;
114
115        /** Reference-counting for the number of tasks holding this element. */
116        private int mPins;
117
118        public Pinnable(E element) {
119            mElement = element;
120            mPins = 0;
121        }
122
123        public E getElement() {
124            return mElement;
125        }
126
127        private boolean isPinned() {
128            return mPins > 0;
129        }
130    }
131
132    /**
133     * A Semaphore that allows to reduce permits to negative values.
134     */
135    private static class NegativePermitsSemaphore extends Semaphore {
136        public NegativePermitsSemaphore(int permits) {
137            super(permits);
138        }
139
140        /**
141         * Reduces the number of permits by <code>permits</code>.
142         * <p/>
143         * This method can only be called when number of available permits is
144         * zero.
145         */
146        @Override
147        public void reducePermits(int permits) {
148            if (availablePermits() != 0) {
149                throw new IllegalStateException("Called without draining the semaphore.");
150            }
151            super.reducePermits(permits);
152        }
153    }
154
155    /** Allow only one swapping operation at a time. */
156    private final Object mSwapLock = new Object();
157    /**
158     * Lock all transactions involving mElements, mUnpinnedElements,
159     * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
160     * mPinStateListener and the state of Pinnable instances. <br>
161     * TODO Replace this with a priority semaphore and allow swapLeast()
162     * operations to run faster at the expense of slower tryPin()/release()
163     * calls.
164     */
165    private final Object mLock = new Object();
166    /** Stores all elements. */
167    private TreeMap<Long, Pinnable<E>> mElements;
168    /** Stores the subset of mElements which is not pinned. */
169    private TreeMap<Long, Pinnable<E>> mUnpinnedElements;
170    /** Used to acquire space in mElements. */
171    private final Semaphore mCapacitySemaphore;
172    /** This must be acquired while an element is pinned. */
173    private final NegativePermitsSemaphore mPinSemaphore;
174    private boolean mClosed = false;
175
176    private Handler mPinStateHandler = null;
177    private PinStateListener mPinStateListener = null;
178
179    /**
180     * Constructs a new ring buffer with the specified capacity.
181     *
182     * @param capacity the maximum number of elements to store.
183     */
184    public ConcurrentSharedRingBuffer(int capacity) {
185        if (capacity <= 0) {
186            throw new IllegalArgumentException("Capacity must be positive.");
187        }
188
189        mElements = new TreeMap<Long, Pinnable<E>>();
190        mUnpinnedElements = new TreeMap<Long, Pinnable<E>>();
191        mCapacitySemaphore = new Semaphore(capacity);
192        // Start with -1 permits to pin elements since we must always have at
193        // least one unpinned
194        // element available to swap out as the head of the buffer.
195        mPinSemaphore = new NegativePermitsSemaphore(-1);
196    }
197
198    /**
199     * Sets or replaces the listener.
200     *
201     * @param handler The handler on which to invoke the listener.
202     * @param listener The listener to be called whenever the ability to pin an
203     *            element changes.
204     */
205    public void setListener(Handler handler, PinStateListener listener) {
206        synchronized (mLock) {
207            mPinStateHandler = handler;
208            mPinStateListener = listener;
209        }
210    }
211
212    /**
213     * Places a new element in the ring buffer, removing the least (by key)
214     * non-pinned element if necessary. The existing element (or {@code null} if
215     * the buffer is under-capacity) is passed to {@code swapper.swap()} and the
216     * result is saved to the buffer. If an entry with {@code newKey} already
217     * exists in the ring-buffer, then {@code swapper.update()} is called and
218     * may modify the element in-place. See {@link SwapTask}. <br>
219     * Note that this method is the only way to add new elements to the buffer
220     * and will never be blocked on pinned tasks.
221     *
222     * @param newKey the key with which to store the swapped-in element.
223     * @param swapper the callback used to perform the swap.
224     * @return true if the swap was successful and the new element was saved to
225     *         the buffer, false if the swap was not possible and the element
226     *         was not saved to the buffer. Note that if the swap failed,
227     *         {@code swapper.create()} may or may not have been invoked.
228     */
229    public boolean swapLeast(long newKey, SwapTask<E> swapper) {
230        synchronized (mSwapLock) {
231            Pinnable<E> existingElement = null;
232
233            synchronized (mLock) {
234                if (mClosed) {
235                    return false;
236                }
237                existingElement = mElements.get(newKey);
238            }
239
240            if (existingElement != null) {
241                swapper.update(existingElement.getElement());
242                return true;
243            }
244
245            if (mCapacitySemaphore.tryAcquire()) {
246                // If we are under capacity, insert the new element and return.
247                Pinnable<E> p = new Pinnable<E>(swapper.create());
248
249                synchronized (mLock) {
250                    if (mClosed) {
251                        return false;
252                    }
253
254                    // Add the new element and release another permit to pin
255                    // allow pinning another element.
256                    mElements.put(newKey, p);
257                    mUnpinnedElements.put(newKey, p);
258                    mPinSemaphore.release();
259                    if (mPinSemaphore.availablePermits() == 1) {
260                        notifyPinStateChange(true);
261                    }
262                }
263
264                return true;
265            } else {
266                Pinnable<E> toSwap;
267
268                // Note that this method must be synchronized to avoid
269                // attempting to remove more than one unpinned element at a
270                // time.
271                synchronized (mLock) {
272                    if (mClosed) {
273                        return false;
274                    }
275                    Pair<Long, Pinnable<E>> toSwapEntry = null;
276                    long swapKey = swapper.getSwapKey();
277                    // If swapKey is same as the inserted key return early.
278                    if (swapKey == newKey) {
279                        return false;
280                    }
281
282                    if (mUnpinnedElements.containsKey(swapKey)) {
283                        toSwapEntry = Pair.create(swapKey, mUnpinnedElements.remove(swapKey));
284                    } else {
285                        // The returned key from getSwapKey was not found in the
286                        // unpinned elements use the least entry from the
287                        // unpinned elements.
288                        Map.Entry<Long, Pinnable<E>> swapEntry = mUnpinnedElements.pollFirstEntry();
289                        if (swapEntry != null) {
290                            toSwapEntry = Pair.create(swapEntry.getKey(), swapEntry.getValue());
291                        }
292                    }
293
294                    if (toSwapEntry == null) {
295                        // We can get here if no unpinned element was found.
296                        return false;
297                    }
298
299                    toSwap = toSwapEntry.second;
300
301                    // We must remove the element from both mElements and
302                    // mUnpinnedElements because it must be re-added after the
303                    // swap to be placed in the correct order with newKey.
304                    mElements.remove(toSwapEntry.first);
305                }
306
307                try {
308                    toSwap.mElement = swapper.swap(toSwap.mElement);
309                } finally {
310                    synchronized (mLock) {
311                        if (mClosed) {
312                            return false;
313                        }
314
315                        mElements.put(newKey, toSwap);
316                        mUnpinnedElements.put(newKey, toSwap);
317                    }
318                }
319                return true;
320            }
321        }
322    }
323
324    /**
325     * Attempts to pin the element with the given key and return it. <br>
326     * Note that, if a non-null pair is returned, the caller <em>must</em> call
327     * {@link #release} with the key.
328     *
329     * @return the key and object of the pinned element, if one could be pinned,
330     *         or null.
331     */
332    public Pair<Long, E> tryPin(long key) {
333
334        boolean acquiredLastPin = false;
335        Pinnable<E> entry = null;
336
337        synchronized (mLock) {
338            if (mClosed) {
339                return null;
340            }
341
342            if (mElements.isEmpty()) {
343                return null;
344            }
345
346            entry = mElements.get(key);
347
348            if (entry == null) {
349                return null;
350            }
351
352            if (entry.isPinned()) {
353                // If the element is already pinned by another task, simply
354                // increment the pin count.
355                entry.mPins++;
356            } else {
357                // We must ensure that there will still be an unpinned element
358                // after we pin this one.
359                if (mPinSemaphore.tryAcquire()) {
360                    mUnpinnedElements.remove(key);
361                    entry.mPins++;
362
363                    acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
364                } else {
365                    return null;
366                }
367            }
368        }
369
370        // If we just grabbed the last permit, we must notify listeners of the
371        // pin
372        // state change.
373        if (acquiredLastPin) {
374            notifyPinStateChange(false);
375        }
376
377        return Pair.create(key, entry.getElement());
378    }
379
380    public void release(long key) {
381        synchronized (mLock) {
382            // Note that this must proceed even if the buffer has been closed.
383
384            Pinnable<E> element = mElements.get(key);
385
386            if (element == null) {
387                throw new InvalidParameterException(
388                        "No entry found for the given key: " + key + ".");
389            }
390
391            if (!element.isPinned()) {
392                throw new IllegalArgumentException("Calling release() with unpinned element.");
393            }
394
395            // Unpin the element
396            element.mPins--;
397
398            if (!element.isPinned()) {
399                // If there are now 0 tasks pinning this element...
400                mUnpinnedElements.put(key, element);
401
402                // Allow pinning another element.
403                mPinSemaphore.release();
404
405                if (mPinSemaphore.availablePermits() == 1) {
406                    notifyPinStateChange(true);
407                }
408            }
409        }
410    }
411
412    /**
413     * Attempts to pin the greatest element and return it. <br>
414     * Note that, if a non-null element is returned, the caller <em>must</em>
415     * call {@link #release} with the element. Furthermore, behavior is
416     * undefined if the element's {@code compareTo} behavior changes between
417     * these calls.
418     *
419     * @return the key and object of the pinned element, if one could be pinned,
420     *         or null.
421     */
422    public Pair<Long, E> tryPinGreatest() {
423        synchronized (mLock) {
424            if (mClosed) {
425                return null;
426            }
427
428            if (mElements.isEmpty()) {
429                return null;
430            }
431
432            return tryPin(mElements.lastKey());
433        }
434    }
435
436    /**
437     * Attempts to pin the greatest element for which {@code selector} returns
438     * true. <br>
439     *
440     * @see #pinGreatest
441     */
442    public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) {
443        // (Quickly) get the list of elements to search through.
444        ArrayList<Long> keys = new ArrayList<Long>();
445        synchronized (mLock) {
446            if (mClosed) {
447                return null;
448            }
449
450            if (mElements.isEmpty()) {
451                return null;
452            }
453
454            keys.addAll(mElements.keySet());
455        }
456
457        Collections.sort(keys);
458
459        // Pin each element, from greatest key to least, until we find the one
460        // we want (the element with the greatest key for which
461        // selector.selected() returns true).
462        for (int i = keys.size() - 1; i >= 0; i--) {
463            Pair<Long, E> pinnedCandidate = tryPin(keys.get(i));
464            if (pinnedCandidate != null) {
465                boolean selected = false;
466
467                try {
468                    selected = selector.select(pinnedCandidate.second);
469                } finally {
470                    // Don't leak pinnedCandidate if the above select() threw an
471                    // exception.
472                    if (selected) {
473                        return pinnedCandidate;
474                    } else {
475                        release(pinnedCandidate.first);
476                    }
477                }
478            }
479        }
480
481        return null;
482    }
483
484    /**
485     * Removes all elements from the buffer, running {@code task} on each one,
486     * and waiting, if necessary, for all pins to be released.
487     *
488     * @param task
489     * @throws InterruptedException
490     */
491    public void close(Task<E> task) throws InterruptedException {
492        int numPinnedElements;
493
494        // Ensure that any pending swap tasks complete before closing.
495        synchronized (mSwapLock) {
496            synchronized (mLock) {
497                mClosed = true;
498                numPinnedElements = mElements.size() - mUnpinnedElements.size();
499            }
500        }
501
502        notifyPinStateChange(false);
503
504        // Wait for all pinned tasks to complete.
505        if (numPinnedElements > 0) {
506            mPinSemaphore.acquire(numPinnedElements);
507        }
508
509        for (Pinnable<E> element : mElements.values()) {
510            task.run(element.mElement);
511            // Release the capacity permits.
512            mCapacitySemaphore.release();
513        }
514
515        mUnpinnedElements.clear();
516
517        mElements.clear();
518    }
519
520    /**
521     * Attempts to get a pinned element for the given key.
522     *
523     * @param key the key of the pinned element.
524     * @return (key, value) pair if found otherwise null.
525     */
526    public Pair<Long, E> tryGetPinned(long key) {
527        synchronized (mLock) {
528            if (mClosed) {
529                return null;
530            }
531            for (java.util.Map.Entry<Long, Pinnable<E>> element : mElements.entrySet()) {
532                if (element.getKey() == key) {
533                    if (element.getValue().isPinned()) {
534                        return Pair.create(element.getKey(), element.getValue().getElement());
535                    } else {
536                        return null;
537                    }
538                }
539            }
540        }
541        return null;
542    }
543
544    /**
545     * Reopens previously closed buffer.
546     * <p/>
547     * Buffer should be closed before calling this method. If called with an
548     * open buffer an {@link IllegalStateException} is thrown.
549     *
550     * @param unpinnedReservedSlotCount a non-negative integer for number of
551     *            slots to reserve for unpinned elements. These slots can never
552     *            be pinned and will always be available for swapping.
553     * @throws InterruptedException
554     */
555    public void reopenBuffer(int unpinnedReservedSlotCount)
556            throws InterruptedException {
557        if (unpinnedReservedSlotCount < 0
558                || unpinnedReservedSlotCount >= mCapacitySemaphore.availablePermits()) {
559            throw new IllegalArgumentException("Invalid unpinned reserved slot count: " +
560                    unpinnedReservedSlotCount);
561        }
562
563        // Ensure that any pending swap tasks complete before closing.
564        synchronized (mSwapLock) {
565            synchronized (mLock) {
566                if (!mClosed) {
567                    throw new IllegalStateException(
568                            "Attempt to reopen the buffer when it is not closed.");
569                }
570
571                mPinSemaphore.drainPermits();
572                mPinSemaphore.reducePermits(unpinnedReservedSlotCount);
573                mClosed = false;
574            }
575        }
576    }
577
578    /**
579     * Releases a pinned element for the given key.
580     * <p/>
581     * If element is unpinned, it is not released.
582     *
583     * @param key the key of the element, if the element is not present an
584     *            {@link IllegalArgumentException} is thrown.
585     */
586    public void releaseIfPinned(long key) {
587        synchronized (mLock) {
588            Pinnable<E> element = mElements.get(key);
589
590            if (element == null) {
591                throw new IllegalArgumentException("Invalid key." + key);
592            }
593
594            if (element.isPinned()) {
595                release(key);
596            }
597        }
598    }
599
600    /**
601     * Releases all pinned elements in the buffer.
602     * <p/>
603     * Note: it only calls {@link #release(long)} only once on a pinned element.
604     */
605    public void releaseAll() {
606        synchronized (mSwapLock) {
607            synchronized (mLock) {
608                if (mClosed || mElements.isEmpty()
609                        || mElements.size() == mUnpinnedElements.size()) {
610                    return;
611                }
612                for (java.util.Map.Entry<Long, Pinnable<E>> entry : mElements.entrySet()) {
613                    if (entry.getValue().isPinned()) {
614                        release(entry.getKey());
615                    }
616                }
617            }
618        }
619    }
620
621    private void notifyPinStateChange(final boolean pinsAvailable) {
622        synchronized (mLock) {
623            // We must synchronize on mPinStateHandler and mPinStateListener.
624            if (mPinStateHandler != null) {
625                final PinStateListener listener = mPinStateListener;
626                mPinStateHandler.post(new Runnable() {
627                        @Override
628                    public void run() {
629                        listener.onPinStateChange(pinsAvailable);
630                    }
631                });
632            }
633        }
634    }
635}
636