Added datastore-rxjava3 and datastore-preferences-rxjava3.
I just copied the rxjava2 modules and replaced the imports with rxjava3 imports.
Test: Copied over tests from rxjava2 modules.
Relnote: This change adds the datastore-preferences-rxjava2 and datastore-preferences-rxjava3 modules
Bug: 170311106
Change-Id: If253fe05877f22b956cda2e2be27387ae76c9b33
diff --git a/datastore/datastore-preferences-rxjava3/api/current.txt b/datastore/datastore-preferences-rxjava3/api/current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+ public final class RxPreferenceDataStoreBuilder {
+ ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+ ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+ method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/public_plus_experimental_current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+ public final class RxPreferenceDataStoreBuilder {
+ ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+ ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+ method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/api/res-current.txt b/datastore/datastore-preferences-rxjava3/api/res-current.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/res-current.txt
diff --git a/datastore/datastore-preferences-rxjava3/api/restricted_current.txt b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
new file mode 100644
index 0000000..88fe233
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/api/restricted_current.txt
@@ -0,0 +1,15 @@
+// Signature format: 4.0
+package androidx.datastore.preferences.rxjava3 {
+
+ public final class RxPreferenceDataStoreBuilder {
+ ctor public RxPreferenceDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile);
+ ctor public RxPreferenceDataStoreBuilder(android.content.Context context, String name);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addDataMigration(androidx.datastore.core.DataMigration<androidx.datastore.preferences.core.Preferences> dataMigration);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<androidx.datastore.preferences.core.Preferences> rxDataMigration);
+ method public androidx.datastore.core.DataStore<androidx.datastore.preferences.core.Preferences> build();
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<androidx.datastore.preferences.core.Preferences> corruptionHandler);
+ method public androidx.datastore.preferences.rxjava3.RxPreferenceDataStoreBuilder setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+}
+
diff --git a/datastore/datastore-preferences-rxjava3/build.gradle b/datastore/datastore-preferences-rxjava3/build.gradle
new file mode 100644
index 0000000..44af2c9
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/build.gradle
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.AndroidXExtension
+import androidx.build.Publish
+import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
+
+plugins {
+ id("AndroidXPlugin")
+ id("com.android.library")
+ id("kotlin-android")
+}
+
+android {
+ sourceSets {
+ test.java.srcDirs += 'src/test-common/java'
+ androidTest.java.srcDirs += 'src/test-common/java'
+ }
+}
+
+dependencies {
+ api(KOTLIN_STDLIB)
+ api(KOTLIN_COROUTINES_CORE)
+ api("androidx.annotation:annotation:1.1.0")
+ api(RX_JAVA3)
+
+ api(project(":datastore:datastore"))
+ api(project(":datastore:datastore-rxjava3"))
+ api(project(":datastore:datastore-preferences"))
+
+ implementation(KOTLIN_COROUTINES_RX3)
+
+ testImplementation(JUNIT)
+ testImplementation(KOTLIN_COROUTINES_TEST)
+ testImplementation(TRUTH)
+ testImplementation(project(":internal-testutils-truth"))
+
+ androidTestImplementation(project(":datastore:datastore-core"))
+ androidTestImplementation(project(":datastore:datastore"))
+ androidTestImplementation(JUNIT)
+ androidTestImplementation(project(":internal-testutils-truth"))
+ androidTestImplementation(ANDROIDX_TEST_RUNNER)
+ androidTestImplementation(ANDROIDX_TEST_CORE)
+}
+
+androidx {
+ name = "Android DataStore Core RxJava2 Wrappers"
+ publish = Publish.SNAPSHOT_AND_RELEASE
+ mavenGroup = LibraryGroups.DATASTORE
+ inceptionYear = "2020"
+ description = "Android DataStore Core - contains wrappers for using DataStore using RxJava2"
+ legacyDisableKotlinStrictApiMode = true
+}
diff --git a/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java
new file mode 100644
index 0000000..f72a779
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/androidTest/java/androidx/datastore/preferences/rxjava3/RxPreferencesDataStoreBuilderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+
+import androidx.annotation.NonNull;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler;
+import androidx.datastore.preferences.core.MutablePreferences;
+import androidx.datastore.preferences.core.Preferences;
+import androidx.datastore.preferences.core.PreferencesFactory;
+import androidx.datastore.preferences.core.PreferencesKeys;
+import androidx.datastore.rxjava3.RxDataMigration;
+import androidx.datastore.rxjava3.RxDataStore;
+import androidx.test.core.app.ApplicationProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
+
+
+public class RxPreferencesDataStoreBuilderTest {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final Preferences.Key<Integer> INTEGER_KEY =
+ PreferencesKeys.intKey("int_key");
+
+ private static Single<Preferences> incrementInteger(Preferences preferencesIn) {
+ MutablePreferences prefs = preferencesIn.toMutablePreferences();
+ Integer currentInt = prefs.get(INTEGER_KEY);
+ prefs.set(INTEGER_KEY, currentInt != null ? currentInt + 1 : 1);
+ return Single.just(prefs);
+ }
+
+ @Test
+ public void testConstructWithProduceFile() throws Exception {
+ File file = tempFolder.newFile("temp.preferences_pb");
+
+ DataStore<Preferences> dataStore =
+ new RxPreferenceDataStoreBuilder(() -> file).build();
+
+ Single<Preferences> incrementInt = RxDataStore.updateDataAsync(dataStore,
+ RxPreferencesDataStoreBuilderTest::incrementInteger);
+ assertThat(incrementInt.blockingGet().get(INTEGER_KEY)).isEqualTo(1);
+
+ // Construct it again and confirm that the data is still there:
+ dataStore = new RxPreferenceDataStoreBuilder(() -> file).build();
+
+ assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+ .isEqualTo(1);
+ }
+
+
+ @Test
+ public void testConstructWithContextAndName() throws Exception {
+
+ Context context = ApplicationProvider.getApplicationContext();
+ String name = "my_data_store";
+
+ File prefsFile = new File(context.getFilesDir().getPath()
+ + "/datastore/" + name + ".preferences_pb");
+ if (prefsFile.exists()) {
+ prefsFile.delete();
+ }
+
+ DataStore<Preferences> dataStore =
+ new RxPreferenceDataStoreBuilder(context, name).build();
+
+ Single<Preferences> set1 = RxDataStore.updateDataAsync(dataStore,
+ RxPreferencesDataStoreBuilderTest::incrementInteger);
+ assertThat(set1.blockingGet().get(INTEGER_KEY)).isEqualTo(1);
+
+ // Construct it again and confirm that the data is still there:
+ dataStore = new RxPreferenceDataStoreBuilder(context, name).build();
+ assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY)).isEqualTo(1);
+
+ // Construct it again with the expected file path and confirm that the data is there:
+ dataStore =
+ new RxPreferenceDataStoreBuilder(
+ () ->
+ new File(context.getFilesDir().getPath()
+ + "/datastore/" + name + ".preferences_pb")
+ ).build();
+
+ assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY)).isEqualTo(1);
+ }
+
+ @Test
+ public void testMigrationsAreInstalledAndRun() throws Exception {
+ RxDataMigration<Preferences> plusOneMigration = new RxDataMigration<Preferences>() {
+ @NonNull
+ @Override
+ public Single<Boolean> shouldMigrate(@NonNull Preferences currentData) {
+ return Single.just(true);
+ }
+
+ @NonNull
+ @Override
+ public Single<Preferences> migrate(@NonNull Preferences currentData) {
+ return incrementInteger(currentData);
+ }
+
+ @NonNull
+ @Override
+ public Completable cleanUp() {
+ return Completable.complete();
+ }
+ };
+
+ DataStore<Preferences> dataStore =
+ new RxPreferenceDataStoreBuilder(() ->
+ tempFolder.newFile("temp.preferences_pb"))
+ .addRxDataMigration(plusOneMigration)
+ .build();
+
+ assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+ .isEqualTo(1);
+ }
+
+
+ @Test
+ public void testCorruptionHandlerIsUser() throws Exception {
+
+ File file = tempFolder.newFile("temp.preferences_pb");
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(file)) {
+ fileOutputStream.write(0); // will cause corruption exception
+ }
+
+ ReplaceFileCorruptionHandler<Preferences> replaceFileCorruptionHandler =
+ new ReplaceFileCorruptionHandler<Preferences>(exception -> {
+ MutablePreferences mutablePreferences =
+ PreferencesFactory.createMutable();
+ mutablePreferences.set(INTEGER_KEY, 99);
+ return (Preferences) mutablePreferences;
+ });
+
+
+ DataStore<Preferences> dataStore =
+ new RxPreferenceDataStoreBuilder(() -> file)
+ .setCorruptionHandler(replaceFileCorruptionHandler)
+ .build();
+
+ assertThat(RxDataStore.data(dataStore).blockingFirst().get(INTEGER_KEY))
+ .isEqualTo(99);
+ }
+}
diff --git a/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml b/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..13f23ea
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/main/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Copyright 2020 The Android Open Source Project
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+ package="androidx.datastore.preferences.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt
new file mode 100644
index 0000000..527cfb3
--- /dev/null
+++ b/datastore/datastore-preferences-rxjava3/src/main/java/androidx/datastore/preferences/rxjava3/RxPreferenceDataStoreBuilder.kt
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.preferences.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.DataStore
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import androidx.datastore.preferences.core.PreferenceDataStoreFactory
+import androidx.datastore.preferences.core.Preferences
+import androidx.datastore.preferences.createDataStore
+import androidx.datastore.rxjava3.RxDataMigration
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.rx3.asCoroutineDispatcher
+import kotlinx.coroutines.rx3.await
+import java.io.File
+import java.util.concurrent.Callable
+
+/**
+ * RxSharedPreferencesMigrationBuilder class for a DataStore that works on a single process.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxPreferenceDataStoreBuilder {
+
+ // Either produceFile or context & name must be set, but not both.
+ private var produceFile: Callable<File>? = null
+
+ private var context: Context? = null
+ private var name: String? = null
+
+ // Optional
+ private var ioScheduler: Scheduler = Schedulers.io()
+ private var corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null
+ private val dataMigrations: MutableList<DataMigration<Preferences>> = mutableListOf()
+
+ /**
+ * Create a RxPreferenceDataStoreBuilder with the callable which returns the File that
+ * DataStore acts on. The user is responsible for ensuring that there is never more than one
+ * DataStore acting on a file at a time.
+ *
+ * @param produceFile Function which returns the file that the new DataStore will act on. The
+ * function must return the same path every time. No two instances of DataStore should act on
+ * the same file at the same time.
+ */
+ public constructor(produceFile: Callable<File>) {
+ this.produceFile = produceFile
+ }
+
+ /**
+ * Create a RxPreferenceDataStoreBuilder with the Context and name from which to derive the
+ * DataStore file. The file is generated by See [Context.createDataStore] for more info. The
+ * user is responsible for ensuring that there is never more than one DataStore acting on a
+ * file at a time.
+ *
+ * @param context the context from which we retrieve files directory.
+ * @param name the filename relative to Context.filesDir that DataStore acts on. The File is
+ * obtained by calling File(this.filesDir, "datastore/$name.preferences_pb"). No two instances
+ * of DataStore should act on the same file at the same time.
+ */
+ public constructor(context: Context, name: String) {
+ this.context = context
+ this.name = name
+ }
+
+ /**
+ * Set the Scheduler on which to perform IO and transform operations. This is converted into
+ * a CoroutineDispatcher before being added to DataStore.
+ *
+ * This parameter is optional and defaults to Schedulers.io().
+ *
+ * @param ioScheduler the scheduler on which IO and transform operations are run
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setIoScheduler(ioScheduler: Scheduler): RxPreferenceDataStoreBuilder =
+ apply { this.ioScheduler = ioScheduler }
+
+ /**
+ * Sets the corruption handler to install into the DataStore.
+ *
+ * This parameter is optional and defaults to no corruption handler.
+ *
+ * @param corruptionHandler
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setCorruptionHandler(corruptionHandler: ReplaceFileCorruptionHandler<Preferences>):
+ RxPreferenceDataStoreBuilder = apply { this.corruptionHandler = corruptionHandler }
+
+ /**
+ * Add an RxDataMigration to the DataStore. Migrations are run in the order they are added.
+ *
+ * @param rxDataMigration the migration to add.
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun addRxDataMigration(rxDataMigration: RxDataMigration<Preferences>):
+ RxPreferenceDataStoreBuilder = apply {
+ this.dataMigrations.add(DataMigrationFromRxDataMigration(rxDataMigration))
+ }
+
+ /**
+ * Add a DataMigration to the Datastore. Migrations are run in the order they are added.
+ *
+ * @param dataMigration the migration to add
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun addDataMigration(dataMigration: DataMigration<Preferences>):
+ RxPreferenceDataStoreBuilder = apply {
+ this.dataMigrations.add(dataMigration)
+ }
+
+ /**
+ * Build the DataStore.
+ *
+ * @throws IllegalStateException if serializer is not set or if neither produceFile not
+ * context and name are set.
+ * @return the DataStore with the provided parameters
+ */
+ public fun build(): DataStore<Preferences> {
+ val scope = CoroutineScope(ioScheduler.asCoroutineDispatcher())
+
+ val produceFile: Callable<File>? = this.produceFile
+ val context: Context? = this.context
+ val name: String? = this.name
+
+ return if (produceFile != null) {
+ PreferenceDataStoreFactory.create(
+ produceFile = { produceFile.call() },
+ scope = CoroutineScope(
+ ioScheduler.asCoroutineDispatcher()
+ ),
+ corruptionHandler = corruptionHandler,
+ migrations = dataMigrations
+ )
+ } else if (context != null && name != null) {
+ return context.createDataStore(
+ name = name,
+ scope = scope,
+ corruptionHandler = corruptionHandler,
+ migrations = dataMigrations
+ )
+ } else {
+ error("Either produceFile or context and name must be set. This should never happen.")
+ }
+ }
+}
+
+internal class DataMigrationFromRxDataMigration<T>(private val migration: RxDataMigration<T>) :
+ DataMigration<T> {
+ override suspend fun shouldMigrate(currentData: T): Boolean {
+ return migration.shouldMigrate(currentData).await()
+ }
+
+ override suspend fun migrate(currentData: T): T {
+ return migration.migrate(currentData).await()
+ }
+
+ override suspend fun cleanUp() {
+ migration.cleanUp().await()
+ }
+}
diff --git a/datastore/datastore-rxjava3/api/current.txt b/datastore/datastore-rxjava3/api/current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+ public interface RxDataMigration<T> {
+ method public io.reactivex.rxjava3.core.Completable cleanUp();
+ method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+ method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+ }
+
+ public final class RxDataStore {
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+ }
+
+ public final class RxDataStoreBuilder<T> {
+ ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+ ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+ method public androidx.datastore.core.DataStore<T> build();
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+ public interface RxSharedPreferencesMigration<T> {
+ method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+ method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+ }
+
+ public final class RxSharedPreferencesMigrationBuilder<T> {
+ ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+ method public androidx.datastore.core.DataMigration<T> build();
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+ }
+
+}
+
diff --git a/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/public_plus_experimental_current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+ public interface RxDataMigration<T> {
+ method public io.reactivex.rxjava3.core.Completable cleanUp();
+ method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+ method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+ }
+
+ public final class RxDataStore {
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+ }
+
+ public final class RxDataStoreBuilder<T> {
+ ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+ ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+ method public androidx.datastore.core.DataStore<T> build();
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+ public interface RxSharedPreferencesMigration<T> {
+ method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+ method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+ }
+
+ public final class RxSharedPreferencesMigrationBuilder<T> {
+ ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+ method public androidx.datastore.core.DataMigration<T> build();
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+ }
+
+}
+
diff --git a/datastore/datastore-rxjava3/api/res-current.txt b/datastore/datastore-rxjava3/api/res-current.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/res-current.txt
diff --git a/datastore/datastore-rxjava3/api/restricted_current.txt b/datastore/datastore-rxjava3/api/restricted_current.txt
new file mode 100644
index 0000000..bdba98a
--- /dev/null
+++ b/datastore/datastore-rxjava3/api/restricted_current.txt
@@ -0,0 +1,38 @@
+// Signature format: 4.0
+package androidx.datastore.rxjava3 {
+
+ public interface RxDataMigration<T> {
+ method public io.reactivex.rxjava3.core.Completable cleanUp();
+ method public io.reactivex.rxjava3.core.Single<T!> migrate(T?);
+ method public io.reactivex.rxjava3.core.Single<java.lang.Boolean!> shouldMigrate(T?);
+ }
+
+ public final class RxDataStore {
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Flowable<T> data(androidx.datastore.core.DataStore<T>);
+ method @kotlinx.coroutines.ExperimentalCoroutinesApi public static <T> io.reactivex.rxjava3.core.Single<T> updateDataAsync(androidx.datastore.core.DataStore<T>, io.reactivex.rxjava3.functions.Function<T,io.reactivex.rxjava3.core.Single<T>> transform);
+ }
+
+ public final class RxDataStoreBuilder<T> {
+ ctor public RxDataStoreBuilder(java.util.concurrent.Callable<java.io.File> produceFile, androidx.datastore.core.Serializer<T> serializer);
+ ctor public RxDataStoreBuilder(android.content.Context context, String fileName, androidx.datastore.core.Serializer<T> serializer);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addDataMigration(androidx.datastore.core.DataMigration<T> dataMigration);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> addRxDataMigration(androidx.datastore.rxjava3.RxDataMigration<T> rxDataMigration);
+ method public androidx.datastore.core.DataStore<T> build();
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setCorruptionHandler(androidx.datastore.core.handlers.ReplaceFileCorruptionHandler<T> corruptionHandler);
+ method public androidx.datastore.rxjava3.RxDataStoreBuilder<T> setIoScheduler(io.reactivex.rxjava3.core.Scheduler ioScheduler);
+ }
+
+ public interface RxSharedPreferencesMigration<T> {
+ method public io.reactivex.rxjava3.core.Single<T> migrate(androidx.datastore.migrations.SharedPreferencesView sharedPreferencesView, T? currentData);
+ method public default io.reactivex.rxjava3.core.Single<java.lang.Boolean> shouldMigrate(T? currentData);
+ }
+
+ public final class RxSharedPreferencesMigrationBuilder<T> {
+ ctor public RxSharedPreferencesMigrationBuilder(android.content.Context context, String sharedPreferencesName, androidx.datastore.rxjava3.RxSharedPreferencesMigration<T> rxSharedPreferencesMigration);
+ method public androidx.datastore.core.DataMigration<T> build();
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setDeleteEmptyPreferences(boolean deleteEmptyPreferences);
+ method public androidx.datastore.rxjava3.RxSharedPreferencesMigrationBuilder<T> setKeysToMigrate(java.lang.String... keys);
+ }
+
+}
+
diff --git a/datastore/datastore-rxjava3/build.gradle b/datastore/datastore-rxjava3/build.gradle
new file mode 100644
index 0000000..9f8e453
--- /dev/null
+++ b/datastore/datastore-rxjava3/build.gradle
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static androidx.build.dependencies.DependenciesKt.*
+import androidx.build.LibraryGroups
+import androidx.build.AndroidXExtension
+import androidx.build.Publish
+import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
+
+plugins {
+ id("AndroidXPlugin")
+ id("com.android.library")
+ id("kotlin-android")
+}
+
+android {
+ sourceSets {
+ test.java.srcDirs += 'src/test-common/java'
+ androidTest.java.srcDirs += 'src/test-common/java'
+ }
+}
+
+dependencies {
+ api(KOTLIN_STDLIB)
+ api(KOTLIN_COROUTINES_CORE)
+ api("androidx.annotation:annotation:1.1.0")
+ api(RX_JAVA3)
+
+ api(project(":datastore:datastore"))
+
+ implementation(KOTLIN_COROUTINES_RX3)
+
+ testImplementation(JUNIT)
+ testImplementation(KOTLIN_COROUTINES_TEST)
+ testImplementation(TRUTH)
+ testImplementation(project(":internal-testutils-truth"))
+
+ androidTestImplementation(JUNIT)
+ androidTestImplementation(project(":internal-testutils-truth"))
+ androidTestImplementation(ANDROIDX_TEST_RUNNER)
+ androidTestImplementation(ANDROIDX_TEST_CORE)
+}
+
+androidx {
+ name = "Android DataStore Core RxJava2 Wrappers"
+ publish = Publish.SNAPSHOT_AND_RELEASE
+ mavenGroup = LibraryGroups.DATASTORE
+ inceptionYear = "2020"
+ description = "Android DataStore Core - contains wrappers for using DataStore using RxJava2"
+ legacyDisableKotlinStrictApiMode = true
+}
diff --git a/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml b/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml
new file mode 100644
index 0000000..3369992
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Copyright 2020 The Android Open Source Project
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+ package="androidx.datastore.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java
new file mode 100644
index 0000000..0564f77
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxDataStoreBuilderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package androidx.datastore.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+
+import androidx.annotation.NonNull;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler;
+import androidx.test.core.app.ApplicationProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class RxDataStoreBuilderTest {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static Single<Byte> incrementByte(Byte byteIn) {
+ return Single.just(++byteIn);
+ }
+
+ @Test
+ public void testConstructWithProduceFile() throws Exception {
+ File file = tempFolder.newFile();
+ DataStore<Byte> dataStore =
+ new RxDataStoreBuilder<Byte>(() -> file, new TestingSerializer())
+ .build();
+ Single<Byte> incrementByte = RxDataStore.updateDataAsync(dataStore,
+ RxDataStoreBuilderTest::incrementByte);
+ assertThat(incrementByte.blockingGet()).isEqualTo(1);
+ // Construct it again and confirm that the data is still there:
+ dataStore =
+ new RxDataStoreBuilder<Byte>(() -> file, new TestingSerializer())
+ .build();
+ assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+ }
+
+ @Test
+ public void testConstructWithContextAndName() throws Exception {
+ Context context = ApplicationProvider.getApplicationContext();
+ String name = "my_data_store";
+ DataStore<Byte> dataStore =
+ new RxDataStoreBuilder<Byte>(context, name, new TestingSerializer())
+ .build();
+ Single<Byte> set1 = RxDataStore.updateDataAsync(dataStore, input -> Single.just((byte) 1));
+ assertThat(set1.blockingGet()).isEqualTo(1);
+ // Construct it again and confirm that the data is still there:
+ dataStore =
+ new RxDataStoreBuilder<Byte>(context, name, new TestingSerializer())
+ .build();
+ assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+ // Construct it again with the expected file path and confirm that the data is there:
+ dataStore =
+ new RxDataStoreBuilder<Byte>(() -> new File(context.getFilesDir().getPath()
+ + "/datastore/" + name), new TestingSerializer()
+ )
+ .build();
+ assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+ }
+
+ @Test
+ public void testMigrationsAreInstalledAndRun() throws Exception {
+ RxDataMigration<Byte> plusOneMigration = new RxDataMigration<Byte>() {
+ @NonNull
+ @Override
+ public Single<Boolean> shouldMigrate(@NonNull Byte currentData) {
+ return Single.just(true);
+ }
+
+ @NonNull
+ @Override
+ public Single<Byte> migrate(@NonNull Byte currentData) {
+ return incrementByte(currentData);
+ }
+
+ @NonNull
+ @Override
+ public Completable cleanUp() {
+ return Completable.complete();
+ }
+ };
+
+ DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(
+ () -> tempFolder.newFile(), new TestingSerializer())
+ .addRxDataMigration(plusOneMigration)
+ .build();
+
+ assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(1);
+ }
+
+ @Test
+ public void testSpecifiedSchedulerIsUser() throws Exception {
+ Scheduler singleThreadedScheduler =
+ Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TestingThread");
+ }
+ }));
+
+
+ DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(() -> tempFolder.newFile(),
+ new TestingSerializer())
+ .setIoScheduler(singleThreadedScheduler)
+ .build();
+ Single<Byte> update = RxDataStore.updateDataAsync(dataStore, input -> {
+ Thread currentThread = Thread.currentThread();
+ assertThat(currentThread.getName()).isEqualTo("TestingThread");
+ return Single.just(input);
+ });
+ assertThat(update.blockingGet()).isEqualTo((byte) 0);
+ Single<Byte> subsequentUpdate = RxDataStore.updateDataAsync(dataStore, input -> {
+ Thread currentThread = Thread.currentThread();
+ assertThat(currentThread.getName()).isEqualTo("TestingThread");
+ return Single.just(input);
+ });
+ assertThat(subsequentUpdate.blockingGet()).isEqualTo((byte) 0);
+ }
+
+ @Test
+ public void testCorruptionHandlerIsUser() {
+ TestingSerializer testingSerializer = new TestingSerializer();
+ testingSerializer.setFailReadWithCorruptionException(true);
+ ReplaceFileCorruptionHandler<Byte> replaceFileCorruptionHandler =
+ new ReplaceFileCorruptionHandler<Byte>(exception -> (byte) 99);
+
+
+ DataStore<Byte> dataStore = new RxDataStoreBuilder<Byte>(
+ () -> tempFolder.newFile(),
+ testingSerializer)
+ .setCorruptionHandler(replaceFileCorruptionHandler)
+ .build();
+ assertThat(RxDataStore.data(dataStore).blockingFirst()).isEqualTo(99);
+ }
+}
diff --git a/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java
new file mode 100644
index 0000000..b66d6ef
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/androidTest/java/androidx/datastore/rxjava3/RxSharedPreferencesMigrationTest.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import static androidx.testutils.AssertionsKt.assertThrows;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import android.content.Context;
+import android.content.SharedPreferences;
+
+import androidx.datastore.core.DataMigration;
+import androidx.datastore.core.DataStore;
+import androidx.datastore.migrations.SharedPreferencesView;
+import androidx.test.core.app.ApplicationProvider;
+
+import com.google.common.truth.Truth;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import io.reactivex.rxjava3.core.Single;
+
+public class RxSharedPreferencesMigrationTest {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private final String mSharedPrefsName = "shared_prefs_name";
+
+
+ private Context mContext;
+ private SharedPreferences mSharedPrefs;
+ private File mDatastoreFile;
+
+ @Before
+ public void setUp() throws Exception {
+ mContext = ApplicationProvider.getApplicationContext();
+ mSharedPrefs = mContext.getSharedPreferences(mSharedPrefsName, Context.MODE_PRIVATE);
+ mDatastoreFile = temporaryFolder.newFile("test_file.preferences_pb");
+
+ assertThat(mSharedPrefs.edit().clear().commit()).isTrue();
+ }
+
+ @Test
+ public void testShouldMigrateSkipsMigration() {
+ RxSharedPreferencesMigration<Byte> skippedMigration =
+ new RxSharedPreferencesMigration<Byte>() {
+ @NotNull
+ @Override
+ public Single<Boolean> shouldMigrate(Byte currentData) {
+ return Single.just(false);
+ }
+
+ @NotNull
+ @Override
+ public Single<Byte> migrate(
+ @NotNull SharedPreferencesView sharedPreferencesView,
+ Byte currentData) {
+ return Single.error(
+ new IllegalStateException("We shouldn't reach this point!"));
+ }
+ };
+
+
+ DataMigration<Byte> spMigration =
+ getSpMigrationBuilder(skippedMigration).build();
+
+ DataStore<Byte> dataStoreWithMigrations = getDataStoreWithMigration(spMigration);
+
+ Truth.assertThat(RxDataStore.data(dataStoreWithMigrations).blockingFirst()).isEqualTo(0);
+ }
+
+ @Test
+ public void testSharedPrefsViewContainsSpecifiedKeys() {
+ String includedKey = "key1";
+ int includedVal = 99;
+ String notMigratedKey = "key2";
+
+ assertThat(mSharedPrefs.edit().putInt(includedKey, includedVal).putInt(notMigratedKey,
+ 123).commit()).isTrue();
+
+ DataMigration<Byte> dataMigration =
+ getSpMigrationBuilder(
+ new DefaultMigration() {
+ @NotNull
+ @Override
+ public Single<Byte> migrate(
+ @NotNull SharedPreferencesView sharedPreferencesView,
+ Byte currentData) {
+ assertThat(sharedPreferencesView.contains(includedKey)).isTrue();
+ assertThat(sharedPreferencesView.getAll().size()).isEqualTo(1);
+ assertThrows(IllegalStateException.class,
+ () -> sharedPreferencesView.getInt(notMigratedKey, -1));
+
+ return Single.just((byte) 50);
+ }
+ }
+ ).setKeysToMigrate(includedKey).build();
+
+ DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+
+ assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(50);
+
+ assertThat(mSharedPrefs.contains(includedKey)).isFalse();
+ assertThat(mSharedPrefs.contains(notMigratedKey)).isTrue();
+ }
+
+
+ @Test
+ public void testSharedPrefsViewWithAllKeysSpecified() {
+ String includedKey = "key1";
+ String includedKey2 = "key2";
+ int value = 99;
+
+ assertThat(mSharedPrefs.edit().putInt(includedKey, value).putInt(includedKey2,
+ value).commit()).isTrue();
+
+ DataMigration<Byte> dataMigration =
+ getSpMigrationBuilder(
+ new DefaultMigration() {
+ @NotNull
+ @Override
+ public Single<Byte> migrate(
+ @NotNull SharedPreferencesView sharedPreferencesView,
+ Byte currentData) {
+ assertThat(sharedPreferencesView.contains(includedKey)).isTrue();
+ assertThat(sharedPreferencesView.contains(includedKey2)).isTrue();
+ assertThat(sharedPreferencesView.getAll().size()).isEqualTo(2);
+
+ return Single.just((byte) 50);
+ }
+ }
+ ).build();
+
+ DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+
+ assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(50);
+
+ assertThat(mSharedPrefs.contains(includedKey)).isFalse();
+ assertThat(mSharedPrefs.contains(includedKey2)).isFalse();
+ }
+
+ @Test
+ public void testDeletesEmptySharedPreferences() {
+ String key = "key";
+ String value = "value";
+ assertThat(mSharedPrefs.edit().putString(key, value).commit()).isTrue();
+
+ DataMigration<Byte> dataMigration =
+ getSpMigrationBuilder(new DefaultMigration()).setDeleteEmptyPreferences(
+ true).build();
+ DataStore<Byte> byteStore = getDataStoreWithMigration(dataMigration);
+ assertThat(RxDataStore.data(byteStore).blockingFirst()).isEqualTo(0);
+
+ // Check that the shared preferences files are deleted
+ File prefsDir = new File(mContext.getApplicationInfo().dataDir, "shared_prefs");
+ File prefsFile = new File(prefsDir, mSharedPrefsName + ".xml");
+ File backupPrefsFile = new File(prefsFile.getPath() + ".bak");
+ assertThat(prefsFile.exists()).isFalse();
+ assertThat(backupPrefsFile.exists()).isFalse();
+ }
+
+ private RxSharedPreferencesMigrationBuilder<Byte> getSpMigrationBuilder(
+ RxSharedPreferencesMigration<Byte> rxSharedPreferencesMigration) {
+ return new RxSharedPreferencesMigrationBuilder<Byte>(mContext, mSharedPrefsName,
+ rxSharedPreferencesMigration);
+ }
+
+ private DataStore<Byte> getDataStoreWithMigration(DataMigration<Byte> dataMigration) {
+ return new RxDataStoreBuilder<Byte>(() -> mDatastoreFile, new TestingSerializer())
+ .addDataMigration(dataMigration).build();
+ }
+
+
+ private static class DefaultMigration implements RxSharedPreferencesMigration<Byte> {
+
+ @NotNull
+ @Override
+ public Single<Boolean> shouldMigrate(Byte currentData) {
+ return Single.just(true);
+ }
+
+ @NotNull
+ @Override
+ public Single<Byte> migrate(@NotNull SharedPreferencesView sharedPreferencesView,
+ Byte currentData) {
+ return Single.just(currentData);
+ }
+ }
+}
diff --git a/datastore/datastore-rxjava3/src/main/AndroidManifest.xml b/datastore/datastore-rxjava3/src/main/AndroidManifest.xml
new file mode 100644
index 0000000..3369992
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/AndroidManifest.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Copyright 2020 The Android Open Source Project
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+ package="androidx.datastore.rxjava3">
+
+</manifest>
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java
new file mode 100644
index 0000000..ade2acc
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataMigration.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import androidx.annotation.NonNull;
+import androidx.annotation.Nullable;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Single;
+
+/**
+ * Interface for migrations to DataStore. Methods on this migration ([shouldMigrate], [migrate]
+ * and [cleanUp]) may be called multiple times, so their implementations must be idempotent.
+ * These methods may be called multiple times if DataStore encounters issues when writing the
+ * newly migrated data to disk or if any migration installed in the same DataStore throws an
+ * Exception.
+ *
+ * If you're migrating from SharedPreferences see [SharedPreferencesMigration].
+ *
+ * @param <T> the exception type
+ */
+public interface RxDataMigration<T> {
+
+ /**
+ * Return whether this migration needs to be performed. If this returns false, no migration or
+ * cleanup will occur. Apps should do the cheapest possible check to determine if this migration
+ * should run, since this will be called every time the DataStore is initialized. This method
+ * may be run multiple times when any failure is encountered.
+ *
+ * Note that this will always be called before each call to [migrate].
+ *
+ * @param currentData the current data (which might already populated from previous runs of this
+ * or other migrations). Only Nullable if the type used with DataStore is
+ * Nullable.
+ */
+ @NonNull
+ Single<Boolean> shouldMigrate(@Nullable T currentData);
+
+ /**
+ * Perform the migration. Implementations should be idempotent since this may be called
+ * multiple times. If migrate fails, DataStore will not commit any data to disk, cleanUp will
+ * not be called, and the exception will be propagated back to the DataStore call that
+ * triggered the migration. Future calls to DataStore will result in DataMigrations being
+ * attempted again. This method may be run multiple times when any failure is encountered.
+ *
+ * Note that this will always be called before a call to [cleanUp].
+ *
+ * @param currentData the current data (it might be populated from other migrations or from
+ * manual changes before this migration was added to the app). Only
+ * Nullable if the type used with DataStore is Nullable.
+ * @return The migrated data.
+ */
+ @NonNull
+ Single<T> migrate(@Nullable T currentData);
+
+ /**
+ * Clean up any old state/data that was migrated into the DataStore. This will not be called
+ * if the migration fails. If cleanUp throws an exception, the exception will be propagated
+ * back to the DataStore call that triggered the migration and future calls to DataStore will
+ * result in DataMigrations being attempted again. This method may be run multiple times when
+ * any failure is encountered.
+ */
+ @NonNull
+ Completable cleanUp();
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt
new file mode 100644
index 0000000..467e039
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStore.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@file:JvmName("RxDataStore")
+
+package androidx.datastore.rxjava3
+
+import androidx.datastore.core.DataStore
+import io.reactivex.rxjava3.core.Flowable
+import io.reactivex.rxjava3.core.Single
+import io.reactivex.rxjava3.functions.Function
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.rx3.asFlowable
+import kotlinx.coroutines.rx3.asSingle
+import kotlinx.coroutines.rx3.await
+
+/**
+ * Gets a reactivex.Flowable of the data from DataStore. See [DataStore.data] for more information.
+ *
+ * Provides efficient, cached (when possible) access to the latest durably persisted state.
+ * The flow will always either emit a value or throw an exception encountered when attempting
+ * to read from disk. If an exception is encountered, collecting again will attempt to read the
+ * data again.
+ *
+ * Do not layer a cache on top of this API: it will be be impossible to guarantee consistency.
+ * Instead, use data.first() to access a single snapshot.
+ *
+ * The Flowable will complete with an IOException when an exception is encountered when reading
+ * data.
+ *
+ * @return a flow representing the current state of the data
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> DataStore<T>.data(): Flowable<T> {
+ return this.data.asFlowable()
+}
+
+/**
+ * See [DataStore.updateData]
+ *
+ * Updates the data transactionally in an atomic read-modify-write operation. All operations
+ * are serialized, and the transform itself is a async so it can perform heavy work
+ * such as RPCs.
+ *
+ * The Single completes when the data has been persisted durably to disk (after which
+ * [data] will reflect the update). If the transform or write to disk fails, the
+ * transaction is aborted and the returned Single is completed with the error.
+ *
+ * The transform will be run on the scheduler that DataStore was constructed with.
+ *
+ * @return the snapshot returned by the transform
+ * @throws Exception when thrown by the transform function
+ */
+@ExperimentalCoroutinesApi
+public fun <T : Any> DataStore<T>.updateDataAsync(transform: Function<T, Single<T>>): Single<T> {
+ return CoroutineScope(Dispatchers.Unconfined).async {
+ this@updateDataAsync.updateData {
+ transform.apply(it).await()
+ }
+ }.asSingle(Dispatchers.Unconfined)
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt
new file mode 100644
index 0000000..139bbca
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxDataStoreBuilder.kt
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.core.DataStore
+import androidx.datastore.core.DataStoreFactory
+import androidx.datastore.core.Serializer
+import androidx.datastore.createDataStore
+import androidx.datastore.core.handlers.ReplaceFileCorruptionHandler
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.rx3.asCoroutineDispatcher
+import kotlinx.coroutines.rx3.await
+import java.io.File
+import java.util.concurrent.Callable
+
+/**
+ * RxSharedPreferencesMigrationBuilder class for a DataStore that works on a single process.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxDataStoreBuilder<T> {
+
+ /**
+ * Create a RxDataStoreBuilder with the callable which returns the File that DataStore acts on.
+ * The user is responsible for ensuring that there is never more than one DataStore acting on
+ * a file at a time.
+ *
+ * @param produceFile Function which returns the file that the new DataStore will act on. The
+ * function must return the same path every time. No two instances of DataStore should act on
+ * the same file at the same time.
+ * @param serializer the serializer for the type that this DataStore acts on.
+ */
+ public constructor(produceFile: Callable<File>, serializer: Serializer<T>) {
+ this.produceFile = produceFile
+ this.serializer = serializer
+ }
+
+ /**
+ * Create a RxDataStoreBuilder with the Context and name from which to derive the DataStore
+ * file. The file is generated by See [Context.createDataStore] for more info. The user is
+ * responsible for ensuring that there is never more than one DataStore acting on a file at a
+ * time.
+ *
+ * @param context the context from which we retrieve files directory.
+ * @param fileName the filename relative to Context.filesDir that DataStore acts on. The File is
+ * obtained by calling File(context.filesDir, fileName). No two instances of DataStore should
+ * act on the same file at the same time.
+ * @param serializer the serializer for the type that this DataStore acts on.
+ */
+ public constructor(context: Context, fileName: String, serializer: Serializer<T>) {
+ this.context = context
+ this.name = fileName
+ this.serializer = serializer
+ }
+
+ // Either produceFile or context & name must be set, but not both. This is enforced by the
+ // two constructors.
+ private var produceFile: Callable<File>? = null
+
+ private var context: Context? = null
+ private var name: String? = null
+
+ // Required. This is enforced by the constructors.
+ private var serializer: Serializer<T>? = null
+
+ // Optional
+ private var ioScheduler: Scheduler = Schedulers.io()
+ private var corruptionHandler: ReplaceFileCorruptionHandler<T>? = null
+ private val dataMigrations: MutableList<DataMigration<T>> = mutableListOf()
+
+ /**
+ * Set the Scheduler on which to perform IO and transform operations. This is converted into
+ * a CoroutineDispatcher before being added to DataStore.
+ *
+ * This parameter is optional and defaults to Schedulers.io().
+ *
+ * @param ioScheduler the scheduler on which IO and transform operations are run
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setIoScheduler(ioScheduler: Scheduler): RxDataStoreBuilder<T> =
+ apply { this.ioScheduler = ioScheduler }
+
+ /**
+ * Sets the corruption handler to install into the DataStore.
+ *
+ * This parameter is optional and defaults to no corruption handler.
+ *
+ * @param corruptionHandler
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setCorruptionHandler(corruptionHandler: ReplaceFileCorruptionHandler<T>):
+ RxDataStoreBuilder<T> = apply { this.corruptionHandler = corruptionHandler }
+
+ /**
+ * Add an RxDataMigration to the DataStore. Migrations are run in the order they are added.
+ *
+ * @param rxDataMigration the migration to add.
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun addRxDataMigration(rxDataMigration: RxDataMigration<T>): RxDataStoreBuilder<T> =
+ apply {
+ this.dataMigrations.add(DataMigrationFromRxDataMigration(rxDataMigration))
+ }
+
+ /**
+ * Add a DataMigration to the Datastore. Migrations are run in the order they are added.
+ *
+ * @param dataMigration the migration to add
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun addDataMigration(dataMigration: DataMigration<T>): RxDataStoreBuilder<T> = apply {
+ this.dataMigrations.add(dataMigration)
+ }
+
+ /**
+ * Build the DataStore.
+ *
+ * @return the DataStore with the provided parameters
+ */
+ public fun build(): DataStore<T> {
+ val scope = CoroutineScope(ioScheduler.asCoroutineDispatcher())
+
+ return if (produceFile != null) {
+ DataStoreFactory.create(
+ produceFile = { produceFile!!.call() },
+ serializer = serializer!!,
+ scope = CoroutineScope(
+ ioScheduler.asCoroutineDispatcher()
+ ),
+ corruptionHandler = corruptionHandler,
+ migrations = dataMigrations
+ )
+ } else if (context != null && name != null) {
+ return context!!.createDataStore(
+ fileName = name!!,
+ serializer = serializer!!,
+ scope = scope,
+ corruptionHandler = corruptionHandler,
+ migrations = dataMigrations
+ )
+ } else {
+ error(
+ "Either produceFile or context and name must be set. This should never happen."
+ )
+ }
+ }
+}
+
+internal class DataMigrationFromRxDataMigration<T>(private val migration: RxDataMigration<T>) :
+ DataMigration<T> {
+ override suspend fun shouldMigrate(currentData: T): Boolean {
+ return migration.shouldMigrate(currentData).await()
+ }
+
+ override suspend fun migrate(currentData: T): T {
+ return migration.migrate(currentData).await()
+ }
+
+ override suspend fun cleanUp() {
+ migration.cleanUp().await()
+ }
+}
diff --git a/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt
new file mode 100644
index 0000000..e5f71c6
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/main/java/androidx/datastore/rxjava3/RxSharedPreferencesMigration.kt
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import android.annotation.SuppressLint
+import android.content.Context
+import androidx.datastore.core.DataMigration
+import androidx.datastore.migrations.SharedPreferencesMigration
+import androidx.datastore.migrations.SharedPreferencesView
+import io.reactivex.rxjava3.core.Single
+import kotlinx.coroutines.rx3.await
+
+/**
+ * Client implemented migration interface.
+ **/
+public interface RxSharedPreferencesMigration<T> {
+ /**
+ * Whether or not the migration should be run. This can be used to skip a read from the
+ * SharedPreferences.
+ *
+ * @param currentData the most recently persisted data
+ * @return a Single indicating whether or not the migration should be run.
+ */
+ public fun shouldMigrate(currentData: T): Single<Boolean> {
+ return Single.just(true)
+ }
+
+ /**
+ * Maps SharedPreferences into T. Implementations should be idempotent
+ * since this may be called multiple times. See [DataMigration.migrate] for more
+ * information. The method accepts a SharedPreferencesView which is the view of the
+ * SharedPreferences to migrate from (limited to [keysToMigrate] and a T which represent
+ * the current data. The function must return the migrated data.
+ *
+ * @param sharedPreferencesView the current state of the SharedPreferences
+ * @param currentData the most recently persisted data
+ * @return a Single of the updated data
+ */
+ public fun migrate(sharedPreferencesView: SharedPreferencesView, currentData: T): Single<T>
+}
+
+/**
+ * RxSharedPreferencesMigrationBuilder for the RxSharedPreferencesMigration.
+ */
+@SuppressLint("TopLevelBuilder")
+public class RxSharedPreferencesMigrationBuilder<T>
+/**
+ * Construct a RxSharedPreferencesMigrationBuilder.
+ *
+ * @param context the Context used for getting the SharedPreferences.
+ * @param sharedPreferencesName the name of the SharedPreference from which to migrate.
+ * @param rxSharedPreferencesMigration the user implemented migration for this SharedPreference.
+ */
+constructor(
+ private val context: Context,
+ private val sharedPreferencesName: String,
+ private val rxSharedPreferencesMigration: RxSharedPreferencesMigration<T>
+) {
+
+ /** Optional */
+ private var deleteEmptyPreference: Boolean = true
+ private var keysToMigrate: Set<String>? = null
+
+ /**
+ * Set the list of keys to migrate. The keys will be mapped to datastore.Preferences with
+ * their same values. If the key is already present in the new Preferences, the key
+ * will not be migrated again. If the key is not present in the SharedPreferences it
+ * will not be migrated.
+ *
+ * This method is optional and if keysToMigrate is not set, all keys will be migrated from the
+ * existing SharedPreferences.
+ *
+ * @param keys the keys to migrate
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setKeysToMigrate(vararg keys: String):
+ RxSharedPreferencesMigrationBuilder<T> = apply {
+ keysToMigrate = setOf(*keys)
+ }
+
+ /**
+ * If enabled and the SharedPreferences are empty (i.e. no remaining
+ * keys) after this migration runs, the leftover SharedPreferences file is deleted. Note that
+ * this cleanup runs only if the migration itself runs, i.e., if the keys were never in
+ * SharedPreferences to begin with then the (potentially) empty SharedPreferences
+ * won't be cleaned up by this option. This functionality is best effort - if there
+ * is an issue deleting the SharedPreferences file it will be silently ignored.
+ *
+ * This method is optional and defaults to true.
+ *
+ * @param deleteEmptyPreferences whether or not to delete the empty shared preferences file
+ * @return this
+ */
+ @Suppress("MissingGetterMatchingBuilder")
+ public fun setDeleteEmptyPreferences(deleteEmptyPreferences: Boolean):
+ RxSharedPreferencesMigrationBuilder<T> = apply {
+ this.deleteEmptyPreference = deleteEmptyPreferences
+ }
+
+ public fun build(): DataMigration<T> {
+ return SharedPreferencesMigration(
+ context = context,
+ sharedPreferencesName = sharedPreferencesName,
+ migrate = { spView, curData ->
+ rxSharedPreferencesMigration.migrate(spView, curData).await()
+ },
+ keysToMigrate = keysToMigrate,
+ deleteEmptyPreferences = deleteEmptyPreference,
+ shouldRunMigration = { curData ->
+ rxSharedPreferencesMigration.shouldMigrate(curData).await()
+ }
+ )
+ }
+}
diff --git a/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt b/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt
new file mode 100644
index 0000000..f1e02bb
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/test-common/java/androidx/datastore/rxjava3/TestingSerializer.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3
+
+import androidx.datastore.core.CorruptionException
+import androidx.datastore.core.Serializer
+import java.io.IOException
+import java.io.InputStream
+import java.io.OutputStream
+
+class TestingSerializer(
+ @Volatile var failReadWithCorruptionException: Boolean = false,
+ @Volatile var failingRead: Boolean = false,
+ @Volatile var failingWrite: Boolean = false
+) : Serializer<Byte> {
+ override fun readFrom(input: InputStream): Byte {
+ if (failReadWithCorruptionException) {
+ throw CorruptionException(
+ "CorruptionException",
+ IOException()
+ )
+ }
+
+ if (failingRead) {
+ throw IOException("I was asked to fail on reads")
+ }
+
+ val read = input.read()
+ if (read == -1) {
+ return 0
+ }
+ return read.toByte()
+ }
+
+ override fun writeTo(t: Byte, output: OutputStream) {
+ if (failingWrite) {
+ throw IOException("I was asked to fail on writes")
+ }
+ output.write(t.toInt())
+ }
+
+ override val defaultValue: Byte = 0
+}
\ No newline at end of file
diff --git a/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java b/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java
new file mode 100644
index 0000000..83c05a8
--- /dev/null
+++ b/datastore/datastore-rxjava3/src/test/java/androidx/datastore/rxjava3/RxDataStoreTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.datastore.rxjava3;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import androidx.datastore.core.DataStore;
+import androidx.datastore.core.DataStoreFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+import kotlinx.coroutines.CoroutineScopeKt;
+import kotlinx.coroutines.Dispatchers;
+
+public class RxDataStoreTest {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static Single<Byte> incrementByte(Byte byteIn) {
+ return Single.just(++byteIn);
+ }
+
+ @Test
+ public void testGetSingleValue() throws Exception {
+ File newFile = tempFolder.newFile();
+
+ DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+ new TestingSerializer(),
+ null,
+ new ArrayList<>(),
+ CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+ () -> newFile);
+
+ Byte firstByte = RxDataStore.data(byteStore).blockingFirst();
+ assertThat(firstByte).isEqualTo(0);
+
+ Single<Byte> incrementByte = RxDataStore.updateDataAsync(byteStore,
+ RxDataStoreTest::incrementByte);
+
+ assertThat(incrementByte.blockingGet()).isEqualTo(1);
+
+ firstByte = RxDataStore.data(byteStore).blockingFirst();
+ assertThat(firstByte).isEqualTo(1);
+ }
+
+ @Test
+ public void testTake3() throws Exception {
+ File newFile = tempFolder.newFile();
+
+ DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+ new TestingSerializer(),
+ null,
+ new ArrayList<>(),
+ CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+ () -> newFile);
+
+ TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+ RxDataStore.updateDataAsync(byteStore, RxDataStoreTest::incrementByte);
+ RxDataStore.updateDataAsync(byteStore, RxDataStoreTest::incrementByte);
+
+ testSubscriber.awaitCount(3).assertValues((byte) 0, (byte) 1, (byte) 2);
+ }
+
+
+ @Test
+ public void testReadFailure() throws Exception {
+ File newFile = tempFolder.newFile();
+ TestingSerializer testingSerializer = new TestingSerializer();
+
+ DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+ testingSerializer,
+ null,
+ new ArrayList<>(),
+ CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+ () -> newFile);
+
+ testingSerializer.setFailingRead(true);
+
+ TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+ assertThat(testSubscriber.await(5, TimeUnit.SECONDS)).isTrue();
+
+ testSubscriber.assertError(IOException.class);
+
+ testingSerializer.setFailingRead(false);
+
+ testSubscriber = RxDataStore.data(byteStore).test();
+ testSubscriber.awaitCount(1).assertValues((byte) 0);
+ }
+
+ @Test
+ public void testWriteFailure() throws Exception {
+ File newFile = tempFolder.newFile();
+ TestingSerializer testingSerializer = new TestingSerializer();
+
+ DataStore<Byte> byteStore = DataStoreFactory.INSTANCE.create(
+ testingSerializer,
+ null,
+ new ArrayList<>(),
+ CoroutineScopeKt.CoroutineScope(Dispatchers.getIO()),
+ () -> newFile);
+
+ TestSubscriber<Byte> testSubscriber = RxDataStore.data(byteStore).test();
+
+ testingSerializer.setFailingWrite(true);
+ Single<Byte> incrementByte = RxDataStore.updateDataAsync(byteStore,
+ RxDataStoreTest::incrementByte);
+
+ incrementByte.cache().test().await().assertError(IOException.class);
+
+ testSubscriber.awaitCount(1).assertNoErrors().assertValues((byte) 0);
+ testingSerializer.setFailingWrite(false);
+
+ Single<Byte> incrementByte2 = RxDataStore.updateDataAsync(byteStore,
+ RxDataStoreTest::incrementByte);
+ assertThat(incrementByte2.blockingGet()).isEqualTo((byte) 1);
+
+ testSubscriber.awaitCount(2).assertValues((byte) 0, (byte) 1);
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 035913c..72c87d0 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -287,8 +287,11 @@
"datastore/datastore-preferences-core/datastore-preferences-proto", [BuildType.MAIN])
includeProject(":datastore:datastore-preferences-rxjava2",
"datastore/datastore-preferences-rxjava2", [BuildType.MAIN])
+includeProject(":datastore:datastore-preferences-rxjava3",
+ "datastore/datastore-preferences-rxjava3", [BuildType.MAIN])
includeProject(":datastore:datastore-proto", "datastore/datastore-proto", [BuildType.MAIN])
includeProject(":datastore:datastore-rxjava2", "datastore/datastore-rxjava2", [BuildType.MAIN])
+includeProject(":datastore:datastore-rxjava3", "datastore/datastore-rxjava3", [BuildType.MAIN])
includeProject(":datastore:datastore-sampleapp", "datastore/datastore-sampleapp", [BuildType.MAIN])
includeProject(":documentfile:documentfile", "documentfile/documentfile", [BuildType.MAIN])
includeProject(":drawerlayout:drawerlayout", "drawerlayout/drawerlayout", [BuildType.MAIN])