13c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar/*
23c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * Copyright (C) 2017 The Android Open Source Project
33c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar *
43c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * Licensed under the Apache License, Version 2.0 (the "License");
53c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * you may not use this file except in compliance with the License.
63c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * You may obtain a copy of the License at
73c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar *
83c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar *      http://www.apache.org/licenses/LICENSE-2.0
93c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar *
103c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * Unless required by applicable law or agreed to in writing, software
113c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * distributed under the License is distributed on an "AS IS" BASIS,
123c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
133c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * See the License for the specific language governing permissions and
143c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * limitations under the License.
153c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar */
163c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
17ba069d50913c3fb250bb60ec310439db36895337Alan Viverettepackage androidx.room;
183c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
19ba069d50913c3fb250bb60ec310439db36895337Alan Viveretteimport androidx.annotation.RestrictTo;
20ddee2b5170ae257a7b2494f8aaa8459ebed806dcAurimas Liutikasimport androidx.arch.core.executor.ArchTaskExecutor;
213c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
222ec1285ef79d4849069efe95cfbac2307d291a47Yuichi Arakiimport java.util.Set;
233c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport java.util.concurrent.Callable;
243c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport java.util.concurrent.TimeUnit;
253c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport java.util.concurrent.atomic.AtomicBoolean;
263c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
273c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.BackpressureStrategy;
283c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.Flowable;
293c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.FlowableEmitter;
303c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.FlowableOnSubscribe;
318fd4a7bd57e102cda143fa113751e92ff79527e9Jake Whartonimport io.reactivex.Maybe;
328fd4a7bd57e102cda143fa113751e92ff79527e9Jake Whartonimport io.reactivex.MaybeSource;
333c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.Scheduler;
343c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.annotations.NonNull;
353c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.disposables.Disposable;
363c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.disposables.Disposables;
373c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.functions.Action;
383c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarimport io.reactivex.functions.Function;
393c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
403c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar/**
413c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar * Helper class to add RxJava2 support to Room.
423c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar */
433c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar@SuppressWarnings("WeakerAccess")
443c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyarpublic class RxRoom {
453c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    /**
463c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
473c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     */
483c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    public static final Object NOTHING = new Object();
493c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
503c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    /**
513c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
523c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * observed tables is updated.
533c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * <p>
543c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
553c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * that it re-runs when database is modified.
563c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * <p>
573c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * Since database invalidation is batched, multiple changes in the database may results in just
583c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * 1 emission.
593c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     *
603c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * @param database   The database instance
613c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * @param tableNames The list of table names that should be observed
623c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
633c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * is modified (also once when the invalidation tracker connection is established).
643c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     */
653c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    public static Flowable<Object> createFlowable(final RoomDatabase database,
663c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            final String... tableNames) {
673c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        return Flowable.create(new FlowableOnSubscribe<Object>() {
683c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            @Override
693c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
703c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
713c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        tableNames) {
723c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    @Override
73ba069d50913c3fb250bb60ec310439db36895337Alan Viverette                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
743c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        if (!emitter.isCancelled()) {
753c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                            emitter.onNext(NOTHING);
763c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        }
773c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    }
783c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                };
793c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                if (!emitter.isCancelled()) {
803c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    database.getInvalidationTracker().addObserver(observer);
813c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    emitter.setDisposable(Disposables.fromAction(new Action() {
823c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        @Override
833c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        public void run() throws Exception {
843c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                            database.getInvalidationTracker().removeObserver(observer);
853c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        }
863c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    }));
873c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                }
883c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
893c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                // emit once to avoid missing any data and also easy chaining
903c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                if (!emitter.isCancelled()) {
913c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    emitter.onNext(NOTHING);
923c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                }
933c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            }
943c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }, BackpressureStrategy.LATEST);
953c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    }
963c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
973c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    /**
983c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * Helper method used by generated code to bind a Callable such that it will be run in
993c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * our disk io thread and will automatically block null values since RxJava2 does not like null.
1003c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     *
1013c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     * @hide
1023c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar     */
1033c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
1043c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
1053c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            final String[] tableNames, final Callable<T> callable) {
1068fd4a7bd57e102cda143fa113751e92ff79527e9Jake Wharton        final Maybe<T> maybe = Maybe.fromCallable(callable);
1073c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler)
1088fd4a7bd57e102cda143fa113751e92ff79527e9Jake Wharton                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
1093c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    @Override
1108fd4a7bd57e102cda143fa113751e92ff79527e9Jake Wharton                    public MaybeSource<T> apply(Object o) throws Exception {
1118fd4a7bd57e102cda143fa113751e92ff79527e9Jake Wharton                        return maybe;
1123c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    }
1133c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                });
1143c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    }
1153c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1163c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    private static Scheduler sAppToolkitIOScheduler = new Scheduler() {
1173c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        @Override
1183c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        public Worker createWorker() {
1193c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            final AtomicBoolean mDisposed = new AtomicBoolean(false);
1203c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            return new Worker() {
1213c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                @Override
1223c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                public Disposable schedule(@NonNull Runnable run, long delay,
1233c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                        @NonNull TimeUnit unit) {
1243c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    DisposableRunnable disposable = new DisposableRunnable(run, mDisposed);
125ae36c8b11a64d3cdc9ba6e37d9f3d1d250fdc4a8Yigit Boyar                    ArchTaskExecutor.getInstance().executeOnDiskIO(run);
1263c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    return disposable;
1273c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                }
1283c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1293c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                @Override
1303c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                public void dispose() {
1313c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    mDisposed.set(true);
1323c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                }
1333c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1343c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                @Override
1353c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                public boolean isDisposed() {
1363c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                    return mDisposed.get();
1373c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                }
1383c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            };
1393c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }
1403c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    };
1413c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1423c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    private static class DisposableRunnable implements Disposable, Runnable {
1433c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        private final Runnable mActual;
1443c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        private volatile boolean mDisposed = false;
1453c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        private final AtomicBoolean mGlobalDisposed;
1463c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1473c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) {
1483c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            mActual = actual;
1493c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            mGlobalDisposed = globalDisposed;
1503c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }
1513c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1523c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        @Override
1533c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        public void dispose() {
1543c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            mDisposed = true;
1553c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }
1563c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1573c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        @Override
1583c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        public boolean isDisposed() {
1593c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            return mDisposed || mGlobalDisposed.get();
1603c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }
1613c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar
1623c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        @Override
1633c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        public void run() {
1643c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            if (!isDisposed()) {
1653c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar                mActual.run();
1663c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar            }
1673c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar        }
1683c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar    }
16923b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton
17023b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton    /** @deprecated This type should not be instantiated as it contains only static methods. */
17123b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton    @Deprecated
17223b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton    @SuppressWarnings("PrivateConstructorForUtilityClass")
17323b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton    public RxRoom() {
17423b05b05914468496d8b39fcf8d8662ca7f3bddbJake Wharton    }
1753c592c4ccbc6052b11443b0fa575052c08fefa55Yigit Boyar}
176