[go: nahoru, domu]

Add Flow<PagedData<T>>.cached

This CL adds caching to Paging such that any flow of PagedData can
be cached in any layer, making any downstream share the same data.

The given scope keeps the PagedData alive meaning that it must be
cancelled, otherwise, PagedData will live in memory forever.

Since Flow library does not provide sharing functionality yet, this
CL uses a port from Dropbox/Store/Multiplexer. When Multiplexer
starts supporting keepAlive, we should start depending on it.
When Flow implements share, we should try to migrate if possible.

Currently, this CL indefinitely buffers page events. A followup
CL will include an intermediate class to flatten pages so that
we don't keep dropped pages in memory.

Bug: 146677974
Test: CachingTest
Change-Id: I12ae2125599315bf58f5fcde313f3d3611e90671
diff --git a/paging/common/api/3.0.0-alpha01.txt b/paging/common/api/3.0.0-alpha01.txt
index c32e398..d6597dd 100644
--- a/paging/common/api/3.0.0-alpha01.txt
+++ b/paging/common/api/3.0.0-alpha01.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -329,3 +333,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/api/current.txt b/paging/common/api/current.txt
index c32e398..d6597dd 100644
--- a/paging/common/api/current.txt
+++ b/paging/common/api/current.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -329,3 +333,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/api/public_plus_experimental_3.0.0-alpha01.txt b/paging/common/api/public_plus_experimental_3.0.0-alpha01.txt
index c32e398..d6597dd 100644
--- a/paging/common/api/public_plus_experimental_3.0.0-alpha01.txt
+++ b/paging/common/api/public_plus_experimental_3.0.0-alpha01.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -329,3 +333,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/api/public_plus_experimental_current.txt b/paging/common/api/public_plus_experimental_current.txt
index c32e398..d6597dd 100644
--- a/paging/common/api/public_plus_experimental_current.txt
+++ b/paging/common/api/public_plus_experimental_current.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -329,3 +333,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/api/restricted_3.0.0-alpha01.txt b/paging/common/api/restricted_3.0.0-alpha01.txt
index 5a9cfaa..9906480 100644
--- a/paging/common/api/restricted_3.0.0-alpha01.txt
+++ b/paging/common/api/restricted_3.0.0-alpha01.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -381,3 +385,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/api/restricted_current.txt b/paging/common/api/restricted_current.txt
index 5a9cfaa..9906480 100644
--- a/paging/common/api/restricted_current.txt
+++ b/paging/common/api/restricted_current.txt
@@ -1,6 +1,10 @@
 // Signature format: 3.0
 package androidx.paging {
 
+  public final class CachedPagedDataKt {
+    method public static <T> kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>> cachedIn(kotlinx.coroutines.flow.Flow<androidx.paging.PagedData<T>>, kotlinx.coroutines.CoroutineScope scope);
+  }
+
   public abstract class DataSource<Key, Value> {
     method @AnyThread public void addInvalidatedCallback(androidx.paging.DataSource.InvalidatedCallback onInvalidatedCallback);
     method @AnyThread public final void addInvalidatedCallback(kotlin.jvm.functions.Function0<kotlin.Unit> onInvalidatedCallback);
@@ -381,3 +385,10 @@
 
 }
 
+package androidx.paging.multicast {
+
+  public final class ChannelManagerKt {
+  }
+
+}
+
diff --git a/paging/common/src/main/kotlin/androidx/paging/CachedPagedData.kt b/paging/common/src/main/kotlin/androidx/paging/CachedPagedData.kt
new file mode 100644
index 0000000..caa5029
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/CachedPagedData.kt
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging
+
+import androidx.annotation.VisibleForTesting
+import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW
+import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW
+import androidx.paging.multicast.Multicaster
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.mapNotNull
+import kotlinx.coroutines.flow.onCompletion
+import kotlinx.coroutines.flow.onStart
+import kotlinx.coroutines.flow.scan
+
+private class MulticastedPagedData<T : Any>(
+    val scope: CoroutineScope,
+    val parent: PagedData<T>,
+    // used in tests
+    val tracker: ActiveFlowTracker? = null
+) {
+    // TODO: instead of relying on paging cache, use something like FlowList that can consalidate
+    //  multiple pages into 1 and also avoid caching dropped pages.
+    @ExperimentalCoroutinesApi
+    @FlowPreview
+    private val multicasted = Multicaster(
+        scope = scope,
+        bufferSize = 100_000,
+        piggybackingDownstream = false,
+        source = parent.flow.onStart {
+            tracker?.onStart(PAGE_EVENT_FLOW)
+        }.onCompletion {
+            tracker?.onComplete(PAGE_EVENT_FLOW)
+        },
+        >
+        keepUpstreamAlive = true
+    )
+
+    @FlowPreview
+    @ExperimentalCoroutinesApi
+    fun asPagedData() = PagedData(
+        flow = multicasted.flow,
+        hintReceiver = parent.hintReceiver
+    )
+
+    @FlowPreview
+    @ExperimentalCoroutinesApi
+    suspend fun close() = multicasted.close()
+}
+
+/**
+ * Caches the PagedData such that any downstream collection from this flow will share the same
+ * paging data.
+ *
+ * The flow is kept active as long as the given [scope] is active. To avoid leaks, make sure to
+ * use a [scope] that is already managed (like a ViewModel scope) or manually cancel it when you
+ * don't need paging anymore.
+ *
+ * A common use case for this caching is to cache PagedData in a ViewModel. This can ensure that,
+ * upon configuration change (e.g. rotation), then new Activity will receive the existing data
+ * immediately rather than fetching it from scratch.
+ *
+ * Note that this does not turn the `Flow<PagedData>` into a hot stream. It won't execute any
+ * unnecessary code unless it is being collected.
+ *
+ * ```
+ * class MyViewModel : ViewModel() {
+ *     val pagedData : Flow<PagedData<Item>> = PagedDataFlowBuilder(
+ *         pagedSourceFactory = <factory>,
+ *         config = <config>)
+ *     ).build()
+ *     .cached(viewModelScope)
+ * }
+ *
+ * class MyActivity : Activity() {
+ *     override fun onCreate() {
+ *         val pages = myViewModel.pagedData
+ *     }
+ * }
+ * ```
+ *
+ * @param scope The coroutine scope where this page cache will be kept alive.
+ */
+@ExperimentalCoroutinesApi
+@FlowPreview
+fun <T : Any> Flow<PagedData<T>>.cachedIn(
+    scope: CoroutineScope
+) = cachedIn(scope, null)
+
+@FlowPreview
+@ExperimentalCoroutinesApi
+internal fun <T : Any> Flow<PagedData<T>>.cachedIn(
+    scope: CoroutineScope,
+    // used in tests
+    tracker: ActiveFlowTracker? = null
+): Flow<PagedData<T>> {
+    val multicastedFlow = this.map {
+        MulticastedPagedData(
+            scope = scope,
+            parent = it
+        )
+    }.scan(null as MulticastedPagedData<T>?) { prev, next ->
+        prev?.close()
+        next
+    }.mapNotNull {
+        it?.asPagedData()
+    }.onStart {
+        tracker?.onStart(PAGED_DATA_FLOW)
+    }.onCompletion {
+        tracker?.onComplete(PAGED_DATA_FLOW)
+    }
+    return Multicaster(
+        scope = scope,
+        bufferSize = 1,
+        source = multicastedFlow,
+        >
+        keepUpstreamAlive = true
+    ).flow
+}
+
+/**
+ * This is only used for testing to ensure we don't leak resources
+ */
+@VisibleForTesting
+internal interface ActiveFlowTracker {
+    suspend fun onStart(flowType: FlowType)
+    suspend fun onComplete(flowType: FlowType)
+
+    enum class FlowType {
+        PAGED_DATA_FLOW,
+        PAGE_EVENT_FLOW
+    }
+}
\ No newline at end of file
diff --git a/paging/common/src/main/kotlin/androidx/paging/Pager.kt b/paging/common/src/main/kotlin/androidx/paging/Pager.kt
index 84fdadd..67652f4 100644
--- a/paging/common/src/main/kotlin/androidx/paging/Pager.kt
+++ b/paging/common/src/main/kotlin/androidx/paging/Pager.kt
@@ -47,6 +47,7 @@
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.sync.Mutex
 import kotlinx.coroutines.sync.withLock
+import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * Holds a generation of pageable data, a snapshot of data loaded by [PagedSource]. An instance
@@ -71,8 +72,11 @@
         lastHint = hint
         hintChannel.offer(hint)
     }
