1/* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package androidx.room; 18 19import static org.hamcrest.MatcherAssert.assertThat; 20import static org.mockito.Matchers.any; 21import static org.mockito.Mockito.doAnswer; 22import static org.mockito.Mockito.mock; 23import static org.mockito.Mockito.never; 24import static org.mockito.Mockito.times; 25import static org.mockito.Mockito.verify; 26import static org.mockito.Mockito.when; 27 28import androidx.arch.core.executor.JunitTaskExecutorRule; 29 30import org.hamcrest.CoreMatchers; 31import org.junit.Before; 32import org.junit.Rule; 33import org.junit.Test; 34import org.junit.runner.RunWith; 35import org.junit.runners.JUnit4; 36import org.mockito.invocation.InvocationOnMock; 37import org.mockito.stubbing.Answer; 38 39import java.util.ArrayList; 40import java.util.Arrays; 41import java.util.HashSet; 42import java.util.List; 43import java.util.Set; 44import java.util.concurrent.Callable; 45import java.util.concurrent.atomic.AtomicReference; 46 47import io.reactivex.Flowable; 48import io.reactivex.annotations.NonNull; 49import io.reactivex.disposables.Disposable; 50import io.reactivex.functions.Consumer; 51import io.reactivex.subscribers.TestSubscriber; 52 53@RunWith(JUnit4.class) 54public class RxRoomTest { 55 @Rule 56 public JunitTaskExecutorRule mExecutor = new JunitTaskExecutorRule(1, false); 57 private RoomDatabase mDatabase; 58 private InvalidationTracker mInvalidationTracker; 59 private List<InvalidationTracker.Observer> mAddedObservers = new ArrayList<>(); 60 61 @Before 62 public void init() { 63 mDatabase = mock(RoomDatabase.class); 64 mInvalidationTracker = mock(InvalidationTracker.class); 65 when(mDatabase.getInvalidationTracker()).thenReturn(mInvalidationTracker); 66 doAnswer(new Answer() { 67 @Override 68 public Object answer(InvocationOnMock invocation) throws Throwable { 69 mAddedObservers.add((InvalidationTracker.Observer) invocation.getArguments()[0]); 70 return null; 71 } 72 }).when(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class)); 73 } 74 75 @Test 76 public void basicAddRemove() { 77 Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, "a", "b"); 78 verify(mInvalidationTracker, never()).addObserver(any(InvalidationTracker.Observer.class)); 79 Disposable disposable = flowable.subscribe(); 80 verify(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class)); 81 assertThat(mAddedObservers.size(), CoreMatchers.is(1)); 82 83 InvalidationTracker.Observer observer = mAddedObservers.get(0); 84 disposable.dispose(); 85 86 verify(mInvalidationTracker).removeObserver(observer); 87 88 disposable = flowable.subscribe(); 89 verify(mInvalidationTracker, times(2)) 90 .addObserver(any(InvalidationTracker.Observer.class)); 91 assertThat(mAddedObservers.size(), CoreMatchers.is(2)); 92 assertThat(mAddedObservers.get(1), CoreMatchers.not(CoreMatchers.sameInstance(observer))); 93 InvalidationTracker.Observer observer2 = mAddedObservers.get(1); 94 disposable.dispose(); 95 verify(mInvalidationTracker).removeObserver(observer2); 96 } 97 98 @Test 99 public void basicNotify() throws InterruptedException { 100 String[] tables = {"a", "b"}; 101 Set<String> tableSet = new HashSet<>(Arrays.asList(tables)); 102 Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, tables); 103 CountingConsumer consumer = new CountingConsumer(); 104 Disposable disposable = flowable.subscribe(consumer); 105 assertThat(mAddedObservers.size(), CoreMatchers.is(1)); 106 InvalidationTracker.Observer observer = mAddedObservers.get(0); 107 assertThat(consumer.mCount, CoreMatchers.is(1)); 108 observer.onInvalidated(tableSet); 109 assertThat(consumer.mCount, CoreMatchers.is(2)); 110 observer.onInvalidated(tableSet); 111 assertThat(consumer.mCount, CoreMatchers.is(3)); 112 disposable.dispose(); 113 observer.onInvalidated(tableSet); 114 assertThat(consumer.mCount, CoreMatchers.is(3)); 115 } 116 117 @Test 118 public void internalCallable() throws InterruptedException { 119 final AtomicReference<String> value = new AtomicReference<>(null); 120 String[] tables = {"a", "b"}; 121 Set<String> tableSet = new HashSet<>(Arrays.asList(tables)); 122 final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, tables, 123 new Callable<String>() { 124 @Override 125 public String call() throws Exception { 126 return value.get(); 127 } 128 }); 129 final CountingConsumer consumer = new CountingConsumer(); 130 flowable.subscribe(consumer); 131 InvalidationTracker.Observer observer = mAddedObservers.get(0); 132 drain(); 133 // no value because it is null 134 assertThat(consumer.mCount, CoreMatchers.is(0)); 135 value.set("bla"); 136 observer.onInvalidated(tableSet); 137 drain(); 138 // get value 139 assertThat(consumer.mCount, CoreMatchers.is(1)); 140 observer.onInvalidated(tableSet); 141 drain(); 142 // get value 143 assertThat(consumer.mCount, CoreMatchers.is(2)); 144 value.set(null); 145 observer.onInvalidated(tableSet); 146 drain(); 147 // no value 148 assertThat(consumer.mCount, CoreMatchers.is(2)); 149 } 150 151 private void drain() throws InterruptedException { 152 mExecutor.drainTasks(2); 153 } 154 155 @Test 156 public void exception() throws InterruptedException { 157 final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, new String[]{"a"}, 158 new Callable<String>() { 159 @Override 160 public String call() throws Exception { 161 throw new Exception("i want exception"); 162 } 163 }); 164 TestSubscriber<String> subscriber = new TestSubscriber<>(); 165 flowable.subscribe(subscriber); 166 drain(); 167 assertThat(subscriber.errorCount(), CoreMatchers.is(1)); 168 assertThat(subscriber.errors().get(0).getMessage(), CoreMatchers.is("i want exception")); 169 } 170 171 private static class CountingConsumer implements Consumer<Object> { 172 int mCount = 0; 173 174 @Override 175 public void accept(@NonNull Object o) throws Exception { 176 mCount++; 177 } 178 } 179} 180