From ed3ece34ba4a22b6945388acf3972e1440ea8881 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 11 Sep 2024 09:34:40 -0700 Subject: [PATCH] add setting to allow replaying terminal orchestrators (#1159) --- .../AzureStorageOrchestrationService.cs | 5 +- ...zureStorageOrchestrationServiceSettings.cs | 14 +++ .../AzureStorageScenarioTests.cs | 99 +++++++++++++++++++ .../TestHelpers.cs | 24 ++++- 4 files changed, 136 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 62183b622..4dd87b4d2 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -790,7 +790,7 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo TraceContext = currentRequestTraceContext, }; - if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage)) + if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage)) { // If all messages belong to the same execution ID, then all of them need to be discarded. // However, it's also possible to have messages for *any* execution ID batched together with messages @@ -1046,7 +1046,7 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin data.Episode.GetValueOrDefault(-1)); } - bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, out string message) + bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances, out string message) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) { @@ -1072,6 +1072,7 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList public bool UseAppLease { get; set; } = true; + /// + /// When false, when an orchestrator is in a terminal state (e.g. Completed, Failed, Terminated), events for that orchestrator are discarded. + /// Otherwise, events for a terminal orchestrator induce a replay. This may be used to recompute the state of the orchestrator in the "Instances Table". + /// + /// + /// Transactions across Azure Tables are not possible, so we independently update the "History table" and then the "Instances table" + /// to set the state of the orchestrator. + /// If a crash were to occur between these two updates, the state of the orchestrator in the "Instances table" would be incorrect. + /// By setting this configuration to true, you can recover from these inconsistencies by forcing a replay of the orchestrator in response + /// to a client event like a termination request or an external event, which gives the framework another opportunity to update the state of + /// the orchestrator in the "Instances table". To force a replay after enabling this configuration, just send any external event to the affected instanceId. + /// + public bool AllowReplayingTerminalInstances { get; set; } = false; + /// /// If UseAppLease is true, gets or sets the AppLeaseOptions used for acquiring the lease to start the application. /// diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index ab0dd8c06..1c505cc2f 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -2312,6 +2312,105 @@ await Task.WhenAll( } } + /// + /// End-to-end test validating the setting. + /// + /// + /// The `AllowReplayingTerminalInstances` setting was introduced to fix a gap in + /// the Azure Storage provider where the History table and Instance table may get out of sync. + /// + /// Namely, suppose we're updating an orchestrator from the Running state to Completed. If the DTFx process crashes + /// after updating the History table but before updating the Instance table, the orchestrator will be in a terminal + /// state according to the History table, but not the Instance table. Since the History claims the orchestrator is terminal, + /// we will *discard* any new events that try to reach the orchestrator, including "Terminate" requests. Therefore, the data + /// in the Instace table will remain incorrect until it is manually edited. + /// + /// To recover from this, users may set `AllowReplayingTerminalInstances` to true. When this is set, DTFx will not discard + /// events for terminal orchestrators, forcing a replay which eventually updates the instances table to the right state. + /// + [DataTestMethod] + [DataRow(true, true, true)] + [DataRow(true, true, false)] + [DataRow(true, false, true)] + [DataRow(true, false, false)] + [DataRow(false, true, true)] + [DataRow(false, true, false)] + [DataRow(false, false, true)] + [DataRow(false, false, false)] + public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost( + enableExtendedSessions, + allowReplayingTerminalInstances: allowReplayingTerminalInstances)) + { + await host.StartAsync(); + + // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!"); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions, + allowReplayingTerminalInstances: allowReplayingTerminalInstances); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G") + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + + // Send event (either terminate or external event) and wait for the event to be processed. + // The wait is possibly flakey, but unit testing this directly is difficult today + if (sendTerminateEvent) + { + // we want to test the "Terminate" event explicitly because it's the first thing users + // should try when an orchestrator is in a bad state + await client.TerminateAsync("Foo"); + } + else + { + // we test the raise event case because, if `AllowReplayingTerminalInstances` is set to true, + // an "unregistered" external event (one that the orchestrator is not expected) allows the orchestrator + // to update the Instance table to the correct state without forcing it to end up as "Terminated". + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + if (allowReplayingTerminalInstances) + { + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + OrchestrationStatus expectedStatus = sendTerminateEvent ? OrchestrationStatus.Terminated : OrchestrationStatus.Completed; + Assert.AreEqual(expectedStatus, status.OrchestrationStatus); + } + else + { + // A replay should not have occurred, the instance table should still have the "Running" status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Running, status.OrchestrationStatus); + } + await host.StopAsync(); + } + } + #if !NET462 /// /// End-to-end test which validates a simple orchestrator function that calls an activity function diff --git a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs index fac50044b..80bd79e0d 100644 --- a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs +++ b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs @@ -27,7 +27,25 @@ public static TestOrchestrationHost GetTestOrchestrationHost( bool enableExtendedSessions, int extendedSessionTimeoutInSeconds = 30, bool fetchLargeMessages = true, + bool allowReplayingTerminalInstances = false, Action? modifySettingsAction = null) + { + AzureStorageOrchestrationServiceSettings settings = GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions, + extendedSessionTimeoutInSeconds, + fetchLargeMessages, + allowReplayingTerminalInstances); + // Give the caller a chance to make test-specific changes to the settings + modifySettingsAction?.Invoke(settings); + + return new TestOrchestrationHost(settings); + } + + public static AzureStorageOrchestrationServiceSettings GetTestAzureStorageOrchestrationServiceSettings( + bool enableExtendedSessions, + int extendedSessionTimeoutInSeconds = 30, + bool fetchLargeMessages = true, + bool allowReplayingTerminalInstances = false) { string storageConnectionString = GetTestStorageAccountConnectionString(); @@ -43,15 +61,13 @@ public static TestOrchestrationHost GetTestOrchestrationHost( FetchLargeMessageDataEnabled = fetchLargeMessages, StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString), TaskHubName = GetTestTaskHubName(), + AllowReplayingTerminalInstances = allowReplayingTerminalInstances, // Setting up a logger factory to enable the new DurableTask.Core logs LoggerFactory = loggerFactory, }; - // Give the caller a chance to make test-specific changes to the settings - modifySettingsAction?.Invoke(settings); - - return new TestOrchestrationHost(settings); + return settings; } public static string GetTestStorageAccountConnectionString()