1/* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package androidx.room; 18 19import androidx.annotation.RestrictTo; 20import androidx.arch.core.executor.ArchTaskExecutor; 21 22import java.util.Set; 23import java.util.concurrent.Callable; 24import java.util.concurrent.TimeUnit; 25import java.util.concurrent.atomic.AtomicBoolean; 26 27import io.reactivex.BackpressureStrategy; 28import io.reactivex.Flowable; 29import io.reactivex.FlowableEmitter; 30import io.reactivex.FlowableOnSubscribe; 31import io.reactivex.Maybe; 32import io.reactivex.MaybeSource; 33import io.reactivex.Scheduler; 34import io.reactivex.annotations.NonNull; 35import io.reactivex.disposables.Disposable; 36import io.reactivex.disposables.Disposables; 37import io.reactivex.functions.Action; 38import io.reactivex.functions.Function; 39 40/** 41 * Helper class to add RxJava2 support to Room. 42 */ 43@SuppressWarnings("WeakerAccess") 44public class RxRoom { 45 /** 46 * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}. 47 */ 48 public static final Object NOTHING = new Object(); 49 50 /** 51 * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the 52 * observed tables is updated. 53 * <p> 54 * You can easily chain a database operation to downstream of this {@link Flowable} to ensure 55 * that it re-runs when database is modified. 56 * <p> 57 * Since database invalidation is batched, multiple changes in the database may results in just 58 * 1 emission. 59 * 60 * @param database The database instance 61 * @param tableNames The list of table names that should be observed 62 * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables 63 * is modified (also once when the invalidation tracker connection is established). 64 */ 65 public static Flowable<Object> createFlowable(final RoomDatabase database, 66 final String... tableNames) { 67 return Flowable.create(new FlowableOnSubscribe<Object>() { 68 @Override 69 public void subscribe(final FlowableEmitter<Object> emitter) throws Exception { 70 final InvalidationTracker.Observer observer = new InvalidationTracker.Observer( 71 tableNames) { 72 @Override 73 public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) { 74 if (!emitter.isCancelled()) { 75 emitter.onNext(NOTHING); 76 } 77 } 78 }; 79 if (!emitter.isCancelled()) { 80 database.getInvalidationTracker().addObserver(observer); 81 emitter.setDisposable(Disposables.fromAction(new Action() { 82 @Override 83 public void run() throws Exception { 84 database.getInvalidationTracker().removeObserver(observer); 85 } 86 })); 87 } 88 89 // emit once to avoid missing any data and also easy chaining 90 if (!emitter.isCancelled()) { 91 emitter.onNext(NOTHING); 92 } 93 } 94 }, BackpressureStrategy.LATEST); 95 } 96 97 /** 98 * Helper method used by generated code to bind a Callable such that it will be run in 99 * our disk io thread and will automatically block null values since RxJava2 does not like null. 100 * 101 * @hide 102 */ 103 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) 104 public static <T> Flowable<T> createFlowable(final RoomDatabase database, 105 final String[] tableNames, final Callable<T> callable) { 106 final Maybe<T> maybe = Maybe.fromCallable(callable); 107 return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler) 108 .flatMapMaybe(new Function<Object, MaybeSource<T>>() { 109 @Override 110 public MaybeSource<T> apply(Object o) throws Exception { 111 return maybe; 112 } 113 }); 114 } 115 116 private static Scheduler sAppToolkitIOScheduler = new Scheduler() { 117 @Override 118 public Worker createWorker() { 119 final AtomicBoolean mDisposed = new AtomicBoolean(false); 120 return new Worker() { 121 @Override 122 public Disposable schedule(@NonNull Runnable run, long delay, 123 @NonNull TimeUnit unit) { 124 DisposableRunnable disposable = new DisposableRunnable(run, mDisposed); 125 ArchTaskExecutor.getInstance().executeOnDiskIO(run); 126 return disposable; 127 } 128 129 @Override 130 public void dispose() { 131 mDisposed.set(true); 132 } 133 134 @Override 135 public boolean isDisposed() { 136 return mDisposed.get(); 137 } 138 }; 139 } 140 }; 141 142 private static class DisposableRunnable implements Disposable, Runnable { 143 private final Runnable mActual; 144 private volatile boolean mDisposed = false; 145 private final AtomicBoolean mGlobalDisposed; 146 147 DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) { 148 mActual = actual; 149 mGlobalDisposed = globalDisposed; 150 } 151 152 @Override 153 public void dispose() { 154 mDisposed = true; 155 } 156 157 @Override 158 public boolean isDisposed() { 159 return mDisposed || mGlobalDisposed.get(); 160 } 161 162 @Override 163 public void run() { 164 if (!isDisposed()) { 165 mActual.run(); 166 } 167 } 168 } 169 170 /** @deprecated This type should not be instantiated as it contains only static methods. */ 171 @Deprecated 172 @SuppressWarnings("PrivateConstructorForUtilityClass") 173 public RxRoom() { 174 } 175} 176