[go: nahoru, domu]

RxDataStore property delegates

Test: See new Delegate Tests
Bug: 173726702
Relnote: Added property delegates for RxDataStore users.
Change-Id: Ied7680e698bc98b385b4bb4f33b6481963e7589a
diff --git a/datastore/datastore-preferences-rxjava2/api/current.txt b/datastore/datastore-preferences-rxjava2/api/current.txt
index f92d774..23d54a0 100644
--- a/datastore/datastore-preferences-rxjava2/api/current.txt
+++ b/datastore/datastore-preferences-rxjava2/api/current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava2.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava2/api/public_plus_experimental_current.txt b/datastore/datastore-preferences-rxjava2/api/public_plus_experimental_current.txt
index f92d774..23d54a0 100644
--- a/datastore/datastore-preferences-rxjava2/api/public_plus_experimental_current.txt
+++ b/datastore/datastore-preferences-rxjava2/api/public_plus_experimental_current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava2.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava2/api/restricted_current.txt b/datastore/datastore-preferences-rxjava2/api/restricted_current.txt
index f92d774..23d54a0 100644
--- a/datastore/datastore-preferences-rxjava2/api/restricted_current.txt
+++ b/datastore/datastore-preferences-rxjava2/api/restricted_current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava2.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava2/src/androidTest/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegateTest.kt b/datastore/datastore-preferences-rxjava2/src/androidTest/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegateTest.kt
new file mode 100644
index 0000000..0051f52
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava2/src/androidTest/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegateTest.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava2
+
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.preferences.core.intPreferencesKey
+import androidx.datastore.preferences.core.mutablePreferencesOf
+import androidx.datastore.preferences.core.preferencesOf
+import androidx.test.core.app.ApplicationProvider
+import com.google.common.truth.Truth.assertThat
+import io.reactivex.Single
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import java.io.File
+import java.io.FileOutputStream
+
+val intKey = intPreferencesKey("int_key")
+
+val Context.rxDataStore by rxPreferencesDataStore("file1")
+
+val Context.rxdsWithMigration by rxPreferencesDataStore(
+    "file2",
+    migrations = listOf(object : DataMigration<Preferences> {
+        override suspend fun shouldMigrate(currentData: Preferences) = true
+        override suspend fun migrate(currentData: Preferences) = preferencesOf(intKey to 123)
+        override suspend fun cleanUp() {}
+    })
+)
+
+val Context.rxdsWithCorruptionHandler by rxPreferencesDataStore(
+    "file3",
+    corruptionHandler = ReplaceFileCorruptionHandler { preferencesOf(intKey to 123) }
+)
+
+val Context.rxDataStoreForFileNameCheck by rxPreferencesDataStore("file5")
+
+@ExperimentalCoroutinesApi
+class RxPreferenceDataStoreDelegateTest {
+    @get:Rule
+    val tmp = TemporaryFolder()
+
+    private lateinit var context: Context
+
+    @Before
+    fun setUp() {
+        context = ApplicationProvider.getApplicationContext()
+        File(context.filesDir, "/datastore").deleteRecursively()
+    }
+
+    @Test
+    fun testBasic() {
+        assertThat(context.rxDataStore.data().blockingFirst()).isEqualTo(preferencesOf())
+        assertThat(
+            context.rxDataStore.updateDataAsync {
+                Single.just(mutablePreferencesOf(intKey to 123))
+            }.blockingGet()
+        ).isEqualTo(mutablePreferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testWithMigration() {
+        assertThat(
+            context.rxdsWithMigration.data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testCorruptedRunsCorruptionHandler() {
+        // File needs to exist or we don't actually hit the serializer:
+        File(context.filesDir, "datastore/file3.preferences_pb").let { file ->
+            file.parentFile!!.mkdirs()
+            FileOutputStream(file).use {
+                it.write(0) // 0 first byte isn't a valid proto
+            }
+        }
+
+        assertThat(
+            context.rxdsWithCorruptionHandler.data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testCorrectFileNameUsed() {
+        context.rxDataStoreForFileNameCheck.updateDataAsync {
+            Single.just(preferencesOf(intKey to 99))
+        }.blockingGet()
+
+        context.rxDataStoreForFileNameCheck.dispose()
+        context.rxDataStoreForFileNameCheck.shutdownComplete().blockingGet()
+
+        assertThat(
+            RxPreferenceDataStoreBuilder {
+                File(context.filesDir, "datastore/file5.preferences_pb")
+            }.build().data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 99))
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-preferences-rxjava2/src/main/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegate.kt b/datastore/datastore-preferences-rxjava2/src/main/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegate.kt
new file mode 100644
index 0000000..087a5f8
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava2/src/main/java/androidx/datastore/preferences/rxjava2/RxPreferenceDataStoreDelegate.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava2
+
+import android.content.Context
+import androidx.annotation.GuardedBy
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.rxjava2.RxDataStore
+import io.reactivex.Scheduler
+import io.reactivex.schedulers.Schedulers
+import kotlin.properties.ReadOnlyProperty
+import kotlin.reflect.KProperty
+
+/**
+ * Creates a property delegate for a single process Preferences DataStore. This should only be
+ * called once
+ * in a file (at the top level), and all usages of the DataStore should use a reference the same
+ * Instance. The receiver type for the property delegate must be an instance of [Context].
+ *
+ * Example usage:
+ * ```
+ * val Context.myRxDataStore by rxPreferencesDataStore("filename", serializer)
+ *
+ * class SomeClass(val context: Context) {
+ *    fun update(): Single<Preferences> = context.myRxDataStore.updateDataAsync {...}
+ * }
+ * ```
+ *
+ *
+ * @param name The name of the preferences. The preferences will be stored in a file obtained
+ * by calling: File(context.filesDir, "datastore/" + name + ".preferences_pb")
+ * @param corruptionHandler The corruptionHandler is invoked if DataStore encounters a
+ * [androidx.datastore.core.CorruptionException] when attempting to read data. CorruptionExceptions
+ * are thrown by serializers when data can not be de-serialized.
+ * @param migrations are run before any access to data can occur. Each producer and migration
+ * may be run more than once whether or not it already succeeded (potentially because another
+ * migration failed or a write to disk failed.)
+ * @param scheduler The scope in which IO operations and transform functions will execute.
+ *
+ * @return a property delegate that manages a datastore as a singleton.
+ */
+@JvmOverloads
+public fun rxPreferencesDataStore(
+    name: String,
+    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
+    migrations: List<DataMigration<Preferences>> = listOf(),
+    scheduler: Scheduler = Schedulers.io()
+): ReadOnlyProperty<Context, RxDataStore<Preferences>> {
+    return RxDataStoreSingletonDelegate(name, corruptionHandler, migrations, scheduler)
+}
+
+/**
+ * Delegate class to manage DataStore as a singleton.
+ */
+internal class RxDataStoreSingletonDelegate internal constructor(
+    private val fileName: String,
+    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
+    private val migrations: List<DataMigration<Preferences>>,
+    private val scheduler: Scheduler
+) : ReadOnlyProperty<Context, RxDataStore<Preferences>> {
+
+    private val lock = Any()
+
+    @GuardedBy("lock")
+    @Volatile
+    private var INSTANCE: RxDataStore<Preferences>? = null
+
+    /**
+     * Gets the instance of the DataStore.
+     *
+     * @param thisRef must be an instance of [Context]
+     * @param property not used
+     */
+    override fun getValue(thisRef: Context, property: KProperty<*>): RxDataStore<Preferences> {
+        return INSTANCE ?: synchronized(lock) {
+            if (INSTANCE == null) {
+                INSTANCE = with(RxPreferenceDataStoreBuilder(thisRef, fileName)) {
+                    setIoScheduler(scheduler)
+                    migrations.forEach { addDataMigration(it) }
+                    corruptionHandler?.let { setCorruptionHandler(it) }
+                    build()
+                }
+            }
+            INSTANCE!!
+        }
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-preferences-rxjava3/api/current.txt b/datastore/datastore-preferences-rxjava3/api/current.txt
index 768c965..66b6f82 100644
--- a/datastore/datastore-preferences-rxjava3/api/current.txt
+++ b/datastore/datastore-preferences-rxjava3/api/current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
index 768c965..66b6f82 100644
--- a/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
+++ b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava3/api/restricted_current.txt b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
index 768c965..66b6f82 100644
--- a/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
+++ b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
@@ -11,5 +11,12 @@
     method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxPreferenceDataStoreDelegateKt {
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences>> migrations);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences>? corruptionHandler);
+    method public static kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<androidx.datastore.preferences.core.Preferences>> rxPreferencesDataStore(String name);
+  }
+
 }
 
diff --git a/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegateTest.kt b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegateTest.kt
new file mode 100644
index 0000000..55d77da
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegateTest.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3
+
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.preferences.core.intPreferencesKey
+import androidx.datastore.preferences.core.mutablePreferencesOf
+import androidx.datastore.preferences.core.preferencesOf
+import androidx.test.core.app.ApplicationProvider
+import com.google.common.truth.Truth.assertThat
+import io.reactivex.rxjava3.core.Single
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import java.io.File
+import java.io.FileOutputStream
+
+val intKey = intPreferencesKey("int_key")
+
+val Context.rxDataStore by rxPreferencesDataStore("file1")
+
+val Context.rxdsWithMigration by rxPreferencesDataStore(
+    "file2",
+    migrations = listOf(object : DataMigration<Preferences> {
+        override suspend fun shouldMigrate(currentData: Preferences) = true
+        override suspend fun migrate(currentData: Preferences) = preferencesOf(intKey to 123)
+        override suspend fun cleanUp() {}
+    })
+)
+
+val Context.rxdsWithCorruptionHandler by rxPreferencesDataStore(
+    "file3",
+    corruptionHandler = ReplaceFileCorruptionHandler { preferencesOf(intKey to 123) }
+)
+
+val Context.rxDataStoreForFileNameCheck by rxPreferencesDataStore("file5")
+
+@ExperimentalCoroutinesApi
+class RxPreferenceDataStoreDelegateTest {
+    @get:Rule
+    val tmp = TemporaryFolder()
+
+    private lateinit var context: Context
+
+    @Before
+    fun setUp() {
+        context = ApplicationProvider.getApplicationContext()
+        File(context.filesDir, "/datastore").deleteRecursively()
+    }
+
+    @Test
+    fun testBasic() {
+        assertThat(context.rxDataStore.data().blockingFirst()).isEqualTo(preferencesOf())
+        assertThat(
+            context.rxDataStore.updateDataAsync {
+                Single.just(mutablePreferencesOf(intKey to 123))
+            }.blockingGet()
+        ).isEqualTo(mutablePreferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testWithMigration() {
+        assertThat(
+            context.rxdsWithMigration.data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testCorruptedRunsCorruptionHandler() {
+        // File needs to exist or we don't actually hit the serializer:
+        File(context.filesDir, "datastore/file3.preferences_pb").let { file ->
+            file.parentFile!!.mkdirs()
+            FileOutputStream(file).use {
+                it.write(0) // 0 first byte isn't a valid proto
+            }
+        }
+
+        assertThat(
+            context.rxdsWithCorruptionHandler.data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 123))
+    }
+
+    @Test
+    fun testCorrectFileNameUsed() {
+        context.rxDataStoreForFileNameCheck.updateDataAsync {
+            Single.just(preferencesOf(intKey to 99))
+        }.blockingGet()
+
+        context.rxDataStoreForFileNameCheck.dispose()
+        context.rxDataStoreForFileNameCheck.shutdownComplete().blockingAwait()
+
+        assertThat(
+            RxPreferenceDataStoreBuilder {
+                File(context.filesDir, "datastore/file5.preferences_pb")
+            }.build().data().blockingFirst()
+        ).isEqualTo(preferencesOf(intKey to 99))
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegate.kt b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegate.kt
new file mode 100644
index 0000000..344a256
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreDelegate.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3
+
+import android.content.Context
+import androidx.annotation.GuardedBy
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.rxjava3.RxDataStore
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlin.properties.ReadOnlyProperty
+import kotlin.reflect.KProperty
+
+/**
+ * Creates a property delegate for a single process Preferences DataStore. This should only be
+ * called once
+ * in a file (at the top level), and all usages of the DataStore should use a reference the same
+ * Instance. The receiver type for the property delegate must be an instance of [Context].
+ *
+ * Example usage:
+ * ```
+ * val Context.myRxDataStore by rxPreferencesDataStore("filename", serializer)
+ *
+ * class SomeClass(val context: Context) {
+ *    fun update(): Single<Preferences> = context.myRxDataStore.updateDataAsync {...}
+ * }
+ * ```
+ *
+ *
+ * @param name The name of the preferences. The preferences will be stored in a file obtained
+ * by calling: File(context.filesDir, "datastore/" + name + ".preferences_pb")
+ * @param corruptionHandler The corruptionHandler is invoked if DataStore encounters a
+ * [androidx.datastore.core.CorruptionException] when attempting to read data. CorruptionExceptions
+ * are thrown by serializers when data can not be de-serialized.
+ * @param migrations are run before any access to data can occur. Each producer and migration
+ * may be run more than once whether or not it already succeeded (potentially because another
+ * migration failed or a write to disk failed.)
+ * @param scheduler The scope in which IO operations and transform functions will execute.
+ *
+ * @return a property delegate that manages a datastore as a singleton.
+ */
+@JvmOverloads
+public fun rxPreferencesDataStore(
+    name: String,
+    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
+    migrations: List<DataMigration<Preferences>> = listOf(),
+    scheduler: Scheduler = Schedulers.io()
+): ReadOnlyProperty<Context, RxDataStore<Preferences>> {
+    return RxDataStoreSingletonDelegate(name, corruptionHandler, migrations, scheduler)
+}
+
+/**
+ * Delegate class to manage DataStore as a singleton.
+ */
+internal class RxDataStoreSingletonDelegate internal constructor(
+    private val fileName: String,
+    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
+    private val migrations: List<DataMigration<Preferences>>,
+    private val scheduler: Scheduler
+) : ReadOnlyProperty<Context, RxDataStore<Preferences>> {
+
+    private val lock = Any()
+
+    @GuardedBy("lock")
+    @Volatile
+    private var INSTANCE: RxDataStore<Preferences>? = null
+
+    /**
+     * Gets the instance of the DataStore.
+     *
+     * @param thisRef must be an instance of [Context]
+     * @param property not used
+     */
+    override fun getValue(thisRef: Context, property: KProperty<*>): RxDataStore<Preferences> {
+        return INSTANCE ?: synchronized(lock) {
+            if (INSTANCE == null) {
+                INSTANCE = with(RxPreferenceDataStoreBuilder(thisRef, fileName)) {
+                    setIoScheduler(scheduler)
+                    migrations.forEach { addDataMigration(it) }
+                    corruptionHandler?.let { setCorruptionHandler(it) }
+                    build()
+                }
+            }
+            INSTANCE!!
+        }
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava2/api/current.txt b/datastore/datastore-rxjava2/api/current.txt
index b55ea08..7d801c2 100644
--- a/datastore/datastore-rxjava2/api/current.txt
+++ b/datastore/datastore-rxjava2/api/current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava2.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava2/api/public_plus_experimental_current.txt b/datastore/datastore-rxjava2/api/public_plus_experimental_current.txt
index b55ea08..7d801c2 100644
--- a/datastore/datastore-rxjava2/api/public_plus_experimental_current.txt
+++ b/datastore/datastore-rxjava2/api/public_plus_experimental_current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava2.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava2/api/restricted_current.txt b/datastore/datastore-rxjava2/api/restricted_current.txt
index b55ea08..7d801c2 100644
--- a/datastore/datastore-rxjava2/api/restricted_current.txt
+++ b/datastore/datastore-rxjava2/api/restricted_current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava2.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava2.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava2/src/androidTest/java/androidx/datastore/rxjava2/RxDataStoreDelegateTest.kt b/datastore/datastore-rxjava2/src/androidTest/java/androidx/datastore/rxjava2/RxDataStoreDelegateTest.kt
new file mode 100644
index 0000000..1c2ae15
--- /dev/null
+++ b/datastore/datastore-rxjava2/src/androidTest/java/androidx/datastore/rxjava2/RxDataStoreDelegateTest.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava2
+
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.test.core.app.ApplicationProvider
+import com.google.common.truth.Truth.assertThat
+import io.reactivex.Single
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import java.io.File
+import java.io.FileOutputStream
+
+val Context.rxDataStore by rxDataStore("file1", TestingSerializer())
+
+val Context.rxdsWithMigration by rxDataStore(
+    "file2", TestingSerializer(),
+    migrations = listOf(object : DataMigration<Byte> {
+        override suspend fun shouldMigrate(currentData: Byte) = true
+        override suspend fun migrate(currentData: Byte) = 123.toByte()
+        override suspend fun cleanUp() {}
+    })
+)
+
+val Context.rxdsWithCorruptionHandler by rxDataStore(
+    "file3",
+    TestingSerializer(failReadWithCorruptionException = true),
+    corruptionHandler = ReplaceFileCorruptionHandler { 123 }
+)
+
+val Context.rxDataStoreForFileNameCheck by rxDataStore("file4", TestingSerializer())
+
+@ExperimentalCoroutinesApi
+class RxDataStoreDelegateTest {
+    @get:Rule
+    val tmp = TemporaryFolder()
+
+    private lateinit var context: Context
+
+    @Before
+    fun setUp() {
+        context = ApplicationProvider.getApplicationContext()
+        File(context.filesDir, "/datastore").deleteRecursively()
+    }
+
+    @Test
+    fun testBasic() {
+        assertThat(context.rxDataStore.data().blockingFirst()).isEqualTo(0)
+        assertThat(context.rxDataStore.updateDataAsync { Single.just(it.inc()) }.blockingGet())
+            .isEqualTo(1)
+    }
+
+    @Test
+    fun testWithMigration() {
+        assertThat(context.rxdsWithMigration.data().blockingFirst()).isEqualTo(123)
+    }
+
+    @Test
+    fun testCorruptedRunsCorruptionHandler() {
+        // File needs to exist or we don't actually hit the serializer:
+        File(context.filesDir, "datastore/file3").let { file ->
+            file.parentFile!!.mkdirs()
+            FileOutputStream(file).use {
+                it.write(0)
+            }
+        }
+
+        assertThat(context.rxdsWithCorruptionHandler.data().blockingFirst()).isEqualTo(123)
+    }
+
+    @Test
+    fun testCorrectFileNameUsed() {
+        context.rxDataStoreForFileNameCheck.updateDataAsync { Single.just(it.inc()) }.blockingGet()
+        context.rxDataStoreForFileNameCheck.dispose()
+        context.rxDataStoreForFileNameCheck.shutdownComplete().blockingGet()
+
+        assertThat(
+            RxDataStoreBuilder(
+                { File(context.filesDir, "datastore/file4") }, TestingSerializer()
+            ).build().data().blockingFirst()
+        ).isEqualTo(1)
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava2/src/main/java/androidx/datastore/rxjava2/RxDataStoreDelegate.kt b/datastore/datastore-rxjava2/src/main/java/androidx/datastore/rxjava2/RxDataStoreDelegate.kt
new file mode 100644
index 0000000..238528c
--- /dev/null
+++ b/datastore/datastore-rxjava2/src/main/java/androidx/datastore/rxjava2/RxDataStoreDelegate.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava2
+
+import android.content.Context
+import androidx.annotation.GuardedBy
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.Serializer
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import io.reactivex.Scheduler
+import io.reactivex.schedulers.Schedulers
+import kotlin.properties.ReadOnlyProperty
+import kotlin.reflect.KProperty
+
+/**
+ * Creates a property delegate for a single process DataStore. This should only be called once
+ * in a file (at the top level), and all usages of the DataStore should use a reference the same
+ * Instance. The receiver type for the property delegate must be an instance of [Context].
+ *
+ * Example usage:
+ * ```
+ * val Context.myRxDataStore by rxDataStore("filename", serializer)
+ *
+ * class SomeClass(val context: Context) {
+ *    fun update(): Single<Settings> = context.myRxDataStore.updateDataAsync {...}
+ * }
+ * ```
+ *
+ * @param fileName the filename relative to Context.filesDir that DataStore acts on. The File is
+ * obtained by calling File(context.filesDir, "datastore/$fileName")). No two instances of DataStore
+ * should act on the same file at the same time.
+ * @param corruptionHandler The corruptionHandler is invoked if DataStore encounters a
+ * [androidx.datastore.core.CorruptionException] when attempting to read data. CorruptionExceptions
+ * are thrown by serializers when data can not be de-serialized.
+ * @param migrations are run before any access to data can occur. Each producer and migration
+ * may be run more than once whether or not it already succeeded (potentially because another
+ * migration failed or a write to disk failed.)
+ * @param scheduler The scheduler in which IO operations and transform functions will execute.
+ *
+ * @return a property delegate that manages a datastore as a singleton.
+ */
+@JvmOverloads
+public fun <T : Any> rxDataStore(
+    fileName: String,
+    serializer: Serializer<T>,
+    corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
+    migrations: List<DataMigration<T>> = listOf(),
+    scheduler: Scheduler = Schedulers.io()
+): ReadOnlyProperty<Context, RxDataStore<T>> {
+    return RxDataStoreSingletonDelegate(
+        fileName,
+        serializer,
+        corruptionHandler,
+        migrations,
+        scheduler
+    )
+}
+
+/**
+ * Delegate class to manage DataStore as a singleton.
+ */
+internal class RxDataStoreSingletonDelegate<T : Any> internal constructor(
+    private val fileName: String,
+    private val serializer: Serializer<T>,
+    private val corruptionHandler: ReplaceFileCorruptionHandler<T>?,
+    private val migrations: List<DataMigration<T>>,
+    private val scheduler: Scheduler
+) : ReadOnlyProperty<Context, RxDataStore<T>> {
+
+    private val lock = Any()
+
+    @GuardedBy("lock")
+    @Volatile
+    private var INSTANCE: RxDataStore<T>? = null
+
+    /**
+     * Gets the instance of the DataStore.
+     *
+     * @param thisRef must be an instance of [Context]
+     * @param property not used
+     */
+    override fun getValue(thisRef: Context, property: KProperty<*>): RxDataStore<T> {
+        return INSTANCE ?: synchronized(lock) {
+            if (INSTANCE == null) {
+                INSTANCE = with(RxDataStoreBuilder(thisRef, fileName, serializer)) {
+                    setIoScheduler(scheduler)
+                    migrations.forEach { addDataMigration(it) }
+                    corruptionHandler?.let { setCorruptionHandler(it) }
+                    build()
+                }
+            }
+            INSTANCE!!
+        }
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava3/api/current.txt b/datastore/datastore-rxjava3/api/current.txt
index 2a6f395..a74eecb 100644
--- a/datastore/datastore-rxjava3/api/current.txt
+++ b/datastore/datastore-rxjava3/api/current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
index 2a6f395..a74eecb 100644
--- a/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
+++ b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava3/api/restricted_current.txt b/datastore/datastore-rxjava3/api/restricted_current.txt
index 2a6f395..a74eecb 100644
--- a/datastore/datastore-rxjava3/api/restricted_current.txt
+++ b/datastore/datastore-rxjava3/api/restricted_current.txt
@@ -25,6 +25,13 @@
     method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
   }
 
+  public final class RxDataStoreDelegateKt {
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations, optional io.reactivex.rxjava3.core.Scheduler scheduler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler, optional java.util.List<? extends androidx.datastore.core.DataMigration<T>> migrations);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer, optional androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T>? corruptionHandler);
+    method public static <T> kotlin.properties.ReadOnlyProperty<android.content.Context,androidx.datastore.rxjava3.RxDataStore<T>> rxDataStore(String fileName, androidx.datastore.core.Serializer<T> serializer);
+  }
+
   public interface RxSharedPreferencesMigration<T> {
     method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
     method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
diff --git a/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreDelegateTest.kt b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreDelegateTest.kt
new file mode 100644
index 0000000..2fc4b06
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreDelegateTest.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.test.core.app.ApplicationProvider
+import com.google.common.truth.Truth.assertThat
+import io.reactivex.rxjava3.core.Single
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.Before
+import org.junit.Rule
+import org.junit.Test
+import org.junit.rules.TemporaryFolder
+import java.io.File
+import java.io.FileOutputStream
+
+val Context.rxDataStore by rxDataStore("file1", TestingSerializer())
+
+val Context.rxdsWithMigration by rxDataStore(
+    "file2", TestingSerializer(),
+    migrations = listOf(object : DataMigration<Byte> {
+        override suspend fun shouldMigrate(currentData: Byte) = true
+        override suspend fun migrate(currentData: Byte) = 123.toByte()
+        override suspend fun cleanUp() {}
+    })
+)
+
+val Context.rxdsWithCorruptionHandler by rxDataStore(
+    "file3",
+    TestingSerializer(failReadWithCorruptionException = true),
+    corruptionHandler = ReplaceFileCorruptionHandler { 123 }
+)
+
+val Context.rxDataStoreForFileNameCheck by rxDataStore("file4", TestingSerializer())
+
+@ExperimentalCoroutinesApi
+class RxDataStoreDelegateTest {
+    @get:Rule
+    val tmp = TemporaryFolder()
+
+    private lateinit var context: Context
+
+    @Before
+    fun setUp() {
+        context = ApplicationProvider.getApplicationContext()
+        File(context.filesDir, "/datastore").deleteRecursively()
+    }
+
+    @Test
+    fun testBasic() {
+        assertThat(context.rxDataStore.data().blockingFirst()).isEqualTo(0)
+        assertThat(context.rxDataStore.updateDataAsync { Single.just(it.inc()) }.blockingGet())
+            .isEqualTo(1)
+    }
+
+    @Test
+    fun testWithMigration() {
+        assertThat(context.rxdsWithMigration.data().blockingFirst()).isEqualTo(123)
+    }
+
+    @Test
+    fun testCorruptedRunsCorruptionHandler() {
+        // File needs to exist or we don't actually hit the serializer:
+        File(context.filesDir, "datastore/file3").let { file ->
+            file.parentFile!!.mkdirs()
+            FileOutputStream(file).use {
+                it.write(0)
+            }
+        }
+
+        assertThat(context.rxdsWithCorruptionHandler.data().blockingFirst()).isEqualTo(123)
+    }
+
+    @Test
+    fun testCorrectFileNameUsed() {
+        context.rxDataStoreForFileNameCheck.updateDataAsync { Single.just(it.inc()) }.blockingGet()
+        context.rxDataStoreForFileNameCheck.dispose()
+        context.rxDataStoreForFileNameCheck.shutdownComplete().blockingAwait()
+
+        assertThat(
+            RxDataStoreBuilder(
+                { File(context.filesDir, "datastore/file4") }, TestingSerializer()
+            ).build().data().blockingFirst()
+        ).isEqualTo(1)
+    }
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreDelegate.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreDelegate.kt
new file mode 100644
index 0000000..38ef4db
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreDelegate.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.content.Context
+import androidx.annotation.GuardedBy
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.Serializer
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlin.properties.ReadOnlyProperty
+import kotlin.reflect.KProperty
+
+/**
+ * Creates a property delegate for a single process DataStore. This should only be called once
+ * in a file (at the top level), and all usages of the DataStore should use a reference the same
+ * Instance. The receiver type for the property delegate must be an instance of [Context].
+ *
+ * Example usage:
+ * ```
+ * val Context.myRxDataStore by rxDataStore("filename", serializer)
+ *
+ * class SomeClass(val context: Context) {
+ *    fun update(): Single<Settings> = context.myRxDataStore.updateDataAsync {...}
+ * }
+ * ```
+ *
+ * @param fileName the filename relative to Context.filesDir that DataStore acts on. The File is
+ * obtained by calling File(context.filesDir, "datastore/$fileName")). No two instances of DataStore
+ * should act on the same file at the same time.
+ * @param corruptionHandler The corruptionHandler is invoked if DataStore encounters a
+ * [androidx.datastore.core.CorruptionException] when attempting to read data. CorruptionExceptions
+ * are thrown by serializers when data can not be de-serialized.
+ * @param migrations are run before any access to data can occur. Each producer and migration
+ * may be run more than once whether or not it already succeeded (potentially because another
+ * migration failed or a write to disk failed.)
+ * @param scheduler The scheduler in which IO operations and transform functions will execute.
+ *
+ * @return a property delegate that manages a datastore as a singleton.
+ */
+@JvmOverloads
+public fun <T : Any> rxDataStore(
+    fileName: String,
+    serializer: Serializer<T>,
+    corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
+    migrations: List<DataMigration<T>> = listOf(),
+    scheduler: Scheduler = Schedulers.io()
+): ReadOnlyProperty<Context, RxDataStore<T>> {
+    return RxDataStoreSingletonDelegate(
+        fileName,
+        serializer,
+        corruptionHandler,
+        migrations,
+        scheduler
+    )
+}
+
+/**
+ * Delegate class to manage DataStore as a singleton.
+ */
+internal class RxDataStoreSingletonDelegate<T : Any> internal constructor(
+    private val fileName: String,
+    private val serializer: Serializer<T>,
+    private val corruptionHandler: ReplaceFileCorruptionHandler<T>?,
+    private val migrations: List<DataMigration<T>>,
+    private val scheduler: Scheduler
+) : ReadOnlyProperty<Context, RxDataStore<T>> {
+
+    private val lock = Any()
+
+    @GuardedBy("lock")
+    @Volatile
+    private var INSTANCE: RxDataStore<T>? = null
+
+    /**
+     * Gets the instance of the DataStore.
+     *
+     * @param thisRef must be an instance of [Context]
+     * @param property not used
+     */
+    override fun getValue(thisRef: Context, property: KProperty<*>): RxDataStore<T> {
+        return INSTANCE ?: synchronized(lock) {
+            if (INSTANCE == null) {
+                INSTANCE = with(RxDataStoreBuilder(thisRef, fileName, serializer)) {
+                    setIoScheduler(scheduler)
+                    migrations.forEach { addDataMigration(it) }
+                    corruptionHandler?.let { setCorruptionHandler(it) }
+                    build()
+                }
+            }
+            INSTANCE!!
+        }
+    }
+}
\ No newline at end of file