/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package androidx.room; import androidx.annotation.RestrictTo; import androidx.arch.core.executor.ArchTaskExecutor; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import io.reactivex.FlowableOnSubscribe; import io.reactivex.Maybe; import io.reactivex.MaybeSource; import io.reactivex.Scheduler; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.functions.Action; import io.reactivex.functions.Function; /** * Helper class to add RxJava2 support to Room. */ @SuppressWarnings("WeakerAccess") public class RxRoom { /** * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}. */ public static final Object NOTHING = new Object(); /** * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the * observed tables is updated. *

* You can easily chain a database operation to downstream of this {@link Flowable} to ensure * that it re-runs when database is modified. *

* Since database invalidation is batched, multiple changes in the database may results in just * 1 emission. * * @param database The database instance * @param tableNames The list of table names that should be observed * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables * is modified (also once when the invalidation tracker connection is established). */ public static Flowable createFlowable(final RoomDatabase database, final String... tableNames) { return Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(final FlowableEmitter emitter) throws Exception { final InvalidationTracker.Observer observer = new InvalidationTracker.Observer( tableNames) { @Override public void onInvalidated(@androidx.annotation.NonNull Set tables) { if (!emitter.isCancelled()) { emitter.onNext(NOTHING); } } }; if (!emitter.isCancelled()) { database.getInvalidationTracker().addObserver(observer); emitter.setDisposable(Disposables.fromAction(new Action() { @Override public void run() throws Exception { database.getInvalidationTracker().removeObserver(observer); } })); } // emit once to avoid missing any data and also easy chaining if (!emitter.isCancelled()) { emitter.onNext(NOTHING); } } }, BackpressureStrategy.LATEST); } /** * Helper method used by generated code to bind a Callable such that it will be run in * our disk io thread and will automatically block null values since RxJava2 does not like null. * * @hide */ @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) public static Flowable createFlowable(final RoomDatabase database, final String[] tableNames, final Callable callable) { final Maybe maybe = Maybe.fromCallable(callable); return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler) .flatMapMaybe(new Function>() { @Override public MaybeSource apply(Object o) throws Exception { return maybe; } }); } private static Scheduler sAppToolkitIOScheduler = new Scheduler() { @Override public Worker createWorker() { final AtomicBoolean mDisposed = new AtomicBoolean(false); return new Worker() { @Override public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { DisposableRunnable disposable = new DisposableRunnable(run, mDisposed); ArchTaskExecutor.getInstance().executeOnDiskIO(run); return disposable; } @Override public void dispose() { mDisposed.set(true); } @Override public boolean isDisposed() { return mDisposed.get(); } }; } }; private static class DisposableRunnable implements Disposable, Runnable { private final Runnable mActual; private volatile boolean mDisposed = false; private final AtomicBoolean mGlobalDisposed; DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) { mActual = actual; mGlobalDisposed = globalDisposed; } @Override public void dispose() { mDisposed = true; } @Override public boolean isDisposed() { return mDisposed || mGlobalDisposed.get(); } @Override public void run() { if (!isDisposed()) { mActual.run(); } } } /** @deprecated This type should not be instantiated as it contains only static methods. */ @Deprecated @SuppressWarnings("PrivateConstructorForUtilityClass") public RxRoom() { } }