15349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld/* 25349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Copyright (C) 2017 The Android Open Source Project 35349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * 45349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Licensed under the Apache License, Version 2.0 (the "License"); 55349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * you may not use this file except in compliance with the License. 65349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * You may obtain a copy of the License at 75349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * 85349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * http://www.apache.org/licenses/LICENSE-2.0 95349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * 105349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Unless required by applicable law or agreed to in writing, software 115349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * distributed under the License is distributed on an "AS IS" BASIS, 125349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 135349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * See the License for the specific language governing permissions and 145349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * limitations under the License. 155349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */ 165349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 17ba069d50913c3fb250bb60ec310439db36895337Alan Viverettepackage androidx.lifecycle; 185349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 19ba069d50913c3fb250bb60ec310439db36895337Alan Viveretteimport androidx.annotation.NonNull; 20ba069d50913c3fb250bb60ec310439db36895337Alan Viveretteimport androidx.annotation.Nullable; 21ddee2b5170ae257a7b2494f8aaa8459ebed806dcAurimas Liutikasimport androidx.arch.core.executor.ArchTaskExecutor; 225349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 235349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Publisher; 245349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Subscriber; 255349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldimport org.reactivestreams.Subscription; 265349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 2705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescuimport java.util.concurrent.atomic.AtomicReference; 285349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 295349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld/** 305349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Adapts {@link LiveData} input and output to the ReactiveStreams spec. 315349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */ 32459caadc8f6875fc78a36ae716193bf991f0808cSergey Vasilinets@SuppressWarnings("WeakerAccess") 335349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeldpublic final class LiveDataReactiveStreams { 345349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld private LiveDataReactiveStreams() { 355349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 365349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 375349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld /** 385349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Adapts the given {@link LiveData} stream to a ReactiveStreams {@link Publisher}. 395349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * 405349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * <p> 415349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * By using a good publisher implementation such as RxJava 2.x Flowables, most consumers will 425349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * be able to let the library deal with backpressure using operators and not need to worry about 435349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * ever manually calling {@link Subscription#request}. 445349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * 455349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * <p> 465349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * On subscription to the publisher, the observer will attach to the given {@link LiveData}. 475349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * Once {@link Subscription#request) is called on the subscription object, an observer will be 485349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * connected to the data stream. Calling request(Long.MAX_VALUE) is equivalent to creating an 495349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * unbounded stream with no backpressure. If request with a finite count reaches 0, the observer 505349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * will buffer the latest item and emit it to the subscriber when data is again requested. Any 515349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld * other items emitted during the time there was no backpressure requested will be dropped. 525349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */ 53b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton @NonNull 545349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld public static <T> Publisher<T> toPublisher( 55b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton @NonNull LifecycleOwner lifecycle, @NonNull LiveData<T> liveData) { 565349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 5705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu return new LiveDataPublisher<>(lifecycle, liveData); 5805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 5905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 6005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu private static final class LiveDataPublisher<T> implements Publisher<T> { 6105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final LifecycleOwner mLifecycle; 6205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final LiveData<T> mLiveData; 6305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 64b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton LiveDataPublisher(LifecycleOwner lifecycle, LiveData<T> liveData) { 6505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu this.mLifecycle = lifecycle; 6605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu this.mLiveData = liveData; 6705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 6805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 6905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 7005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void subscribe(Subscriber<? super T> subscriber) { 7105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu subscriber.onSubscribe(new LiveDataSubscription<T>(subscriber, mLifecycle, mLiveData)); 7205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 7305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 7405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu static final class LiveDataSubscription<T> implements Subscription, Observer<T> { 7505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final Subscriber<? super T> mSubscriber; 7605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final LifecycleOwner mLifecycle; 7705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final LiveData<T> mLiveData; 7805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 7905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu volatile boolean mCanceled; 8005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu // used on main thread only 815349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld boolean mObserving; 825349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld long mRequested; 8305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu // used on main thread only 845349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld @Nullable 855349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld T mLatest; 865349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 8705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu LiveDataSubscription(final Subscriber<? super T> subscriber, 8805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final LifecycleOwner lifecycle, final LiveData<T> liveData) { 8905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu this.mSubscriber = subscriber; 9005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu this.mLifecycle = lifecycle; 9105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu this.mLiveData = liveData; 9205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 9305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 9405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 95b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton public void onChanged(@Nullable T t) { 9605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mCanceled) { 9705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu return; 9805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 9905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mRequested > 0) { 10005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLatest = null; 10105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber.onNext(t); 10205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mRequested != Long.MAX_VALUE) { 10305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mRequested--; 10405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 10505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } else { 10605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLatest = t; 10705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 10805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 10905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 1105349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld @Override 11105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void request(final long n) { 11205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mCanceled) { 11305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu return; 11405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 11505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() { 1165349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld @Override 11705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void run() { 1185349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld if (mCanceled) { 1195349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld return; 1205349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 12105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (n <= 0L) { 12205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mCanceled = true; 12305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mObserving) { 12405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLiveData.removeObserver(LiveDataSubscription.this); 12505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mObserving = false; 1265349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 12705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLatest = null; 12805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber.onError( 12905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu new IllegalArgumentException("Non-positive request")); 13005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu return; 1315349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 1325349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 13305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu // Prevent overflowage. 13405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mRequested = mRequested + n >= mRequested 13505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu ? mRequested + n : Long.MAX_VALUE; 13605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (!mObserving) { 13705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mObserving = true; 13805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLiveData.observe(mLifecycle, LiveDataSubscription.this); 13905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } else if (mLatest != null) { 14005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu onChanged(mLatest); 14105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLatest = null; 1425349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 1435349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 14405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu }); 14505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 1465349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 14705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 14805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void cancel() { 14905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mCanceled) { 15005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu return; 15105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 15205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mCanceled = true; 15305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() { 1545349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld @Override 15505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void run() { 15605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (mObserving) { 15705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLiveData.removeObserver(LiveDataSubscription.this); 15805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mObserving = false; 1597f1587498cc64d12ad79ccd343c1a0e83ae3863dJason Neufeld } 16005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mLatest = null; 1615349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 1625349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld }); 1635349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 16405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 1655349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 1665349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 1675349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld /** 168c57cff3120eb77417ceb2647b0345fbf505f9f7dJake Wharton * Creates an observable {@link LiveData} stream from a ReactiveStreams {@link Publisher}}. 169f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 170f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 171f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * When the LiveData becomes active, it subscribes to the emissions from the Publisher. 172f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 173f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 174f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * When the LiveData becomes inactive, the subscription is cleared. 175f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * LiveData holds the last value emitted by the Publisher when the LiveData was active. 176f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 177f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is 178f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * added, it will automatically notify with the last value held in LiveData, 179f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * which might not be the last value emitted by the Publisher. 18005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * <p> 18105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * Note that LiveData does NOT handle errors and it expects that errors are treated as states 18205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * in the data that's held. In case of an error being emitted by the publisher, an error will 18305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * be propagated to the main thread and the app will crash. 184f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 185f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * @param <T> The type of data hold by this instance. 1865349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld */ 187b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton @NonNull 188b70f749a999b642ca984c667cd6fa83e3206a48bJake Wharton public static <T> LiveData<T> fromPublisher(@NonNull Publisher<T> publisher) { 189f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu return new PublisherLiveData<>(publisher); 190f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 1915349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 192f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu /** 193f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * Defines a {@link LiveData} object that wraps a {@link Publisher}. 194f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 195f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 196f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * When the LiveData becomes active, it subscribes to the emissions from the Publisher. 197f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 198f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 199f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * When the LiveData becomes inactive, the subscription is cleared. 200f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * LiveData holds the last value emitted by the Publisher when the LiveData was active. 201f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * <p> 202f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * Therefore, in the case of a hot RxJava Observable, when a new LiveData {@link Observer} is 203f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * added, it will automatically notify with the last value held in LiveData, 204f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * which might not be the last value emitted by the Publisher. 205f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * 20605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * <p> 20705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * Note that LiveData does NOT handle errors and it expects that errors are treated as states 20805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * in the data that's held. In case of an error being emitted by the publisher, an error will 20905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * be propagated to the main thread and the app will crash. 21005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu * 211f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu * @param <T> The type of data hold by this instance. 212f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu */ 213f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu private static class PublisherLiveData<T> extends LiveData<T> { 2142ef38b7a8103ff827d04d5c5f9c047f013646d68Jake Wharton private final Publisher<T> mPublisher; 21505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final AtomicReference<LiveDataSubscriber> mSubscriber; 216f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu 2172ef38b7a8103ff827d04d5c5f9c047f013646d68Jake Wharton PublisherLiveData(@NonNull Publisher<T> publisher) { 218f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu mPublisher = publisher; 21905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber = new AtomicReference<>(); 220f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 221f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu 222f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu @Override 223f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu protected void onActive() { 224f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu super.onActive(); 22505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu LiveDataSubscriber s = new LiveDataSubscriber(); 22605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber.set(s); 22705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mPublisher.subscribe(s); 22805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 229f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu 23005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 23105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu protected void onInactive() { 23205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu super.onInactive(); 23305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu LiveDataSubscriber s = mSubscriber.getAndSet(null); 23405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (s != null) { 23505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu s.cancelSubscription(); 23605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 23705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 2385349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 23905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu final class LiveDataSubscriber extends AtomicReference<Subscription> 24005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu implements Subscriber<T> { 2415349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 24205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 24305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void onSubscribe(Subscription s) { 24405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (compareAndSet(null, s)) { 24505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu s.request(Long.MAX_VALUE); 24605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } else { 24705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu s.cancel(); 248f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 24905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 2505349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld 25105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 25205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void onNext(T item) { 25305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu postValue(item); 25405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 255f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu 25605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 25705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void onError(final Throwable ex) { 25805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber.compareAndSet(this, null); 259f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu 26005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() { 26105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 26205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void run() { 26305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu // Errors should be handled upstream, so propagate as a crash. 26405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu throw new RuntimeException("LiveData does not handle errors. Errors from " 26505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu + "publishers should be handled upstream and propagated as " 26605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu + "state", ex); 267f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 26805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu }); 26905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 27005d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 27105d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu @Override 27205d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void onComplete() { 27305d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu mSubscriber.compareAndSet(this, null); 27405d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu } 27505d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu 27605d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu public void cancelSubscription() { 27705d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu Subscription s = get(); 27805d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu if (s != null) { 27905d35760ac09f603c170c5065b9c0163077e5f05Florina Muntenescu s.cancel(); 280f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 2815349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 282f6d7c687c9e2635d441ff73430b86b8aa0d56476Florina Muntenescu } 2835349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld } 2845349f0eff19efb337fb4585a96449fca3a7300a6Jason Neufeld} 285