diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
index 62183b622..4dd87b4d2 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
@@ -790,7 +790,7 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
                         TraceContext = currentRequestTraceContext,
                     };
 
-                    if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage))
+                    if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
                     {
                         // If all messages belong to the same execution ID, then all of them need to be discarded.
                         // However, it's also possible to have messages for *any* execution ID batched together with messages
@@ -1046,7 +1046,7 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
                 data.Episode.GetValueOrDefault(-1));
         }
 
-        bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, out string message)
+        bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances, out string message)
         {
             if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
             {
@@ -1072,6 +1072,7 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMess
             }
 
             if (runtimeState.ExecutionStartedEvent != null &&
+                !allowReplayingTerminalInstances &&
                 runtimeState.OrchestrationStatus != OrchestrationStatus.Running &&
                 runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
                 runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
index 71121daf8..2c41bce57 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
@@ -156,6 +156,20 @@ public class AzureStorageOrchestrationServiceSettings
         /// </summary>
         public bool UseAppLease { get; set; } = true;
 
+        /// <summary>
+        /// When false, when an orchestrator is in a terminal state (e.g. Completed, Failed, Terminated), events for that orchestrator are discarded.
+        /// Otherwise, events for a terminal orchestrator induce a replay. This may be used to recompute the state of the orchestrator in the "Instances Table".
+        /// </summary>
+        /// <remarks>
+        /// Transactions across Azure Tables are not possible, so we independently update the "History table" and then the "Instances table"
+        /// to set the state of the orchestrator.
+        /// If a crash were to occur between these two updates, the state of the orchestrator in the "Instances table" would be incorrect.
+        /// By setting this configuration to true, you can recover from these inconsistencies by forcing a replay of the orchestrator in response
+        /// to a client event like a termination request or an external event, which gives the framework another opportunity to update the state of
+        /// the orchestrator in the "Instances table". To force a replay after enabling this configuration, just send any external event to the affected instanceId.
+        /// </remarks>
+        public bool AllowReplayingTerminalInstances { get; set; } = false;
+
         /// <summary>
         /// If UseAppLease is true, gets or sets the AppLeaseOptions used for acquiring the lease to start the application.
         /// </summary>
diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
index ab0dd8c06..1c505cc2f 100644
--- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
@@ -2312,6 +2312,105 @@ await Task.WhenAll(
             }
         }
 
+        /// <summary>
+        /// End-to-end test validating the <see cref="AzureStorageOrchestrationServiceSettings.AllowReplayingTerminalInstances"/> setting.
+        /// </summary>
+        /// <remarks>
+        /// The `AllowReplayingTerminalInstances` setting was introduced to fix a gap in
+        /// the Azure Storage provider where the History table and Instance table may get out of sync.
+        ///
+        /// Namely, suppose we're updating an orchestrator from the Running state to Completed. If the DTFx process crashes
+        /// after updating the History table but before updating the Instance table, the orchestrator will be in a terminal
+        /// state according to the History table, but not the Instance table. Since the History claims the orchestrator is terminal,
+        /// we will *discard* any new events that try to reach the orchestrator, including "Terminate" requests. Therefore, the data
+        /// in the Instace table will remain incorrect until it is manually edited.
+        ///
+        /// To recover from this, users may set `AllowReplayingTerminalInstances` to true. When this is set, DTFx will not discard
+        /// events for terminal orchestrators, forcing a replay which eventually updates the instances table to the right state.
+        /// </remarks>
+        [DataTestMethod]
+        [DataRow(true, true, true)]
+        [DataRow(true, true, false)]
+        [DataRow(true, false, true)]
+        [DataRow(true, false, false)]
+        [DataRow(false, true, true)]
+        [DataRow(false, true, false)]
+        [DataRow(false, false, true)]
+        [DataRow(false, false, false)]
+        public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances)
+        {
+            using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
+                enableExtendedSessions,
+                allowReplayingTerminalInstances: allowReplayingTerminalInstances))
+            {
+                await host.StartAsync();
+
+                // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator
+                var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!");
+                var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
+                Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
+
+                // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running".
+                // This simulates the scenario where the History table was updated, but not the Instance table.
+                var instanceId = client.InstanceId;
+                AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings(
+                    enableExtendedSessions,
+                    allowReplayingTerminalInstances: allowReplayingTerminalInstances);
+                AzureStorageClient azureStorageClient = new AzureStorageClient(settings);
+
+                Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
+                TableEntity entity = new TableEntity(instanceId, "")
+                {
+                    ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G")
+                };
+                await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);
+
+                // Assert that the status in the Instance table reads "Running"
+                IList<OrchestrationState> state = await client.GetStateAsync(instanceId);
+                OrchestrationStatus forcedStatus = state.First().OrchestrationStatus;
+                Assert.AreEqual(OrchestrationStatus.Running, forcedStatus);
+
+
+                // Send event (either terminate or external event) and wait for the event to be processed.
+                // The wait is possibly flakey, but unit testing this directly is difficult today
+                if (sendTerminateEvent)
+                {
+                    // we want to test the "Terminate" event explicitly because it's the first thing users
+                    // should try when an orchestrator is in a bad state
+                    await client.TerminateAsync("Foo");
+                }
+                else
+                {
+                    // we test the raise event case because, if `AllowReplayingTerminalInstances` is set to true,
+                    // an "unregistered" external event (one that the orchestrator is not expected) allows the orchestrator
+                    // to update the Instance table to the correct state without forcing it to end up as "Terminated".
+                    await client.RaiseEventAsync("Foo", "Bar");
+                }
+                await Task.Delay(TimeSpan.FromSeconds(30));
+
+                if (allowReplayingTerminalInstances)
+                {
+                    // A replay should have occurred, forcing the instance table to be updated with a terminal status
+                    state = await client.GetStateAsync(instanceId);
+                    Assert.AreEqual(1, state.Count);
+
+                    status = state.First();
+                    OrchestrationStatus expectedStatus = sendTerminateEvent ? OrchestrationStatus.Terminated : OrchestrationStatus.Completed;
+                    Assert.AreEqual(expectedStatus, status.OrchestrationStatus);
+                }
+                else
+                {
+                    // A replay should not have occurred, the instance table should still have the "Running" status
+                    state = await client.GetStateAsync(instanceId);
+                    Assert.AreEqual(1, state.Count);
+
+                    status = state.First();
+                    Assert.AreEqual(OrchestrationStatus.Running, status.OrchestrationStatus);
+                }
+                await host.StopAsync();
+            }
+        }
+
 #if !NET462
         /// <summary>
         /// End-to-end test which validates a simple orchestrator function that calls an activity function
