[go: nahoru, domu]

Added datastore-rxjava3 and datastore-preferences-rxjava3.

I just copied the rxjava2 modules and replaced the imports with rxjava3 imports.

Test: Copied over tests from rxjava2 modules.
Relnote: This change adds the datastore-preferences-rxjava2 and datastore-preferences-rxjava3 modules
Bug: 170311106
Change-Id: If253fe05877f22b956cda2e2be27387ae76c9b33
diff --git a/datastore/datastore-preferences-rxjava3/api/current.txt b/datastore/datastore-preferences-rxjava3/api/current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+  public final class RxPreferenceDataStoreBuilder {
+    ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+    ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+    method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+  public final class RxPreferenceDataStoreBuilder {
+    ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+    ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+    method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/api/res-current.txt b/datastore/datastore-preferences-rxjava3/api/res-current.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/res-current.txt
diff --git a/datastore/datastore-preferences-rxjava3/api/restricted_current.txt b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+  public final class RxPreferenceDataStoreBuilder {
+    ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+    ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+    method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+    method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/build.gradle b/datastore/datastore-preferences-rxjava3/build.gradle
new file mode 100644
index 0000000..44af2c9
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/build.gradle
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.AndroidXExtension
+import androidx.build.Publish
+import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
+
+plugins {
+    id("AndroidXPlugin")
+    id("com.android.library")
+    id("kotlin-android")
+}
+
+android {
+    sourceSets {
+        test.java.srcDirs += 'src/test-common/java'
+        androidTest.java.srcDirs += 'src/test-common/java'
+    }
+}
+
+dependencies {
+    api(KOTLIN_STDLIB)
+    api(KOTLIN_COROUTINES_CORE)
+    api("androidx.annotation:annotation:1.1.0")
+    api(RX_JAVA3)
+
+    api(project(":datastore:datastore"))
+    api(project(":datastore:datastore-rxjava3"))
+    api(project(":datastore:datastore-preferences"))
+
+    implementation(KOTLIN_COROUTINES_RX3)
+
+    testImplementation(JUNIT)
+    testImplementation(KOTLIN_COROUTINES_TEST)
+    testImplementation(TRUTH)
+    testImplementation(project(":internal-testutils-truth"))
+
+    androidTestImplementation(project(":datastore:datastore-core"))
+    androidTestImplementation(project(":datastore:datastore"))
+    androidTestImplementation(JUNIT)
+    androidTestImplementation(project(":internal-testutils-truth"))
+    androidTestImplementation(ANDROIDX_TEST_RUNNER)
+    androidTestImplementation(ANDROIDX_TEST_CORE)
+}
+
+androidx {
+    name = "Android DataStore Core RxJava2 Wrappers"
+    publish = Publish.SNAPSHOT_AND_RELEASE
+    mavenGroup = LibraryGroups.DATASTORE
+    inceptionYear = "2020"
+    description = "Android DataStore Core - contains wrappers for using DataStore using RxJava2"
+    legacyDisableKotlinStrictApiMode = true
+}
diff --git a/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java
new file mode 100644
index 0000000..f72a779
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+
+import androidx.annotation.NonNull;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler;
+import androidx.datastore.preferences.core.MutablePreferences;
+import androidx.datastore.preferences.core.Preferences;
+import androidx.datastore.preferences.core.PreferencesFactory;
+import androidx.datastore.preferences.core.PreferencesKeys;
+import androidx.datastore.rxjava3.RxDataMigration;
+import androidx.datastore.rxjava3.RxDataStore;
+import androidx.test.core.app.ApplicationProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
+
+
+public class RxPreferencesDataStoreBuilderTest {
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final Preferences.Key<Integer> INTEGER_KEY =
+            PreferencesKeys.intKey("int_key");
+
+    private static Single<Preferences> incrementInteger(Preferences preferencesIn) {
+        MutablePreferences prefs = preferencesIn.toMutablePreferences();
+        Integer currentInt = prefs.get(INTEGER_KEY);
+        prefs.set(INTEGER_KEY, currentInt != null ? currentInt + 1 : 1);
+        return Single.just(prefs);
+    }
+
+    @Test
+    public void testConstructWithProduceFile() throws Exception {
+        File file = tempFolder.newFile("temp.preferences_pb");
+
+        DataStore<Preferences> dataStore =
+                new RxPreferenceDataStoreBuilder(() -> file).build();
+
+        Single<Preferences> incrementInt = RxDataStore.updateDataAsync(dataStore,
+                RxPreferencesDataStoreBuilderTest::incrementInteger);
+        assertThat(incrementInt.blockingGet().get(INTEGER_KEY)).isEqualTo(1);
+
+        // Construct it again and confirm that the data is still there:
+        dataStore = new RxPreferenceDataStoreBuilder(() -> file).build();
+
+        assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+                .isEqualTo(1);
+    }
+
+
+    @Test
+    public void testConstructWithContextAndName() throws Exception {
+
+        Context context = ApplicationProvider.getApplicationContext();
+        String name = "my_data_store";
+
+        File prefsFile = new File(context.getFilesDir().getPath()
+                + "/datastore/" + name + ".preferences_pb");
+        if (prefsFile.exists()) {
+            prefsFile.delete();
+        }
+
+        DataStore<Preferences> dataStore =
+                new RxPreferenceDataStoreBuilder(context, name).build();
+
+        Single<Preferences> set1 = RxDataStore.updateDataAsync(dataStore,
+                RxPreferencesDataStoreBuilderTest::incrementInteger);
+        assertThat(set1.blockingGet().get(INTEGER_KEY)).isEqualTo(1);
+
+        // Construct it again and confirm that the data is still there:
+        dataStore = new RxPreferenceDataStoreBuilder(context, name).build();
+        assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY)).isEqualTo(1);
+
+        // Construct it again with the expected file path and confirm that the data is there:
+        dataStore =
+                new RxPreferenceDataStoreBuilder(
+                        () ->
+                                new File(context.getFilesDir().getPath()
+                                        + "/datastore/" + name + ".preferences_pb")
+                ).build();
+
+        assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY)).isEqualTo(1);
+    }
+
+    @Test
+    public void testMigrationsAreInstalledAndRun() throws Exception {
+        RxDataMigration<Preferences> plusOneMigration = new RxDataMigration<Preferences>() {
+            @NonNull
+            @Override
+            public Single<Boolean> shouldMigrate(@NonNull Preferences currentData) {
+                return Single.just(true);
+            }
+
+            @NonNull
+            @Override
+            public Single<Preferences> migrate(@NonNull Preferences currentData) {
+                return incrementInteger(currentData);
+            }
+
+            @NonNull
+            @Override
+            public Completable cleanUp() {
+                return Completable.complete();
+            }
+        };
+
+        DataStore<Preferences> dataStore =
+                new RxPreferenceDataStoreBuilder(() ->
+                        tempFolder.newFile("temp.preferences_pb"))
+                        .addRxDataMigration(plusOneMigration)
+                        .build();
+
+        assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+                .isEqualTo(1);
+    }
+
+
+    @Test
+    public void testCorruptionHandlerIsUser() throws Exception {
+
+        File file = tempFolder.newFile("temp.preferences_pb");
+
+        try (FileOutputStream fileOutputStream = new FileOutputStream(file)) {
+            fileOutputStream.write(0); // will cause corruption exception
+        }
+
+        ReplaceFileCorruptionHandler<Preferences> replaceFileCorruptionHandler =
+                new ReplaceFileCorruptionHandler<Preferences>(exception -> {
+                    MutablePreferences mutablePreferences =
+                            PreferencesFactory.createMutable();
+                    mutablePreferences.set(INTEGER_KEY, 99);
+                    return (Preferences) mutablePreferences;
+                });
+
+
+        DataStore<Preferences> dataStore =
+                new RxPreferenceDataStoreBuilder(() -> file)
+                        .setCorruptionHandler(replaceFileCorruptionHandler)
+                        .build();
+
+        assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+                .isEqualTo(99);
+    }
+}
diff --git a/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml b/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..13f23ea
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  Copyright 2020 The Android Open Source Project
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+    package="androidx.datastore.preferences.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt
new file mode 100644
index 0000000..527cfb3
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.DataStore
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.PreferenceDataStoreFactory
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.preferences.createDataStore
+import androidx.datastore.rxjava3.RxDataMigration
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.rx3.asCoroutineDispatcher
+import kotlinx.coroutines.rx3.await
+import java.io.File
+import java.util.concurrent.Callable
+
+/**
+ * RxSharedPreferencesMigrationBuilder class for a DataStore that works on a single process.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxPreferenceDataStoreBuilder {
+
+    // Either produceFile or context & name must be set, but not both.
+    private var produceFile: Callable<File>? = null
+
+    private var context: Context? = null
+    private var name: String? = null
+
+    // Optional
+    private var ioScheduler: Scheduler = Schedulers.io()
+    private var corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null
+    private val dataMigrations: MutableList<DataMigration<Preferences>> = mutableListOf()
+
+    /**
+     * Create a RxPreferenceDataStoreBuilder with the callable which returns the File that
+     * DataStore acts on. The user is responsible for ensuring that there is never more than one
+     * DataStore acting on a file at a time.
+     *
+     * @param produceFile Function which returns the file that the new DataStore will act on. The
+     * function must return the same path every time. No two instances of DataStore should act on
+     * the same file at the same time.
+     */
+    public constructor(produceFile: Callable<File>) {
+        this.produceFile = produceFile
+    }
+
+    /**
+     * Create a RxPreferenceDataStoreBuilder with the Context and name from which to derive the
+     * DataStore file. The file is generated by See [Context.createDataStore] for more info. The
+     * user is responsible for ensuring that there is never more than one DataStore acting on a
+     * file at a time.
+     *
+     * @param context the context from which we retrieve files directory.
+     * @param name the filename relative to Context.filesDir that DataStore acts on. The File is
+     * obtained by calling File(this.filesDir, "datastore/$name.preferences_pb"). No two instances
+     * of DataStore should act on the same file at the same time.
+     */
+    public constructor(context: Context, name: String) {
+        this.context = context
+        this.name = name
+    }
+
+    /**
+     * Set the Scheduler on which to perform IO and transform operations. This is converted into
+     * a CoroutineDispatcher before being added to DataStore.
+     *
+     * This parameter is optional and defaults to Schedulers.io().
+     *
+     * @param ioScheduler the scheduler on which IO and transform operations are run
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setIoScheduler(ioScheduler: Scheduler): RxPreferenceDataStoreBuilder =
+        apply { this.ioScheduler = ioScheduler }
+
+    /**
+     * Sets the corruption handler to install into the DataStore.
+     *
+     * This parameter is optional and defaults to no corruption handler.
+     *
+     * @param corruptionHandler
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setCorruptionHandler(corruptionHandler: ReplaceFileCorruptionHandler<Preferences>):
+        RxPreferenceDataStoreBuilder = apply { this.corruptionHandler = corruptionHandler }
+
+    /**
+     * Add an RxDataMigration to the DataStore. Migrations are run in the order they are added.
+     *
+     * @param rxDataMigration the migration to add.
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun addRxDataMigration(rxDataMigration: RxDataMigration<Preferences>):
+        RxPreferenceDataStoreBuilder = apply {
+            this.dataMigrations.add(DataMigrationFromRxDataMigration(rxDataMigration))
+        }
+
+    /**
+     * Add a DataMigration to the Datastore. Migrations are run in the order they are added.
+     *
+     * @param dataMigration the migration to add
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun addDataMigration(dataMigration: DataMigration<Preferences>):
+        RxPreferenceDataStoreBuilder = apply {
+            this.dataMigrations.add(dataMigration)
+        }
+
+    /**
+     * Build the DataStore.
+     *
+     * @throws IllegalStateException if serializer is not set or if neither produceFile not
+     * context and name are set.
+     * @return the DataStore with the provided parameters
+     */
+    public fun build(): DataStore<Preferences> {
+        val scope = CoroutineScope(ioScheduler.asCoroutineDispatcher())
+
+        val produceFile: Callable<File>? = this.produceFile
+        val context: Context? = this.context
+        val name: String? = this.name
+
+        return if (produceFile != null) {
+            PreferenceDataStoreFactory.create(
+                produceFile = { produceFile.call() },
+                scope = CoroutineScope(
+                    ioScheduler.asCoroutineDispatcher()
+                ),
+                corruptionHandler = corruptionHandler,
+                migrations = dataMigrations
+            )
+        } else if (context != null && name != null) {
+            return context.createDataStore(
+                name = name,
+                scope = scope,
+                corruptionHandler = corruptionHandler,
+                migrations = dataMigrations
+            )
+        } else {
+            error("Either produceFile or context and name must be set. This should never happen.")
+        }
+    }
+}
+
+internal class DataMigrationFromRxDataMigration<T>(private val migration: RxDataMigration<T>) :
+    DataMigration<T> {
+    override suspend fun shouldMigrate(currentData: T): Boolean {
+        return migration.shouldMigrate(currentData).await()
+    }
+
+    override suspend fun migrate(currentData: T): T {
+        return migration.migrate(currentData).await()
+    }
+
+    override suspend fun cleanUp() {
+        migration.cleanUp().await()
+    }
+}
diff --git a/datastore/datastore-rxjava3/api/current.txt b/datastore/datastore-rxjava3/api/current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+  public interface RxDataMigration<T> {
+    method public io.reactivex.rxjava3.core.Completable cleanUp();
+    method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+    method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+  }
+
+  public final class RxDataStore {
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+  }
+
+  public final class RxDataStoreBuilder<T> {
+    ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+    ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+    method public androidx.datastore.core.DataStore<T> build();
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+  public interface RxSharedPreferencesMigration<T> {
+    method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+    method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+  }
+
+  public final class RxSharedPreferencesMigrationBuilder<T> {
+    ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+    method public androidx.datastore.core.DataMigration<T> build();
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+  }
+
+}
+
diff --git a/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+  public interface RxDataMigration<T> {
+    method public io.reactivex.rxjava3.core.Completable cleanUp();
+    method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+    method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+  }
+
+  public final class RxDataStore {
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+  }
+
+  public final class RxDataStoreBuilder<T> {
+    ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+    ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+    method public androidx.datastore.core.DataStore<T> build();
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+  public interface RxSharedPreferencesMigration<T> {
+    method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+    method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+  }
+
+  public final class RxSharedPreferencesMigrationBuilder<T> {
+    ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+    method public androidx.datastore.core.DataMigration<T> build();
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+  }
+
+}
+
diff --git a/datastore/datastore-rxjava3/api/res-current.txt b/datastore/datastore-rxjava3/api/res-current.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/res-current.txt
diff --git a/datastore/datastore-rxjava3/api/restricted_current.txt b/datastore/datastore-rxjava3/api/restricted_current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/restricted_current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+  public interface RxDataMigration<T> {
+    method public io.reactivex.rxjava3.core.Completable cleanUp();
+    method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+    method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+  }
+
+  public final class RxDataStore {
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+    method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+  }
+
+  public final class RxDataStoreBuilder<T> {
+    ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+    ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+    method public androidx.datastore.core.DataStore<T> build();
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+    method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+  }
+
+  public interface RxSharedPreferencesMigration<T> {
+    method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+    method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+  }
+
+  public final class RxSharedPreferencesMigrationBuilder<T> {
+    ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+    method public androidx.datastore.core.DataMigration<T> build();
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+    method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+  }
+
+}
+
diff --git a/datastore/datastore-rxjava3/build.gradle b/datastore/datastore-rxjava3/build.gradle
new file mode 100644
index 0000000..9f8e453
--- /dev/null
+++ b/datastore/datastore-rxjava3/build.gradle
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.AndroidXExtension
+import androidx.build.Publish
+import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
+
+plugins {
+    id("AndroidXPlugin")
+    id("com.android.library")
+    id("kotlin-android")
+}
+
+android {
+    sourceSets {
+        test.java.srcDirs += 'src/test-common/java'
+        androidTest.java.srcDirs += 'src/test-common/java'
+    }
+}
+
+dependencies {
+    api(KOTLIN_STDLIB)
+    api(KOTLIN_COROUTINES_CORE)
+    api("androidx.annotation:annotation:1.1.0")
+    api(RX_JAVA3)
+
+    api(project(":datastore:datastore"))
+
+    implementation(KOTLIN_COROUTINES_RX3)
+
+    testImplementation(JUNIT)
+    testImplementation(KOTLIN_COROUTINES_TEST)
+    testImplementation(TRUTH)
+    testImplementation(project(":internal-testutils-truth"))
+
+    androidTestImplementation(JUNIT)
+    androidTestImplementation(project(":internal-testutils-truth"))
+    androidTestImplementation(ANDROIDX_TEST_RUNNER)
+    androidTestImplementation(ANDROIDX_TEST_CORE)
+}
+
+androidx {
+    name = "Android DataStore Core RxJava2 Wrappers"
+    publish = Publish.SNAPSHOT_AND_RELEASE
+    mavenGroup = LibraryGroups.DATASTORE
+    inceptionYear = "2020"
+    description = "Android DataStore Core - contains wrappers for using DataStore using RxJava2"
+    legacyDisableKotlinStrictApiMode = true
+}
diff --git a/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml b/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml
new file mode 100644
index 0000000..3369992
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  Copyright 2020 The Android Open Source Project
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+    package="androidx.datastore.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java
new file mode 100644
index 0000000..0564f77
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package androidx.datastore.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+
+import androidx.annotation.NonNull;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler;
+import androidx.test.core.app.ApplicationProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class RxDataStoreBuilderTest {
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static Single<Byte> incrementByte(Byte byteIn) {
+        return Single.just(++byteIn);
+    }
+
+    @Test
+    public void testConstructWithProduceFile() throws Exception {
+        File file = tempFolder.newFile();
+        DataStore<Byte> dataStore =
+                new RxDataStoreBuilder<Byte>(() -> file, new TestingSerializer())
+                        .build();
+        Single<Byte> incrementByte = RxDataStore.updateDataAsync(dataStore,
+                RxDataStoreBuilderTest::incrementByte);
+        assertThat(incrementByte.blockingGet()).isEqualTo(1);
+        // Construct it again and confirm that the data is still there:
+        dataStore =
+                new RxDataStoreBuilder<Byte>(() -> file, new TestingSerializer())
+                        .build();
+        assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+    }
+
+    @Test
+    public void testConstructWithContextAndName() throws Exception {
+        Context context = ApplicationProvider.getApplicationContext();
+        String name = "my_data_store";
+        DataStore<Byte> dataStore =
+                new RxDataStoreBuilder<Byte>(context, name, new TestingSerializer())
+                        .build();
+        Single<Byte> set1 = RxDataStore.updateDataAsync(dataStore, input -> Single.just((byte) 1));
+        assertThat(set1.blockingGet()).isEqualTo(1);
+        // Construct it again and confirm that the data is still there:
+        dataStore =
+                new RxDataStoreBuilder<Byte>(context, name, new TestingSerializer())
+                        .build();
+        assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+        // Construct it again with the expected file path and confirm that the data is there:
+        dataStore =
+                new RxDataStoreBuilder<Byte>(() -> new File(context.getFilesDir().getPath()
+                        + "/datastore/" + name), new TestingSerializer()
+                )
+                        .build();
+        assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+    }
+
+    @Test
+    public void testMigrationsAreInstalledAndRun() throws Exception {
+        RxDataMigration<Byte> plusOneMigration = new RxDataMigration<Byte>() {
+            @NonNull
+            @Override
+            public Single<Boolean> shouldMigrate(@NonNull Byte currentData) {
+                return Single.just(true);
+            }
+
+            @NonNull
+            @Override
+            public Single<Byte> migrate(@NonNull Byte currentData) {
+                return incrementByte(currentData);
+            }
+
+            @NonNull
+            @Override
+            public Completable cleanUp() {
+                return Completable.complete();
+            }
+        };
+
+        DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(
+                () -> tempFolder.newFile(), new TestingSerializer())
+                .addRxDataMigration(plusOneMigration)
+                .build();
+
+        assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+    }
+
+    @Test
+    public void testSpecifiedSchedulerIsUser() throws Exception {
+        Scheduler singleThreadedScheduler =
+                Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "TestingThread");
+                    }
+                }));
+
+
+        DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(() -> tempFolder.newFile(),
+                new TestingSerializer())
+                .setIoScheduler(singleThreadedScheduler)
+                .build();
+        Single<Byte> update = RxDataStore.updateDataAsync(dataStore, input -> {
+            Thread currentThread = Thread.currentThread();
+            assertThat(currentThread.getName()).isEqualTo("TestingThread");
+            return Single.just(input);
+        });
+        assertThat(update.blockingGet()).isEqualTo((byte) 0);
+        Single<Byte> subsequentUpdate = RxDataStore.updateDataAsync(dataStore, input -> {
+            Thread currentThread = Thread.currentThread();
+            assertThat(currentThread.getName()).isEqualTo("TestingThread");
+            return Single.just(input);
+        });
+        assertThat(subsequentUpdate.blockingGet()).isEqualTo((byte) 0);
+    }
+
+    @Test
+    public void testCorruptionHandlerIsUser() {
+        TestingSerializer testingSerializer = new TestingSerializer();
+        testingSerializer.setFailReadWithCorruptionException(true);
+        ReplaceFileCorruptionHandler<Byte> replaceFileCorruptionHandler =
+                new ReplaceFileCorruptionHandler<Byte>(exception -> (byte) 99);
+
+
+        DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(
+                () -> tempFolder.newFile(),
+                testingSerializer)
+                .setCorruptionHandler(replaceFileCorruptionHandler)
+                .build();
+        assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(99);
+    }
+}
diff --git a/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java
new file mode 100644
index 0000000..b66d6ef
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import static androidx.testutils.AssertionsKt.assertThrows;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+import android.content.SharedPreferences;
+
+import androidx.datastore.core.DataMigration;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.migrations.SharedPreferencesView;
+import androidx.test.core.app.ApplicationProvider;
+
+import com.google.common.truth.Truth;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import io.reactivex.rxjava3.core.Single;
+
+public class RxSharedPreferencesMigrationTest {
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    private final String mSharedPrefsName = "shared_prefs_name";
+
+
+    private Context mContext;
+    private SharedPreferences mSharedPrefs;
+    private File mDatastoreFile;
+
+    @Before
+    public void setUp() throws Exception {
+        mContext = ApplicationProvider.getApplicationContext();
+        mSharedPrefs = mContext.getSharedPreferences(mSharedPrefsName, Context.MODE_PRIVATE);
+        mDatastoreFile = temporaryFolder.newFile("test_file.preferences_pb");
+
+        assertThat(mSharedPrefs.edit().clear().commit()).isTrue();
+    }
+
+    @Test
+    public void testShouldMigrateSkipsMigration() {
+        RxSharedPreferencesMigration<Byte> skippedMigration =
+                new RxSharedPreferencesMigration<Byte>() {
+                    @NotNull
+                    @Override
+                    public Single<Boolean> shouldMigrate(Byte currentData) {
+                        return Single.just(false);
+                    }
+
+                    @NotNull
+                    @Override
+                    public Single<Byte> migrate(
+                            @NotNull SharedPreferencesView sharedPreferencesView,
+                            Byte currentData) {
+                        return Single.error(
+                                new IllegalStateException("We shouldn't reach this point!"));
+                    }
+                };
+
+
+        DataMigration<Byte> spMigration =
+                getSpMigrationBuilder(skippedMigration).build();
+
+        DataStore<Byte> dataStoreWithMigrations = getDataStoreWithMigration(spMigration);
+
+        Truth.assertThat(RxDataStore.data(dataStoreWithMigrations).blockingFirst()).isEqualTo(0);
+    }
+
+    @Test
+    public void testSharedPrefsViewContainsSpecifiedKeys() {
+        String includedKey = "key1";
+        int includedVal = 99;
+        String notMigratedKey = "key2";
+
+        assertThat(mSharedPrefs.edit().putInt(includedKey, includedVal).putInt(notMigratedKey,
+                123).commit()).isTrue();
+
+        DataMigration<Byte> dataMigration =
+                getSpMigrationBuilder(
+                        new DefaultMigration() {
+                            @NotNull
+                            @Override
+                            public Single<Byte> migrate(
+                                    @NotNull SharedPreferencesView sharedPreferencesView,
+                                    Byte currentData) {
+                                assertThat(sharedPreferencesView.contains(includedKey)).isTrue();
+                                assertThat(sharedPreferencesView.getAll().size()).isEqualTo(1);
+                                assertThrows(IllegalStateException.class,
+                                        () -> sharedPreferencesView.getInt(notMigratedKey, -1));
+
+                                return Single.just((byte) 50);
+                            }
+                        }
+                ).setKeysToMigrate(includedKey).build();
+
+        DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+
+        assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(50);
+
+        assertThat(mSharedPrefs.contains(includedKey)).isFalse();
+        assertThat(mSharedPrefs.contains(notMigratedKey)).isTrue();
+    }
+
+
+    @Test
+    public void testSharedPrefsViewWithAllKeysSpecified() {
+        String includedKey = "key1";
+        String includedKey2 = "key2";
+        int value = 99;
+
+        assertThat(mSharedPrefs.edit().putInt(includedKey, value).putInt(includedKey2,
+                value).commit()).isTrue();
+
+        DataMigration<Byte> dataMigration =
+                getSpMigrationBuilder(
+                        new DefaultMigration() {
+                            @NotNull
+                            @Override
+                            public Single<Byte> migrate(
+                                    @NotNull SharedPreferencesView sharedPreferencesView,
+                                    Byte currentData) {
+                                assertThat(sharedPreferencesView.contains(includedKey)).isTrue();
+                                assertThat(sharedPreferencesView.contains(includedKey2)).isTrue();
+                                assertThat(sharedPreferencesView.getAll().size()).isEqualTo(2);
+
+                                return Single.just((byte) 50);
+                            }
+                        }
+                ).build();
+
+        DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+
+        assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(50);
+
+        assertThat(mSharedPrefs.contains(includedKey)).isFalse();
+        assertThat(mSharedPrefs.contains(includedKey2)).isFalse();
+    }
+
+    @Test
+    public void testDeletesEmptySharedPreferences() {
+        String key = "key";
+        String value = "value";
+        assertThat(mSharedPrefs.edit().putString(key, value).commit()).isTrue();
+
+        DataMigration<Byte> dataMigration =
+                getSpMigrationBuilder(new DefaultMigration()).setDeleteEmptyPreferences(
+                        true).build();
+        DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+        assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(0);
+
+        // Check that the shared preferences files are deleted
+        File prefsDir = new File(mContext.getApplicationInfo().dataDir, "shared_prefs");
+        File prefsFile = new File(prefsDir, mSharedPrefsName + ".xml");
+        File backupPrefsFile = new File(prefsFile.getPath() + ".bak");
+        assertThat(prefsFile.exists()).isFalse();
+        assertThat(backupPrefsFile.exists()).isFalse();
+    }
+
+    private RxSharedPreferencesMigrationBuilder<Byte> getSpMigrationBuilder(
+            RxSharedPreferencesMigration<Byte> rxSharedPreferencesMigration) {
+        return new RxSharedPreferencesMigrationBuilder<Byte>(mContext, mSharedPrefsName,
+                rxSharedPreferencesMigration);
+    }
+
+    private DataStore<Byte> getDataStoreWithMigration(DataMigration<Byte> dataMigration) {
+        return new RxDataStoreBuilder<Byte>(() -> mDatastoreFile, new TestingSerializer())
+                .addDataMigration(dataMigration).build();
+    }
+
+
+    private static class DefaultMigration implements RxSharedPreferencesMigration<Byte> {
+
+        @NotNull
+        @Override
+        public Single<Boolean> shouldMigrate(Byte currentData) {
+            return Single.just(true);
+        }
+
+        @NotNull
+        @Override
+        public Single<Byte> migrate(@NotNull SharedPreferencesView sharedPreferencesView,
+                Byte currentData) {
+            return Single.just(currentData);
+        }
+    }
+}
diff --git a/datastore/datastore-rxjava3/src/main/AndroidManifest.xml b/datastore/datastore-rxjava3/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..3369992
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  Copyright 2020 The Android Open Source Project
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+    package="androidx.datastore.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java
new file mode 100644
index 0000000..ade2acc
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import androidx.annotation.NonNull;
+import androidx.annotation.Nullable;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
+
+/**
+ * Interface for migrations to DataStore. Methods on this migration ([shouldMigrate], [migrate]
+ * and [cleanUp]) may be called multiple times, so their implementations must be idempotent.
+ * These methods may be called multiple times if DataStore encounters issues when writing the
+ * newly migrated data to disk or if any migration installed in the same DataStore throws an
+ * Exception.
+ *
+ * If you're migrating from SharedPreferences see [SharedPreferencesMigration].
+ *
+ * @param <T> the exception type
+ */
+public interface RxDataMigration<T> {
+
+    /**
+     * Return whether this migration needs to be performed. If this returns false, no migration or
+     * cleanup will occur. Apps should do the cheapest possible check to determine if this migration
+     * should run, since this will be called every time the DataStore is initialized. This method
+     * may be run multiple times when any failure is encountered.
+     *
+     * Note that this will always be called before each call to [migrate].
+     *
+     * @param currentData the current data (which might already populated from previous runs of this
+     *                    or other migrations). Only Nullable if the type used with DataStore is
+     *                    Nullable.
+     */
+    @NonNull
+    Single<Boolean> shouldMigrate(@Nullable T currentData);
+
+    /**
+     * Perform the migration. Implementations should be idempotent since this may be called
+     * multiple times. If migrate fails, DataStore will not commit any data to disk, cleanUp will
+     * not be called, and the exception will be propagated back to the DataStore call that
+     * triggered the migration. Future calls to DataStore will result in DataMigrations being
+     * attempted again. This method may be run multiple times when any failure is encountered.
+     *
+     * Note that this will always be called before a call to [cleanUp].
+     *
+     * @param currentData the current data (it might be populated from other migrations or from
+     *                    manual changes before this migration was added to the app). Only
+     *                    Nullable if the type used with DataStore is Nullable.
+     * @return The migrated data.
+     */
+    @NonNull
+    Single<T> migrate(@Nullable T currentData);
+
+    /**
+     * Clean up any old state/data that was migrated into the DataStore. This will not be called
+     * if the migration fails. If cleanUp throws an exception, the exception will be propagated
+     * back to the DataStore call that triggered the migration and future calls to DataStore will
+     * result in DataMigrations being attempted again. This method may be run multiple times when
+     * any failure is encountered.
+     */
+    @NonNull
+    Completable cleanUp();
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt
new file mode 100644
index 0000000..467e039
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@file:JvmName("RxDataStore")
+
+package androidx.datastore.rxjava3
+
+import androidx.datastore.core.DataStore
+import io.reactivex.rxjava3.core.Flowable
+import io.reactivex.rxjava3.core.Single
+import io.reactivex.rxjava3.functions.Function
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.rx3.asFlowable
+import kotlinx.coroutines.rx3.asSingle
+import kotlinx.coroutines.rx3.await
+
+/**
+ * Gets a reactivex.Flowable of the data from DataStore. See [DataStore.data] for more information.
+ *
+ * Provides efficient, cached (when possible) access to the latest durably persisted state.
+ * The flow will always either emit a value or throw an exception encountered when attempting
+ * to read from disk. If an exception is encountered, collecting again will attempt to read the
+ * data again.
+ *
+ * Do not layer a cache on top of this API: it will be be impossible to guarantee consistency.
+ * Instead, use data.first() to access a single snapshot.
+ *
+ * The Flowable will complete with an IOException when an exception is encountered when reading
+ * data.
+ *
+ * @return a flow representing the current state of the data
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> DataStore<T>.data(): Flowable<T> {
+    return this.data.asFlowable()
+}
+
+/**
+ * See [DataStore.updateData]
+ *
+ * Updates the data transactionally in an atomic read-modify-write operation. All operations
+ * are serialized, and the transform itself is a async so it can perform heavy work
+ * such as RPCs.
+ *
+ * The Single completes when the data has been persisted durably to disk (after which
+ * [data] will reflect the update). If the transform or write to disk fails, the
+ * transaction is aborted and the returned Single is completed with the error.
+ *
+ * The transform will be run on the scheduler that DataStore was constructed with.
+ *
+ * @return the snapshot returned by the transform
+ * @throws Exception when thrown by the transform function
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> DataStore<T>.updateDataAsync(transform: Function<T, Single<T>>): Single<T> {
+    return CoroutineScope(Dispatchers.Unconfined).async {
+        this@updateDataAsync.updateData {
+            transform.apply(it).await()
+        }
+    }.asSingle(Dispatchers.Unconfined)
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt
new file mode 100644
index 0000000..139bbca
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.DataStore
+import androidx.datastore.core.DataStoreFactory
+import androidx.datastore.core.Serializer
+import androidx.datastore.createDataStore
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.rx3.asCoroutineDispatcher
+import kotlinx.coroutines.rx3.await
+import java.io.File
+import java.util.concurrent.Callable
+
+/**
+ * RxSharedPreferencesMigrationBuilder class for a DataStore that works on a single process.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxDataStoreBuilder<T> {
+
+    /**
+     * Create a RxDataStoreBuilder with the callable which returns the File that DataStore acts on.
+     * The user is responsible for ensuring that there is never more than one DataStore acting on
+     * a file at a time.
+     *
+     * @param produceFile Function which returns the file that the new DataStore will act on. The
+     * function must return the same path every time. No two instances of DataStore should act on
+     * the same file at the same time.
+     * @param serializer the serializer for the type that this DataStore acts on.
+     */
+    public constructor(produceFile: Callable<File>, serializer: Serializer<T>) {
+        this.produceFile = produceFile
+        this.serializer = serializer
+    }
+
+    /**
+     * Create a RxDataStoreBuilder with the Context and name from which to derive the DataStore
+     * file. The file is generated by See [Context.createDataStore] for more info. The user is
+     * responsible for ensuring that there is never more than one DataStore acting on a file at a
+     * time.
+     *
+     * @param context the context from which we retrieve files directory.
+     * @param fileName the filename relative to Context.filesDir that DataStore acts on. The File is
+     * obtained by calling File(context.filesDir, fileName). No two instances of DataStore should
+     * act on the same file at the same time.
+     * @param serializer the serializer for the type that this DataStore acts on.
+     */
+    public constructor(context: Context, fileName: String, serializer: Serializer<T>) {
+        this.context = context
+        this.name = fileName
+        this.serializer = serializer
+    }
+
+    // Either produceFile or context & name must be set, but not both. This is enforced by the
+    // two constructors.
+    private var produceFile: Callable<File>? = null
+
+    private var context: Context? = null
+    private var name: String? = null
+
+    // Required. This is enforced by the constructors.
+    private var serializer: Serializer<T>? = null
+
+    // Optional
+    private var ioScheduler: Scheduler = Schedulers.io()
+    private var corruptionHandler: ReplaceFileCorruptionHandler<T>? = null
+    private val dataMigrations: MutableList<DataMigration<T>> = mutableListOf()
+
+    /**
+     * Set the Scheduler on which to perform IO and transform operations. This is converted into
+     * a CoroutineDispatcher before being added to DataStore.
+     *
+     * This parameter is optional and defaults to Schedulers.io().
+     *
+     * @param ioScheduler the scheduler on which IO and transform operations are run
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setIoScheduler(ioScheduler: Scheduler): RxDataStoreBuilder<T> =
+        apply { this.ioScheduler = ioScheduler }
+
+    /**
+     * Sets the corruption handler to install into the DataStore.
+     *
+     * This parameter is optional and defaults to no corruption handler.
+     *
+     * @param corruptionHandler
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setCorruptionHandler(corruptionHandler: ReplaceFileCorruptionHandler<T>):
+        RxDataStoreBuilder<T> = apply { this.corruptionHandler = corruptionHandler }
+
+    /**
+     * Add an RxDataMigration to the DataStore. Migrations are run in the order they are added.
+     *
+     * @param rxDataMigration the migration to add.
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun addRxDataMigration(rxDataMigration: RxDataMigration<T>): RxDataStoreBuilder<T> =
+        apply {
+            this.dataMigrations.add(DataMigrationFromRxDataMigration(rxDataMigration))
+        }
+
+    /**
+     * Add a DataMigration to the Datastore. Migrations are run in the order they are added.
+     *
+     * @param dataMigration the migration to add
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun addDataMigration(dataMigration: DataMigration<T>): RxDataStoreBuilder<T> = apply {
+        this.dataMigrations.add(dataMigration)
+    }
+
+    /**
+     * Build the DataStore.
+     *
+     * @return the DataStore with the provided parameters
+     */
+    public fun build(): DataStore<T> {
+        val scope = CoroutineScope(ioScheduler.asCoroutineDispatcher())
+
+        return if (produceFile != null) {
+            DataStoreFactory.create(
+                produceFile = { produceFile!!.call() },
+                serializer = serializer!!,
+                scope = CoroutineScope(
+                    ioScheduler.asCoroutineDispatcher()
+                ),
+                corruptionHandler = corruptionHandler,
+                migrations = dataMigrations
+            )
+        } else if (context != null && name != null) {
+            return context!!.createDataStore(
+                fileName = name!!,
+                serializer = serializer!!,
+                scope = scope,
+                corruptionHandler = corruptionHandler,
+                migrations = dataMigrations
+            )
+        } else {
+            error(
+                "Either produceFile or context and name must be set. This should never happen."
+            )
+        }
+    }
+}
+
+internal class DataMigrationFromRxDataMigration<T>(private val migration: RxDataMigration<T>) :
+    DataMigration<T> {
+    override suspend fun shouldMigrate(currentData: T): Boolean {
+        return migration.shouldMigrate(currentData).await()
+    }
+
+    override suspend fun migrate(currentData: T): T {
+        return migration.migrate(currentData).await()
+    }
+
+    override suspend fun cleanUp() {
+        migration.cleanUp().await()
+    }
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt
new file mode 100644
index 0000000..e5f71c6
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.migrations.SharedPreferencesMigration
+import androidx.datastore.migrations.SharedPreferencesView
+import io.reactivex.rxjava3.core.Single
+import kotlinx.coroutines.rx3.await
+
+/**
+ * Client implemented migration interface.
+ **/
+public interface RxSharedPreferencesMigration<T> {
+    /**
+     * Whether or not the migration should be run. This can be used to skip a read from the
+     * SharedPreferences.
+     *
+     * @param currentData the most recently persisted data
+     * @return a Single indicating whether or not the migration should be run.
+     */
+    public fun shouldMigrate(currentData: T): Single<Boolean> {
+        return Single.just(true)
+    }
+
+    /**
+     * Maps SharedPreferences into T. Implementations should be idempotent
+     * since this may be called multiple times. See [DataMigration.migrate] for more
+     * information. The method accepts a SharedPreferencesView which is the view of the
+     * SharedPreferences to migrate from (limited to [keysToMigrate] and a T which represent
+     * the current data. The function must return the migrated data.
+     *
+     * @param sharedPreferencesView the current state of the SharedPreferences
+     * @param currentData the most recently persisted data
+     * @return a Single of the updated data
+     */
+    public fun migrate(sharedPreferencesView: SharedPreferencesView, currentData: T): Single<T>
+}
+
+/**
+ * RxSharedPreferencesMigrationBuilder for the RxSharedPreferencesMigration.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxSharedPreferencesMigrationBuilder<T>
+/**
+ * Construct a RxSharedPreferencesMigrationBuilder.
+ *
+ * @param context the Context used for getting the SharedPreferences.
+ * @param sharedPreferencesName the name of the SharedPreference from which to migrate.
+ * @param rxSharedPreferencesMigration the user implemented migration for this SharedPreference.
+ */
+constructor(
+    private val context: Context,
+    private val sharedPreferencesName: String,
+    private val rxSharedPreferencesMigration: RxSharedPreferencesMigration<T>
+) {
+
+    /** Optional */
+    private var deleteEmptyPreference: Boolean = true
+    private var keysToMigrate: Set<String>? = null
+
+    /**
+     * Set the list of keys to migrate. The keys will be mapped to datastore.Preferences with
+     * their same values. If the key is already present in the new Preferences, the key
+     * will not be migrated again. If the key is not present in the SharedPreferences it
+     * will not be migrated.
+     *
+     * This method is optional and if keysToMigrate is not set, all keys will be migrated from the
+     * existing SharedPreferences.
+     *
+     * @param keys the keys to migrate
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setKeysToMigrate(vararg keys: String):
+        RxSharedPreferencesMigrationBuilder<T> = apply {
+            keysToMigrate = setOf(*keys)
+        }
+
+    /**
+     * If enabled and the SharedPreferences are empty (i.e. no remaining
+     * keys) after this migration runs, the leftover SharedPreferences file is deleted. Note that
+     * this cleanup runs only if the migration itself runs, i.e., if the keys were never in
+     * SharedPreferences to begin with then the (potentially) empty SharedPreferences
+     * won't be cleaned up by this option. This functionality is best effort - if there
+     * is an issue deleting the SharedPreferences file it will be silently ignored.
+     *
+     * This method is optional and defaults to true.
+     *
+     * @param deleteEmptyPreferences whether or not to delete the empty shared preferences file
+     * @return this
+     */
+    @Suppress("MissingGetterMatchingBuilder")
+    public fun setDeleteEmptyPreferences(deleteEmptyPreferences: Boolean):
+        RxSharedPreferencesMigrationBuilder<T> = apply {
+            this.deleteEmptyPreference = deleteEmptyPreferences
+        }
+
+    public fun build(): DataMigration<T> {
+        return SharedPreferencesMigration(
+            context = context,
+            sharedPreferencesName = sharedPreferencesName,
+            migrate = { spView, curData ->
+                rxSharedPreferencesMigration.migrate(spView, curData).await()
+            },
+            keysToMigrate = keysToMigrate,
+            deleteEmptyPreferences = deleteEmptyPreference,
+            shouldRunMigration = { curData ->
+                rxSharedPreferencesMigration.shouldMigrate(curData).await()
+            }
+        )
+    }
+}
diff --git a/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt b/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt
new file mode 100644
index 0000000..f1e02bb
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import androidx.datastore.core.CorruptionException
+import androidx.datastore.core.Serializer
+import java.io.IOException
+import java.io.InputStream
+import java.io.OutputStream
+
+class TestingSerializer(
+    @Volatile var failReadWithCorruptionException: Boolean = false,
+    @Volatile var failingRead: Boolean = false,
+    @Volatile var failingWrite: Boolean = false
+) : Serializer<Byte> {
+    override fun readFrom(input: InputStream): Byte {
+        if (failReadWithCorruptionException) {
+            throw CorruptionException(
+                "CorruptionException",
+                IOException()
+            )
+        }
+
+        if (failingRead) {
+            throw IOException("I was asked to fail on reads")
+        }
+
+        val read = input.read()
+        if (read == -1) {
+            return 0
+        }
+        return read.toByte()
+    }
+
+    override fun writeTo(t: Byte, output: OutputStream) {
+        if (failingWrite) {
+            throw IOException("I was asked to fail on writes")
+        }
+        output.write(t.toInt())
+    }
+
+    override val defaultValue: Byte = 0
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java b/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java
new file mode 100644
index 0000000..83c05a8
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.DataStoreFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+import kotlinx.coroutines.CoroutineScopeKt;
+import kotlinx.coroutines.Dispatchers;
+
+public class RxDataStoreTest {
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static Single<Byte> incrementByte(Byte byteIn) {
+        return Single.just(++byteIn);
+    }
+
+    @Test
+    public void testGetSingleValue() throws Exception {
+        File newFile = tempFolder.newFile();
+
+        DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+                new TestingSerializer(),
+                null,
+                new ArrayList<>(),
+                CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+                () -> newFile);
+
+        Byte firstByte = RxDataStore.data(byteStore).blockingFirst();
+        assertThat(firstByte).isEqualTo(0);
+
+        Single<Byte> incrementByte = RxDataStore.updateDataAsync(byteStore,
+                RxDataStoreTest::incrementByte);
+
+        assertThat(incrementByte.blockingGet()).isEqualTo(1);
+
+        firstByte = RxDataStore.data(byteStore).blockingFirst();
+        assertThat(firstByte).isEqualTo(1);
+    }
+
+    @Test
+    public void testTake3() throws Exception {
+        File newFile = tempFolder.newFile();
+
+        DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+                new TestingSerializer(),
+                null,
+                new ArrayList<>(),
+                CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+                () -> newFile);
+
+        TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+        RxDataStore.updateDataAsync(byteStore, RxDataStoreTest::incrementByte);
+        RxDataStore.updateDataAsync(byteStore, RxDataStoreTest::incrementByte);
+
+        testSubscriber.awaitCount(3).assertValues((byte) 0, (byte) 1, (byte) 2);
+    }
+
+
+    @Test
+    public void testReadFailure() throws Exception {
+        File newFile = tempFolder.newFile();
+        TestingSerializer testingSerializer = new TestingSerializer();
+
+        DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+                testingSerializer,
+                null,
+                new ArrayList<>(),
+                CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+                () -> newFile);
+
+        testingSerializer.setFailingRead(true);
+
+        TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+        assertThat(testSubscriber.await(5, TimeUnit.SECONDS)).isTrue();
+
+        testSubscriber.assertError(IOException.class);
+
+        testingSerializer.setFailingRead(false);
+
+        testSubscriber = RxDataStore.data(byteStore).test();
+        testSubscriber.awaitCount(1).assertValues((byte) 0);
+    }
+
+    @Test
+    public void testWriteFailure() throws Exception {
+        File newFile = tempFolder.newFile();
+        TestingSerializer testingSerializer = new TestingSerializer();
+
+        DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+                testingSerializer,
+                null,
+                new ArrayList<>(),
+                CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+                () -> newFile);
+
+        TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+        testingSerializer.setFailingWrite(true);
+        Single<Byte> incrementByte = RxDataStore.updateDataAsync(byteStore,
+                RxDataStoreTest::incrementByte);
+
+        incrementByte.cache().test().await().assertError(IOException.class);
+
+        testSubscriber.awaitCount(1).assertNoErrors().assertValues((byte) 0);
+        testingSerializer.setFailingWrite(false);
+
+        Single<Byte> incrementByte2 = RxDataStore.updateDataAsync(byteStore,
+                RxDataStoreTest::incrementByte);
+        assertThat(incrementByte2.blockingGet()).isEqualTo((byte) 1);
+
+        testSubscriber.awaitCount(2).assertValues((byte) 0, (byte) 1);
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index 035913c..72c87d0 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -287,8 +287,11 @@
         "datastore/datastore-preferences-core/datastore-preferences-proto", [BuildType.MAIN])
 includeProject(":datastore:datastore-preferences-rxjava2",
         "datastore/datastore-preferences-rxjava2", [BuildType.MAIN])
+includeProject(":datastore:datastore-preferences-rxjava3",
+        "datastore/datastore-preferences-rxjava3", [BuildType.MAIN])
 includeProject(":datastore:datastore-proto", "datastore/datastore-proto", [BuildType.MAIN])
 includeProject(":datastore:datastore-rxjava2", "datastore/datastore-rxjava2", [BuildType.MAIN])
+includeProject(":datastore:datastore-rxjava3", "datastore/datastore-rxjava3", [BuildType.MAIN])
 includeProject(":datastore:datastore-sampleapp", "datastore/datastore-sampleapp", [BuildType.MAIN])
 includeProject(":documentfile:documentfile", "documentfile/documentfile", [BuildType.MAIN])
 includeProject(":drawerlayout:drawerlayout", "drawerlayout/drawerlayout", [BuildType.MAIN])