Add buildLiveData for LiveData#coroutines integration
This CL add a new `buildLiveData` function that runs a suspend
block when LiveData is active and cancels it with a timeout
if the LiveData becomes inactive.
Bug: 122740811
Test: BuildLiveDataTest
Change-Id: I727dcfd2386f42c3784d343c579fd58fc9a7af84
diff --git a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
index d5bf69e..899cdb4 100644
--- a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
@@ -66,6 +66,7 @@
ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-compiler")
ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-common-eap")
ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-runtime-eap")
+ ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-livedata-eap")
prebuilts(LibraryGroups.LIFECYCLE, "lifecycle-viewmodel-savedstate", "1.0.0-alpha01")
prebuilts(LibraryGroups.LIFECYCLE, "2.1.0-alpha03")
prebuilts(LibraryGroups.LOADER, "1.1.0-beta01")
diff --git a/lifecycle/livedata/eap/api/1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/1.0.0-alpha01.txt
new file mode 100644
index 0000000..89968c5
--- /dev/null
+++ b/lifecycle/livedata/eap/api/1.0.0-alpha01.txt
@@ -0,0 +1,17 @@
+// Signature format: 3.0
+package androidx.lifecycle {
+
+ public final class CoroutineLiveDataKt {
+ ctor public CoroutineLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.experimental.Continuation<? super kotlin.Unit>,?> block);
+ }
+
+ public interface LiveDataScope<T> {
+ method public T? getInitialValue();
+ method public suspend Object? yield(T! value, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+ method public suspend Object? yieldSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+ property public abstract T? initialValue;
+ }
+
+}
+
diff --git a/lifecycle/livedata/eap/api/current.txt b/lifecycle/livedata/eap/api/current.txt
new file mode 100644
index 0000000..89968c5
--- /dev/null
+++ b/lifecycle/livedata/eap/api/current.txt
@@ -0,0 +1,17 @@
+// Signature format: 3.0
+package androidx.lifecycle {
+
+ public final class CoroutineLiveDataKt {
+ ctor public CoroutineLiveDataKt();
+ method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.experimental.Continuation<? super kotlin.Unit>,?> block);
+ }
+
+ public interface LiveDataScope<T> {
+ method public T? getInitialValue();
+ method public suspend Object? yield(T! value, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+ method public suspend Object? yieldSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+ property public abstract T? initialValue;
+ }
+
+}
+
diff --git a/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt
diff --git a/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt
new file mode 100644
index 0000000..da4f6cc
--- /dev/null
+++ b/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt
@@ -0,0 +1 @@
+// Signature format: 3.0
diff --git a/lifecycle/livedata/eap/api/restricted_current.txt b/lifecycle/livedata/eap/api/restricted_current.txt
new file mode 100644
index 0000000..da4f6cc
--- /dev/null
+++ b/lifecycle/livedata/eap/api/restricted_current.txt
@@ -0,0 +1 @@
+// Signature format: 3.0
diff --git a/lifecycle/livedata/eap/build.gradle b/lifecycle/livedata/eap/build.gradle
new file mode 100644
index 0000000..e00aabe
--- /dev/null
+++ b/lifecycle/livedata/eap/build.gradle
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.LibraryVersions
+
+plugins {
+ id("SupportAndroidLibraryPlugin")
+ id("org.jetbrains.kotlin.android")
+}
+
+android {
+ buildTypes {
+ debug {
+ testCoverageEnabled = false // Breaks Kotlin compiler.
+ }
+ }
+}
+
+dependencies {
+ api(project(":lifecycle:lifecycle-livedata"))
+ api(project(":lifecycle:lifecycle-livedata-core-ktx"))
+ api(KOTLIN_STDLIB)
+ api(KOTLIN_COROUTINES_CORE)
+ testImplementation(project(":lifecycle:lifecycle-runtime"))
+ testImplementation(project(":arch:core-testing"))
+ testImplementation(project(":lifecycle:lifecycle-livedata-ktx"))
+ testImplementation(JUNIT)
+ testImplementation(TRUTH)
+ testImplementation(TEST_EXT_JUNIT)
+ testImplementation(TEST_CORE)
+ testImplementation(TEST_RUNNER)
+ testImplementation(TEST_RULES)
+ testImplementation(KOTLIN_COROUTINES_TEST)
+}
+
+supportLibrary {
+ name = "LiveData Coroutine Extensions EAP"
+ publish = false
+ mavenVersion = LibraryVersions.LIFECYCLES_COROUTINES
+ mavenGroup = LibraryGroups.LIFECYCLE
+ inceptionYear = "2018"
+ description = "Coroutines extensions for 'livedata' artifact"
+ useMetalava = true
+}
diff --git a/lifecycle/livedata/eap/src/main/AndroidManifest.xml b/lifecycle/livedata/eap/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..9443d70
--- /dev/null
+++ b/lifecycle/livedata/eap/src/main/AndroidManifest.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ ~ Copyright 2018 The Android Open Source Project
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<manifest package="androidx.lifecycle.livedata.eap"/>
diff --git a/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt b/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt
new file mode 100644
index 0000000..1eab6ec
--- /dev/null
+++ b/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.annotation.MainThread
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+import kotlin.experimental.ExperimentalTypeInference
+
+internal const val DEFAULT_TIMEOUT = 5000L
+/**
+ * Interface that allows controlling a [LiveData] from a coroutine block.
+ *
+ * @see liveData
+ */
+interface LiveDataScope<T> {
+ /**
+ * Set's the [LiveData]'s value to the given [value]. If you've called [yieldSource] previously,
+ * calling [yield] will remove that source.
+ *
+ * Note that this function suspends until the value is set on the [LiveData].
+ *
+ * @param value The new value for the [LiveData]
+ *
+ * @see yieldSource
+ */
+ suspend fun yield(value: T)
+
+ /**
+ * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this
+ * method will remove any source that was yielded before via [yieldSource].
+ *
+ * @param source The [LiveData] instance whose values will be dispatched from the current
+ * [LiveData].
+ *
+ * @see yield
+ * @see MediatorLiveData.addSource
+ * @see MediatorLiveData.removeSource
+ */
+ suspend fun yieldSource(source: LiveData<T>)
+
+ /**
+ * Denotes the value of the [LiveData] when this block is started.
+ *
+ * If it is the first time block is running, [initialValue] will be `null`. You can use this
+ * value to check what was then latest value `yield`ed by your `block` before it got cancelled.
+ *
+ * Note that if the block called [yieldSource], then `initialValue` will be last value
+ * dispatched by the `source` [LiveData].
+ */
+ val initialValue: T?
+}
+
+internal class LiveDataScopeImpl<T>(
+ private var target: CoroutineLiveData<T>,
+ context: CoroutineContext,
+ override val initialValue: T? = target.value
+) : LiveDataScope<T> {
+ // use `liveData` provided context + main dispatcher to communicate with the target
+ // LiveData. This gives us main thread safety as well as cancellation cooperation
+ private val coroutineContext = context + Dispatchers.Main
+
+ override suspend fun yieldSource(source: LiveData<T>) = withContext(coroutineContext) {
+ target.yieldSource(source)
+ }
+
+ override suspend fun yield(value: T) = withContext(coroutineContext) {
+ target.clearYieldedSource()
+ target.value = value
+ }
+}
+
+internal typealias Block<T> = suspend LiveDataScope<T>.() -> Unit
+
+/**
+ * Handles running a block at most once to completion.
+ */
+internal class BlockRunner<T>(
+ private val liveData: CoroutineLiveData<T>,
+ private val block: Block<T>,
+ private val timeoutInMs: Long,
+ private val scope: CoroutineScope,
+ private val onDone: () -> Unit
+) {
+ // currently running block job.
+ private var runningJob: Job? = null
+
+ // cancelation job created in cancel.
+ private var cancellationJob: Job? = null
+
+ @MainThread
+ fun maybeRun() {
+ cancellationJob?.cancel()
+ cancellationJob = null
+ if (runningJob != null) {
+ return
+ }
+ runningJob = scope.launch {
+ val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
+ block(liveDataScope)
+ onDone()
+ }
+ }
+
+ @MainThread
+ fun cancel() {
+ if (cancellationJob != null) {
+ error("Cancel call cannot happen without a maybeRun")
+ }
+ cancellationJob = scope.launch(Dispatchers.Main) {
+ delay(timeoutInMs)
+ if (!liveData.hasActiveObservers()) {
+ // one last check on active observers to avoid any race condition between starting
+ // a running coroutine and cancelation
+ runningJob?.cancel()
+ runningJob = null
+ }
+ }
+ }
+}
+
+internal class CoroutineLiveData<T>(
+ context: CoroutineContext = EmptyCoroutineContext,
+ timeoutInMs: Long = 5000,
+ block: Block<T>
+) : MediatorLiveData<T>() {
+ private var blockRunner: BlockRunner<T>?
+
+ init {
+ // use an intermediate supervisor job so that if we cancel individual block runs due to losing
+ // observers, it won't cancel the given context as we only cancel w/ the intention of possibly
+ // relaunching using the same parent context.
+ val supervisorJob = SupervisorJob(context[Job])
+
+ // The scope for this LiveData where we launch every block Job.
+ // We default to Main dispatcher but developer can override it.
+ // The supervisor job is added last to isolate block runs.
+ val scope = CoroutineScope(Dispatchers.Main + context + supervisorJob)
+ blockRunner = BlockRunner(
+ liveData = this,
+ block = block,
+ timeoutInMs = timeoutInMs,
+ scope = scope
+ ) {
+ blockRunner = null
+ }
+ }
+
+ // The source we are delegated to, sent from LiveDataScope#yieldSource
+ // TODO We track this specifically since [MediatorLiveData] does not provide access to it.
+ // We should eventually get rid of this and provide the internal API from MediatorLiveData after
+ // the EAP.
+ private var yieldedSource: LiveData<T>? = null
+
+ @MainThread
+ internal fun yieldSource(source: LiveData<T>) {
+ clearYieldedSource()
+ yieldedSource = source
+ addSource(source) {
+ value = it
+ }
+ }
+
+ @MainThread
+ internal fun clearYieldedSource() {
+ yieldedSource?.let {
+ removeSource(it)
+ yieldedSource = null
+ }
+ }
+
+ override fun onActive() {
+ super.onActive()
+ blockRunner?.maybeRun()
+ }
+
+ override fun onInactive() {
+ super.onInactive()
+ blockRunner?.cancel()
+ }
+}
+
+/**
+ * Builds a LiveData that has values yielded from the given [block] that executes on a
+ * [LiveDataScope].
+ *
+ * The [block] starts executing when the returned [LiveData] becomes active ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the [block] is executing, it
+ * will be cancelled after [timeoutInMs] milliseconds unless the [LiveData] becomes active again
+ * before that timeout (to gracefully handle cases like Activity rotation). Any value
+ * [LiveDataScope.yield]ed from a cancelled [block] will be ignored.
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the [block] will be re-executed
+ * from the beginning. If you would like to continue the operations based on where it was stopped
+ * last, you can use the [LiveDataScope.initialValue] function to get the last
+ * [LiveDataScope.yield]ed value.
+
+ * If the [block] completes successfully *or* is cancelled due to reasons other than [LiveData]
+ * becoming inactive, it *will not* be re-executed even after [LiveData] goes through active
+ * inactive cycle.
+ *
+ * As a best practice, it is important for the [block] to cooperate in cancellation. See kotlin
+ * coroutines documentation for details
+ * https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html.
+ *
+ * ```
+ * // a simple LiveData that receives value 3, 3 seconds after being observed for the first time.
+ * val data : LiveData<Int> = liveData {
+ * delay(3000)
+ * yield(3)
+ * }
+ *
+ *
+ * // a LiveData that fetches a `User` object based on a `userId` and refreshes it every 30 seconds
+ * // as long as it is observed
+ * val userId : LiveData<String> = ...
+ * val user = userId.switchMap { id ->
+ * liveData {
+ * while(true) {
+ * // note that `while(true)` is fine because the `delay(30_000)` below will cooperate in
+ * // cancellation if LiveData is not actively observed anymore
+ * val data = api.fetch(id) // errors are ignored for brevity
+ * yield(data)
+ * delay(30_000)
+ * }
+ * }
+ * }
+ *
+ * // A retrying data fetcher with doubling back-off
+ * val user = liveData {
+ * var backOffTime = 1_000
+ * var succeeded = false
+ * while(!succeeded) {
+ * try {
+ * yield(api.fetch(id))
+ * succeeded = true
+ * } catch(ioError : IOException) {
+ * delay(backOffTime)
+ * backOffTime *= minOf(backOffTime * 2, 60_000)
+ * }
+ * }
+ * }
+ *
+ * // a LiveData that tries to load the `User` from local cache first and then tries to fetch
+ * // from the server and also yields the updated value
+ * val user = liveData {
+ * // dispatch loading first
+ * yield(LOADING(id))
+ * // check local storage
+ * val cached = cache.loadUser(id)
+ * if (cached != null) {
+ * yield(cached)
+ * }
+ * if (cached == null || cached.isStale()) {
+ * val fresh = api.fetch(id) // errors are ignored for brevity
+ * cache.save(fresh)
+ * yield(fresh)
+ * }
+ * }
+ *
+ * // a LiveData that immediately receives a LiveData<User> from the database and yields it as a
+ * // source but also tries to back-fill the database from the server
+ * val user = liveData {
+ * val fromDb: LiveData<User> = roomDatabase.loadUser(id)
+ * yieldSource(fromDb)
+ * val updated = api.fetch(id) // errors are ignored for brevity
+ * // Since we are using Room here, updating the database will update the `fromDb` LiveData
+ * // that was obtained above. See Room's documentation for more details.
+ * // https://developer.android.com/training/data-storage/room/accessing-data#query-observable
+ * roomDatabase.insert(updated)
+ * }
+ * ```
+ *
+ * @param context The CoroutineContext to run the given block in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ * @param block The block to run when the [LiveData] has active observers.
+ */
+@UseExperimental(ExperimentalTypeInference::class)
+fun <T> liveData(
+ context: CoroutineContext = EmptyCoroutineContext,
+ timeoutInMs: Long = DEFAULT_TIMEOUT,
+ @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
+): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
\ No newline at end of file
diff --git a/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt b/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
new file mode 100644
index 0000000..15bcff4
--- /dev/null
+++ b/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
@@ -0,0 +1,465 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.ObsoleteCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
+import kotlinx.coroutines.test.TestCoroutineContext
+import kotlinx.coroutines.test.resetMain
+import kotlinx.coroutines.test.setMain
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.coroutineContext
+
+@ObsoleteCoroutinesApi
+@RunWith(JUnit4::class)
+class BuildLiveDataTest {
+ @ObsoleteCoroutinesApi
+ private val mainContext = TestCoroutineContext("test-main-context")
+ private val mainScope = CoroutineScope(mainContext)
+ @ObsoleteCoroutinesApi
+ private val testContext = TestCoroutineContext("test-other-context")
+
+ @ExperimentalCoroutinesApi
+ @Before
+ fun initMain() {
+ lateinit var mainThread: Thread
+ runBlocking(mainContext) {
+ mainThread = Thread.currentThread()
+ }
+ Dispatchers.setMain(
+ mainContext[ContinuationInterceptor.Key] as CoroutineDispatcher
+ )
+ ArchTaskExecutor.getInstance().setDelegate(
+ object : TaskExecutor() {
+ override fun executeOnDiskIO(runnable: Runnable) {
+ error("unsupported")
+ }
+
+ override fun postToMainThread(runnable: Runnable) {
+ mainScope.launch {
+ runnable.run()
+ }
+ }
+
+ override fun isMainThread(): Boolean {
+ return mainThread == Thread.currentThread()
+ }
+ }
+ )
+ }
+
+ @ExperimentalCoroutinesApi
+ @After
+ fun clear() {
+ advanceTimeBy(100000)
+ mainContext.assertExceptions("shouldn't have any exceptions") {
+ it.isEmpty()
+ }
+ testContext.assertExceptions("shouldn't have any exceptions") {
+ it.isEmpty()
+ }
+ ArchTaskExecutor.getInstance().setDelegate(null)
+ Dispatchers.resetMain()
+ }
+
+ @Test
+ fun oneShot() {
+ val liveData = liveData {
+ yield(3)
+ }
+ triggerAllActions()
+ assertThat(liveData.value).isNull()
+ liveData.addObserver().assertItems(3)
+ }
+
+ @Test
+ fun removeObserverInBetween() {
+ val ld = liveData(timeoutInMs = 10) {
+ yield(1)
+ yield(2)
+ delay(1000)
+ yield(3)
+ }
+ ld.addObserver().apply {
+ assertItems(1, 2)
+ unsubscribe()
+ }
+ // trigger cancellation
+ mainContext.advanceTimeBy(100)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems(2, 1, 2)
+ mainContext.advanceTimeBy(1001)
+ assertItems(2, 1, 2, 3)
+ }
+ }
+
+ @Test
+ fun removeObserverInBetween_largeTimeout() {
+ val ld = liveData(timeoutInMs = 10000) {
+ yield(1)
+ yield(2)
+ delay(1000)
+ yield(3)
+ }
+ ld.addObserver().apply {
+ assertItems(1, 2)
+ unsubscribe()
+ }
+ // advance some but not enough to cover the delay
+ mainContext.advanceTimeBy(500)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ assertThat(ld.value).isEqualTo(2)
+ ld.addObserver().apply {
+ assertItems(2)
+ // adnvace enough to cover the rest of the delay
+ mainContext.advanceTimeBy(501)
+ assertItems(2, 3)
+ }
+ }
+
+ @Test
+ fun ignoreCancelledYields() {
+ val cancelMutex = Mutex(true)
+ val ld = liveData(timeoutInMs = 0, context = testContext) {
+ yield(1)
+ cancelMutex.withLock {
+ yield(2)
+ }
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems(1)
+ unsubscribe()
+ cancelMutex.unlock()
+ }
+ // let cancellation take place
+ triggerAllActions()
+ // yield should immediately trigger cancellation to happen
+ assertThat(ld.value).isEqualTo(1)
+ assertThat(ld.hasActiveObservers()).isFalse()
+ // now because it was cancelled, re-observing should dispatch 1,1,2
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems(1, 1, 2)
+ }
+ }
+
+ @Test
+ fun readInitialValue() {
+ val initial = AtomicReference<Int?>()
+ val ld = liveData<Int>(testContext) {
+ initial.set(initialValue)
+ }
+ runOnMain {
+ ld.value = 3
+ }
+ ld.addObserver()
+ triggerAllActions()
+ assertThat(initial.get()).isEqualTo(3)
+ }
+
+ @Test
+ fun readInitialValue_ignoreYielded() {
+ val initial = AtomicReference<Int?>()
+ val ld = liveData<Int>(testContext) {
+ yield(5)
+ initial.set(initialValue)
+ }
+ ld.addObserver()
+ triggerAllActions()
+ assertThat(initial.get()).isNull()
+ }
+
+ @Test
+ fun readInitialValue_keepYieldedFromBefore() {
+ val initial = AtomicReference<Int?>()
+ val ld = liveData<Int>(testContext, 10) {
+ if (initialValue == null) {
+ yield(5)
+ delay(500000) // wait for cancellation
+ }
+
+ initial.set(initialValue)
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems(5)
+ unsubscribe()
+ }
+ triggerAllActions()
+ // wait for it to be cancelled
+ advanceTimeBy(10)
+ assertThat(initial.get()).isNull()
+ ld.addObserver()
+ triggerAllActions()
+ assertThat(initial.get()).isEqualTo(5)
+ }
+
+ @Test
+ fun yieldSource_simple() {
+ val odds = liveData {
+ (1..9 step 2).forEach {
+ yield(it)
+ }
+ }
+ val ld = liveData {
+ yieldSource(odds)
+ }
+ ld.addObserver().apply {
+ assertItems(1, 3, 5, 7, 9)
+ }
+ }
+
+ @Test
+ fun yieldSource_switchTwo() {
+ val doneOddsYield = Mutex(true)
+ val odds = liveData {
+ (1..9 step 2).forEach {
+ yield(it)
+ }
+ doneOddsYield.unlock()
+ delay(1)
+ yield(-1)
+ }
+ val evens = liveData {
+ (2..10 step 2).forEach {
+ yield(it)
+ }
+ }
+ val ld = liveData(testContext) {
+ yieldSource(odds)
+ doneOddsYield.lock()
+ yieldSource(evens)
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
+ }
+ }
+
+ @Test
+ fun yieldSource_yieldValue() {
+ val doneOddsYield = Mutex(true)
+ val odds = liveData(timeoutInMs = 0) {
+ (1..9 step 2).forEach {
+ yield(it)
+ }
+ doneOddsYield.unlock()
+ delay(1)
+ yield(-1)
+ }
+ val ld = liveData(testContext) {
+ yieldSource(odds)
+ doneOddsYield.lock()
+ yield(10)
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ advanceTimeBy(100)
+ assertItems(1, 3, 5, 7, 9, 10)
+ }
+ }
+
+ @Test
+ fun blockThrows() {
+ // use an exception handler instead of the test context exception handler to ensure that
+ // we do not re-run the block if its exception is gracefully caught
+ // TODO should we consider doing that ? But if we do, what is the rule? do we retry when
+ // it becomes active again or do we retry ourselves? better no do anything to be consistent.
+ val exception = CompletableDeferred<Throwable>()
+ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+ exception.complete(throwable)
+ }
+ val ld = liveData(testContext + exceptionHandler, 10) {
+ if (exception.isActive) {
+ throw IllegalArgumentException("i like to fail")
+ } else {
+ yield(3)
+ }
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems()
+ runBlocking {
+ assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+ }
+ unsubscribe()
+ }
+ triggerAllActions()
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems()
+ }
+ }
+
+ @Test
+ fun blockCancelsItself() {
+ val didCancel = AtomicBoolean(false)
+ val unexpected = AtomicBoolean(false)
+
+ val ld = liveData<Int>(testContext, 10) {
+ if (didCancel.compareAndSet(false, true)) {
+ coroutineContext.cancel()
+ } else {
+ unexpected.set(true)
+ }
+ }
+ ld.addObserver().apply {
+ triggerAllActions()
+ assertItems()
+ unsubscribe()
+ }
+ assertThat(didCancel.get()).isTrue()
+ ld.addObserver()
+ // trigger cancelation
+ advanceTimeBy(11)
+ assertThat(unexpected.get()).isFalse()
+ }
+
+ @Test
+ fun blockThrows_switchMap() {
+ val exception = CompletableDeferred<Throwable>()
+ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+ exception.complete(throwable)
+ }
+ val src = MutableLiveData<Int>()
+ val ld = src.switchMap {
+ liveData(testContext + exceptionHandler) {
+ if (exception.isActive) {
+ throw IllegalArgumentException("i like to fail")
+ } else {
+ yield(3)
+ }
+ }
+ }
+ ld.addObserver().apply {
+ assertItems()
+ runOnMain {
+ src.value = 1
+ }
+ triggerAllActions()
+ runBlocking {
+ assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+ }
+ runOnMain {
+ src.value = 2
+ }
+ triggerAllActions()
+ assertItems(3)
+ }
+ }
+
+ private fun triggerAllActions() {
+ do {
+ mainContext.triggerActions()
+ testContext.triggerActions()
+ val allIdle = listOf(mainContext, testContext).all {
+ it.isIdle()
+ }
+ } while (!allIdle)
+ }
+
+ private fun advanceTimeBy(time: Long) {
+ mainContext.advanceTimeBy(time)
+ testContext.advanceTimeBy(time)
+ triggerAllActions()
+ }
+
+ private fun TestCoroutineContext.isIdle(): Boolean {
+ val queueField = this::class.java
+ .getDeclaredField("queue")
+ queueField.isAccessible = true
+ val queue = queueField.get(this)
+ val peekMethod = queue::class.java
+ .getDeclaredMethod("peek")
+ val nextTask = peekMethod.invoke(queue) ?: return true
+ val timeField = nextTask::class.java.getDeclaredField("time")
+ timeField.isAccessible = true
+ val time = timeField.getLong(nextTask)
+ return time > now()
+ }
+
+ private fun <T> runOnMain(block: () -> T): T {
+ return runBlocking {
+ val async = mainScope.async {
+ block()
+ }
+ mainContext.triggerActions()
+ async.await()
+ }
+ }
+
+ private fun <T> LiveData<T>.addObserver(): CollectingObserver<T> {
+ return runOnMain {
+ val observer = CollectingObserver(this)
+ observeForever(observer)
+ observer
+ }
+ }
+
+ @Test
+ fun multipleValuesAndObservers() {
+ val ld = liveData {
+ yield(3)
+ yield(4)
+ }
+ ld.addObserver().assertItems(3, 4)
+ // re-observe, get latest value only
+ ld.addObserver().assertItems(4)
+ }
+
+ inner class CollectingObserver<T>(
+ private val liveData: LiveData<T>
+ ) : Observer<T> {
+ private var items = mutableListOf<T>()
+ override fun onChanged(t: T) {
+ items.add(t)
+ }
+
+ fun assertItems(vararg expected: T) {
+ assertThat(items).containsExactly(*expected)
+ }
+
+ fun unsubscribe() = runOnMain {
+ liveData.removeObserver(this)
+ }
+ }
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 7976ae5..eb9ed0a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -95,6 +95,7 @@
includeProject(":lifecycle:lifecycle-livedata-core-ktx", "lifecycle/livedata-core/ktx")
includeProject(":lifecycle:lifecycle-livedata", "lifecycle/livedata")
includeProject(":lifecycle:lifecycle-livedata-ktx", "lifecycle/livedata/ktx")
+includeProject(":lifecycle:lifecycle-livedata-eap", "lifecycle/livedata/eap")
includeProject(":lifecycle:lifecycle-process", "lifecycle/process")
includeProject(":lifecycle:lifecycle-reactivestreams", "lifecycle/reactivestreams")
includeProject(":lifecycle:lifecycle-reactivestreams-ktx", "lifecycle/reactivestreams/ktx")