Skip to content

Commit

Permalink
add setting to allow replaying terminal orchestrators (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid authored Sep 11, 2024
1 parent b898aa9 commit ed3ece3
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
TraceContext = currentRequestTraceContext,
};

if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage))
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
Expand Down Expand Up @@ -1046,7 +1046,7 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
data.Episode.GetValueOrDefault(-1));
}

bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, out string message)
bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances, out string message)
{
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
{
Expand All @@ -1072,6 +1072,7 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMess
}

if (runtimeState.ExecutionStartedEvent != null &&
!allowReplayingTerminalInstances &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Running &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public bool UseAppLease { get; set; } = true;

/// <summary>
/// When false, when an orchestrator is in a terminal state (e.g. Completed, Failed, Terminated), events for that orchestrator are discarded.
/// Otherwise, events for a terminal orchestrator induce a replay. This may be used to recompute the state of the orchestrator in the "Instances Table".
/// </summary>
/// <remarks>
/// Transactions across Azure Tables are not possible, so we independently update the "History table" and then the "Instances table"
/// to set the state of the orchestrator.
/// If a crash were to occur between these two updates, the state of the orchestrator in the "Instances table" would be incorrect.
/// By setting this configuration to true, you can recover from these inconsistencies by forcing a replay of the orchestrator in response
/// to a client event like a termination request or an external event, which gives the framework another opportunity to update the state of
/// the orchestrator in the "Instances table". To force a replay after enabling this configuration, just send any external event to the affected instanceId.
/// </remarks>
public bool AllowReplayingTerminalInstances { get; set; } = false;

/// <summary>
/// If UseAppLease is true, gets or sets the AppLeaseOptions used for acquiring the lease to start the application.
/// </summary>
Expand Down
99 changes: 99 additions & 0 deletions test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,105 @@ await Task.WhenAll(
}
}

/// <summary>
/// End-to-end test validating the <see cref="AzureStorageOrchestrationServiceSettings.AllowReplayingTerminalInstances"/> setting.
/// </summary>
/// <remarks>
/// The `AllowReplayingTerminalInstances` setting was introduced to fix a gap in
/// the Azure Storage provider where the History table and Instance table may get out of sync.
///
/// Namely, suppose we're updating an orchestrator from the Running state to Completed. If the DTFx process crashes
/// after updating the History table but before updating the Instance table, the orchestrator will be in a terminal
/// state according to the History table, but not the Instance table. Since the History claims the orchestrator is terminal,
/// we will *discard* any new events that try to reach the orchestrator, including "Terminate" requests. Therefore, the data
/// in the Instace table will remain incorrect until it is manually edited.
///
/// To recover from this, users may set `AllowReplayingTerminalInstances` to true. When this is set, DTFx will not discard
/// events for terminal orchestrators, forcing a replay which eventually updates the instances table to the right state.
/// </remarks>
[DataTestMethod]
[DataRow(true, true, true)]
[DataRow(true, true, false)]
[DataRow(true, false, true)]
[DataRow(true, false, false)]
[DataRow(false, true, true)]
[DataRow(false, true, false)]
[DataRow(false, false, true)]
[DataRow(false, false, false)]
public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
enableExtendedSessions,
allowReplayingTerminalInstances: allowReplayingTerminalInstances))
{
await host.StartAsync();

// Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!");
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);

// Simulate having an "out of date" Instance table, by setting it's runtime status to "Running".
// This simulates the scenario where the History table was updated, but not the Instance table.
var instanceId = client.InstanceId;
AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings(
enableExtendedSessions,
allowReplayingTerminalInstances: allowReplayingTerminalInstances);
AzureStorageClient azureStorageClient = new AzureStorageClient(settings);

Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
TableEntity entity = new TableEntity(instanceId, "")
{
["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G")
};
await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);

// Assert that the status in the Instance table reads "Running"
IList<OrchestrationState> state = await client.GetStateAsync(instanceId);
OrchestrationStatus forcedStatus = state.First().OrchestrationStatus;
Assert.AreEqual(OrchestrationStatus.Running, forcedStatus);


// Send event (either terminate or external event) and wait for the event to be processed.
// The wait is possibly flakey, but unit testing this directly is difficult today
if (sendTerminateEvent)
{
// we want to test the "Terminate" event explicitly because it's the first thing users
// should try when an orchestrator is in a bad state
await client.TerminateAsync("Foo");
}
else
{
// we test the raise event case because, if `AllowReplayingTerminalInstances` is set to true,
// an "unregistered" external event (one that the orchestrator is not expected) allows the orchestrator
// to update the Instance table to the correct state without forcing it to end up as "Terminated".
await client.RaiseEventAsync("Foo", "Bar");
}
await Task.Delay(TimeSpan.FromSeconds(30));

if (allowReplayingTerminalInstances)
{
// A replay should have occurred, forcing the instance table to be updated with a terminal status
state = await client.GetStateAsync(instanceId);
Assert.AreEqual(1, state.Count);

status = state.First();
OrchestrationStatus expectedStatus = sendTerminateEvent ? OrchestrationStatus.Terminated : OrchestrationStatus.Completed;
Assert.AreEqual(expectedStatus, status.OrchestrationStatus);
}
else
{
// A replay should not have occurred, the instance table should still have the "Running" status
state = await client.GetStateAsync(instanceId);
Assert.AreEqual(1, state.Count);

status = state.First();
Assert.AreEqual(OrchestrationStatus.Running, status.OrchestrationStatus);
}
await host.StopAsync();
}
}

#if !NET462
/// <summary>
/// End-to-end test which validates a simple orchestrator function that calls an activity function
Expand Down
24 changes: 20 additions & 4 deletions test/DurableTask.AzureStorage.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,25 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
bool enableExtendedSessions,
int extendedSessionTimeoutInSeconds = 30,
bool fetchLargeMessages = true,
bool allowReplayingTerminalInstances = false,
Action<AzureStorageOrchestrationServiceSettings>? modifySettingsAction = null)
{
AzureStorageOrchestrationServiceSettings settings = GetTestAzureStorageOrchestrationServiceSettings(
enableExtendedSessions,
extendedSessionTimeoutInSeconds,
fetchLargeMessages,
allowReplayingTerminalInstances);
// Give the caller a chance to make test-specific changes to the settings
modifySettingsAction?.Invoke(settings);

return new TestOrchestrationHost(settings);
}

public static AzureStorageOrchestrationServiceSettings GetTestAzureStorageOrchestrationServiceSettings(
bool enableExtendedSessions,
int extendedSessionTimeoutInSeconds = 30,
bool fetchLargeMessages = true,
bool allowReplayingTerminalInstances = false)
{
string storageConnectionString = GetTestStorageAccountConnectionString();

Expand All @@ -43,15 +61,13 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
FetchLargeMessageDataEnabled = fetchLargeMessages,
StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString),
TaskHubName = GetTestTaskHubName(),
AllowReplayingTerminalInstances = allowReplayingTerminalInstances,

// Setting up a logger factory to enable the new DurableTask.Core logs
LoggerFactory = loggerFactory,
};

// Give the caller a chance to make test-specific changes to the settings
modifySettingsAction?.Invoke(settings);

return new TestOrchestrationHost(settings);
return settings;
}

public static string GetTestStorageAccountConnectionString()
Expand Down

0 comments on commit ed3ece3

Please sign in to comment.