| /* |
| * Copyright 2022 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package androidx.room.androidx.room.integration.kotlintestapp.test |
| |
| import androidx.paging.Pager |
| import androidx.paging.PagingState |
| import androidx.paging.rxjava2.RxPagingSource |
| import androidx.room.Room |
| import androidx.room.RoomDatabase |
| import androidx.room.androidx.room.integration.kotlintestapp.testutil.ItemStore |
| import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingDb |
| import androidx.room.androidx.room.integration.kotlintestapp.testutil.PagingEntity |
| import androidx.room.integration.kotlintestapp.test.CONFIG |
| import androidx.room.integration.kotlintestapp.test.createExpected |
| import androidx.room.integration.kotlintestapp.test.createItems |
| import androidx.test.core.app.ApplicationProvider |
| import androidx.test.ext.junit.runners.AndroidJUnit4 |
| import androidx.test.filters.SmallTest |
| import com.google.common.truth.Truth |
| import com.google.common.truth.Truth.assertThat |
| import io.reactivex.Single |
| import kotlin.test.assertFalse |
| import kotlin.test.assertTrue |
| import kotlinx.coroutines.CoroutineScope |
| import kotlinx.coroutines.Dispatchers |
| import kotlinx.coroutines.Job |
| import kotlinx.coroutines.cancel |
| import kotlinx.coroutines.cancelAndJoin |
| import kotlinx.coroutines.delay |
| import kotlinx.coroutines.flow.collectLatest |
| import kotlinx.coroutines.launch |
| import kotlinx.coroutines.runBlocking |
| import org.junit.After |
| import org.junit.Before |
| import org.junit.Test |
| import org.junit.runner.RunWith |
| |
| @RunWith(AndroidJUnit4::class) |
| @SmallTest |
| class Rx2PagingSourceTest { |
| private lateinit var coroutineScope: CoroutineScope |
| private lateinit var db: PagingDb |
| private lateinit var itemStore: ItemStore |
| |
| private val mainThreadQueries = mutableListOf<Pair<String, String>>() |
| private val pagingSources = mutableListOf<RxPagingSourceImpl>() |
| |
| @Before |
| fun init() { |
| coroutineScope = CoroutineScope(Dispatchers.Main) |
| itemStore = ItemStore(coroutineScope) |
| |
| val mainThread: Thread = runBlocking(Dispatchers.Main) { |
| Thread.currentThread() |
| } |
| db = Room.inMemoryDatabaseBuilder( |
| ApplicationProvider.getApplicationContext(), |
| PagingDb::class.java |
| ).setQueryCallback( |
| object : RoomDatabase.QueryCallback { |
| override fun onQuery(sqlQuery: String, bindArgs: List<Any?>) { |
| if (Thread.currentThread() === mainThread) { |
| mainThreadQueries.add( |
| sqlQuery to Throwable().stackTraceToString() |
| ) |
| } |
| } |
| } |
| ) { |
| // instantly execute the log callback so that we can check the thread. |
| it.run() |
| }.build() |
| } |
| |
| @After |
| fun tearDown() { |
| // Check no mainThread queries happened. |
| assertThat(mainThreadQueries).isEmpty() |
| coroutineScope.cancel() |
| pagingSources.clear() |
| } |
| |
| @Test |
| fun refresh_canceledCoroutine_disposesSingle() { |
| val items = createItems(startId = 0, count = 90) |
| db.getDao().insert(items) |
| |
| var isDisposed = false |
| val pager = Pager(CONFIG) { |
| val baseSource = db.getDao().loadItemsRx2() |
| RxPagingSourceImpl( |
| baseSource = baseSource, |
| initialLoadSingle = { params -> |
| baseSource.loadSingle(params) |
| // delay load for refresh so we have time to cancel load |
| .doOnSubscribe { Thread.sleep(500) } |
| .doOnSuccess { Truth.assertWithMessage("Should not succeed").fail() } |
| .doOnDispose { isDisposed = true } |
| }, |
| nonInitialLoadSingle = { params -> baseSource.loadSingle(params) }, |
| ).also { pagingSources.add(it) } |
| } |
| |
| runTest(pager) { collectionJob -> |
| // ensure initial load has started |
| assertFalse(collectionJob.start()) |
| // allow collection to start and return a single |
| delay(200) |
| |
| // make sure it progresses enough to have created the first single |
| assertThat(pagingSources.size).isEqualTo(1) |
| val pagingSource = pagingSources.first() |
| assertThat(pagingSource.singles.size).isEqualTo(1) |
| |
| assertFalse(isDisposed) |
| |
| collectionJob.cancelAndJoin() // this should dispose single |
| |
| assertTrue(isDisposed) |
| assertFalse(pagingSource.invalid) // paging source should still be valid though |
| } |
| } |
| |
| @Test |
| fun append_canceledCoroutine_disposesSingle() { |
| val items = createItems(startId = 0, count = 90) |
| db.getDao().insert(items) |
| |
| var isDisposed = false |
| val pager = Pager(CONFIG) { |
| val baseSource = db.getDao().loadItemsRx2() |
| RxPagingSourceImpl( |
| baseSource = baseSource, |
| initialLoadSingle = { params -> baseSource.loadSingle(params) }, |
| nonInitialLoadSingle = { params -> |
| baseSource.loadSingle(params) |
| // delay load for append/prepend so we have time to cancel load |
| .doOnSubscribe { Thread.sleep(500) } |
| .doOnSuccess { Truth.assertWithMessage("Should not succeed").fail() } |
| .doOnDispose { isDisposed = true } |
| }, |
| ).also { pagingSources.add(it) } |
| } |
| |
| runTest(pager) { collectionJob -> |
| // do initial load first |
| assertThat( |
| itemStore.awaitInitialLoad(2) |
| ).containsExactlyElementsIn( |
| items.createExpected( |
| fromIndex = 0, |
| toIndex = CONFIG.initialLoadSize |
| ) |
| ) |
| |
| // trigger an append and give it time to create second single |
| itemStore.get(30) |
| delay(200) |
| |
| // make sure it progresses enough to have created second single |
| assertThat(pagingSources.size).isEqualTo(1) |
| val pagingSource = pagingSources.first() |
| assertThat(pagingSource.singles.size).isEqualTo(2) |
| |
| assertFalse(isDisposed) |
| |
| collectionJob.cancelAndJoin() // this should now dispose second single |
| |
| assertTrue(isDisposed) |
| assertFalse(pagingSource.invalid) // paging source should still be valid though |
| } |
| } |
| |
| @Test |
| fun prepend_canceledCoroutine_disposesSingle() { |
| val items = createItems(startId = 0, count = 90) |
| db.getDao().insert(items) |
| |
| var isDisposed = false |
| val pager = Pager(config = CONFIG, initialKey = 50) { |
| val baseSource = db.getDao().loadItemsRx2() |
| RxPagingSourceImpl( |
| baseSource = baseSource, |
| initialLoadSingle = { params -> baseSource.loadSingle(params) }, |
| nonInitialLoadSingle = { params -> |
| baseSource.loadSingle(params) |
| // delay load for append/prepend so we have time to cancel load |
| .doOnSubscribe { Thread.sleep(500) } |
| .doOnSuccess { Truth.assertWithMessage("Should not succeed").fail() } |
| .doOnDispose { isDisposed = true } |
| }, |
| ).also { pagingSources.add(it) } |
| } |
| |
| runTest(pager) { collectionJob -> |
| // do initial load first |
| assertThat( |
| itemStore.awaitInitialLoad(2) |
| ).containsExactlyElementsIn( |
| items.createExpected( |
| fromIndex = 50, |
| toIndex = 50 + CONFIG.initialLoadSize |
| ) |
| ) |
| |
| // trigger a prepend and give it time to create second single |
| itemStore.get(30) |
| delay(200) |
| |
| // make sure it progresses enough to have created second single |
| assertThat(pagingSources.size).isEqualTo(1) |
| val pagingSource = pagingSources.first() |
| assertThat(pagingSource.singles.size).isEqualTo(2) |
| |
| assertFalse(isDisposed) |
| |
| collectionJob.cancelAndJoin() // this should now dispose second single |
| |
| assertTrue(isDisposed) |
| assertFalse(pagingSource.invalid) // paging source should still be valid though |
| } |
| } |
| |
| private fun runTest( |
| pager: Pager<Int, PagingEntity>, |
| block: suspend (Job) -> Unit |
| ) { |
| val collection = coroutineScope.launch(Dispatchers.Main) { |
| pager.flow.collectLatest { |
| itemStore.collectFrom(it) |
| } |
| } |
| runBlocking { |
| block(collection) |
| } |
| } |
| } |
| |
| private class RxPagingSourceImpl( |
| private val baseSource: RxPagingSource<Int, PagingEntity>, |
| private val initialLoadSingle: (LoadParams<Int>) -> Single<LoadResult<Int, PagingEntity>>, |
| private val nonInitialLoadSingle: (LoadParams<Int>) -> Single<LoadResult<Int, PagingEntity>>, |
| ) : RxPagingSource<Int, PagingEntity>() { |
| |
| val singles = mutableListOf<Single<LoadResult<Int, PagingEntity>>>() |
| |
| override fun getRefreshKey(state: PagingState<Int, PagingEntity>): Int? { |
| return baseSource.getRefreshKey(state) |
| } |
| |
| override fun loadSingle(params: LoadParams<Int>): Single<LoadResult<Int, PagingEntity>> { |
| return if (singles.isEmpty()) { |
| initialLoadSingle(params) |
| } else { |
| nonInitialLoadSingle(params) |
| }.also { singles.add(it) } |
| } |
| } |