1/* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package androidx.lifecycle; 18 19import static org.hamcrest.CoreMatchers.is; 20import static org.hamcrest.MatcherAssert.assertThat; 21import static org.junit.Assert.assertEquals; 22import static org.junit.Assert.fail; 23 24import androidx.annotation.Nullable; 25import androidx.arch.core.executor.testing.InstantTaskExecutorRule; 26 27import org.junit.Before; 28import org.junit.Rule; 29import org.junit.Test; 30import org.junit.rules.TestRule; 31import org.reactivestreams.Subscriber; 32import org.reactivestreams.Subscription; 33 34import java.util.ArrayList; 35import java.util.Arrays; 36import java.util.Collections; 37import java.util.List; 38import java.util.concurrent.TimeUnit; 39 40import io.reactivex.Flowable; 41import io.reactivex.disposables.Disposable; 42import io.reactivex.functions.Consumer; 43import io.reactivex.processors.PublishProcessor; 44import io.reactivex.processors.ReplayProcessor; 45import io.reactivex.schedulers.TestScheduler; 46import io.reactivex.subjects.AsyncSubject; 47 48public class LiveDataReactiveStreamsTest { 49 @Rule public final TestRule instantTaskExecutorRule = new InstantTaskExecutorRule(); 50 51 private LifecycleOwner mLifecycleOwner; 52 53 private final List<String> mLiveDataOutput = new ArrayList<>(); 54 private final Observer<String> mObserver = new Observer<String>() { 55 @Override 56 public void onChanged(@Nullable String s) { 57 mLiveDataOutput.add(s); 58 } 59 }; 60 61 private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create(); 62 63 private static final TestScheduler sBackgroundScheduler = new TestScheduler(); 64 65 @Before 66 public void init() { 67 mLifecycleOwner = new LifecycleOwner() { 68 LifecycleRegistry mRegistry = new LifecycleRegistry(this); 69 { 70 mRegistry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME); 71 } 72 73 @Override 74 public Lifecycle getLifecycle() { 75 return mRegistry; 76 } 77 }; 78 } 79 80 @Test 81 public void convertsFromPublisher() { 82 PublishProcessor<String> processor = PublishProcessor.create(); 83 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 84 85 liveData.observe(mLifecycleOwner, mObserver); 86 87 processor.onNext("foo"); 88 processor.onNext("bar"); 89 processor.onNext("baz"); 90 91 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); 92 } 93 94 @Test 95 public void convertsFromPublisherSubscribeWithDelay() { 96 PublishProcessor<String> processor = PublishProcessor.create(); 97 processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler); 98 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 99 100 liveData.observe(mLifecycleOwner, mObserver); 101 102 processor.onNext("foo"); 103 liveData.removeObserver(mObserver); 104 sBackgroundScheduler.triggerActions(); 105 liveData.observe(mLifecycleOwner, mObserver); 106 107 processor.onNext("bar"); 108 processor.onNext("baz"); 109 110 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz"))); 111 } 112 113 @Test 114 public void convertsFromPublisherThrowsException() { 115 PublishProcessor<String> processor = PublishProcessor.create(); 116 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 117 118 liveData.observe(mLifecycleOwner, mObserver); 119 120 IllegalStateException exception = new IllegalStateException("test exception"); 121 try { 122 processor.onError(exception); 123 fail("Runtime Exception expected"); 124 } catch (RuntimeException ex) { 125 assertEquals(ex.getCause(), exception); 126 } 127 } 128 129 @Test 130 public void convertsFromPublisherWithMultipleObservers() { 131 final List<String> output2 = new ArrayList<>(); 132 PublishProcessor<String> processor = PublishProcessor.create(); 133 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 134 135 liveData.observe(mLifecycleOwner, mObserver); 136 137 processor.onNext("foo"); 138 processor.onNext("bar"); 139 140 // The second observer should only get the newest value and any later values. 141 liveData.observe(mLifecycleOwner, new Observer<String>() { 142 @Override 143 public void onChanged(@Nullable String s) { 144 output2.add(s); 145 } 146 }); 147 148 processor.onNext("baz"); 149 150 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); 151 assertThat(output2, is(Arrays.asList("bar", "baz"))); 152 } 153 154 @Test 155 public void convertsFromPublisherWithMultipleObserversAfterInactive() { 156 final List<String> output2 = new ArrayList<>(); 157 PublishProcessor<String> processor = PublishProcessor.create(); 158 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 159 160 liveData.observe(mLifecycleOwner, mObserver); 161 162 processor.onNext("foo"); 163 processor.onNext("bar"); 164 165 // The second observer should only get the newest value and any later values. 166 liveData.observe(mLifecycleOwner, new Observer<String>() { 167 @Override 168 public void onChanged(@Nullable String s) { 169 output2.add(s); 170 } 171 }); 172 173 liveData.removeObserver(mObserver); 174 processor.onNext("baz"); 175 176 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar"))); 177 assertThat(output2, is(Arrays.asList("bar", "baz"))); 178 } 179 180 @Test 181 public void convertsFromPublisherAfterInactive() { 182 PublishProcessor<String> processor = PublishProcessor.create(); 183 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 184 185 liveData.observe(mLifecycleOwner, mObserver); 186 processor.onNext("foo"); 187 liveData.removeObserver(mObserver); 188 processor.onNext("bar"); 189 190 liveData.observe(mLifecycleOwner, mObserver); 191 processor.onNext("baz"); 192 193 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz"))); 194 } 195 196 @Test 197 public void convertsFromPublisherManagesSubscriptions() { 198 PublishProcessor<String> processor = PublishProcessor.create(); 199 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor); 200 201 assertThat(processor.hasSubscribers(), is(false)); 202 liveData.observe(mLifecycleOwner, mObserver); 203 204 // once the live data is active, there's a subscriber 205 assertThat(processor.hasSubscribers(), is(true)); 206 207 liveData.removeObserver(mObserver); 208 // once the live data is inactive, the subscriber is removed 209 assertThat(processor.hasSubscribers(), is(false)); 210 } 211 212 @Test 213 public void convertsFromAsyncPublisher() { 214 Flowable<String> input = Flowable.just("foo") 215 .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler)); 216 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input); 217 218 liveData.observe(mLifecycleOwner, mObserver); 219 220 assertThat(mLiveDataOutput, is(Collections.singletonList("foo"))); 221 sBackgroundScheduler.triggerActions(); 222 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); 223 } 224 225 @Test 226 public void convertsToPublisherWithSyncData() { 227 MutableLiveData<String> liveData = new MutableLiveData<>(); 228 liveData.setValue("foo"); 229 assertThat(liveData.getValue(), is("foo")); 230 231 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData)) 232 .subscribe(mOutputProcessor); 233 234 liveData.setValue("bar"); 235 liveData.setValue("baz"); 236 237 assertThat( 238 mOutputProcessor.getValues(new String[]{}), 239 is(new String[]{"foo", "bar", "baz"})); 240 } 241 242 @Test 243 public void convertingToPublisherIsCancelable() { 244 MutableLiveData<String> liveData = new MutableLiveData<>(); 245 liveData.setValue("foo"); 246 assertThat(liveData.getValue(), is("foo")); 247 248 Disposable disposable = Flowable 249 .fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData)) 250 .subscribe(new Consumer<String>() { 251 @Override 252 public void accept(String s) throws Exception { 253 mLiveDataOutput.add(s); 254 } 255 }); 256 257 liveData.setValue("bar"); 258 liveData.setValue("baz"); 259 260 assertThat(liveData.hasObservers(), is(true)); 261 disposable.dispose(); 262 263 liveData.setValue("fizz"); 264 liveData.setValue("buzz"); 265 266 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz"))); 267 // Canceling disposable should also remove livedata mObserver. 268 assertThat(liveData.hasObservers(), is(false)); 269 } 270 271 @Test 272 public void convertsToPublisherWithBackpressure() { 273 MutableLiveData<String> liveData = new MutableLiveData<>(); 274 275 final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create(); 276 277 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData)) 278 .subscribe(new Subscriber<String>() { 279 @Override 280 public void onSubscribe(Subscription s) { 281 subscriptionSubject.onNext(s); 282 subscriptionSubject.onComplete(); 283 } 284 285 @Override 286 public void onNext(String s) { 287 mOutputProcessor.onNext(s); 288 } 289 290 @Override 291 public void onError(Throwable t) { 292 throw new RuntimeException(t); 293 } 294 295 @Override 296 public void onComplete() { 297 } 298 }); 299 300 // Subscription should have happened synchronously. If it didn't, this will deadlock. 301 final Subscription subscription = subscriptionSubject.blockingSingle(); 302 303 subscription.request(1); 304 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{})); 305 306 liveData.setValue("foo"); 307 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"})); 308 309 subscription.request(2); 310 liveData.setValue("baz"); 311 liveData.setValue("fizz"); 312 313 assertThat( 314 mOutputProcessor.getValues(new String[]{}), 315 is(new String[]{"foo", "baz", "fizz"})); 316 317 // 'nyan' will be dropped as there is nothing currently requesting a stream. 318 liveData.setValue("nyan"); 319 liveData.setValue("cat"); 320 321 assertThat( 322 mOutputProcessor.getValues(new String[]{}), 323 is(new String[]{"foo", "baz", "fizz"})); 324 325 // When a new request comes in, the latest value will be pushed. 326 subscription.request(1); 327 assertThat( 328 mOutputProcessor.getValues(new String[]{}), 329 is(new String[]{"foo", "baz", "fizz", "cat"})); 330 } 331 332 @Test 333 public void convertsToPublisherWithAsyncData() { 334 MutableLiveData<String> liveData = new MutableLiveData<>(); 335 336 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData)) 337 .observeOn(sBackgroundScheduler) 338 .subscribe(mOutputProcessor); 339 340 liveData.setValue("foo"); 341 342 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{})); 343 sBackgroundScheduler.triggerActions(); 344 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"})); 345 346 liveData.setValue("bar"); 347 liveData.setValue("baz"); 348 349 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"})); 350 sBackgroundScheduler.triggerActions(); 351 assertThat(mOutputProcessor.getValues( 352 new String[]{}), 353 is(new String[]{"foo", "bar", "baz"})); 354 } 355} 356