diff --git a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
index fac50044b..80bd79e0d 100644
--- a/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
+++ b/test/DurableTask.AzureStorage.Tests/TestHelpers.cs
@@ -27,7 +27,25 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
             bool enableExtendedSessions,
             int extendedSessionTimeoutInSeconds = 30,
             bool fetchLargeMessages = true,
+            bool allowReplayingTerminalInstances = false,
             Action<AzureStorageOrchestrationServiceSettings>? modifySettingsAction = null)
+        {
+            AzureStorageOrchestrationServiceSettings settings = GetTestAzureStorageOrchestrationServiceSettings(
+                enableExtendedSessions,
+                extendedSessionTimeoutInSeconds,
+                fetchLargeMessages,
+                allowReplayingTerminalInstances);
+            // Give the caller a chance to make test-specific changes to the settings
+            modifySettingsAction?.Invoke(settings);
+
+            return new TestOrchestrationHost(settings);
+        }
+
+        public static AzureStorageOrchestrationServiceSettings GetTestAzureStorageOrchestrationServiceSettings(
+            bool enableExtendedSessions,
+            int extendedSessionTimeoutInSeconds = 30,
+            bool fetchLargeMessages = true,
+            bool allowReplayingTerminalInstances = false)
         {
             string storageConnectionString = GetTestStorageAccountConnectionString();
 
@@ -43,15 +61,13 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
                 FetchLargeMessageDataEnabled = fetchLargeMessages,
                 StorageAccountClientProvider = new StorageAccountClientProvider(storageConnectionString),
                 TaskHubName = GetTestTaskHubName(),
+                AllowReplayingTerminalInstances = allowReplayingTerminalInstances,
 
                 // Setting up a logger factory to enable the new DurableTask.Core logs
                 LoggerFactory = loggerFactory,
             };
 
-            // Give the caller a chance to make test-specific changes to the settings
-            modifySettingsAction?.Invoke(settings);
-
-            return new TestOrchestrationHost(settings);
+            return settings;
         }
 
         public static string GetTestStorageAccountConnectionString()