[go: nahoru, domu]

Add buildLiveData for LiveData#coroutines integration

This CL add a new `buildLiveData` function that runs a suspend
block when LiveData is active and cancels it with a timeout
if the LiveData becomes inactive.

Bug: 122740811
Test: BuildLiveDataTest

Change-Id: I727dcfd2386f42c3784d343c579fd58fc9a7af84
diff --git a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
index d5bf69e..899cdb4 100644
--- a/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
+++ b/buildSrc/src/main/kotlin/androidx/build/PublishDocsRules.kt
@@ -66,6 +66,7 @@
     ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-compiler")
     ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-common-eap")
     ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-runtime-eap")
+    ignore(LibraryGroups.LIFECYCLE.group, "lifecycle-livedata-eap")
     prebuilts(LibraryGroups.LIFECYCLE, "lifecycle-viewmodel-savedstate", "1.0.0-alpha01")
     prebuilts(LibraryGroups.LIFECYCLE, "2.1.0-alpha03")
     prebuilts(LibraryGroups.LOADER, "1.1.0-beta01")
diff --git a/lifecycle/livedata/eap/api/1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/1.0.0-alpha01.txt
new file mode 100644
index 0000000..89968c5
--- /dev/null
+++ b/lifecycle/livedata/eap/api/1.0.0-alpha01.txt
@@ -0,0 +1,17 @@
+// Signature format: 3.0
+package androidx.lifecycle {
+
+  public final class CoroutineLiveDataKt {
+    ctor public CoroutineLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.experimental.Continuation<? super kotlin.Unit>,?> block);
+  }
+
+  public interface LiveDataScope<T> {
+    method public T? getInitialValue();
+    method public suspend Object? yield(T! value, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+    method public suspend Object? yieldSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+    property public abstract T? initialValue;
+  }
+
+}
+
diff --git a/lifecycle/livedata/eap/api/current.txt b/lifecycle/livedata/eap/api/current.txt
new file mode 100644
index 0000000..89968c5
--- /dev/null
+++ b/lifecycle/livedata/eap/api/current.txt
@@ -0,0 +1,17 @@
+// Signature format: 3.0
+package androidx.lifecycle {
+
+  public final class CoroutineLiveDataKt {
+    ctor public CoroutineLiveDataKt();
+    method public static <T> androidx.lifecycle.LiveData<T> liveData(kotlin.coroutines.CoroutineContext context = EmptyCoroutineContext, long timeoutInMs = 5000L, kotlin.jvm.functions.Function2<? super androidx.lifecycle.LiveDataScope<T>,? super kotlin.coroutines.experimental.Continuation<? super kotlin.Unit>,?> block);
+  }
+
+  public interface LiveDataScope<T> {
+    method public T? getInitialValue();
+    method public suspend Object? yield(T! value, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+    method public suspend Object? yieldSource(androidx.lifecycle.LiveData<T> source, kotlin.coroutines.experimental.Continuation<? super kotlin.Unit> p);
+    property public abstract T? initialValue;
+  }
+
+}
+
diff --git a/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lifecycle/livedata/eap/api/res-1.0.0-alpha01.txt
diff --git a/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt b/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt
new file mode 100644
index 0000000..da4f6cc
--- /dev/null
+++ b/lifecycle/livedata/eap/api/restricted_1.0.0-alpha01.txt
@@ -0,0 +1 @@
+// Signature format: 3.0
diff --git a/lifecycle/livedata/eap/api/restricted_current.txt b/lifecycle/livedata/eap/api/restricted_current.txt
new file mode 100644
index 0000000..da4f6cc
--- /dev/null
+++ b/lifecycle/livedata/eap/api/restricted_current.txt
@@ -0,0 +1 @@
+// Signature format: 3.0
diff --git a/lifecycle/livedata/eap/build.gradle b/lifecycle/livedata/eap/build.gradle
new file mode 100644
index 0000000..e00aabe
--- /dev/null
+++ b/lifecycle/livedata/eap/build.gradle
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.LibraryVersions
+
+plugins {
+    id("SupportAndroidLibraryPlugin")
+    id("org.jetbrains.kotlin.android")
+}
+
+android {
+    buildTypes {
+        debug {
+            testCoverageEnabled = false // Breaks Kotlin compiler.
+        }
+    }
+}
+
+dependencies {
+    api(project(":lifecycle:lifecycle-livedata"))
+    api(project(":lifecycle:lifecycle-livedata-core-ktx"))
+    api(KOTLIN_STDLIB)
+    api(KOTLIN_COROUTINES_CORE)
+    testImplementation(project(":lifecycle:lifecycle-runtime"))
+    testImplementation(project(":arch:core-testing"))
+    testImplementation(project(":lifecycle:lifecycle-livedata-ktx"))
+    testImplementation(JUNIT)
+    testImplementation(TRUTH)
+    testImplementation(TEST_EXT_JUNIT)
+    testImplementation(TEST_CORE)
+    testImplementation(TEST_RUNNER)
+    testImplementation(TEST_RULES)
+    testImplementation(KOTLIN_COROUTINES_TEST)
+}
+
+supportLibrary {
+    name = "LiveData Coroutine Extensions EAP"
+    publish = false
+    mavenVersion = LibraryVersions.LIFECYCLES_COROUTINES
+    mavenGroup = LibraryGroups.LIFECYCLE
+    inceptionYear = "2018"
+    description = "Coroutines extensions for 'livedata' artifact"
+    useMetalava = true
+}
diff --git a/lifecycle/livedata/eap/src/main/AndroidManifest.xml b/lifecycle/livedata/eap/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..9443d70
--- /dev/null
+++ b/lifecycle/livedata/eap/src/main/AndroidManifest.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  ~ Copyright 2018 The Android Open Source Project
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<manifest package="androidx.lifecycle.livedata.eap"/>
diff --git a/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt b/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt
new file mode 100644
index 0000000..1eab6ec
--- /dev/null
+++ b/lifecycle/livedata/eap/src/main/java/androidx/lifecycle/CoroutineLiveData.kt
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.annotation.MainThread
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+import kotlin.experimental.ExperimentalTypeInference
+
+internal const val DEFAULT_TIMEOUT = 5000L
+/**
+ * Interface that allows controlling a [LiveData] from a coroutine block.
+ *
+ * @see liveData
+ */
+interface LiveDataScope<T> {
+    /**
+     * Set's the [LiveData]'s value to the given [value]. If you've called [yieldSource] previously,
+     * calling [yield] will remove that source.
+     *
+     * Note that this function suspends until the value is set on the [LiveData].
+     *
+     * @param value The new value for the [LiveData]
+     *
+     * @see yieldSource
+     */
+    suspend fun yield(value: T)
+
+    /**
+     * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this
+     * method will remove any source that was yielded before via [yieldSource].
+     *
+     * @param source The [LiveData] instance whose values will be dispatched from the current
+     * [LiveData].
+     *
+     * @see yield
+     * @see MediatorLiveData.addSource
+     * @see MediatorLiveData.removeSource
+     */
+    suspend fun yieldSource(source: LiveData<T>)
+
+    /**
+     * Denotes the value of the [LiveData] when this block is started.
+     *
+     * If it is the first time block is running, [initialValue] will be `null`. You can use this
+     * value to check what was then latest value `yield`ed by your `block` before it got cancelled.
+     *
+     * Note that if the block called [yieldSource], then `initialValue` will be last value
+     * dispatched by the `source` [LiveData].
+     */
+    val initialValue: T?
+}
+
+internal class LiveDataScopeImpl<T>(
+    private var target: CoroutineLiveData<T>,
+    context: CoroutineContext,
+    override val initialValue: T? = target.value
+) : LiveDataScope<T> {
+    // use `liveData` provided context + main dispatcher to communicate with the target
+    // LiveData. This gives us main thread safety as well as cancellation cooperation
+    private val coroutineContext = context + Dispatchers.Main
+
+    override suspend fun yieldSource(source: LiveData<T>) = withContext(coroutineContext) {
+        target.yieldSource(source)
+    }
+
+    override suspend fun yield(value: T) = withContext(coroutineContext) {
+        target.clearYieldedSource()
+        target.value = value
+    }
+}
+
+internal typealias Block<T> = suspend LiveDataScope<T>.() -> Unit
+
+/**
+ * Handles running a block at most once to completion.
+ */
+internal class BlockRunner<T>(
+    private val liveData: CoroutineLiveData<T>,
+    private val block: Block<T>,
+    private val timeoutInMs: Long,
+    private val scope: CoroutineScope,
+    private val onDone: () -> Unit
+) {
+    // currently running block job.
+    private var runningJob: Job? = null
+
+    // cancelation job created in cancel.
+    private var cancellationJob: Job? = null
+
+    @MainThread
+    fun maybeRun() {
+        cancellationJob?.cancel()
+        cancellationJob = null
+        if (runningJob != null) {
+            return
+        }
+        runningJob = scope.launch {
+            val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
+            block(liveDataScope)
+            onDone()
+        }
+    }
+
+    @MainThread
+    fun cancel() {
+        if (cancellationJob != null) {
+            error("Cancel call cannot happen without a maybeRun")
+        }
+        cancellationJob = scope.launch(Dispatchers.Main) {
+            delay(timeoutInMs)
+            if (!liveData.hasActiveObservers()) {
+                // one last check on active observers to avoid any race condition between starting
+                // a running coroutine and cancelation
+                runningJob?.cancel()
+                runningJob = null
+            }
+        }
+    }
+}
+
+internal class CoroutineLiveData<T>(
+    context: CoroutineContext = EmptyCoroutineContext,
+    timeoutInMs: Long = 5000,
+    block: Block<T>
+) : MediatorLiveData<T>() {
+    private var blockRunner: BlockRunner<T>?
+
+    init {
+        // use an intermediate supervisor job so that if we cancel individual block runs due to losing
+        // observers, it won't cancel the given context as we only cancel w/ the intention of possibly
+        // relaunching using the same parent context.
+        val supervisorJob = SupervisorJob(context[Job])
+
+        // The scope for this LiveData where we launch every block Job.
+        // We default to Main dispatcher but developer can override it.
+        // The supervisor job is added last to isolate block runs.
+        val scope = CoroutineScope(Dispatchers.Main + context + supervisorJob)
+        blockRunner = BlockRunner(
+            liveData = this,
+            block = block,
+            timeoutInMs = timeoutInMs,
+            scope = scope
+        ) {
+            blockRunner = null
+        }
+    }
+
+    // The source we are delegated to, sent from LiveDataScope#yieldSource
+    // TODO We track this specifically since [MediatorLiveData] does not provide access to it.
+    // We should eventually get rid of this and provide the internal API from MediatorLiveData after
+    // the EAP.
+    private var yieldedSource: LiveData<T>? = null
+
+    @MainThread
+    internal fun yieldSource(source: LiveData<T>) {
+        clearYieldedSource()
+        yieldedSource = source
+        addSource(source) {
+            value = it
+        }
+    }
+
+    @MainThread
+    internal fun clearYieldedSource() {
+        yieldedSource?.let {
+            removeSource(it)
+            yieldedSource = null
+        }
+    }
+
+    override fun onActive() {
+        super.onActive()
+        blockRunner?.maybeRun()
+    }
+
+    override fun onInactive() {
+        super.onInactive()
+        blockRunner?.cancel()
+    }
+}
+
+/**
+ * Builds a LiveData that has values yielded from the given [block] that executes on a
+ * [LiveDataScope].
+ *
+ * The [block] starts executing when the returned [LiveData] becomes active ([LiveData.onActive]).
+ * If the [LiveData] becomes inactive ([LiveData.onInactive]) while the [block] is executing, it
+ * will be cancelled after [timeoutInMs] milliseconds unless the [LiveData] becomes active again
+ * before that timeout (to gracefully handle cases like Activity rotation). Any value
+ * [LiveDataScope.yield]ed from a cancelled [block] will be ignored.
+ *
+ * After a cancellation, if the [LiveData] becomes active again, the [block] will be re-executed
+ * from the beginning. If you would like to continue the operations based on where it was stopped
+ * last, you can use the [LiveDataScope.initialValue] function to get the last
+ * [LiveDataScope.yield]ed value.
+
+ * If the [block] completes successfully *or* is cancelled due to reasons other than [LiveData]
+ * becoming inactive, it *will not* be re-executed even after [LiveData] goes through active
+ * inactive cycle.
+ *
+ * As a best practice, it is important for the [block] to cooperate in cancellation. See kotlin
+ * coroutines documentation for details
+ * https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html.
+ *
+ * ```
+ * // a simple LiveData that receives value 3, 3 seconds after being observed for the first time.
+ * val data : LiveData<Int> = liveData {
+ *     delay(3000)
+ *     yield(3)
+ * }
+ *
+ *
+ * // a LiveData that fetches a `User` object based on a `userId` and refreshes it every 30 seconds
+ * // as long as it is observed
+ * val userId : LiveData<String> = ...
+ * val user = userId.switchMap { id ->
+ *     liveData {
+ *       while(true) {
+ *         // note that `while(true)` is fine because the `delay(30_000)` below will cooperate in
+ *         // cancellation if LiveData is not actively observed anymore
+ *         val data = api.fetch(id) // errors are ignored for brevity
+ *         yield(data)
+ *         delay(30_000)
+ *       }
+ *     }
+ * }
+ *
+ * // A retrying data fetcher with doubling back-off
+ * val user = liveData {
+ *     var backOffTime = 1_000
+ *     var succeeded = false
+ *     while(!succeeded) {
+ *         try {
+ *             yield(api.fetch(id))
+ *             succeeded = true
+ *         } catch(ioError : IOException) {
+ *             delay(backOffTime)
+ *             backOffTime *= minOf(backOffTime * 2, 60_000)
+ *         }
+ *     }
+ * }
+ *
+ * // a LiveData that tries to load the `User` from local cache first and then tries to fetch
+ * // from the server and also yields the updated value
+ * val user = liveData {
+ *     // dispatch loading first
+ *     yield(LOADING(id))
+ *     // check local storage
+ *     val cached = cache.loadUser(id)
+ *     if (cached != null) {
+ *         yield(cached)
+ *     }
+ *     if (cached == null || cached.isStale()) {
+ *         val fresh = api.fetch(id) // errors are ignored for brevity
+ *         cache.save(fresh)
+ *         yield(fresh)
+ *     }
+ * }
+ *
+ * // a LiveData that immediately receives a LiveData<User> from the database and yields it as a
+ * // source but also tries to back-fill the database from the server
+ * val user = liveData {
+ *     val fromDb: LiveData<User> = roomDatabase.loadUser(id)
+ *     yieldSource(fromDb)
+ *     val updated = api.fetch(id) // errors are ignored for brevity
+ *     // Since we are using Room here, updating the database will update the `fromDb` LiveData
+ *     // that was obtained above. See Room's documentation for more details.
+ *     // https://developer.android.com/training/data-storage/room/accessing-data#query-observable
+ *     roomDatabase.insert(updated)
+ * }
+ * ```
+ *
+ * @param context The CoroutineContext to run the given block in. Defaults to
+ * [EmptyCoroutineContext] combined with [Dispatchers.Main]
+ * @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
+ * ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
+ * @param block The block to run when the [LiveData] has active observers.
+ */
+@UseExperimental(ExperimentalTypeInference::class)
+fun <T> liveData(
+    context: CoroutineContext = EmptyCoroutineContext,
+    timeoutInMs: Long = DEFAULT_TIMEOUT,
+    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
+): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
\ No newline at end of file
diff --git a/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt b/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
new file mode 100644
index 0000000..15bcff4
--- /dev/null
+++ b/lifecycle/livedata/eap/src/test/java/androidx/lifecycle/BuildLiveDataTest.kt
@@ -0,0 +1,465 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.lifecycle
+
+import androidx.arch.core.executor.ArchTaskExecutor
+import androidx.arch.core.executor.TaskExecutor
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.ObsoleteCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
+import kotlinx.coroutines.test.TestCoroutineContext
+import kotlinx.coroutines.test.resetMain
+import kotlinx.coroutines.test.setMain
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicReference
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.coroutineContext
+
+@ObsoleteCoroutinesApi
+@RunWith(JUnit4::class)
+class BuildLiveDataTest {
+    @ObsoleteCoroutinesApi
+    private val mainContext = TestCoroutineContext("test-main-context")
+    private val mainScope = CoroutineScope(mainContext)
+    @ObsoleteCoroutinesApi
+    private val testContext = TestCoroutineContext("test-other-context")
+
+    @ExperimentalCoroutinesApi
+    @Before
+    fun initMain() {
+        lateinit var mainThread: Thread
+        runBlocking(mainContext) {
+            mainThread = Thread.currentThread()
+        }
+        Dispatchers.setMain(
+            mainContext[ContinuationInterceptor.Key] as CoroutineDispatcher
+        )
+        ArchTaskExecutor.getInstance().setDelegate(
+            object : TaskExecutor() {
+                override fun executeOnDiskIO(runnable: Runnable) {
+                    error("unsupported")
+                }
+
+                override fun postToMainThread(runnable: Runnable) {
+                    mainScope.launch {
+                        runnable.run()
+                    }
+                }
+
+                override fun isMainThread(): Boolean {
+                    return mainThread == Thread.currentThread()
+                }
+            }
+        )
+    }
+
+    @ExperimentalCoroutinesApi
+    @After
+    fun clear() {
+        advanceTimeBy(100000)
+        mainContext.assertExceptions("shouldn't have any exceptions") {
+            it.isEmpty()
+        }
+        testContext.assertExceptions("shouldn't have any exceptions") {
+            it.isEmpty()
+        }
+        ArchTaskExecutor.getInstance().setDelegate(null)
+        Dispatchers.resetMain()
+    }
+
+    @Test
+    fun oneShot() {
+        val liveData = liveData {
+            yield(3)
+        }
+        triggerAllActions()
+        assertThat(liveData.value).isNull()
+        liveData.addObserver().assertItems(3)
+    }
+
+    @Test
+    fun removeObserverInBetween() {
+        val ld = liveData(timeoutInMs = 10) {
+            yield(1)
+            yield(2)
+            delay(1000)
+            yield(3)
+        }
+        ld.addObserver().apply {
+            assertItems(1, 2)
+            unsubscribe()
+        }
+        // trigger cancellation
+        mainContext.advanceTimeBy(100)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems(2, 1, 2)
+            mainContext.advanceTimeBy(1001)
+            assertItems(2, 1, 2, 3)
+        }
+    }
+
+    @Test
+    fun removeObserverInBetween_largeTimeout() {
+        val ld = liveData(timeoutInMs = 10000) {
+            yield(1)
+            yield(2)
+            delay(1000)
+            yield(3)
+        }
+        ld.addObserver().apply {
+            assertItems(1, 2)
+            unsubscribe()
+        }
+        // advance some but not enough to cover the delay
+        mainContext.advanceTimeBy(500)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        assertThat(ld.value).isEqualTo(2)
+        ld.addObserver().apply {
+            assertItems(2)
+            // adnvace enough to cover the rest of the delay
+            mainContext.advanceTimeBy(501)
+            assertItems(2, 3)
+        }
+    }
+
+    @Test
+    fun ignoreCancelledYields() {
+        val cancelMutex = Mutex(true)
+        val ld = liveData(timeoutInMs = 0, context = testContext) {
+            yield(1)
+            cancelMutex.withLock {
+                yield(2)
+            }
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems(1)
+            unsubscribe()
+            cancelMutex.unlock()
+        }
+        // let cancellation take place
+        triggerAllActions()
+        // yield should immediately trigger cancellation to happen
+        assertThat(ld.value).isEqualTo(1)
+        assertThat(ld.hasActiveObservers()).isFalse()
+        // now because it was cancelled, re-observing should dispatch 1,1,2
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems(1, 1, 2)
+        }
+    }
+
+    @Test
+    fun readInitialValue() {
+        val initial = AtomicReference<Int?>()
+        val ld = liveData<Int>(testContext) {
+            initial.set(initialValue)
+        }
+        runOnMain {
+            ld.value = 3
+        }
+        ld.addObserver()
+        triggerAllActions()
+        assertThat(initial.get()).isEqualTo(3)
+    }
+
+    @Test
+    fun readInitialValue_ignoreYielded() {
+        val initial = AtomicReference<Int?>()
+        val ld = liveData<Int>(testContext) {
+            yield(5)
+            initial.set(initialValue)
+        }
+        ld.addObserver()
+        triggerAllActions()
+        assertThat(initial.get()).isNull()
+    }
+
+    @Test
+    fun readInitialValue_keepYieldedFromBefore() {
+        val initial = AtomicReference<Int?>()
+        val ld = liveData<Int>(testContext, 10) {
+            if (initialValue == null) {
+                yield(5)
+                delay(500000) // wait for cancellation
+            }
+
+            initial.set(initialValue)
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems(5)
+            unsubscribe()
+        }
+        triggerAllActions()
+        // wait for it to be cancelled
+        advanceTimeBy(10)
+        assertThat(initial.get()).isNull()
+        ld.addObserver()
+        triggerAllActions()
+        assertThat(initial.get()).isEqualTo(5)
+    }
+
+    @Test
+    fun yieldSource_simple() {
+        val odds = liveData {
+            (1..9 step 2).forEach {
+                yield(it)
+            }
+        }
+        val ld = liveData {
+            yieldSource(odds)
+        }
+        ld.addObserver().apply {
+            assertItems(1, 3, 5, 7, 9)
+        }
+    }
+
+    @Test
+    fun yieldSource_switchTwo() {
+        val doneOddsYield = Mutex(true)
+        val odds = liveData {
+            (1..9 step 2).forEach {
+                yield(it)
+            }
+            doneOddsYield.unlock()
+            delay(1)
+            yield(-1)
+        }
+        val evens = liveData {
+            (2..10 step 2).forEach {
+                yield(it)
+            }
+        }
+        val ld = liveData(testContext) {
+            yieldSource(odds)
+            doneOddsYield.lock()
+            yieldSource(evens)
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
+        }
+    }
+
+    @Test
+    fun yieldSource_yieldValue() {
+        val doneOddsYield = Mutex(true)
+        val odds = liveData(timeoutInMs = 0) {
+            (1..9 step 2).forEach {
+                yield(it)
+            }
+            doneOddsYield.unlock()
+            delay(1)
+            yield(-1)
+        }
+        val ld = liveData(testContext) {
+            yieldSource(odds)
+            doneOddsYield.lock()
+            yield(10)
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            advanceTimeBy(100)
+            assertItems(1, 3, 5, 7, 9, 10)
+        }
+    }
+
+    @Test
+    fun blockThrows() {
+        // use an exception handler instead of the test context exception handler to ensure that
+        // we do not re-run the block if its exception is gracefully caught
+        // TODO should we consider doing that ? But if we do, what is the rule? do we retry when
+        // it becomes active again or do we retry ourselves? better no do anything to be consistent.
+        val exception = CompletableDeferred<Throwable>()
+        val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+            exception.complete(throwable)
+        }
+        val ld = liveData(testContext + exceptionHandler, 10) {
+            if (exception.isActive) {
+                throw IllegalArgumentException("i like to fail")
+            } else {
+                yield(3)
+            }
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems()
+            runBlocking {
+                assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+            }
+            unsubscribe()
+        }
+        triggerAllActions()
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems()
+        }
+    }
+
+    @Test
+    fun blockCancelsItself() {
+        val didCancel = AtomicBoolean(false)
+        val unexpected = AtomicBoolean(false)
+
+        val ld = liveData<Int>(testContext, 10) {
+            if (didCancel.compareAndSet(false, true)) {
+                coroutineContext.cancel()
+            } else {
+                unexpected.set(true)
+            }
+        }
+        ld.addObserver().apply {
+            triggerAllActions()
+            assertItems()
+            unsubscribe()
+        }
+        assertThat(didCancel.get()).isTrue()
+        ld.addObserver()
+        // trigger cancelation
+        advanceTimeBy(11)
+        assertThat(unexpected.get()).isFalse()
+    }
+
+    @Test
+    fun blockThrows_switchMap() {
+        val exception = CompletableDeferred<Throwable>()
+        val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
+            exception.complete(throwable)
+        }
+        val src = MutableLiveData<Int>()
+        val ld = src.switchMap {
+            liveData(testContext + exceptionHandler) {
+                if (exception.isActive) {
+                    throw IllegalArgumentException("i like to fail")
+                } else {
+                    yield(3)
+                }
+            }
+        }
+        ld.addObserver().apply {
+            assertItems()
+            runOnMain {
+                src.value = 1
+            }
+            triggerAllActions()
+            runBlocking {
+                assertThat(exception.await()).hasMessageThat().contains("i like to fail")
+            }
+            runOnMain {
+                src.value = 2
+            }
+            triggerAllActions()
+            assertItems(3)
+        }
+    }
+
+    private fun triggerAllActions() {
+        do {
+            mainContext.triggerActions()
+            testContext.triggerActions()
+            val allIdle = listOf(mainContext, testContext).all {
+                it.isIdle()
+            }
+        } while (!allIdle)
+    }
+
+    private fun advanceTimeBy(time: Long) {
+        mainContext.advanceTimeBy(time)
+        testContext.advanceTimeBy(time)
+        triggerAllActions()
+    }
+
+    private fun TestCoroutineContext.isIdle(): Boolean {
+        val queueField = this::class.java
+            .getDeclaredField("queue")
+        queueField.isAccessible = true
+        val queue = queueField.get(this)
+        val peekMethod = queue::class.java
+            .getDeclaredMethod("peek")
+        val nextTask = peekMethod.invoke(queue) ?: return true
+        val timeField = nextTask::class.java.getDeclaredField("time")
+        timeField.isAccessible = true
+        val time = timeField.getLong(nextTask)
+        return time > now()
+    }
+
+    private fun <T> runOnMain(block: () -> T): T {
+        return runBlocking {
+            val async = mainScope.async {
+                block()
+            }
+            mainContext.triggerActions()
+            async.await()
+        }
+    }
+
+    private fun <T> LiveData<T>.addObserver(): CollectingObserver<T> {
+        return runOnMain {
+            val observer = CollectingObserver(this)
+            observeForever(observer)
+            observer
+        }
+    }
+
+    @Test
+    fun multipleValuesAndObservers() {
+        val ld = liveData {
+            yield(3)
+            yield(4)
+        }
+        ld.addObserver().assertItems(3, 4)
+        // re-observe, get latest value only
+        ld.addObserver().assertItems(4)
+    }
+
+    inner class CollectingObserver<T>(
+        private val liveData: LiveData<T>
+    ) : Observer<T> {
+        private var items = mutableListOf<T>()
+        override fun onChanged(t: T) {
+            items.add(t)
+        }
+
+        fun assertItems(vararg expected: T) {
+            assertThat(items).containsExactly(*expected)
+        }
+
+        fun unsubscribe() = runOnMain {
+            liveData.removeObserver(this)
+        }
+    }
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 7976ae5..eb9ed0a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -95,6 +95,7 @@
 includeProject(":lifecycle:lifecycle-livedata-core-ktx", "lifecycle/livedata-core/ktx")
 includeProject(":lifecycle:lifecycle-livedata", "lifecycle/livedata")
 includeProject(":lifecycle:lifecycle-livedata-ktx", "lifecycle/livedata/ktx")
+includeProject(":lifecycle:lifecycle-livedata-eap", "lifecycle/livedata/eap")
 includeProject(":lifecycle:lifecycle-process", "lifecycle/process")
 includeProject(":lifecycle:lifecycle-reactivestreams", "lifecycle/reactivestreams")
 includeProject(":lifecycle:lifecycle-reactivestreams-ktx", "lifecycle/reactivestreams/ktx")