15349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld/*
25349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Copyright (C) 2017 The Android Open Source Project
35349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld *
45349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Licensed under the Apache License, Version 2.0 (the "License");
55349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * you may not use this file except in compliance with the License.
65349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * You may obtain a copy of the License at
75349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld *
85349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld *      http://www.apache.org/licenses/LICENSE-2.0
95349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld *
105349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Unless required by applicable law or agreed to in writing, software
115349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * distributed under the License is distributed on an "AS IS" BASIS,
125349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
135349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * See the License for the specific language governing permissions and
145349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * limitations under the License.
155349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */
165349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
17ba069d50913c3fb250bb60ec310439db36895337Alan Viverettepackage androidx.lifecycle;
185349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
19ba069d50913c3fb250bb60ec310439db36895337Alan Viveretteimport androidx.annotation.NonNull;
20ba069d50913c3fb250bb60ec310439db36895337Alan Viveretteimport androidx.annotation.Nullable;
21ddee2b5170ae257a7b2494f8aaa8459ebed806dcAurimas Liutikasimport androidx.arch.core.executor.ArchTaskExecutor;
225349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
235349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Publisher;
245349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Subscriber;
255349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Subscription;
265349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
2705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescuimport java.util.concurrent.atomic.AtomicReference;
285349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
295349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld/**
305349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Adapts {@link LiveData} input and output to the ReactiveStreams spec.
315349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */
32459caadc8f6875fc78a36ae716193bf991f0808cSergey Vasilinets@SuppressWarnings("WeakerAccess")
335349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldpublic final class LiveDataReactiveStreams {
345349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    private LiveDataReactiveStreams() {
355349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    }
365349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
375349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    /**
385349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}.
395349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     *
405349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * <p>
415349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will
425349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * be able to let the library deal with backpressure using operators and not need to worry about
435349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * ever manually calling {@link Subscription#request}.
445349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     *
455349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * <p>
465349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * On subscription to the publisher, the observer will attach to the given {@link LiveData}.
475349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * Once {@link Subscription#request) is called on the subscription object, an observer will be
485349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an
495349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * unbounded stream with no backpressure. If request with a finite count reaches 0, the observer
505349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * will buffer the latest item and emit it to the subscriber when data is again requested. Any
515349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     * other items emitted during the time there was no backpressure requested will be dropped.
525349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     */
53b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton    @NonNull
545349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    public static <T> Publisher<T> toPublisher(
55b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton            @NonNull LifecycleOwner lifecycle, @NonNull LiveData<T> liveData) {
565349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
5705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        return new LiveDataPublisher<>(lifecycle, liveData);
5805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu    }
5905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
6005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu    private static final class LiveDataPublisher<T> implements Publisher<T> {
6105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        final LifecycleOwner mLifecycle;
6205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        final LiveData<T> mLiveData;
6305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
64b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton        LiveDataPublisher(LifecycleOwner lifecycle, LiveData<T> liveData) {
6505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            this.mLifecycle = lifecycle;
6605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            this.mLiveData = liveData;
6705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        }
6805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
6905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        @Override
7005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        public void subscribe(Subscriber<? super T> subscriber) {
7105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            subscriber.onSubscribe(new LiveDataSubscription<T>(subscriber, mLifecycle, mLiveData));
7205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        }
7305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
7405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        static final class LiveDataSubscription<T> implements Subscription, Observer<T> {
7505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            final Subscriber<? super T> mSubscriber;
7605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            final LifecycleOwner mLifecycle;
7705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            final LiveData<T> mLiveData;
7805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
7905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            volatile boolean mCanceled;
8005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            // used on main thread only
815349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            boolean mObserving;
825349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            long mRequested;
8305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            // used on main thread only
845349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            @Nullable
855349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            T mLatest;
865349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
8705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            LiveDataSubscription(final Subscriber<? super T> subscriber,
8805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    final LifecycleOwner lifecycle, final LiveData<T> liveData) {
8905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                this.mSubscriber = subscriber;
9005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                this.mLifecycle = lifecycle;
9105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                this.mLiveData = liveData;
9205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
9305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
9405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
95b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton            public void onChanged(@Nullable T t) {
9605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (mCanceled) {
9705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    return;
9805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                }
9905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (mRequested > 0) {
10005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    mLatest = null;
10105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    mSubscriber.onNext(t);
10205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    if (mRequested != Long.MAX_VALUE) {
10305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        mRequested--;
10405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    }
10505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                } else {
10605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    mLatest = t;
10705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                }
10805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
10905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
1105349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            @Override
11105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void request(final long n) {
11205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (mCanceled) {
11305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    return;
11405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                }
11505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
1165349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                    @Override
11705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    public void run() {
1185349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                        if (mCanceled) {
1195349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                            return;
1205349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                        }
12105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        if (n <= 0L) {
12205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mCanceled = true;
12305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            if (mObserving) {
12405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                mLiveData.removeObserver(LiveDataSubscription.this);
12505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                mObserving = false;
1265349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                            }
12705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mLatest = null;
12805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mSubscriber.onError(
12905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                    new IllegalArgumentException("Non-positive request"));
13005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            return;
1315349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                        }
1325349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
13305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        // Prevent overflowage.
13405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        mRequested = mRequested + n >= mRequested
13505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                ? mRequested + n : Long.MAX_VALUE;
13605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        if (!mObserving) {
13705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mObserving = true;
13805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mLiveData.observe(mLifecycle, LiveDataSubscription.this);
13905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        } else if (mLatest != null) {
14005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            onChanged(mLatest);
14105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mLatest = null;
1425349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                        }
1435349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                    }
14405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                });
14505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
1465349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
14705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
14805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void cancel() {
14905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (mCanceled) {
15005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    return;
15105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                }
15205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                mCanceled = true;
15305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
1545349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                    @Override
15505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    public void run() {
15605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        if (mObserving) {
15705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mLiveData.removeObserver(LiveDataSubscription.this);
15805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                            mObserving = false;
1597f1587498cc64d12ad79ccd343c1a0e83ae3863dJason Neufeld                        }
16005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        mLatest = null;
1615349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                    }
1625349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld                });
1635349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            }
16405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        }
1655349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    }
1665349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
1675349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    /**
168c57cff3120eb77417ceb2647b0345fbf505f9f7dJake Wharton     * Creates an observable {@link LiveData} stream from a ReactiveStreams {@link Publisher}}.
169f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
170f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
171f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
172f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
173f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
174f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * When the LiveData becomes inactive, the subscription is cleared.
175f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * LiveData holds the last value emitted by the Publisher when the LiveData was active.
176f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
177f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
178f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * added, it will automatically notify with the last value held in LiveData,
179f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * which might not be the last value emitted by the Publisher.
18005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * <p>
18105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * Note that LiveData does NOT handle errors and it expects that errors are treated as states
18205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * in the data that's held. In case of an error being emitted by the publisher, an error will
18305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * be propagated to the main thread and the app will crash.
184f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
185f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * @param <T> The type of data hold by this instance.
1865349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld     */
187b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton    @NonNull
188b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton    public static <T> LiveData<T> fromPublisher(@NonNull Publisher<T> publisher) {
189f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu        return new PublisherLiveData<>(publisher);
190f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu    }
1915349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
192f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu    /**
193f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * Defines a {@link LiveData} object that wraps a {@link Publisher}.
194f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
195f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
196f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * When the LiveData becomes active, it subscribes to the emissions from the Publisher.
197f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
198f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
199f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * When the LiveData becomes inactive, the subscription is cleared.
200f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * LiveData holds the last value emitted by the Publisher when the LiveData was active.
201f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * <p>
202f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is
203f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * added, it will automatically notify with the last value held in LiveData,
204f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * which might not be the last value emitted by the Publisher.
205f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     *
20605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * <p>
20705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * Note that LiveData does NOT handle errors and it expects that errors are treated as states
20805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * in the data that's held. In case of an error being emitted by the publisher, an error will
20905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     * be propagated to the main thread and the app will crash.
21005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu     *
211f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     * @param <T> The type of data hold by this instance.
212f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu     */
213f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu    private static class PublisherLiveData<T> extends LiveData<T> {
2142ef38b7a8103ff827d04d5c5f9c047f013646d68Jake Wharton        private final Publisher<T> mPublisher;
21505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        final AtomicReference<LiveDataSubscriber> mSubscriber;
216f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu
2172ef38b7a8103ff827d04d5c5f9c047f013646d68Jake Wharton        PublisherLiveData(@NonNull Publisher<T> publisher) {
218f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu            mPublisher = publisher;
21905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            mSubscriber = new AtomicReference<>();
220f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu        }
221f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu
222f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu        @Override
223f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu        protected void onActive() {
224f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu            super.onActive();
22505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            LiveDataSubscriber s = new LiveDataSubscriber();
22605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            mSubscriber.set(s);
22705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            mPublisher.subscribe(s);
22805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        }
229f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu
23005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        @Override
23105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        protected void onInactive() {
23205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            super.onInactive();
23305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            LiveDataSubscriber s = mSubscriber.getAndSet(null);
23405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            if (s != null) {
23505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                s.cancelSubscription();
23605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
23705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        }
2385349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
23905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu        final class LiveDataSubscriber extends AtomicReference<Subscription>
24005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                implements Subscriber<T> {
2415349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
24205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
24305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void onSubscribe(Subscription s) {
24405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (compareAndSet(null, s)) {
24505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    s.request(Long.MAX_VALUE);
24605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                } else {
24705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    s.cancel();
248f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu                }
24905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
2505349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld
25105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
25205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void onNext(T item) {
25305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                postValue(item);
25405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
255f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu
25605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
25705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void onError(final Throwable ex) {
25805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                mSubscriber.compareAndSet(this, null);
259f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu
26005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
26105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    @Override
26205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    public void run() {
26305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        // Errors should be handled upstream, so propagate as a crash.
26405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                        throw new RuntimeException("LiveData does not handle errors. Errors from "
26505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                + "publishers should be handled upstream and propagated as "
26605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                                + "state", ex);
267f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu                    }
26805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                });
26905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
27005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
27105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            @Override
27205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void onComplete() {
27305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                mSubscriber.compareAndSet(this, null);
27405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            }
27505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu
27605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu            public void cancelSubscription() {
27705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                Subscription s = get();
27805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                if (s != null) {
27905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu                    s.cancel();
280f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu                }
2815349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld            }
282f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu        }
2835349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld    }
2845349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld}
285