[go: nahoru, domu]

blob: cedf80a2fa0ba4dbb71708d4eb818af9ad799a16 [file] [log] [blame]
/*
* Copyright 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.paging
import android.annotation.SuppressLint
import androidx.arch.core.executor.ArchTaskExecutor
import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.ObservableOnSubscribe
import io.reactivex.Scheduler
import io.reactivex.functions.Cancellable
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.rx2.SchedulerCoroutineDispatcher
import kotlinx.coroutines.rx2.asCoroutineDispatcher
import kotlinx.coroutines.withContext
/**
* Builder for `Observable<PagedList>` or `Flowable<PagedList>`, given a [DataSource.Factory] and a
* [PagedList.Config].
*
* The required parameters are in the constructor, so you can simply construct and build, or
* optionally enable extra features (such as initial load key, or BoundaryCallback).
*
* The returned observable/flowable will already be subscribed on the [setFetchScheduler], and will
* perform all loading on that scheduler. It will already be observed on [setNotifyScheduler], and
* will dispatch new PagedLists, as well as their updates to that scheduler.
*
* @param Key Type of input valued used to load data from the [DataSource]. Must be integer if
* you're using [PositionalDataSource].
* @param Value Item type being presented.
*
*/
class RxPagedListBuilder<Key : Any, Value : Any> {
private val pagingSourceFactory: (() -> PagingSource<Key, Value>)?
private val dataSourceFactory: DataSource.Factory<Key, Value>?
@Suppress("DEPRECATION")
private val config: PagedList.Config
private var initialLoadKey: Key? = null
@Suppress("DEPRECATION")
private var boundaryCallback: PagedList.BoundaryCallback<Value>? = null
private var notifyDispatcher: SchedulerCoroutineDispatcher? = null
private var notifyScheduler: Scheduler? = null
private var fetchDispatcher: SchedulerCoroutineDispatcher? = null
private var fetchScheduler: Scheduler? = null
/**
* Creates a [RxPagedListBuilder] with required parameters.
*
* @param pagingSourceFactory DataSource factory providing DataSource generations.
* @param config Paging configuration.
*/
@Deprecated(
message = "PagedList is deprecated and has been replaced by PagingData",
replaceWith = ReplaceWith(
"""PagingDataFlowable(
config = PagingConfig(
config.pageSize,
config.prefetchDistance,
config.enablePlaceholders,
config.initialLoadSizeHint,
config.maxSize
),
initialKey = null,
strategy = BackpressureStrategy.LATEST,
pagingSourceFactory = pagingSourceFactory
)""",
"androidx.paging.PagingConfig",
"androidx.paging.PagingDataFlowable",
"io.reactivex.BackpressureStrategy"
)
)
constructor(
pagingSourceFactory: () -> PagingSource<Key, Value>,
@Suppress("DEPRECATION") config: PagedList.Config
) {
this.pagingSourceFactory = pagingSourceFactory
this.dataSourceFactory = null
this.config = config
}
/**
* Creates a [RxPagedListBuilder] with required parameters.
*
* This method is a convenience for:
* ```
* RxPagedListBuilder(
* pagingSourceFactory,
* PagedList.Config.Builder().setPageSize(pageSize).build()
* )
* ```
*
* @param pagingSourceFactory [PagingSource] factory providing [PagingSource] generations.
* @param pageSize Size of pages to load.
*/
@Suppress("DEPRECATION")
@Deprecated(
message = "PagedList is deprecated and has been replaced by PagingData",
replaceWith = ReplaceWith(
"""PagingDataFlowable(
config = PagingConfig(pageSize),
initialKey = null,
strategy = BackpressureStrategy.LATEST,
pagingSourceFactory = pagingSourceFactory
)""",
"androidx.paging.PagingConfig",
"androidx.paging.PagingDataFlowable",
"io.reactivex.BackpressureStrategy"
)
)
constructor(pagingSourceFactory: () -> PagingSource<Key, Value>, pageSize: Int) : this(
pagingSourceFactory,
PagedList.Config.Builder().setPageSize(pageSize).build()
)
/**
* Creates a [RxPagedListBuilder] with required parameters.
*
* @param dataSourceFactory DataSource factory providing DataSource generations.
* @param config Paging configuration.
*/
@Deprecated(
message = "PagedList is deprecated and has been replaced by PagingData",
replaceWith = ReplaceWith(
"""PagingDataFlowable(
config = PagingConfig(
config.pageSize,
config.prefetchDistance,
config.enablePlaceholders,
config.initialLoadSizeHint,
config.maxSize
),
initialKey = null,
strategy = BackpressureStrategy.LATEST,
this.asPagingSourceFactory(Dispatchers.IO)
)""",
"androidx.paging.PagingConfig",
"androidx.paging.PagingDataFlowable",
"io.reactivex.BackpressureStrategy",
"kotlinx.coroutines.Dispatchers"
)
)
constructor(
dataSourceFactory: DataSource.Factory<Key, Value>,
@Suppress("DEPRECATION") config: PagedList.Config
) {
this.pagingSourceFactory = null
this.dataSourceFactory = dataSourceFactory
this.config = config
}
/**
* Creates a [RxPagedListBuilder] with required parameters.
*
* This method is a convenience for:
* ```
* RxPagedListBuilder(
* dataSourceFactory,
* PagedList.Config.Builder().setPageSize(pageSize).build()
* )
* ```
*
* @param dataSourceFactory [DataSource.Factory] providing DataSource generations.
* @param pageSize Size of pages to load.
*/
@Suppress("DEPRECATION")
@Deprecated(
message = "PagedList is deprecated and has been replaced by PagingData",
replaceWith = ReplaceWith(
"""PagingDataFlowable(
config = PagingConfig(pageSize),
initialKey = null,
strategy = BackpressureStrategy.LATEST,
this.asPagingSourceFactory(Dispatchers.IO)
)""",
"androidx.paging.PagingConfig",
"androidx.paging.PagingDataFlowable",
"io.reactivex.BackpressureStrategy",
"kotlinx.coroutines.Dispatchers"
)
)
constructor(
dataSourceFactory: DataSource.Factory<Key, Value>,
pageSize: Int
) : this(
dataSourceFactory,
PagedList.Config.Builder().setPageSize(pageSize).build()
)
/**
* First loading key passed to the first PagedList/DataSource.
*
* When a new PagedList/DataSource pair is created after the first, it acquires a load key from
* the previous generation so that data is loaded around the position already being observed.
*
* @param key Initial load key passed to the first PagedList/DataSource.
* @return this
*/
fun setInitialLoadKey(key: Key?) = apply {
initialLoadKey = key
}
/**
* Sets a [androidx.paging.PagedList.BoundaryCallback] on each PagedList created,
* typically used to load additional data from network when paging from local storage.
*
* Pass a BoundaryCallback to listen to when the PagedList runs out of data to load. If this
* method is not called, or `null` is passed, you will not be notified when each
* DataSource runs out of data to provide to its PagedList.
*
* If you are paging from a DataSource.Factory backed by local storage, you can set a
* BoundaryCallback to know when there is no more information to page from local storage.
* This is useful to page from the network when local storage is a cache of network data.
*
* Note that when using a BoundaryCallback with a `Observable<PagedList>`, method calls
* on the callback may be dispatched multiple times - one for each PagedList/DataSource
* pair. If loading network data from a BoundaryCallback, you should prevent multiple
* dispatches of the same method from triggering multiple simultaneous network loads.
*
* @param boundaryCallback The boundary callback for listening to PagedList load state.
* @return this
*/
fun setBoundaryCallback(
@Suppress("DEPRECATION") boundaryCallback: PagedList.BoundaryCallback<Value>?
) = apply { this.boundaryCallback = boundaryCallback }
/**
* Sets scheduler which will be used for observing new PagedLists, as well as loading updates
* within the PagedLists.
*
* If not set, defaults to the UI thread.
*
* The built [Observable] / [Flowable] will be observed on this scheduler, so that the thread
* receiving PagedLists will also receive the internal updates to the PagedList.
*
* @param scheduler Scheduler that receives PagedList updates, and where [PagedList.Callback]
* calls are dispatched. Generally, this is the UI/main thread.
* @return this
*/
fun setNotifyScheduler(scheduler: Scheduler) = apply {
notifyScheduler = scheduler
notifyDispatcher = scheduler.asCoroutineDispatcher()
}
/**
* Sets scheduler which will be used for background fetching of PagedLists, as well as on-demand
* fetching of pages inside.
*
* If not set, defaults to the Arch components I/O thread pool.
*
* The built [Observable] / [Flowable] will be subscribed on this scheduler.
*
* @param scheduler [Scheduler] used to fetch from DataSources, generally a background thread
* pool for e.g. I/O or network loading.
* @return this
*/
fun setFetchScheduler(scheduler: Scheduler) = apply {
fetchScheduler = scheduler
fetchDispatcher = scheduler.asCoroutineDispatcher()
}
/**
* Constructs a `Observable<PagedList>`.
*
* The returned Observable will already be observed on the [notifyScheduler], and subscribed on
* the [fetchScheduler].
*
* @return The [Observable] of PagedLists
*/
@Suppress("DEPRECATION")
fun buildObservable(): Observable<PagedList<Value>> {
@SuppressLint("RestrictedApi")
val notifyScheduler = notifyScheduler
?: ScheduledExecutor(ArchTaskExecutor.getMainThreadExecutor())
val notifyDispatcher = notifyDispatcher ?: notifyScheduler.asCoroutineDispatcher()
@SuppressLint("RestrictedApi")
val fetchScheduler = fetchScheduler
?: ScheduledExecutor(ArchTaskExecutor.getIOThreadExecutor())
val fetchDispatcher = fetchDispatcher ?: fetchScheduler.asCoroutineDispatcher()
val pagingSourceFactory = pagingSourceFactory
?: dataSourceFactory?.asPagingSourceFactory(fetchDispatcher)
check(pagingSourceFactory != null) {
"LivePagedList cannot be built without a PagingSourceFactory or DataSource.Factory"
}
return Observable
.create(
PagingObservableOnSubscribe(
initialLoadKey,
config,
boundaryCallback,
pagingSourceFactory,
notifyDispatcher,
fetchDispatcher
)
)
.observeOn(notifyScheduler)
.subscribeOn(fetchScheduler)
}
/**
* Constructs a `Flowable<PagedList>`.
*
* The returned Observable will already be observed on the [notifyScheduler], and subscribed on
* the [fetchScheduler].
*
* @param backpressureStrategy BackpressureStrategy for the [Flowable] to use.
* @return The [Flowable] of PagedLists
*/
@Suppress("DEPRECATION")
fun buildFlowable(backpressureStrategy: BackpressureStrategy): Flowable<PagedList<Value>> {
return buildObservable().toFlowable(backpressureStrategy)
}
@Suppress("DEPRECATION")
internal class PagingObservableOnSubscribe<Key : Any, Value : Any>(
initialLoadKey: Key?,
private val config: PagedList.Config,
private val boundaryCallback: PagedList.BoundaryCallback<Value>?,
private val pagingSourceFactory: () -> PagingSource<Key, Value>,
private val notifyDispatcher: CoroutineDispatcher,
private val fetchDispatcher: CoroutineDispatcher
) : ObservableOnSubscribe<PagedList<Value>>, Cancellable {
private var firstSubscribe = true
private var currentData: PagedList<Value>
private var currentJob: Job? = null
private lateinit var emitter: ObservableEmitter<PagedList<Value>>
private val callback = {
invalidate(true)
}
private val refreshRetryCallback = Runnable { invalidate(true) }
init {
currentData = InitialPagedList(
pagingSourceFactory(),
GlobalScope,
config,
initialLoadKey
)
currentData.setRetryCallback(refreshRetryCallback)
}
override fun subscribe(emitter: ObservableEmitter<PagedList<Value>>) {
this.emitter = emitter
emitter.setCancellable(this)
if (firstSubscribe) {
emitter.onNext(currentData)
firstSubscribe = false
}
invalidate(false)
}
override fun cancel() {
currentData.pagingSource.unregisterInvalidatedCallback(callback)
}
private fun invalidate(force: Boolean) {
// work is already ongoing, not forcing, so skip invalidate
if (currentJob != null && !force) return
currentJob?.cancel()
currentJob = GlobalScope.launch(fetchDispatcher) {
currentData.pagingSource.unregisterInvalidatedCallback(callback)
val pagingSource = pagingSourceFactory()
pagingSource.registerInvalidatedCallback(callback)
withContext(notifyDispatcher) {
currentData.setInitialLoadState(LoadType.REFRESH, LoadState.Loading)
}
@Suppress("UNCHECKED_CAST")
val lastKey = currentData.lastKey as Key?
val params = config.toRefreshLoadParams(lastKey)
when (val initialResult = pagingSource.load(params)) {
is PagingSource.LoadResult.Error -> {
currentData.setInitialLoadState(
LoadType.REFRESH,
LoadState.Error(initialResult.throwable)
)
}
is PagingSource.LoadResult.Page -> {
val pagedList = PagedList.create(
pagingSource,
initialResult,
GlobalScope,
notifyDispatcher,
fetchDispatcher,
boundaryCallback,
config,
lastKey
)
onItemUpdate(currentData, pagedList)
currentData = pagedList
emitter.onNext(pagedList)
}
}
}
}
private fun onItemUpdate(previous: PagedList<Value>, next: PagedList<Value>) {
previous.setRetryCallback(null)
next.setRetryCallback(refreshRetryCallback)
}
}
}