ObservableCombiner.java revision e919a48fb40b9d6c698a495acf40adbc0e320431
1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.android.camera.async; 18 19import com.android.camera.util.Callback; 20import com.google.common.base.Function; 21import com.google.common.collect.ImmutableList; 22 23import java.util.ArrayList; 24import java.util.List; 25import java.util.concurrent.Executor; 26 27import javax.annotation.CheckReturnValue; 28import javax.annotation.Nonnull; 29import javax.annotation.ParametersAreNonnullByDefault; 30import javax.annotation.concurrent.GuardedBy; 31import javax.annotation.concurrent.ThreadSafe; 32 33/** 34 * Enables combining multiple {@link Observable}s together with a given 35 * function. 36 * <p> 37 * Callbacks added to the resulting observable are notified when any of the 38 * dependencies change. 39 */ 40@ThreadSafe 41@ParametersAreNonnullByDefault 42final class ObservableCombiner<I, O> implements Observable<O> { 43 private final ImmutableList<Observable<I>> mInputs; 44 private final Function<List<I>, O> mFunction; 45 46 private final Object mLock; 47 48 @GuardedBy("mLock") 49 private final ConcurrentState<O> mListenerNotifier; 50 51 @GuardedBy("mLock") 52 private final List<SafeCloseable> mInputCallbackHandles; 53 54 @GuardedBy("mLock") 55 private int mNumRegisteredCallbacks; 56 57 /** 58 * The thread-safe callback to be registered with each input. 59 */ 60 private final Updatable<I> mInputCallback = new Updatable<I>() { 61 public void update(I ignored) { 62 mListenerNotifier.update(get()); 63 } 64 }; 65 66 private ObservableCombiner(List<? extends Observable<I>> inputs, 67 Function<List<I>, O> function, O initialValue) { 68 mInputs = ImmutableList.copyOf(inputs); 69 mFunction = function; 70 mListenerNotifier = new ConcurrentState<>(initialValue); 71 mLock = new Object(); 72 mInputCallbackHandles = new ArrayList<>(); 73 mNumRegisteredCallbacks = 0; 74 } 75 76 /** 77 * Transforms a set of input observables with a function. 78 * 79 * @param inputs The input observables. 80 * @param function The function to apply to all of the inputs. 81 * @param <I> The type of all inputs values. 82 * @param <O> The type of the output values. 83 * @return An observable which will reflect the combination of all inputs 84 * with the given function. Changes in the output value will result 85 * in calls to any callbacks registered with the output. 86 */ 87 static <I, O> Observable<O> transform(List<? extends Observable<I>> inputs, 88 Function<List<I>, O> function) { 89 // Compute the initial value. 90 ArrayList<I> deps = new ArrayList<>(); 91 for (Observable<? extends I> input : inputs) { 92 deps.add(input.get()); 93 } 94 O initialValue = function.apply(deps); 95 96 return new ObservableCombiner<>(inputs, function, initialValue); 97 } 98 99 @GuardedBy("mLock") 100 private void addCallbacksToInputs() { 101 for (Observable<I> observable : mInputs) { 102 final SafeCloseable callbackHandle = 103 Observables.addThreadSafeCallback(observable, mInputCallback); 104 105 mInputCallbackHandles.add(callbackHandle); 106 } 107 } 108 109 @GuardedBy("mLock") 110 private void removeCallbacksFromInputs() { 111 for (SafeCloseable callbackHandle : mInputCallbackHandles) { 112 callbackHandle.close(); 113 } 114 } 115 116 @Nonnull 117 @Override 118 @CheckReturnValue 119 public SafeCloseable addCallback(final Callback<O> callback, Executor executor) { 120 // When a callback is added to this, the "output", we must ensure that 121 // callbacks are registered with all of the inputs so that they can be 122 // forwarded properly. 123 // Instead of adding another callback to each input for each callback 124 // registered with the output, callbacks are registered when the first 125 // output callback is added, and removed when the last output callback 126 // is removed. 127 128 synchronized (mLock) { 129 if (mNumRegisteredCallbacks == 0) { 130 addCallbacksToInputs(); 131 } 132 mNumRegisteredCallbacks++; 133 } 134 135 // Wrap the callback in a {@link FilteredCallback} to prevent many 136 // duplicate/cascading updates even if the output does not change. 137 final SafeCloseable resultingCallbackHandle = mListenerNotifier.addCallback( 138 new FilteredCallback<O>(callback), executor); 139 140 return new SafeCloseable() { 141 @Override 142 public void close() { 143 resultingCallbackHandle.close(); 144 145 synchronized (mLock) { 146 mNumRegisteredCallbacks--; 147 if (mNumRegisteredCallbacks == 0) { 148 removeCallbacksFromInputs(); 149 } 150 } 151 } 152 }; 153 } 154 155 @Nonnull 156 @Override 157 public O get() { 158 ArrayList<I> deps = new ArrayList<>(); 159 for (Observable<? extends I> dependency : mInputs) { 160 deps.add(dependency.get()); 161 } 162 return mFunction.apply(deps); 163 } 164} 165