1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.camera.processing.imagebackend;
18
19import android.os.Process;
20
21import com.android.camera.async.AndroidPriorityThread;
22import com.android.camera.debug.Log;
23import com.android.camera.processing.ProcessingTaskConsumer;
24import com.android.camera.processing.memory.ByteBufferDirectPool;
25import com.android.camera.processing.memory.LruResourcePool;
26import com.android.camera.session.CaptureSession;
27import com.android.camera.util.Size;
28import com.google.common.base.Optional;
29
30import java.nio.ByteBuffer;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Map;
34import java.util.Set;
35import java.util.concurrent.Executor;
36import java.util.concurrent.ExecutorService;
37import java.util.concurrent.Executors;
38import java.util.concurrent.ThreadFactory;
39import java.util.concurrent.locks.Condition;
40import java.util.concurrent.locks.ReentrantLock;
41
42/**
43 * This ImageBackend is created for the purpose of creating a task-running
44 * infrastructure that has two-level of priority and doing the book-keeping to
45 * keep track of tasks that use Android Images. Android.media.images are
46 * critical system resources that MUST be properly managed in order to maintain
47 * camera application performance. Android.media.images are merely Java handles
48 * to regions of physically contiguous memory used by the camera hardware as a
49 * destination for imaging data. In general, this physically contiguous memory
50 * is not counted as an application resource, but as a system resources held by
51 * the application and does NOT count against the limits of application memory.
52 * The performance pressures of both computing and memory resources must often
53 * be prioritized in releasing Android.media.images in a timely manner. In order
54 * to properly balance these concerns, most image processing requested should be
55 * routed through this object. This object is also responsible for releasing
56 * Android.media image as soon as possible, so as not to stall the camera
57 * hardware subsystem. Image that reserve these images are a subclass of the
58 * basic Java Runnable with a few conditions placed upon their run()
59 * implementation:
60 * <ol>
61 * <li>The task will try to release the image as early as possible by calling
62 * the releaseSemaphoreReference as soon as a reference to the original image is
63 * no longer required.</li>
64 * <li>A set of tasks that require ImageData must only happen on the first
65 * receiveImage call. receiveImage must only be called once per image.</li>
66 * <li>However, the submitted tasks may spawn new tasks via the appendTask with
67 * any image that have had a task submitted, but NOT released via
68 * releaseSemaphoreReference.</li>
69 * <li>Computation that is dependent on multiple images should be written into
70 * this task framework in a distributed manner where image task can be computed
71 * independently and join their results to a common shared object.This style of
72 * implementation allows for the earliest release of Android Images while
73 * honoring the resources priorities set by this class. See the Lucky shot
74 * implementation for a concrete example for this shared object and its
75 * respective task {@link TaskLuckyShotSession} {@link LuckyShotSession}</li>
76 * </ol>
77 * To integrate with the ProcessingServiceManager, ImageBackend also signals to
78 * the ProcessingServiceManager its processing state by enqueuing
79 * ImageShadowTasks on each ImageBackend::receiveImage call. These ImageShadow
80 * tasks have no implementation, but emulate the processing delay by blocking
81 * until all tasks submitted and spawned by a particular receiveImage call have
82 * completed their processing. This emulated functionality ensures that other
83 * ProcessingTasks associated with Lens Blur and Panorama are not processing
84 * while the ImageBackend is running. Unfairly, the ImageBackend proceeds with
85 * its own processing regardless of the state of ImageShadowTask.
86 * ImageShadowTasks that are associated with ImageBackend tasks that have
87 * already been completed should return immediately on its process call.
88 */
89public class ImageBackend implements ImageConsumer, ImageTaskManager {
90    private static final Log.Tag TAG = new Log.Tag("ImageBackend");
91
92    protected static final int NUM_THREADS_FAST = 2;
93    protected static final int NUM_THREADS_AVERAGE = 2;
94    protected static final int NUM_THREADS_SLOW = 2;
95
96    private static final int FAST_THREAD_PRIORITY = Process.THREAD_PRIORITY_DISPLAY;
97    private static final int AVERAGE_THREAD_PRIORITY = Process.THREAD_PRIORITY_DEFAULT
98            + Process.THREAD_PRIORITY_LESS_FAVORABLE;
99    private static final int SLOW_THREAD_PRIORITY = Process.THREAD_PRIORITY_BACKGROUND
100            + Process.THREAD_PRIORITY_MORE_FAVORABLE;
101
102    private static final int IMAGE_BACKEND_HARD_REF_POOL_SIZE = 2;
103
104    protected final ProcessingTaskConsumer mProcessingTaskConsumer;
105
106    /**
107     * Map for TaskImageContainer and the release of ImageProxy Book-keeping
108     */
109    protected final Map<ImageToProcess, ImageReleaseProtocol> mImageSemaphoreMap;
110    /**
111     * Map for ImageShadowTask and release of blocking on
112     * ImageShadowTask::process
113     */
114    protected final Map<CaptureSession, ImageShadowTask> mShadowTaskMap;
115
116    // The available threadpools for scheduling
117    protected final ExecutorService mThreadPoolFast;
118    protected final ExecutorService mThreadPoolAverage;
119    protected final ExecutorService mThreadPoolSlow;
120
121    private final LruResourcePool<Integer, ByteBuffer> mByteBufferDirectPool;
122
123    /**
124     * Approximate viewable size (in pixels) for the fast thumbnail in the
125     * current UX definition of the product. Note that these values will be the
126     * minimum size of FAST_THUMBNAIL target for the CONVERT_TO_RGB_PREVIEW
127     * task.
128     */
129    private final Size mTinyThumbnailTargetSize;
130
131    /**
132     * A standard viewable size (in pixels) for the filmstrip thumbnail in the
133     * current UX definition of the product. Note that this size is the minimum
134     * size for the Preview on the filmstrip associated with
135     * COMPRESS_TO_JPEG_AND_WRITE_TO_DISK task.
136     */
137    private final static Size FILMSTRIP_THUMBNAIL_TARGET_SIZE = new Size(512, 384);
138
139    // Some invariants to know that we're keeping track of everything
140    // that reflect the state of mImageSemaphoreMap
141    private int mOutstandingImageRefs = 0;
142
143    private int mOutstandingImageOpened = 0;
144
145    private int mOutstandingImageClosed = 0;
146
147    // Objects that may be registered to this objects events.
148    private ImageProcessorProxyListener mProxyListener = null;
149
150    // Default constructor, values are conservatively targeted to the Nexus 6
151    public ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize) {
152        mThreadPoolFast = Executors.newFixedThreadPool(NUM_THREADS_FAST, new FastThreadFactory());
153        mThreadPoolAverage = Executors.newFixedThreadPool(NUM_THREADS_AVERAGE,
154                new AverageThreadFactory());
155        mThreadPoolSlow = Executors.newFixedThreadPool(NUM_THREADS_SLOW, new SlowThreadFactory());
156        mByteBufferDirectPool = new ByteBufferDirectPool(IMAGE_BACKEND_HARD_REF_POOL_SIZE);
157        mProxyListener = new ImageProcessorProxyListener();
158        mImageSemaphoreMap = new HashMap<>();
159        mShadowTaskMap = new HashMap<>();
160        mProcessingTaskConsumer = processingTaskConsumer;
161        mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize);
162    }
163
164    /**
165     * Direct Injection Constructor for Testing purposes.
166     *
167     * @param fastService Service where Tasks of FAST Priority are placed.
168     * @param averageService Service where Tasks of AVERAGE Priority are placed.
169     * @param slowService Service where Tasks of SLOW Priority are placed.
170     * @param imageProcessorProxyListener iamge proxy listener to be used
171     */
172    public ImageBackend(ExecutorService fastService,
173            ExecutorService averageService,
174            ExecutorService slowService,
175            LruResourcePool<Integer, ByteBuffer> byteBufferDirectPool,
176            ImageProcessorProxyListener imageProcessorProxyListener,
177            ProcessingTaskConsumer processingTaskConsumer,
178            int tinyThumbnailSize) {
179        mThreadPoolFast = fastService;
180        mThreadPoolAverage = averageService;
181        mThreadPoolSlow = slowService;
182        mByteBufferDirectPool = byteBufferDirectPool;
183        mProxyListener = imageProcessorProxyListener;
184        mImageSemaphoreMap = new HashMap<>();
185        mShadowTaskMap = new HashMap<>();
186        mProcessingTaskConsumer = processingTaskConsumer;
187        mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize);
188    }
189
190    /**
191     * Simple getter for the associated listener object associated with this
192     * instantiation that handles registration of events listeners.
193     *
194     * @return listener proxy that handles events messaging for this object.
195     */
196    public ImageProcessorProxyListener getProxyListener() {
197        return mProxyListener;
198    }
199
200    /**
201     * Wrapper function for all log messages created by this object. Default
202     * implementation is to send messages to the Android logger. For test
203     * purposes, this method can be overridden to avoid "Stub!" Runtime
204     * exceptions in Unit Tests.
205     */
206    public void logWrapper(String message) {
207        Log.v(TAG, message);
208    }
209
210    /**
211     * @return Number of Image references currently held by this instance
212     */
213    @Override
214    public int getNumberOfReservedOpenImages() {
215        synchronized (mImageSemaphoreMap) {
216            // since mOutstandingImageOpened, mOutstandingImageClosed reflect
217            // the historical state of mImageSemaphoreMap, we need to lock on
218            // before we return a value.
219            return mOutstandingImageOpened - mOutstandingImageClosed;
220        }
221    }
222
223    /**
224     * Returns of the number of receiveImage calls that are currently enqueued
225     * and/or being processed.
226     *
227     * @return The number of receiveImage calls that are currently enqueued
228     *         and/or being processed
229     */
230    @Override
231    public int getNumberOfOutstandingCalls() {
232        synchronized (mShadowTaskMap) {
233            return mShadowTaskMap.size();
234        }
235    }
236
237    /**
238     * Signals the ImageBackend that a tasks has released a reference to the
239     * image. Imagebackend determines whether all references have been released
240     * and applies its specified release protocol of closing image and/or
241     * unblocking the caller. Should ONLY be called by the tasks running on this
242     * class.
243     *
244     * @param img the image to be released by the task.
245     * @param executor the executor on which the image close is run. if null,
246     *            image close is run by the calling thread (usually the main
247     *            task thread).
248     */
249    @Override
250    public void releaseSemaphoreReference(final ImageToProcess img, Executor executor) {
251        synchronized (mImageSemaphoreMap) {
252            ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img);
253            if (protocol == null || protocol.getCount() <= 0) {
254                // That means task implementation has allowed an unbalanced
255                // semaphore release.
256                throw new RuntimeException(
257                        "ERROR: Task implementation did NOT balance its release.");
258            }
259
260            // Normal operation from here.
261            protocol.addCount(-1);
262            mOutstandingImageRefs--;
263            logWrapper("Ref release.  Total refs = " + mOutstandingImageRefs);
264            if (protocol.getCount() == 0) {
265                // Image is ready to be released
266                // Remove the image from the map so that it may be submitted
267                // again.
268                mImageSemaphoreMap.remove(img);
269
270                // Conditionally close the image, specified by initial
271                // receiveImage call
272                if (protocol.closeOnRelease) {
273                    closeImageExecutorSafe(img, executor);
274                    logWrapper("Ref release close.");
275                }
276
277                // Conditionally signal the blocking thread to go.
278                if (protocol.blockUntilRelease) {
279                    protocol.signal();
280                }
281            } else {
282                // Image is still being held by other tasks.
283                // Otherwise, update the semaphore
284                mImageSemaphoreMap.put(img, protocol);
285            }
286        }
287    }
288
289    /**
290     * Spawns dependent tasks from internal implementation of a set of tasks. If
291     * a dependent task does NOT require the image reference, it should be
292     * passed a null pointer as an image reference. In general, this method
293     * should be called after the task has completed its own computations, but
294     * before it has released its own image reference (via the
295     * releaseSemaphoreReference call).
296     *
297     * @param tasks The set of tasks to be run
298     * @return whether tasks are successfully submitted.
299     */
300    @Override
301    public boolean appendTasks(ImageToProcess img, Set<TaskImageContainer> tasks) {
302        // Make sure that referred images are all the same, if it exists.
303        // And count how image references need to be kept track of.
304        int countImageRefs = numPropagatedImageReferences(img, tasks);
305
306        if (img != null) {
307            // If you're still holding onto the reference, make sure you keep
308            // count
309            incrementSemaphoreReferenceCount(img, countImageRefs);
310        }
311
312        // Update the done count on the new tasks.
313        incrementTaskDone(tasks);
314
315        scheduleTasks(tasks);
316        return true;
317    }
318
319    /**
320     * Spawns a single dependent task from internal implementation of a task.
321     *
322     * @param task The task to be run
323     * @return whether tasks are successfully submitted.
324     */
325    @Override
326    public boolean appendTasks(ImageToProcess img, TaskImageContainer task) {
327        Set<TaskImageContainer> tasks = new HashSet<TaskImageContainer>(1);
328        tasks.add(task);
329        return appendTasks(img, tasks);
330    }
331
332    /**
333     * Implements that top-level image single task submission that is defined by
334     * the ImageConsumer interface w/o Runnable to executed.
335     *
336     * @param img Image required by the task
337     * @param task Task to be run
338     * @param blockUntilImageRelease If true, call blocks until the object img
339     *            is no longer referred by any task. If false, call is
340     *            non-blocking
341     * @param closeOnImageRelease If true, images is closed when the object img
342     *            is is no longer referred by any task. If false, After an image
343     *            is submitted, it should never be submitted again to the
344     *            interface until all tasks and their spawned tasks are
345     *            finished.
346     * @return whether jobs were enqueued to the ImageBackend.
347     */
348    @Override
349    public boolean receiveImage(ImageToProcess img, TaskImageContainer task,
350            boolean blockUntilImageRelease, boolean closeOnImageRelease)
351            throws InterruptedException {
352        return receiveImage(img, task, blockUntilImageRelease, closeOnImageRelease,
353                Optional.<Runnable> absent());
354    }
355
356    /**
357     * Implements that top-level image single task submission that is defined by
358     * the ImageConsumer interface.
359     *
360     * @param img Image required by the task
361     * @param task Task to be run
362     * @param blockUntilImageRelease If true, call blocks until the object img
363     *            is no longer referred by any task. If false, call is
364     *            non-blocking
365     * @param closeOnImageRelease If true, images is closed when the object img
366     *            is is no longer referred by any task. If false, After an image
367     *            is submitted, it should never be submitted again to the
368     *            interface until all tasks and their spawned tasks are
369     *            finished.
370     * @param runnableWhenDone Optional runnable to be executed when the set of
371     *            tasks are done.
372     * @return whether jobs were enqueued to the ImageBackend.
373     */
374    @Override
375    public boolean receiveImage(ImageToProcess img, TaskImageContainer task,
376            boolean blockUntilImageRelease, boolean closeOnImageRelease,
377            Optional<Runnable> runnableWhenDone)
378            throws InterruptedException {
379        Set<TaskImageContainer> passTasks = new HashSet<TaskImageContainer>(1);
380        passTasks.add(task);
381        return receiveImage(img, passTasks, blockUntilImageRelease, closeOnImageRelease,
382                runnableWhenDone);
383    }
384
385    /**
386     * Returns an informational string about the current status of ImageBackend,
387     * along with an approximate number of references being held.
388     *
389     * @return an informational string suitable to be dumped into logcat
390     */
391    @Override
392    public String toString() {
393        return "ImageBackend Status BEGIN:\n" +
394                "Shadow Image Map Size = " + mShadowTaskMap.size() + "\n" +
395                "Image Semaphore Map Size = " + mImageSemaphoreMap.size() + "\n" +
396                "OutstandingImageRefs = " + mOutstandingImageRefs + "\n" +
397                "Proxy Listener Map Size = " + mProxyListener.getMapSize() + "\n" +
398                "Proxy Listener = " + mProxyListener.getNumRegisteredListeners() + "\n" +
399                "ImageBackend Status END:\n";
400    }
401
402    /**
403     * Implements that top-level image single task submission that is defined by
404     * the ImageConsumer interface.
405     *
406     * @param img Image required by the task
407     * @param tasks A set of Tasks to be run
408     * @param blockUntilImageRelease If true, call blocks until the object img
409     *            is no longer referred by any task. If false, call is
410     *            non-blocking
411     * @param closeOnImageRelease If true, images is closed when the object img
412     *            is is no longer referred by any task. If false, After an image
413     *            is submitted, it should never be submitted again to the
414     *            interface until all tasks and their spawned tasks are
415     *            finished.
416     * @param runnableWhenDone Optional runnable to be executed when the set of
417     *            tasks are done.
418     * @return whether receiveImage succeeded. Generally, only happens when the
419     *         image reference is null or the task set is empty.
420     * @throws InterruptedException occurs when call is set to be blocking and
421     *             is interrupted.
422     */
423    @Override
424    public boolean receiveImage(ImageToProcess img, Set<TaskImageContainer> tasks,
425            boolean blockUntilImageRelease, boolean closeOnImageRelease,
426            Optional<Runnable> runnableWhenDone)
427            throws InterruptedException {
428
429        // Short circuit if no tasks submitted.
430        if (tasks == null || tasks.size() <= 0) {
431            return false;
432        }
433
434        if (img == null) {
435            // TODO: Determine whether you need to be so strict at the top level
436            throw new RuntimeException("ERROR: Initial call must reference valid Image!");
437        }
438
439        // Make sure that referred images are all the same, if it exists.
440        // And count how image references need to be kept track of.
441        int countImageRefs = numPropagatedImageReferences(img, tasks);
442
443        // Initialize the counters for process-level tasks
444        initializeTaskDone(tasks, runnableWhenDone);
445
446        // Set the semaphore, given that the number of tasks that need to be
447        // scheduled
448        // and the boolean flags for imaging closing and thread blocking
449        ImageReleaseProtocol protocol = setSemaphoreReferenceCount(img, countImageRefs,
450                blockUntilImageRelease, closeOnImageRelease);
451
452        // Put the tasks on their respective queues.
453        scheduleTasks(tasks);
454
455        // Implement blocking if required
456        if (protocol.blockUntilRelease) {
457            protocol.block();
458        }
459
460        return true;
461    }
462
463    /**
464     * Implements that top-level image task submission short-cut that is defined
465     * by the ImageConsumer interface.
466     *
467     * @param img Image required by the task
468     * @param executor Executor to run events and image closes, in case of
469     *            control leakage
470     * @param processingFlags Magical bit vector that specifies jobs to be run
471     *            After an image is submitted, it should never be submitted
472     *            again to the interface until all tasks and their spawned tasks
473     *            are finished.
474     * @param imageProcessorListener Optional listener to automatically register
475     *            at the job task and unregister after all tasks are done
476     * @return whether receiveImage succeeded. Generally, only happens when the
477     *         image reference is null or the task set is empty.
478     * @throws InterruptedException occurs when call is set to be blocking and
479     *             is interrupted.
480     */
481    @Override
482    public boolean receiveImage(ImageToProcess img, Executor executor,
483            Set<ImageTaskFlags> processingFlags, CaptureSession session,
484            Optional<ImageProcessorListener> imageProcessorListener)
485            throws InterruptedException {
486
487        // Uncomment for occasional debugging
488        // Log.v(TAG, toString());
489
490        Set<TaskImageContainer> tasksToExecute = new HashSet<TaskImageContainer>();
491
492        if (img == null) {
493            // No data to process, just pure message.
494            return true;
495        }
496
497        // Now add the pre-mixed versions of the tasks.
498
499        if (processingFlags.contains(ImageTaskFlags.COMPRESS_TO_JPEG_AND_WRITE_TO_DISK)) {
500            if (processingFlags.contains(ImageTaskFlags.CREATE_EARLY_FILMSTRIP_PREVIEW)) {
501                // Request job that creates both filmstrip thumbnail from YUV,
502                // JPEG compression of the YUV Image, and writes the result to
503                // disk
504                tasksToExecute.add(new TaskPreviewChainedJpeg(img, executor, this, session,
505                        FILMSTRIP_THUMBNAIL_TARGET_SIZE, mByteBufferDirectPool));
506            } else {
507                // Request job that only does JPEG compression and writes the
508                // result to disk
509                tasksToExecute.add(new TaskCompressImageToJpeg(img, executor, this, session,
510                      mByteBufferDirectPool));
511            }
512        }
513
514        if (processingFlags.contains(ImageTaskFlags.CONVERT_TO_RGB_PREVIEW)) {
515            // Add an additional type of task to the appropriate queue.
516            tasksToExecute.add(new TaskConvertImageToRGBPreview(img, executor,
517                    this, TaskImageContainer.ProcessingPriority.FAST, session,
518                    mTinyThumbnailTargetSize,
519                    TaskConvertImageToRGBPreview.ThumbnailShape.SQUARE_ASPECT_CIRCULAR_INSET));
520        }
521
522        // Wrap the listener in a runnable that will be fired when all tasks are
523        // complete.
524        final Optional<Runnable> runnableOptional;
525        if (imageProcessorListener.isPresent()) {
526            final ImageProcessorListener finalImageProcessorListener = imageProcessorListener.get();
527            Runnable unregisterRunnable = new Runnable() {
528                @Override
529                public void run() {
530                    getProxyListener().unregisterListener(finalImageProcessorListener);
531                }
532            };
533            runnableOptional = Optional.of(unregisterRunnable);
534        } else {
535            runnableOptional = Optional.<Runnable> absent();
536        }
537
538        if (receiveImage(img, tasksToExecute,
539                processingFlags.contains(ImageTaskFlags.BLOCK_UNTIL_ALL_TASKS_RELEASE),
540                processingFlags.contains(ImageTaskFlags.CLOSE_ON_ALL_TASKS_RELEASE),
541                runnableOptional)) {
542            if (imageProcessorListener.isPresent()) {
543                getProxyListener().registerListener(imageProcessorListener.get(), img.proxy);
544            }
545            return true;
546        } else {
547            return false;
548        }
549    }
550
551    /**
552     * Factory functions, in case, you want some shake and bake functionality.
553     */
554    public TaskConvertImageToRGBPreview createTaskConvertImageToRGBPreview(
555            ImageToProcess image, Executor executor, ImageBackend imageBackend,
556            CaptureSession session, Size targetSize,
557            TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape) {
558        return new TaskConvertImageToRGBPreview(image, executor, imageBackend,
559                TaskImageContainer.ProcessingPriority.FAST, session,
560                mTinyThumbnailTargetSize, thumbnailShape);
561    }
562
563    public TaskCompressImageToJpeg createTaskCompressImageToJpeg(ImageToProcess image,
564            Executor executor, ImageBackend imageBackend, CaptureSession session) {
565        return new TaskCompressImageToJpeg(image, executor, imageBackend, session,
566              mByteBufferDirectPool);
567    }
568
569    /**
570     * Blocks and waits for all tasks to complete.
571     */
572    @Override
573    public void shutdown() {
574        mThreadPoolSlow.shutdown();
575        mThreadPoolFast.shutdown();
576    }
577
578    /**
579     * For a given set of starting tasks, initialize the associated sessions
580     * with a proper blocking semaphore and value of number of tasks to be run.
581     * For each semaphore, a ImageShadowTask will be instantiated and enqueued
582     * onto the selected ProcessingSerivceManager.
583     *
584     * @param tasks The set of ImageContainer tasks to be run on ImageBackend
585     */
586    protected void initializeTaskDone(Set<TaskImageContainer> tasks,
587            Optional<Runnable> runnableWhenDone) {
588        Set<CaptureSession> sessionSet = new HashSet<>();
589        Map<CaptureSession, Integer> sessionTaskCount = new HashMap<>();
590
591        // Create a set w/ no session duplicates and count them
592        for (TaskImageContainer task : tasks) {
593            sessionSet.add(task.mSession);
594            Integer currentCount = sessionTaskCount.get(task.mSession);
595            if (currentCount == null) {
596                sessionTaskCount.put(task.mSession, 1);
597            } else {
598                sessionTaskCount.put(task.mSession, currentCount + 1);
599            }
600        }
601
602        // Create a new blocking semaphore for each set of tasks on a given
603        // session.
604        synchronized (mShadowTaskMap) {
605            for (CaptureSession captureSession : sessionSet) {
606                BlockSignalProtocol protocol = new BlockSignalProtocol();
607                protocol.setCount(sessionTaskCount.get(captureSession));
608                final ImageShadowTask shadowTask;
609                shadowTask = new ImageShadowTask(protocol, captureSession,
610                            runnableWhenDone);
611                mShadowTaskMap.put(captureSession, shadowTask);
612                mProcessingTaskConsumer.enqueueTask(shadowTask);
613            }
614        }
615    }
616
617    /**
618     * For ImageBackend tasks that spawn their own tasks, increase the semaphore
619     * count to take into account the new tasks being spawned.
620     *
621     * @param tasks The set of tasks to be spawned.
622     */
623    protected void incrementTaskDone(Set<TaskImageContainer> tasks) throws RuntimeException {
624        // TODO: Add invariant test so that all sessions are the same.
625        synchronized (mShadowTaskMap) {
626            for (TaskImageContainer task : tasks) {
627                ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession);
628                if (shadowTask == null) {
629                    throw new RuntimeException(
630                            "Session NOT previously registered."
631                                    + " ImageShadowTask booking-keeping is incorrect.");
632                }
633                shadowTask.getProtocol().addCount(1);
634            }
635        }
636    }
637
638    /**
639     * Decrement the semaphore count of the ImageShadowTask. Should be called
640     * when a task completes its processing in ImageBackend.
641     *
642     * @param imageShadowTask The ImageShadow task that contains the blocking
643     *            semaphore.
644     * @return whether all the tasks associated with an ImageShadowTask are done
645     */
646    protected boolean decrementTaskDone(ImageShadowTask imageShadowTask) {
647        synchronized (mShadowTaskMap) {
648            int remainingTasks = imageShadowTask.getProtocol().addCount(-1);
649            if (remainingTasks == 0) {
650                mShadowTaskMap.remove(imageShadowTask.getSession());
651                imageShadowTask.getProtocol().signal();
652                return true;
653            } else {
654                return false;
655            }
656        }
657
658    }
659
660    /**
661     * Puts the tasks on the specified queue. May be more complicated in the
662     * future.
663     *
664     * @param tasks The set of tasks to be run
665     */
666    protected void scheduleTasks(Set<TaskImageContainer> tasks) {
667        synchronized (mShadowTaskMap) {
668            for (TaskImageContainer task : tasks) {
669                ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession);
670                if (shadowTask == null) {
671                    throw new IllegalStateException("Scheduling a task with a unknown session.");
672                }
673                // Before scheduling, wrap TaskImageContainer inside of the
674                // TaskDoneWrapper to add
675                // instrumentation for managing ImageShadowTasks
676                switch (task.getProcessingPriority()) {
677                    case FAST:
678                        mThreadPoolFast.execute(new TaskDoneWrapper(this, shadowTask, task));
679                        break;
680                    case AVERAGE:
681                        mThreadPoolAverage.execute(new TaskDoneWrapper(this, shadowTask, task));
682                        break;
683                    case SLOW:
684                        mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task));
685                        break;
686                    default:
687                        mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task));
688                        break;
689                }
690            }
691        }
692    }
693
694    /**
695     * Initializes the semaphore count for the image
696     *
697     * @return The protocol object that keeps tracks of the image reference
698     *         count and actions to be taken on release.
699     */
700    protected ImageReleaseProtocol setSemaphoreReferenceCount(ImageToProcess img, int count,
701            boolean blockUntilRelease, boolean closeOnRelease) throws RuntimeException {
702        synchronized (mImageSemaphoreMap) {
703            if (mImageSemaphoreMap.get(img) != null) {
704                throw new RuntimeException(
705                        "ERROR: Rewriting of Semaphore Lock."
706                                + "  Image references may not freed properly");
707            }
708
709            // Create the new booking-keeping object.
710            ImageReleaseProtocol protocol = new ImageReleaseProtocol(blockUntilRelease,
711                    closeOnRelease);
712            protocol.setCount(count);
713
714            mImageSemaphoreMap.put(img, protocol);
715            mOutstandingImageRefs += count;
716            mOutstandingImageOpened++;
717            logWrapper("Received an opened image: " + mOutstandingImageOpened + "/"
718                    + mOutstandingImageClosed);
719            logWrapper("Setting an image reference count of " + count + "   Total refs = "
720                    + mOutstandingImageRefs);
721            return protocol;
722        }
723    }
724
725    /**
726     * Increments the semaphore count for the image. Should ONLY be internally
727     * via appendTasks by internal tasks. Otherwise, image references could get
728     * out of whack.
729     *
730     * @param img The Image associated with the set of tasks running on it.
731     * @param count The number of tasks to be added
732     * @throws RuntimeException Indicates image Closing Bookkeeping is screwed
733     *             up.
734     */
735    protected void incrementSemaphoreReferenceCount(ImageToProcess img, int count)
736            throws RuntimeException {
737        synchronized (mImageSemaphoreMap) {
738            ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img);
739            if (mImageSemaphoreMap.get(img) == null) {
740                throw new RuntimeException(
741                        "Image Reference has already been released or has never been held.");
742            }
743
744            protocol.addCount(count);
745            mImageSemaphoreMap.put(img, protocol);
746
747            mOutstandingImageRefs += count;
748        }
749    }
750
751    /**
752     * Close an Image with a executor if it's available and does the proper
753     * booking keeping on the object.
754     *
755     * @param img Image to be closed
756     * @param executor Executor to be used, if executor is null, the close is
757     *            run on the task thread
758     */
759    private void closeImageExecutorSafe(final ImageToProcess img, Executor executor) {
760        Runnable closeTask = new Runnable() {
761            @Override
762            public void run() {
763                img.proxy.close();
764                mOutstandingImageClosed++;
765                logWrapper("Release of image occurred.  Good fun. " + "Total Images Open/Closed = "
766                        + mOutstandingImageOpened + "/" + mOutstandingImageClosed);
767            }
768        };
769        if (executor == null) {
770            // Just run it on the main thread.
771            closeTask.run();
772        } else {
773            executor.execute(closeTask);
774        }
775    }
776
777    /**
778     * Calculates the number of new Image references in a set of dependent
779     * tasks. Checks to make sure no new image references are being introduced.
780     *
781     * @param tasks The set of dependent tasks to be run
782     */
783    private int numPropagatedImageReferences(ImageToProcess img, Set<TaskImageContainer> tasks)
784            throws RuntimeException {
785        int countImageRefs = 0;
786        for (TaskImageContainer task : tasks) {
787            if (task.mImage != null && task.mImage != img) {
788                throw new RuntimeException("ERROR:  Spawned tasks cannot reference new images!");
789            }
790
791            if (task.mImage != null) {
792                countImageRefs++;
793            }
794        }
795
796        return countImageRefs;
797    }
798
799    /**
800     * Simple wrapper task to instrument when tasks ends so that ImageBackend
801     * can fire events when set of tasks created by a ReceiveImage call have all
802     * completed.
803     */
804    private class TaskDoneWrapper implements Runnable {
805        private final ImageBackend mImageBackend;
806        private final ImageShadowTask mImageShadowTask;
807        private final TaskImageContainer mWrappedTask;
808
809        /**
810         * Constructor
811         *
812         * @param imageBackend ImageBackend that the task is running on
813         * @param imageShadowTask ImageShadowTask that is blocking on the
814         *            completion of the task
815         * @param wrappedTask The task to be run w/o instrumentation
816         */
817        public TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask,
818                TaskImageContainer wrappedTask) {
819            mImageBackend = imageBackend;
820            mImageShadowTask = imageShadowTask;
821            mWrappedTask = wrappedTask;
822        }
823
824        /**
825         * Adds instrumentation that runs when a TaskImageContainer completes.
826         */
827        @Override
828        public void run() {
829            mWrappedTask.run();
830            // Decrement count
831            if (mImageBackend.decrementTaskDone(mImageShadowTask)) {
832                // If you're the last one...
833                Runnable doneRunnable = mImageShadowTask.getRunnableWhenDone();
834                if (doneRunnable != null) {
835                    if (mWrappedTask.mExecutor == null) {
836                        doneRunnable.run();
837                    } else {
838                        mWrappedTask.mExecutor.execute(doneRunnable);
839                    }
840                }
841            }
842        }
843    }
844
845    /**
846     * Encapsulates all synchronization for semaphore signaling and blocking.
847     */
848    static public class BlockSignalProtocol {
849        private int count;
850
851        private final ReentrantLock mLock = new ReentrantLock();
852
853        private Condition mSignal;
854
855        public void setCount(int value) {
856            mLock.lock();
857            count = value;
858            mLock.unlock();
859        }
860
861        public int getCount() {
862            int value;
863            mLock.lock();
864            value = count;
865            mLock.unlock();
866            return value;
867        }
868
869        public int addCount(int value) {
870            mLock.lock();
871            try {
872                count += value;
873                return count;
874            } finally {
875                mLock.unlock();
876            }
877        }
878
879        BlockSignalProtocol() {
880            count = 0;
881            mSignal = mLock.newCondition();
882        }
883
884        public void block() throws InterruptedException {
885            mLock.lock();
886            try {
887                while (count != 0) {
888                    // Spin to deal with spurious signals.
889                    mSignal.await();
890                }
891            } catch (InterruptedException e) {
892                // TODO: on interruption, figure out what to do.
893                throw (e);
894            } finally {
895                mLock.unlock();
896            }
897        }
898
899        public void signal() {
900            mLock.lock();
901            mSignal.signal();
902            mLock.unlock();
903        }
904
905    }
906
907    /**
908     * A simple tuple class to keep track of image reference, and whether to
909     * block and/or close on final image release. Instantiated on every task
910     * submission call.
911     */
912    static public class ImageReleaseProtocol extends BlockSignalProtocol {
913
914        public final boolean blockUntilRelease;
915
916        public final boolean closeOnRelease;
917
918        ImageReleaseProtocol(boolean block, boolean close) {
919            super();
920            blockUntilRelease = block;
921            closeOnRelease = close;
922        }
923
924    }
925
926    // Thread factories for a default constructor
927    private class FastThreadFactory implements ThreadFactory {
928        @Override
929        public Thread newThread(Runnable r) {
930            Thread t = new AndroidPriorityThread(FAST_THREAD_PRIORITY, r);
931            return t;
932        }
933    }
934
935    private class AverageThreadFactory implements ThreadFactory {
936        @Override
937        public Thread newThread(Runnable r) {
938            Thread t = new AndroidPriorityThread(AVERAGE_THREAD_PRIORITY, r);
939            return t;
940        }
941    }
942
943    private class SlowThreadFactory implements ThreadFactory {
944        @Override
945        public Thread newThread(Runnable r) {
946            Thread t = new AndroidPriorityThread(SLOW_THREAD_PRIORITY, r);
947            return t;
948        }
949    }
950
951}
952