1/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.lifecycle;
18
19import static org.hamcrest.CoreMatchers.is;
20import static org.hamcrest.MatcherAssert.assertThat;
21import static org.junit.Assert.assertEquals;
22import static org.junit.Assert.fail;
23
24import androidx.annotation.Nullable;
25import androidx.arch.core.executor.testing.InstantTaskExecutorRule;
26
27import org.junit.Before;
28import org.junit.Rule;
29import org.junit.Test;
30import org.junit.rules.TestRule;
31import org.reactivestreams.Subscriber;
32import org.reactivestreams.Subscription;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.Collections;
37import java.util.List;
38import java.util.concurrent.TimeUnit;
39
40import io.reactivex.Flowable;
41import io.reactivex.disposables.Disposable;
42import io.reactivex.functions.Consumer;
43import io.reactivex.processors.PublishProcessor;
44import io.reactivex.processors.ReplayProcessor;
45import io.reactivex.schedulers.TestScheduler;
46import io.reactivex.subjects.AsyncSubject;
47
48public class LiveDataReactiveStreamsTest {
49    @Rule public final TestRule instantTaskExecutorRule = new InstantTaskExecutorRule();
50
51    private LifecycleOwner mLifecycleOwner;
52
53    private final List<String> mLiveDataOutput = new ArrayList<>();
54    private final Observer<String> mObserver = new Observer<String>() {
55        @Override
56        public void onChanged(@Nullable String s) {
57            mLiveDataOutput.add(s);
58        }
59    };
60
61    private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create();
62
63    private static final TestScheduler sBackgroundScheduler = new TestScheduler();
64
65    @Before
66    public void init() {
67        mLifecycleOwner = new LifecycleOwner() {
68            LifecycleRegistry mRegistry = new LifecycleRegistry(this);
69            {
70                mRegistry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME);
71            }
72
73            @Override
74            public Lifecycle getLifecycle() {
75                return mRegistry;
76            }
77        };
78    }
79
80    @Test
81    public void convertsFromPublisher() {
82        PublishProcessor<String> processor = PublishProcessor.create();
83        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
84
85        liveData.observe(mLifecycleOwner, mObserver);
86
87        processor.onNext("foo");
88        processor.onNext("bar");
89        processor.onNext("baz");
90
91        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
92    }
93
94    @Test
95    public void convertsFromPublisherSubscribeWithDelay() {
96        PublishProcessor<String> processor = PublishProcessor.create();
97        processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
98        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
99
100        liveData.observe(mLifecycleOwner, mObserver);
101
102        processor.onNext("foo");
103        liveData.removeObserver(mObserver);
104        sBackgroundScheduler.triggerActions();
105        liveData.observe(mLifecycleOwner, mObserver);
106
107        processor.onNext("bar");
108        processor.onNext("baz");
109
110        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
111    }
112
113    @Test
114    public void convertsFromPublisherThrowsException() {
115        PublishProcessor<String> processor = PublishProcessor.create();
116        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
117
118        liveData.observe(mLifecycleOwner, mObserver);
119
120        IllegalStateException exception = new IllegalStateException("test exception");
121        try {
122            processor.onError(exception);
123            fail("Runtime Exception expected");
124        } catch (RuntimeException ex) {
125            assertEquals(ex.getCause(), exception);
126        }
127    }
128
129    @Test
130    public void convertsFromPublisherWithMultipleObservers() {
131        final List<String> output2 = new ArrayList<>();
132        PublishProcessor<String> processor = PublishProcessor.create();
133        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
134
135        liveData.observe(mLifecycleOwner, mObserver);
136
137        processor.onNext("foo");
138        processor.onNext("bar");
139
140        // The second observer should only get the newest value and any later values.
141        liveData.observe(mLifecycleOwner, new Observer<String>() {
142            @Override
143            public void onChanged(@Nullable String s) {
144                output2.add(s);
145            }
146        });
147
148        processor.onNext("baz");
149
150        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
151        assertThat(output2, is(Arrays.asList("bar", "baz")));
152    }
153
154    @Test
155    public void convertsFromPublisherWithMultipleObserversAfterInactive() {
156        final List<String> output2 = new ArrayList<>();
157        PublishProcessor<String> processor = PublishProcessor.create();
158        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
159
160        liveData.observe(mLifecycleOwner, mObserver);
161
162        processor.onNext("foo");
163        processor.onNext("bar");
164
165        // The second observer should only get the newest value and any later values.
166        liveData.observe(mLifecycleOwner, new Observer<String>() {
167            @Override
168            public void onChanged(@Nullable String s) {
169                output2.add(s);
170            }
171        });
172
173        liveData.removeObserver(mObserver);
174        processor.onNext("baz");
175
176        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar")));
177        assertThat(output2, is(Arrays.asList("bar", "baz")));
178    }
179
180    @Test
181    public void convertsFromPublisherAfterInactive() {
182        PublishProcessor<String> processor = PublishProcessor.create();
183        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
184
185        liveData.observe(mLifecycleOwner, mObserver);
186        processor.onNext("foo");
187        liveData.removeObserver(mObserver);
188        processor.onNext("bar");
189
190        liveData.observe(mLifecycleOwner, mObserver);
191        processor.onNext("baz");
192
193        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz")));
194    }
195
196    @Test
197    public void convertsFromPublisherManagesSubscriptions() {
198        PublishProcessor<String> processor = PublishProcessor.create();
199        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
200
201        assertThat(processor.hasSubscribers(), is(false));
202        liveData.observe(mLifecycleOwner, mObserver);
203
204        // once the live data is active, there's a subscriber
205        assertThat(processor.hasSubscribers(), is(true));
206
207        liveData.removeObserver(mObserver);
208        // once the live data is inactive, the subscriber is removed
209        assertThat(processor.hasSubscribers(), is(false));
210    }
211
212    @Test
213    public void convertsFromAsyncPublisher() {
214        Flowable<String> input = Flowable.just("foo")
215                .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler));
216        LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input);
217
218        liveData.observe(mLifecycleOwner, mObserver);
219
220        assertThat(mLiveDataOutput, is(Collections.singletonList("foo")));
221        sBackgroundScheduler.triggerActions();
222        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
223    }
224
225    @Test
226    public void convertsToPublisherWithSyncData() {
227        MutableLiveData<String> liveData = new MutableLiveData<>();
228        liveData.setValue("foo");
229        assertThat(liveData.getValue(), is("foo"));
230
231        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
232                .subscribe(mOutputProcessor);
233
234        liveData.setValue("bar");
235        liveData.setValue("baz");
236
237        assertThat(
238                mOutputProcessor.getValues(new String[]{}),
239                is(new String[]{"foo", "bar", "baz"}));
240    }
241
242    @Test
243    public void convertingToPublisherIsCancelable() {
244        MutableLiveData<String> liveData = new MutableLiveData<>();
245        liveData.setValue("foo");
246        assertThat(liveData.getValue(), is("foo"));
247
248        Disposable disposable = Flowable
249                .fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
250                .subscribe(new Consumer<String>() {
251                    @Override
252                    public void accept(String s) throws Exception {
253                        mLiveDataOutput.add(s);
254                    }
255                });
256
257        liveData.setValue("bar");
258        liveData.setValue("baz");
259
260        assertThat(liveData.hasObservers(), is(true));
261        disposable.dispose();
262
263        liveData.setValue("fizz");
264        liveData.setValue("buzz");
265
266        assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
267        // Canceling disposable should also remove livedata mObserver.
268        assertThat(liveData.hasObservers(), is(false));
269    }
270
271    @Test
272    public void convertsToPublisherWithBackpressure() {
273        MutableLiveData<String> liveData = new MutableLiveData<>();
274
275        final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create();
276
277        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
278                .subscribe(new Subscriber<String>() {
279                    @Override
280                    public void onSubscribe(Subscription s) {
281                        subscriptionSubject.onNext(s);
282                        subscriptionSubject.onComplete();
283                    }
284
285                    @Override
286                    public void onNext(String s) {
287                        mOutputProcessor.onNext(s);
288                    }
289
290                    @Override
291                    public void onError(Throwable t) {
292                        throw new RuntimeException(t);
293                    }
294
295                    @Override
296                    public void onComplete() {
297                    }
298                });
299
300        // Subscription should have happened synchronously. If it didn't, this will deadlock.
301        final Subscription subscription = subscriptionSubject.blockingSingle();
302
303        subscription.request(1);
304        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{}));
305
306        liveData.setValue("foo");
307        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
308
309        subscription.request(2);
310        liveData.setValue("baz");
311        liveData.setValue("fizz");
312
313        assertThat(
314                mOutputProcessor.getValues(new String[]{}),
315                is(new String[]{"foo", "baz", "fizz"}));
316
317        // 'nyan' will be dropped as there is nothing currently requesting a stream.
318        liveData.setValue("nyan");
319        liveData.setValue("cat");
320
321        assertThat(
322                mOutputProcessor.getValues(new String[]{}),
323                is(new String[]{"foo", "baz", "fizz"}));
324
325        // When a new request comes in, the latest value will be pushed.
326        subscription.request(1);
327        assertThat(
328                mOutputProcessor.getValues(new String[]{}),
329                is(new String[]{"foo", "baz", "fizz", "cat"}));
330    }
331
332    @Test
333    public void convertsToPublisherWithAsyncData() {
334        MutableLiveData<String> liveData = new MutableLiveData<>();
335
336        Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
337                .observeOn(sBackgroundScheduler)
338                .subscribe(mOutputProcessor);
339
340        liveData.setValue("foo");
341
342        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{}));
343        sBackgroundScheduler.triggerActions();
344        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
345
346        liveData.setValue("bar");
347        liveData.setValue("baz");
348
349        assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
350        sBackgroundScheduler.triggerActions();
351        assertThat(mOutputProcessor.getValues(
352                new String[]{}),
353                is(new String[]{"foo", "bar", "baz"}));
354    }
355}
356