-
+    private val created = AtomicBoolean(false)
     fun create(): Flow<PageEvent<Value>> = channelFlow {
+        check(created.compareAndSet(false, true)) {
+            "cannot collect twice from pager"
+        }
         launch { pageEventCh.consumeAsFlow().collect { send(it) } }
         state.doInitialLoad()
 
diff --git a/paging/common/src/main/kotlin/androidx/paging/multicast/ChannelManager.kt b/paging/common/src/main/kotlin/androidx/paging/multicast/ChannelManager.kt
new file mode 100644
index 0000000..c32d277
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/multicast/ChannelManager.kt
@@ -0,0 +1,398 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging.multicast
+
+import androidx.paging.multicast.ChannelManager.Message
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.flow.Flow
+import java.util.ArrayDeque
+import java.util.Collections
+
+/**
+ * Tracks active downstream channels and dispatches incoming upstream values to each of them in
+ * parallel. The upstream is suspended after producing a value until at least one of the downstreams
+ * acknowledges receiving it via [Message.Dispatch.Value.delivered].
+ *
+ * The [ChannelManager] will start the upstream from the given [upstream] [Flow] if there
+ * is no active upstream and there's at least one downstream that has not received a value.
+ *
+ */
+@ExperimentalCoroutinesApi
+internal class ChannelManager<T>(
+    /**
+     * The scope in which ChannelManager actor runs
+     */
+    private val scope: CoroutineScope,
+    /**
+     * The buffer size that is used while the upstream is active
+     */
+    private val bufferSize: Int,
+    /**
+     * If true, downstream is never closed by the ChannelManager unless upstream throws an error.
+     * Instead, it is kept open and if a new downstream shows up that causes us to restart the flow,
+     * it will receive values as well.
+     */
+    private val piggybackingDownstream: Boolean = false,
+    /**
+     * Called when a value is dispatched
+     */
+    private val onEach: suspend (T) -> Unit,
+
+    private val keepUpstreamAlive: Boolean = false,
+
+    private val upstream: Flow<T>
+) {
+
+    suspend fun addDownstream(channel: SendChannel<Message.Dispatch.Value<T>>) =
+        actor.send(Message.AddChannel(channel))
+
+    suspend fun removeDownstream(channel: SendChannel<Message.Dispatch.Value<T>>) =
+        actor.send(Message.RemoveChannel(channel))
+
+    suspend fun close() = actor.close()
+
+    private val actor = Actor()
+
+    /**
+     * Actor that does all the work. Any state and functionality should go here.
+     */
+    private inner class Actor : StoreRealActor<Message<T>>(scope) {
+
+        private val buffer = Buffer<T>(bufferSize)
+        /**
+         * The current producer
+         */
+        private var producer: SharedFlowProducer<T>? = null
+
+        /**
+         * Tracks whether we've ever dispatched value or error from the current producer.
+         * Reset when producer finishes.
+         */
+        private var dispatchedValue: Boolean = false
+
+        /**
+         * The ack for the very last message we've delivered.
+         * When a new downstream comes and buffer is 0, we ack this message so that new downstream
+         * can immediately start receiving values instead of waiting for values that it'll never
+         * receive.
+         */
+        private var lastDeliveryAck: CompletableDeferred<Unit>? = null
+
+        /**
+         * List of downstream collectors.
+         */
+        private val channels = mutableListOf<ChannelEntry<T>>()
+
+        override suspend fun handle(msg: Message<T>) {
+            when (msg) {
+                is Message.AddChannel -> doAdd(msg)
+                is Message.RemoveChannel -> doRemove(msg.channel)
+                is Message.Dispatch.Value -> doDispatchValue(msg)
+                is Message.Dispatch.Error -> doDispatchError(msg)
+                is Message.Dispatch.UpstreamFinished -> doHandleUpstreamClose(msg.producer)
+            }
+        }
+
+        /**
+         * Called when the channel manager is active (e.g. it has downstream collectors and needs a
+         * producer)
+         */
+        private fun newProducer() =
+            SharedFlowProducer(scope, upstream, ::send)
+
+        /**
+         * We are closing. Do a cleanup on existing channels where we'll close them and also decide
+         * on the list of leftovers.
+         */
+        private fun doHandleUpstreamClose(producer: SharedFlowProducer<T>?) {
+            if (this.producer !== producer) {
+                return
+            }
+            val piggyBacked = mutableListOf<ChannelEntry<T>>()
+            val leftovers = mutableListOf<ChannelEntry<T>>()
+            channels.forEach {
+                when {
+                    it.receivedValue -> {
+                        if (!piggybackingDownstream) {
+                            it.close()
+                        } else {
+                            piggyBacked.add(it)
+                        }
+                    }
+                    dispatchedValue ->
+                        // we dispatched a value but this channel didn't receive so put it into
+                        // leftovers
+                        leftovers.add(it)
+                    else -> { // upstream didn't dispatch
+                        if (!piggybackingDownstream) {
+                            it.close()
+                        } else {
+                            piggyBacked.add(it)
+                        }
+                    }
+                }
+            }
+            channels.clear() // empty references
+            channels.addAll(leftovers)
+            channels.addAll(piggyBacked)
+            this.producer = null
+            // we only reactivate if leftovers is not empty
+            if (leftovers.isNotEmpty()) {
+                activateIfNecessary()
+            }
+        }
+
+        override fun onClosed() {
+            channels.forEach {
+                it.close()
+            }
+            channels.clear()
+            producer?.cancel()
+        }
+
+        /**
+         * Dispatch value to all downstream collectors.
+         */
+        private suspend fun doDispatchValue(msg: Message.Dispatch.Value<T>) {
+            onEach(msg.value)
+            buffer.add(msg)
+            dispatchedValue = true
+            if (buffer.isEmpty()) {
+                // if a new downstream arrives, we need to ack this so that it won't wait for
+                // values that it'll never receive
+                lastDeliveryAck = msg.delivered
+            }
+            channels.forEach {
+                it.dispatchValue(msg)
+            }
+        }
+
+        /**
+         * Dispatch an upstream error to downstream collectors.
+         */
+        private fun doDispatchError(msg: Message.Dispatch.Error<T>) {
+            // dispatching error is as good as dispatching value
+            dispatchedValue = true
+            channels.forEach {
+                it.dispatchError(msg.error)
+            }
+        }
+
+        /**
+         * Remove a downstream collector.
+         */
+        private suspend fun doRemove(channel: SendChannel<Message.Dispatch.Value<T>>) {
+            val index = channels.indexOfFirst {
+                it.hasChannel(channel)
+            }
+            if (index >= 0) {
+                channels.removeAt(index)
+                if (channels.isEmpty() && !keepUpstreamAlive) {
+                    producer?.cancelAndJoin()
+                }
+            }
+        }
+
+        /**
+         * Add a new downstream collector
+         */
+        private suspend fun doAdd(msg: Message.AddChannel<T>) {
+            addEntry(
+                entry = ChannelEntry(
+                    channel = msg.channel
+                )
+            )
+            activateIfNecessary()
+        }
+
+        private fun activateIfNecessary() {
+            if (producer == null) {
+                producer = newProducer()
+                dispatchedValue = false
+                producer!!.start()
+            }
+        }
+
+        /**
+         * Internally add the new downstream collector to our list, send it anything buffered.
+         */
+        private suspend fun addEntry(entry: ChannelEntry<T>) {
+            val new = channels.none {
+                it.hasChannel(entry)
+            }
+            check(new) {
+                "$entry is already in the list."
+            }
+            check(!entry.receivedValue) {
+                "$entry already received a value"
+            }
+            channels.add(entry)
+            if (buffer.items.isNotEmpty()) {
+                // if there is anything in the buffer, send it
+                buffer.items.forEach {
+                    entry.dispatchValue(it)
+                }
+            } else {
+                // unlock upstream since we now have a downstream that needs values
+                lastDeliveryAck?.complete(Unit)
+            }
+        }
+    }
+
+    /**
+     * Holder for each downstream collector
+     */
+    internal data class ChannelEntry<T>(
+        /**
+         * The channel used by the collector
+         */
+        private val channel: SendChannel<Message.Dispatch.Value<T>>,
+        /**
+         * Tracking whether we've ever dispatched a value or an error to downstream
+         */
+        private var _receivedValue: Boolean = false
+    ) {
+        val receivedValue
+            get() = _receivedValue
+
+        suspend fun dispatchValue(value: Message.Dispatch.Value<T>) {
+            _receivedValue = true
+            channel.send(value)
+        }
+
+        fun dispatchError(error: Throwable) {
+            _receivedValue = true
+            channel.close(error)
+        }
+
+        fun close() {
+            channel.close()
+        }
+
+        fun hasChannel(channel: SendChannel<Message.Dispatch.Value<T>>) = this.channel === channel
+
+        fun hasChannel(entry: ChannelEntry<T>) = this.channel === entry.channel
+    }
+
+    /**
+     * Messages accepted by the [ChannelManager].
+     */
+    sealed class Message<T> {
+        /**
+         * Add a new channel, that means a new downstream subscriber
+         */
+        class AddChannel<T>(
+            val channel: SendChannel<Dispatch.Value<T>>
+        ) : Message<T>()
+
+        /**
+         * Remove a downstream subscriber, that means it completed
+         */
+        class RemoveChannel<T>(val channel: SendChannel<Dispatch.Value<T>>) : Message<T>()
+
+        sealed class Dispatch<T> : Message<T>() {
+            /**
+             * Upstream dispatched a new value, send it to all downstream items
+             */
+            class Value<T>(
+                /**
+                 * The value dispatched by the upstream
+                 */
+                val value: T,
+                /**
+                 * Ack that is completed by all receiver. Upstream producer will await this before asking
+                 * for a new value from upstream
+                 */
+                val delivered: CompletableDeferred<Unit>
+            ) : Dispatch<T>()
+
+            /**
+             * Upstream dispatched an error, send it to all downstream items
+             */
+            class Error<T>(
+                /**
+                 * The error sent by the upstream
+                 */
+                val error: Throwable
+            ) : Dispatch<T>()
+
+            class UpstreamFinished<T>(
+                /**
+                 * SharedFlowProducer finished emitting
+                 */
+                val producer: SharedFlowProducer<T>
+            ) : Dispatch<T>()
+        }
+    }
+}
+
+/**
+ * Buffer implementation for any late arrivals.
+ */
+@ExperimentalCoroutinesApi
+private interface Buffer<T> {
+    fun add(item: ChannelManager.Message.Dispatch.Value<T>)
+    fun isEmpty() = items.isEmpty()
+    val items: Collection<ChannelManager.Message.Dispatch.Value<T>>
+}
+
+/**
+ * Default implementation of buffer which does not buffer anything.
+ */
+@ExperimentalCoroutinesApi
+private class NoBuffer<T> : Buffer<T> {
+    override val items: Collection<ChannelManager.Message.Dispatch.Value<T>>
+        get() = Collections.emptyList()
+
+    // ignore
+    override fun add(item: ChannelManager.Message.Dispatch.Value<T>) = Unit
+}
+
+/**
+ * Create a new buffer insteance based on the provided limit.
+ */
+@Suppress("FunctionName")
+@ExperimentalCoroutinesApi
+private fun <T> Buffer(limit: Int): Buffer<T> = if (limit > 0) {
+    BufferImpl(limit)
+} else {
+    NoBuffer()
+}
+
+/**
+ * A real buffer implementation that has a FIFO queue.
+ */
+@ExperimentalCoroutinesApi
+private class BufferImpl<T>(private val limit: Int) :
+    Buffer<T> {
+    override val items =
+        ArrayDeque<ChannelManager.Message.Dispatch.Value<T>>(limit.coerceAtMost(10))
+
+    override fun add(item: ChannelManager.Message.Dispatch.Value<T>) {
+        while (items.size >= limit) {
+            items.pollFirst()
+        }
+        items.offerLast(item)
+    }
+}
+
+@ExperimentalCoroutinesApi
+internal fun <T> ChannelManager.Message.Dispatch.Value<T>.markDelivered() =
+    delivered.complete(Unit)
diff --git a/paging/common/src/main/kotlin/androidx/paging/multicast/Multicaster.kt b/paging/common/src/main/kotlin/androidx/paging/multicast/Multicaster.kt
new file mode 100644
index 0000000..1556137
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/multicast/Multicaster.kt
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging.multicast
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.flow.emitAll
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.onCompletion
+import kotlinx.coroutines.flow.onStart
+import kotlinx.coroutines.flow.transform
+
+/**
+ * Like a publish, shares 1 upstream value with multiple downstream receiver.
+ * It has one store specific behavior where upstream flow is suspended until at least 1 downstream
+ * flow emits the value to ensure we don't abuse the upstream flow of downstream cannot keep up.
+ */
+@FlowPreview
+@ExperimentalCoroutinesApi
+internal class Multicaster<T>(
+    /**
+     * The [CoroutineScope] to use for upstream subscription
+     */
+    private val scope: CoroutineScope,
+    /**
+     * The buffer size that is used only if the upstream has not complete yet.
+     * Defaults to 0.
+     */
+    bufferSize: Int = 0,
+    /**
+     * Source function to create a new flow when necessary.
+     */
+    private val source: Flow<T>,
+
+    /**
+     * If true, downstream is never closed by the multicaster unless upstream throws an error.
+     * Instead, it is kept open and if a new downstream shows up that causes us to restart the flow,
+     * it will receive values as well.
+     */
+    private val piggybackingDownstream: Boolean = false,
+    /**
+     * Called when upstream dispatches a value.
+     */
+    private val onEach: suspend (T) -> Unit,
+
+    private val keepUpstreamAlive: Boolean = false
+) {
+
+    private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
+        ChannelManager(
+            scope = scope,
+            bufferSize = bufferSize,
+            upstream = source,
+            piggybackingDownstream = piggybackingDownstream,
+            >
+            keepUpstreamAlive = keepUpstreamAlive
+        )
+    }
+
+    val flow = flow<T> {
+        val channel = Channel<ChannelManager.Message.Dispatch.Value<T>>(Channel.UNLIMITED)
+        val subFlow = channel.consumeAsFlow()
+            .onStart {
+                channelManager.addDownstream(channel)
+            }
+            .transform {
+                emit(it.value)
+                it.delivered.complete(Unit)
+            }.onCompletion {
+                channelManager.removeDownstream(channel)
+            }
+        emitAll(subFlow)
+    }
+
+    suspend fun close() {
+        channelManager.close()
+    }
+}
diff --git a/paging/common/src/main/kotlin/androidx/paging/multicast/README.md b/paging/common/src/main/kotlin/androidx/paging/multicast/README.md
new file mode 100644
index 0000000..0317847
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/multicast/README.md
@@ -0,0 +1,2 @@
+this is copied from dropbox/store.
+We should make this a real dependency when Store Multicast has proper keep alive behavior.
\ No newline at end of file
diff --git a/paging/common/src/main/kotlin/androidx/paging/multicast/SharedFlowProducer.kt b/paging/common/src/main/kotlin/androidx/paging/multicast/SharedFlowProducer.kt
new file mode 100644
index 0000000..3c1a6ca
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/multicast/SharedFlowProducer.kt
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package androidx.paging.multicast
+
+import androidx.paging.multicast.ChannelManager.Message.Dispatch.UpstreamFinished
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.channels.ClosedSendChannelException
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.catch
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.launch
+
+/**
+ * A flow collector that works with a [ChannelManager] to collect values from an upstream flow
+ * and dispatch to the [sendUpsteamMessage] which then dispatches to downstream collectors.
+ *
+ * They work in sync such that this producer always expects an ack from the [ChannelManager] after
+ * sending an event.
+ *
+ * Cancellation of the collection might be triggered by both this producer (e.g. upstream completes)
+ * or the [ChannelManager] (e.g. all active collectors complete).
+ */
+@ExperimentalCoroutinesApi
+internal class SharedFlowProducer<T>(
+    private val scope: CoroutineScope,
+    private val src: Flow<T>,
+    private val sendUpsteamMessage: suspend (ChannelManager.Message.Dispatch<T>) -> Unit
+) {
+    private lateinit var collectionJob: Job
+
+    /**
+     * Starts the collection of the upstream flow.
+     */
+    fun start() {
+        scope.launch {
+            try {
+                // launch again to track the collection job
+                collectionJob = scope.launch {
+                    try {
+                        src.catch {
+                            sendUpsteamMessage(ChannelManager.Message.Dispatch.Error(it))
+                        }.collect {
+                            val ack = CompletableDeferred<Unit>()
+                            sendUpsteamMessage(
+                                ChannelManager.Message.Dispatch.Value(
+                                    it,
+                                    ack
+                                )
+                            )
+                            // suspend until at least 1 receives the new value
+                            ack.await()
+                        }
+                    } catch (closed: ClosedSendChannelException) {
+                        // ignore. if consumers are gone, it might close itself.
+                    }
+                }
+                // wait until collection ends, either due to an error or ordered by the channel
+                // manager
+                collectionJob.join()
+            } finally {
+                // cleanup the channel manager so that downstreams can be closed if they are not
+                // closed already and leftovers can be moved to a new producer if necessary.
+                try {
+                    sendUpsteamMessage(UpstreamFinished(this@SharedFlowProducer))
+                } catch (closed: ClosedSendChannelException) {
+                    // it might close before us, its fine.
+                }
+            }
+        }
+    }
+
+    suspend fun cancelAndJoin() {
+        collectionJob.cancelAndJoin()
+    }
+
+    fun cancel() {
+        collectionJob.cancel()
+    }
+}
diff --git a/paging/common/src/main/kotlin/androidx/paging/multicast/StoreRealActor.kt b/paging/common/src/main/kotlin/androidx/paging/multicast/StoreRealActor.kt
new file mode 100644
index 0000000..7a33e57
--- /dev/null
+++ b/paging/common/src/main/kotlin/androidx/paging/multicast/StoreRealActor.kt
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package androidx.paging.multicast
+
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.channels.actor
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * Simple actor implementation abstracting away Coroutine.actor since it is deprecated.
+ * It also enforces a 0 capacity buffer.
+ */
+@Suppress("EXPERIMENTAL_API_USAGE")
+@ExperimentalCoroutinesApi
+internal abstract class StoreRealActor<T>(
+    scope: CoroutineScope
+) {
+    private val inboundChannel: SendChannel<Any?>
+    private val closeCompleted = CompletableDeferred<Unit>()
+    private val didClose = AtomicBoolean(false)
+
+    init {
+        inboundChannel = scope.actor(
+            capacity = 0
+        ) {
+            try {
+                for (msg in channel) {
+                    if (msg === CLOSE_TOKEN) {
+                        doClose()
+                        break
+                    } else {
+                        @Suppress("UNCHECKED_CAST")
+                        handle(msg as T)
+                    }
+                }
+            } finally {
+                doClose()
+            }
+        }
+    }
+
+    private fun doClose() {
+        if (didClose.compareAndSet(false, true)) {
+            try {
+                onClosed()
+            } finally {
+                inboundChannel.close()
+                closeCompleted.complete(Unit)
+            }
+        }
+    }
+
+    open fun onClosed() = Unit
+
+    abstract suspend fun handle(msg: T)
+
+    suspend fun send(msg: T) {
+        inboundChannel.send(msg)
+    }
+
+    suspend fun close() {
+        // using a custom token to close so that we can gracefully close the downstream
+        inboundChannel.send(CLOSE_TOKEN)
+        // wait until close is done done
+        closeCompleted.await()
+    }
+
+    companion object {
+        val CLOSE_TOKEN = Any()
+    }
+}
diff --git a/paging/common/src/test/kotlin/androidx/paging/CachingTest.kt b/paging/common/src/test/kotlin/androidx/paging/CachingTest.kt
new file mode 100644
index 0000000..40defc0
--- /dev/null
+++ b/paging/common/src/test/kotlin/androidx/paging/CachingTest.kt
@@ -0,0 +1,474 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.paging
+
+import androidx.paging.ActiveFlowTracker.FlowType
+import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW
+import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.filterIsInstance
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.flow.take
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.test.runBlockingTest
+import org.junit.After
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.concurrent.atomic.AtomicInteger
+
+@FlowPreview
+@ExperimentalCoroutinesApi
+@RunWith(JUnit4::class)
+class CachingTest {
+    private val testScope = TestCoroutineScope()
+
+    private val tracker = ActiveFlowTrackerImpl()
+    @After
+    fun checkResources() {
+        testScope.cleanupTestCoroutines()
+        assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
+    }
+
+    @Test
+    fun noSharing() = testScope.runBlockingTest {
+        val pageFlow = buildPageFlow()
+        val firstCollect = pageFlow.collectItems(2)
+        val secondCollect = pageFlow.collectItems(3)
+        assertThat(firstCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            )
+        )
+
+        assertThat(secondCollect).isEqualTo(
+            buildItems(
+                version = 1,
+                generation = 0,
+                start = 0,
+                size = 9
+            )
+        )
+    }
+
+    @Test
+    fun cached() = testScope.runBlockingTest {
+        val pageFlow = buildPageFlow().cachedIn(testScope, tracker)
+        val firstCollect = pageFlow.collectItems(2)
+        val secondCollect = pageFlow.collectItems(3)
+        assertThat(firstCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            )
+        )
+
+        assertThat(secondCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            )
+        )
+    }
+
+    @Test
+    fun cached_afterMapping() = testScope.runBlockingTest {
+        var mappingCnt = 0
+        val pageFlow = buildPageFlow().map { pagedData ->
+            val mappingIndex = mappingCnt++
+            pagedData.map {
+                it.copy(metadata = mappingIndex.toString())
+            }
+        }.cachedIn(testScope, tracker)
+        val firstCollect = pageFlow.collectItems(2)
+        val secondCollect = pageFlow.collectItems(3)
+        assertThat(firstCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            ) {
+                it.copy(metadata = "0")
+            }
+        )
+
+        assertThat(secondCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            ) {
+                it.copy(metadata = "0")
+            }
+        )
+    }
+
+    @Test
+    fun cached_beforeMapping() = testScope.runBlockingTest {
+        var mappingCnt = 0
+        val pageFlow = buildPageFlow().cachedIn(testScope, tracker).map { pagedData ->
+            val mappingIndex = mappingCnt++
+            pagedData.map {
+                it.copy(metadata = mappingIndex.toString())
+            }
+        }
+        val firstCollect = pageFlow.collectItems(2)
+        val secondCollect = pageFlow.collectItems(3)
+        assertThat(firstCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 6
+            ) {
+                it.copy(metadata = "0")
+            }
+        )
+
+        assertThat(secondCollect).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 9
+            ) {
+                it.copy(metadata = "1")
+            }
+        )
+    }
+
+    @Test
+    fun cached_afterMapping_withMoreMappingAfterwards() = testScope
+        .runBlockingTest {
+            var mappingCnt = 0
+            val pageFlow = buildPageFlow().map { pagedData ->
+                val mappingIndex = mappingCnt++
+                pagedData.map {
+                    it.copy(metadata = mappingIndex.toString())
+                }
+            }.cachedIn(testScope, tracker).map { pagedData ->
+                val mappingIndex = mappingCnt++
+                pagedData.map {
+                    it.copy(metadata = "${it.metadata}_$mappingIndex")
+                }
+            }
+            val firstCollect = pageFlow.collectItems(2)
+            val secondCollect = pageFlow.collectItems(3)
+            assertThat(firstCollect).isEqualTo(
+                buildItems(
+                    version = 0,
+                    generation = 0,
+                    start = 0,
+                    size = 6
+                ) {
+                    it.copy(metadata = "0_1")
+                }
+            )
+
+            assertThat(secondCollect).isEqualTo(
+                buildItems(
+                    version = 0,
+                    generation = 0,
+                    start = 0,
+                    size = 9
+                ) {
+                    it.copy(metadata = "0_2")
+                }
+            )
+        }
+
+    @Test
+    fun pagesAreClosedProperty() {
+        val job = SupervisorJob()
+        val subScope = CoroutineScope(job + Dispatchers.Default)
+        val pageFlow = buildPageFlow().cachedIn(subScope, tracker)
+        assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(0)
+        val items = runBlocking {
+            pageFlow.collectItems(3) {
+                // see https://b/146676984
+                delay(10)
+            }
+        }
+        val firstList = buildItems(
+            version = 0,
+            generation = 0,
+            start = 0,
+            size = 9
+        )
+        assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
+        assertThat(items).isEqualTo(firstList)
+        runBlocking {
+            job.cancelAndJoin()
+        }
+        assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
+        assertThat(tracker.pageDataFlowCount()).isEqualTo(0)
+    }
+
+    @Test
+    fun cachedWithPassiveCollector() = testScope.runBlockingTest {
+        val flow = buildPageFlow().cachedIn(testScope, tracker)
+        val passive = ItemCollector(flow)
+        passive.collectIn(testScope)
+        testScope.runCurrent()
+        // collecting on the paged source will trigger initial page
+        assertThat(passive.items()).isEqualTo(
+            buildItems(
+                version = 0,
+                generation = 0,
+                start = 0,
+                size = 3
+            )
+        )
+        val firstList = buildItems(
+            version = 0,
+            generation = 0,
+            start = 0,
+            size = 9
+        )
+        // another collector is causing more items to be loaded, they should be reflected in the
+        // passive one
+        assertThat(flow.collectItems(3)).isEqualTo(firstList)
+        assertThat(passive.items()).isEqualTo(firstList)
+        val passive2 = ItemCollector(flow)
+        passive2.collectIn(testScope)
+        testScope.runCurrent()
+        // a new passive one should receive all existing items immediately
+        assertThat(passive2.items()).isEqualTo(firstList)
+
+        // now we get another collector that'll fetch more pages, it should reflect in passives
+        val secondList = buildItems(
+            version = 0,
+            generation = 0,
+            start = 0,
+            size = 12
+        )
+        // another collector is causing more items to be loaded, they should be reflected in the
+        // passive one
+        assertThat(flow.collectItems(4)).isEqualTo(secondList)
+        assertThat(passive.items()).isEqualTo(secondList)
+        assertThat(passive2.items()).isEqualTo(secondList)
+    }
+
+    private fun buildPageFlow(): Flow<PagedData<Item>> {
+        return PagedDataFlowBuilder(
+            pagedSourceFactory = StringPagedSource.VersionedFactory()::create,
+            config = PagedList.Config(
+                pageSize = 3,
+                prefetchDistance = 1,
+                enablePlaceholders = false,
+                initialLoadSizeHint = 3,
+                maxSize = 1000
+            )
+        ).build()
+    }
+
+    private suspend fun Flow<PagedData<Item>>.collectItems(
+        pageCnt: Int,
+        onEach: (suspend () -> Unit)? = null
+    ): List<Item> {
+        val pageData = this.first()
+        val hintReceiver = pageData.hintReceiver
+        var loadedPageCnt = 0
+        return pageData.flow.filterIsInstance<PageEvent.Insert<Item>>().take(pageCnt)
+            .onEach {
+                onEach?.invoke()
+            }
+            .map {
+                loadedPageCnt += it.pages.size
+                if (loadedPageCnt < pageCnt) {
+                    hintReceiver(
+                        ViewportHint(
+                            sourcePageIndex = loadedPageCnt - 1,
+                            indexInPage = it.pages.last().data.size - 1
+                        )
+                    )
+                }
+
+                it.pages.flatMap {
+                    it.data
+                }
+            }.toList().flatten()
+    }
+
+    /**
+     * Paged list collector that does not call any hints but always collects
+     */
+    private class ItemCollector(
+        val source: Flow<PagedData<Item>>
+    ) {
+        private var items: List<Item> = emptyList()
+        private var job: Job? = null
+        fun collectIn(scope: CoroutineScope) {
+            check(job == null) {
+                "don't call collect twice"
+            }
+            job = scope.launch {
+                collectPassively()
+            }
+        }
+
+        private suspend fun collectPassively() {
+            source.collect {
+                // clear to latest
+                val list = mutableListOf<Item>()
+                items = list
+                it.flow.filterIsInstance<PageEvent.Insert<Item>>().collect {
+                    it.pages.forEach {
+                        list.addAll(it.data)
+                    }
+                }
+            }
+        }
+
+        fun items() = items.toList()
+    }
+
+    private class StringPagedSource(
+        private val version: Int
+    ) : PagedSource<Int, Item>() {
+        private var generation = -1
+        override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Item> {
+            when (params.loadType) {
+                LoadType.REFRESH -> {
+                    generation++
+                    return doLoad(
+                        position = params.key ?: 0,
+                        size = params.loadSize
+                    )
+                }
+                LoadType.START -> {
+                    val loadSize = minOf(params.key!!, params.pageSize)
+                    return doLoad(
+                        position = params.key!! - params.loadSize,
+                        size = loadSize
+                    )
+                }
+                LoadType.END -> {
+                    return doLoad(
+                        position = params.key!!,
+                        size = params.loadSize
+                    )
+                }
+            }
+        }
+
+        private fun doLoad(
+            position: Int,
+            size: Int
+        ): LoadResult<Int, Item> {
+            return LoadResult.Page(
+                data = buildItems(
+                    version = version,
+                    generation = generation,
+                    start = position,
+                    size = size
+                ),
+                prevKey = if (position == 0) null else position,
+                nextKey = position + size
+            )
+        }
+
+        class VersionedFactory {
+            private var version = 0
+            fun create() = StringPagedSource(version++)
+        }
+    }
+
+    companion object {
+        private fun buildItems(
+            version: Int,
+            generation: Int,
+            start: Int,
+            size: Int,
+            modifier: ((Item) -> Item)? = null
+        ): List<Item> {
+            return (start until start + size).map { id ->
+                Item(
+                    pagedSourceId = version,
+                    generation = generation,
+                    value = id
+                ).let {
+                    modifier?.invoke(it) ?: it
+                }
+            }
+        }
+    }
+
+    private data class Item(
+        /**
+         * which paged source generated this item
+         */
+        val pagedSourceId: Int,
+        /**
+         * # of refresh counts in the paged source
+         */
+        val generation: Int,
+        /**
+         * Item unique identifier
+         */
+        val value: Int,
+
+        /**
+         * Any additional data by transformations etc
+         */
+        val metadata: String? = null
+    )
+
+    private class ActiveFlowTrackerImpl : ActiveFlowTracker {
+        private val counters = mapOf(
+            PAGED_DATA_FLOW to AtomicInteger(0),
+            PAGE_EVENT_FLOW to AtomicInteger(0)
+        )
+
+        override suspend fun onStart(flowType: FlowType) {
+            counters[flowType]!!.incrementAndGet()
+        }
+
+        override suspend fun onComplete(flowType: FlowType) {
+            counters[flowType]!!.decrementAndGet()
+        }
+
+        fun pageDataFlowCount() = counters[PAGED_DATA_FLOW]!!.get()
+        fun pageEventFlowCount() = counters[PAGE_EVENT_FLOW]!!.get()
+    }
+}
\ No newline at end of file