1/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.room;
18
19import androidx.annotation.RestrictTo;
20import androidx.arch.core.executor.ArchTaskExecutor;
21
22import java.util.Set;
23import java.util.concurrent.Callable;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.atomic.AtomicBoolean;
26
27import io.reactivex.BackpressureStrategy;
28import io.reactivex.Flowable;
29import io.reactivex.FlowableEmitter;
30import io.reactivex.FlowableOnSubscribe;
31import io.reactivex.Maybe;
32import io.reactivex.MaybeSource;
33import io.reactivex.Scheduler;
34import io.reactivex.annotations.NonNull;
35import io.reactivex.disposables.Disposable;
36import io.reactivex.disposables.Disposables;
37import io.reactivex.functions.Action;
38import io.reactivex.functions.Function;
39
40/**
41 * Helper class to add RxJava2 support to Room.
42 */
43@SuppressWarnings("WeakerAccess")
44public class RxRoom {
45    /**
46     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
47     */
48    public static final Object NOTHING = new Object();
49
50    /**
51     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
52     * observed tables is updated.
53     * <p>
54     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
55     * that it re-runs when database is modified.
56     * <p>
57     * Since database invalidation is batched, multiple changes in the database may results in just
58     * 1 emission.
59     *
60     * @param database   The database instance
61     * @param tableNames The list of table names that should be observed
62     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
63     * is modified (also once when the invalidation tracker connection is established).
64     */
65    public static Flowable<Object> createFlowable(final RoomDatabase database,
66            final String... tableNames) {
67        return Flowable.create(new FlowableOnSubscribe<Object>() {
68            @Override
69            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
70                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
71                        tableNames) {
72                    @Override
73                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
74                        if (!emitter.isCancelled()) {
75                            emitter.onNext(NOTHING);
76                        }
77                    }
78                };
79                if (!emitter.isCancelled()) {
80                    database.getInvalidationTracker().addObserver(observer);
81                    emitter.setDisposable(Disposables.fromAction(new Action() {
82                        @Override
83                        public void run() throws Exception {
84                            database.getInvalidationTracker().removeObserver(observer);
85                        }
86                    }));
87                }
88
89                // emit once to avoid missing any data and also easy chaining
90                if (!emitter.isCancelled()) {
91                    emitter.onNext(NOTHING);
92                }
93            }
94        }, BackpressureStrategy.LATEST);
95    }
96
97    /**
98     * Helper method used by generated code to bind a Callable such that it will be run in
99     * our disk io thread and will automatically block null values since RxJava2 does not like null.
100     *
101     * @hide
102     */
103    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
104    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
105            final String[] tableNames, final Callable<T> callable) {
106        final Maybe<T> maybe = Maybe.fromCallable(callable);
107        return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler)
108                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
109                    @Override
110                    public MaybeSource<T> apply(Object o) throws Exception {
111                        return maybe;
112                    }
113                });
114    }
115
116    private static Scheduler sAppToolkitIOScheduler = new Scheduler() {
117        @Override
118        public Worker createWorker() {
119            final AtomicBoolean mDisposed = new AtomicBoolean(false);
120            return new Worker() {
121                @Override
122                public Disposable schedule(@NonNull Runnable run, long delay,
123                        @NonNull TimeUnit unit) {
124                    DisposableRunnable disposable = new DisposableRunnable(run, mDisposed);
125                    ArchTaskExecutor.getInstance().executeOnDiskIO(run);
126                    return disposable;
127                }
128
129                @Override
130                public void dispose() {
131                    mDisposed.set(true);
132                }
133
134                @Override
135                public boolean isDisposed() {
136                    return mDisposed.get();
137                }
138            };
139        }
140    };
141
142    private static class DisposableRunnable implements Disposable, Runnable {
143        private final Runnable mActual;
144        private volatile boolean mDisposed = false;
145        private final AtomicBoolean mGlobalDisposed;
146
147        DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) {
148            mActual = actual;
149            mGlobalDisposed = globalDisposed;
150        }
151
152        @Override
153        public void dispose() {
154            mDisposed = true;
155        }
156
157        @Override
158        public boolean isDisposed() {
159            return mDisposed || mGlobalDisposed.get();
160        }
161
162        @Override
163        public void run() {
164            if (!isDisposed()) {
165                mActual.run();
166            }
167        }
168    }
169
170    /** @deprecated This type should not be instantiated as it contains only static methods. */
171    @Deprecated
172    @SuppressWarnings("PrivateConstructorForUtilityClass")
173    public RxRoom() {
174    }
175}
176