1/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.room;
18
19import static org.hamcrest.MatcherAssert.assertThat;
20import static org.mockito.Matchers.any;
21import static org.mockito.Mockito.doAnswer;
22import static org.mockito.Mockito.mock;
23import static org.mockito.Mockito.never;
24import static org.mockito.Mockito.times;
25import static org.mockito.Mockito.verify;
26import static org.mockito.Mockito.when;
27
28import androidx.arch.core.executor.JunitTaskExecutorRule;
29
30import org.hamcrest.CoreMatchers;
31import org.junit.Before;
32import org.junit.Rule;
33import org.junit.Test;
34import org.junit.runner.RunWith;
35import org.junit.runners.JUnit4;
36import org.mockito.invocation.InvocationOnMock;
37import org.mockito.stubbing.Answer;
38
39import java.util.ArrayList;
40import java.util.Arrays;
41import java.util.HashSet;
42import java.util.List;
43import java.util.Set;
44import java.util.concurrent.Callable;
45import java.util.concurrent.atomic.AtomicReference;
46
47import io.reactivex.Flowable;
48import io.reactivex.annotations.NonNull;
49import io.reactivex.disposables.Disposable;
50import io.reactivex.functions.Consumer;
51import io.reactivex.subscribers.TestSubscriber;
52
53@RunWith(JUnit4.class)
54public class RxRoomTest {
55    @Rule
56    public JunitTaskExecutorRule mExecutor = new JunitTaskExecutorRule(1, false);
57    private RoomDatabase mDatabase;
58    private InvalidationTracker mInvalidationTracker;
59    private List<InvalidationTracker.Observer> mAddedObservers = new ArrayList<>();
60
61    @Before
62    public void init() {
63        mDatabase = mock(RoomDatabase.class);
64        mInvalidationTracker = mock(InvalidationTracker.class);
65        when(mDatabase.getInvalidationTracker()).thenReturn(mInvalidationTracker);
66        doAnswer(new Answer() {
67            @Override
68            public Object answer(InvocationOnMock invocation) throws Throwable {
69                mAddedObservers.add((InvalidationTracker.Observer) invocation.getArguments()[0]);
70                return null;
71            }
72        }).when(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
73    }
74
75    @Test
76    public void basicAddRemove() {
77        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, "a", "b");
78        verify(mInvalidationTracker, never()).addObserver(any(InvalidationTracker.Observer.class));
79        Disposable disposable = flowable.subscribe();
80        verify(mInvalidationTracker).addObserver(any(InvalidationTracker.Observer.class));
81        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
82
83        InvalidationTracker.Observer observer = mAddedObservers.get(0);
84        disposable.dispose();
85
86        verify(mInvalidationTracker).removeObserver(observer);
87
88        disposable = flowable.subscribe();
89        verify(mInvalidationTracker, times(2))
90                .addObserver(any(InvalidationTracker.Observer.class));
91        assertThat(mAddedObservers.size(), CoreMatchers.is(2));
92        assertThat(mAddedObservers.get(1), CoreMatchers.not(CoreMatchers.sameInstance(observer)));
93        InvalidationTracker.Observer observer2 = mAddedObservers.get(1);
94        disposable.dispose();
95        verify(mInvalidationTracker).removeObserver(observer2);
96    }
97
98    @Test
99    public void basicNotify() throws InterruptedException {
100        String[] tables = {"a", "b"};
101        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
102        Flowable<Object> flowable = RxRoom.createFlowable(mDatabase, tables);
103        CountingConsumer consumer = new CountingConsumer();
104        Disposable disposable = flowable.subscribe(consumer);
105        assertThat(mAddedObservers.size(), CoreMatchers.is(1));
106        InvalidationTracker.Observer observer = mAddedObservers.get(0);
107        assertThat(consumer.mCount, CoreMatchers.is(1));
108        observer.onInvalidated(tableSet);
109        assertThat(consumer.mCount, CoreMatchers.is(2));
110        observer.onInvalidated(tableSet);
111        assertThat(consumer.mCount, CoreMatchers.is(3));
112        disposable.dispose();
113        observer.onInvalidated(tableSet);
114        assertThat(consumer.mCount, CoreMatchers.is(3));
115    }
116
117    @Test
118    public void internalCallable() throws InterruptedException {
119        final AtomicReference<String> value = new AtomicReference<>(null);
120        String[] tables = {"a", "b"};
121        Set<String> tableSet = new HashSet<>(Arrays.asList(tables));
122        final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, tables,
123                new Callable<String>() {
124                    @Override
125                    public String call() throws Exception {
126                        return value.get();
127                    }
128                });
129        final CountingConsumer consumer = new CountingConsumer();
130        flowable.subscribe(consumer);
131        InvalidationTracker.Observer observer = mAddedObservers.get(0);
132        drain();
133        // no value because it is null
134        assertThat(consumer.mCount, CoreMatchers.is(0));
135        value.set("bla");
136        observer.onInvalidated(tableSet);
137        drain();
138        // get value
139        assertThat(consumer.mCount, CoreMatchers.is(1));
140        observer.onInvalidated(tableSet);
141        drain();
142        // get value
143        assertThat(consumer.mCount, CoreMatchers.is(2));
144        value.set(null);
145        observer.onInvalidated(tableSet);
146        drain();
147        // no value
148        assertThat(consumer.mCount, CoreMatchers.is(2));
149    }
150
151    private void drain() throws InterruptedException {
152        mExecutor.drainTasks(2);
153    }
154
155    @Test
156    public void exception() throws InterruptedException {
157        final Flowable<String> flowable = RxRoom.createFlowable(mDatabase, new String[]{"a"},
158                new Callable<String>() {
159                    @Override
160                    public String call() throws Exception {
161                        throw new Exception("i want exception");
162                    }
163                });
164        TestSubscriber<String> subscriber = new TestSubscriber<>();
165        flowable.subscribe(subscriber);
166        drain();
167        assertThat(subscriber.errorCount(), CoreMatchers.is(1));
168        assertThat(subscriber.errors().get(0).getMessage(), CoreMatchers.is("i want exception"));
169    }
170
171    private static class CountingConsumer implements Consumer<Object> {
172        int mCount = 0;
173
174        @Override
175        public void accept(@NonNull Object o) throws Exception {
176            mCount++;
177        }
178    }
179}
180