Skip to content

Commit

Permalink
fix(datastore) Race condition fix and other stability-related fixes (#…
Browse files Browse the repository at this point in the history
…599)

* Don't try to initialize orchestrator in beforeOperation
* Update aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java
* Refactored some stop/start logic
* Retry logic for subscription processor. Modified start/stop sequence
* Add new hub event message types
* Changes related to subscription connectivity problems (#615)
* Adding basic unit tests for retry handler
  • Loading branch information
rjuliano authored Jul 16, 2020
1 parent 8a6e133 commit 0d560a6
Show file tree
Hide file tree
Showing 17 changed files with 768 additions and 226 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

final class SubscriptionOperation<T> extends GraphQLOperation<T> {
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-api");
Expand All @@ -38,9 +40,10 @@ final class SubscriptionOperation<T> extends GraphQLOperation<T> {
private final Consumer<GraphQLResponse<T>> onNextItem;
private final Consumer<ApiException> onSubscriptionError;
private final Action onSubscriptionComplete;
private final AtomicBoolean canceled;

private String subscriptionId;
private boolean canceled;
private Future<?> subscriptionFuture;

@SuppressWarnings("ParameterNumber")
private SubscriptionOperation(
Expand All @@ -59,8 +62,7 @@ private SubscriptionOperation(
this.onSubscriptionError = onSubscriptionError;
this.onSubscriptionComplete = onSubscriptionComplete;
this.executorService = executorService;
this.subscriptionId = null;
this.canceled = false;
this.canceled = new AtomicBoolean(false);
}

@NonNull
Expand All @@ -70,13 +72,13 @@ static <T> SubscriptionManagerStep<T> builder() {

@Override
public synchronized void start() {
if (canceled) {
if (canceled.get()) {
onSubscriptionError.accept(new ApiException(
"Operation already canceled.", "Don't cancel the subscription before starting it!"
));
return;
}
executorService.submit(() -> {
subscriptionFuture = executorService.submit(() -> {
LOG.debug("Requesting subscription: " + getRequest().getContent());
subscriptionEndpoint.requestSubscription(
getRequest(),
Expand All @@ -85,21 +87,31 @@ public synchronized void start() {
onSubscriptionStart.accept(subscriptionId);
},
onNextItem,
onSubscriptionError,
apiException -> {
// Guard against calling something that's been cancelled already.
if (!canceled.get()) {
canceled.set(true);
onSubscriptionError.accept(apiException);
}
},
onSubscriptionComplete
);
});
}

@Override
public synchronized void cancel() {
if (subscriptionId != null && !canceled) {
canceled = true;
if (subscriptionId != null && !canceled.get()) {
canceled.set(true);
try {
subscriptionEndpoint.releaseSubscription(subscriptionId);
} catch (ApiException exception) {
onSubscriptionError.accept(exception);
}
} else if (subscriptionFuture != null && subscriptionFuture.cancel(true)) {
LOG.debug("Subscription attempt was canceled.");
} else {
LOG.debug("Nothing to cancel. Subscription not yet created.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,18 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Completable;
import io.reactivex.schedulers.Schedulers;

/**
* An AWS implementation of the {@link DataStorePlugin}.
*/
public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
private static final long PLUGIN_INIT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
private static final long PLUGIN_TERMINATE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
// Reference to an implementation of the Local Storage Adapter that
// manages the persistence of data on-device.
private final LocalStorageAdapter sqliteStorageAdapter;
Expand All @@ -72,6 +75,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
// Keeps track of whether of not the category is initialized yet
private final CountDownLatch categoryInitializationsPending;

private final AtomicBoolean isOrchestratorReady;

// Used to interrogate plugins, to understand if sync should be automatically turned on
private final ApiCategory api;

Expand All @@ -90,6 +95,7 @@ private AWSDataStorePlugin(
@Nullable DataStoreConfiguration userProvidedConfiguration) {
this.sqliteStorageAdapter = SQLiteStorageAdapter.forModels(modelSchemaRegistry, modelProvider);
this.categoryInitializationsPending = new CountDownLatch(1);
this.isOrchestratorReady = new AtomicBoolean(false);
this.api = api;
this.orchestrator = new Orchestrator(
modelProvider,
Expand Down Expand Up @@ -192,13 +198,18 @@ public void configure(
@Override
public void initialize(@NonNull Context context) throws AmplifyException {
Throwable initError = initializeStorageAdapter(context)
.andThen(initializeOrchestrator())
.blockingGet(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (initError != null) {
throw new AmplifyException("Failed to initialize the local storage adapter for the DataStore plugin.",
initError,
AmplifyException.TODO_RECOVERY_SUGGESTION);
}
// Kick off orchestrator asynchronously.
synchronized (isOrchestratorReady) {
initializeOrchestrator()
.subscribeOn(Schedulers.io())
.subscribe();
}
}

/**
Expand All @@ -219,8 +230,13 @@ private Completable initializeStorageAdapter(Context context) {
*/
@SuppressWarnings("unused")
synchronized void terminate() throws AmplifyException {
orchestrator.stop();
sqliteStorageAdapter.terminate();
Throwable throwable = orchestrator.stop()
.andThen(
Completable.fromAction(sqliteStorageAdapter::terminate)
).blockingGet(PLUGIN_TERMINATE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (throwable != null) {
LOG.warn("An error occurred while terminating the DataStore plugin.", throwable);
}
}

/**
Expand Down Expand Up @@ -425,31 +441,66 @@ public <T extends Model> void observe(
@Override
public void clear(@NonNull Action onComplete,
@NonNull Consumer<DataStoreException> onError) {
beforeOperation(() -> {
orchestrator.stop();
sqliteStorageAdapter.clear(onComplete, onError);
});
// We shouldn't call beforeOperation when clearing the DataStore. The
// only thing we have to wait for is the category initialization latch.
boolean isCategoryInitialized = false;
try {
isCategoryInitialized = categoryInitializationsPending.await(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException exception) {
LOG.warn("Execution interrupted while waiting for DataStore to be initialized.");
}
if (!isCategoryInitialized) {
onError.accept(new DataStoreException("DataStore not ready to be cleared.", "Retry your request."));
return;
}
isOrchestratorReady.set(false);
orchestrator.stop()
.subscribeOn(Schedulers.io())
.andThen(Completable.fromAction(() -> sqliteStorageAdapter.clear(() -> {
// Invoke the consumer's callback once the clear operation is finished.
onComplete.call();
// Kick off the orchestrator asynchronously.
initializeOrchestrator()
.doOnError(throwable -> LOG.warn("Failed to restart orchestrator after clearing.", throwable))
.doOnComplete(() -> LOG.info("Orchestrator restarted after clear operation."))
.subscribe();
}, onError)))
.doOnError(throwable -> LOG.warn("Clear operation failed", throwable))
.doOnComplete(() -> LOG.debug("Clear operation completed."))
.subscribe();
}

private void beforeOperation(@NonNull final Runnable runnable) {
Completable opCompletable = Completable.fromAction(categoryInitializationsPending::await);
if (!orchestrator.isStarted()) {
opCompletable = opCompletable
.andThen(initializeOrchestrator());
}
Throwable throwable = opCompletable
Throwable throwable = Completable.fromAction(categoryInitializationsPending::await)
.repeatUntil(() -> {
// Repeat until this is true or the blockingGet call times out.
synchronized (isOrchestratorReady) {
return isOrchestratorReady.get();
}
})
.andThen(Completable.fromRunnable(runnable))
.blockingGet(PLUGIN_INIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (throwable != null) {
LOG.warn("Failed to execute request due to an unexpected error.", throwable);
if (!(throwable == null && isOrchestratorReady.get())) {
if (!isOrchestratorReady.get()) {
LOG.warn("Failed to execute request because DataStore is not fully initialized.");
} else {
LOG.warn("Failed to execute request due to an unexpected error.", throwable);
}
}
}

private Completable initializeOrchestrator() {
if (api.getPlugins().isEmpty()) {
return Completable.complete();
} else {
return orchestrator.start();
// Let's prevent the orchestrator startup from possibly running in main.
return orchestrator.start(() -> {
// This callback is invoked when the local storage observer gets initialized.
isOrchestratorReady.set(true);
})
.repeatUntil(() -> isOrchestratorReady.get())
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,27 @@
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;

import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.graphql.GraphQLResponse;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.Consumer;
import com.amplifyframework.core.async.Cancelable;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
import com.amplifyframework.logging.Logger;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.ObservableEmitter;
import io.reactivex.disposables.Disposable;

/**
* A utility for building Rx {@link Disposable}s from Amplify entities,
* e.g. the {@link Cancelable}.
*/
public final class AmplifyDisposables {
private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");

private AmplifyDisposables() {}

/**
Expand Down Expand Up @@ -61,4 +71,47 @@ public boolean isDisposed() {
}
};
}

/**
* This function that creates a {@link Consumer} which wraps the {@link ObservableEmitter#onError(Throwable)}
* to prevent it from calling observers that have already been disposed.
*
* <p>
* The scenario is that we have multiple event sources (1 AppSync subscription
* for each model+operation type combination) being consumed by a single downstream
* oberserver. Once the first subscription emits an error, the downstream subscriber
* is placed in a disposed state and will not receive any further notifications.
* In a situation such as loss of connectivity, it's innevitable that multiple subscriptions will fail.
* With that said, after the first failure, the other events sources (AppSync subscriptions)
* will attempt to invoke the downstream onError handler which then results in an
* {@link io.reactivex.exceptions.UndeliverableException} being thrown.
* </p>
*
* <p>
* This method takes a reference of the observable that represents the AppSync subscription,
* wraps it and returns a {@link Consumer} that is used as the onError parameter. The returned
* {@link Consumer} function will delegate the onError call to the downstream observers if it's
* still available, otherwise it logs a warning.
* </p>
*
* @param realEmitter The emitter which will have it's onError function proxied by the return
* value of this function.
* @param <T> The type of model handled by the emitter.
* @param <E> The type of exception for the onError consumer
* @return A {@link Consumer} that proxies the {@link ObservableEmitter#onError(Throwable)} call
* to the {@code realEmitter} if it's not disposed or logs a warning.
* @see <a href="https://github.com/aws-amplify/amplify-android/issues/541">GitHub issue #541</a>
*
*/
@NonNull
public static <T extends Model, E extends AmplifyException> Consumer<E> onErrorConsumerWrapperFor(
@NonNull ObservableEmitter<GraphQLResponse<ModelWithMetadata<T>>> realEmitter) {
return dataStoreException -> {
if (realEmitter.isDisposed()) {
LOG.warn("Attempted to invoke an emitter that is already disposed", dataStoreException);
} else {
realEmitter.onError(dataStoreException);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* Just a ~type-alias for a consumer of DataStoreException.
*/
interface DataStoreErrorHandler extends Consumer<DataStoreException> {
public interface DataStoreErrorHandler extends Consumer<DataStoreException> {
}
Loading

0 comments on commit 0d560a6

Please sign in to comment.