From dc957cf36f1d3fdc8f873287b1844ee895b36533 Mon Sep 17 00:00:00 2001 From: Tom Seida Date: Thu, 4 Apr 2024 09:00:28 -0400 Subject: [PATCH 1/2] Add ability to set both RetryPolicy and AsyncRetryHandler for activity functions and suborchestrations. --- src/Abstractions/TaskOptions.cs | 17 ++ src/Abstractions/TaskRetryOptions.cs | 19 +++ .../Shims/TaskOrchestrationContextWrapper.cs | 148 ++++++++++++++++-- .../OrchestrationErrorHandling.cs | 137 ++++++++++++++++ 4 files changed, 311 insertions(+), 10 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 63f94382..4ed688a6 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -29,6 +29,15 @@ public TaskOptions(TaskRetryOptions? retry = null) /// A built from the policy. public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + + /// + /// Returns a new from the provided and . + /// + /// The policy to convert from. + /// The handler to convert from. + /// A built from the policy. + public static TaskOptions FromRetryPolicy(RetryPolicy policy, AsyncRetryHandler handler) => new(new TaskRetryOptions(policy, handler)); + /// /// Returns a new from the provided . /// @@ -36,6 +45,14 @@ public TaskOptions(TaskRetryOptions? retry = null) /// A built from the handler. public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); + /// + /// Returns a new from the provided and . + /// + /// The handler to convert from. + /// The policy to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(AsyncRetryHandler handler, RetryPolicy policy) => new(new TaskRetryOptions(policy, handler)); + /// /// Returns a new from the provided . /// diff --git a/src/Abstractions/TaskRetryOptions.cs b/src/Abstractions/TaskRetryOptions.cs index ed2493e4..86e3dd02 100644 --- a/src/Abstractions/TaskRetryOptions.cs +++ b/src/Abstractions/TaskRetryOptions.cs @@ -27,6 +27,17 @@ public TaskRetryOptions(AsyncRetryHandler handler) this.Handler = Check.NotNull(handler); } + /// + /// Initializes a new instance of the class. + /// + /// The retry policy to use. + /// The retry handler to use. + public TaskRetryOptions(RetryPolicy policy, AsyncRetryHandler handler) + { + this.Policy = Check.NotNull(policy); + this.Handler = Check.NotNull(handler); + } + /// /// Gets the retry policy. null if is set. /// @@ -62,6 +73,14 @@ public TaskRetryOptions(AsyncRetryHandler handler) /// A built from the policy. public static TaskRetryOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + /// + /// Returns a new from the provided . + /// + /// The policy to convert from. + /// The handler to convert from. + /// A built from the policy. + public static TaskRetryOptions FromRetryPolicy(RetryPolicy policy, AsyncRetryHandler handler) => new(policy, handler); + /// /// Returns a new from the provided . /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 608a7038..b23b8d54 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -1,12 +1,15 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using System.Globalization; using System.Security.Cryptography; using System.Text; using DurableTask.Core; +using DurableTask.Core.Common; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.Serializing.Internal; +using DurableTask.Core.Tracing; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; @@ -115,11 +118,23 @@ public override async Task CallActivityAsync( // TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7) if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.ScheduleWithRetry( - name.Name, - name.Version, - policy.ToDurableTaskCoreRetryOptions(), - input); + if (options?.Retry?.Handler is AsyncRetryHandler handler) + { + return await this.InvokeWithCustomRetryHandler( + () => this.innerContext.ScheduleTask(name.Name, name.Version, input), + name.Name, + policy, + handler, + default); + } + else + { + return await this.innerContext.ScheduleWithRetry( + name.Name, + name.Version, + policy.ToDurableTaskCoreRetryOptions(), + input); + } } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -164,12 +179,28 @@ public override async Task CallSubOrchestratorAsync( { if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( + if (options?.Retry?.Handler is AsyncRetryHandler handler) + { + return await this.InvokeWithCustomRetryHandler( + () => this.innerContext.CreateSubOrchestrationInstance( + orchestratorName.Name, + orchestratorName.Version, + instanceId, + input), orchestratorName.Name, - orchestratorName.Version, - instanceId, - policy.ToDurableTaskCoreRetryOptions(), - input); + policy, + handler, + default); + } + else + { + return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( + orchestratorName.Name, + orchestratorName.Version, + instanceId, + policy.ToDurableTaskCoreRetryOptions(), + input); + } } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -449,4 +480,101 @@ async Task InvokeWithCustomRetryHandler( } } } + + async Task InvokeWithCustomRetryHandler( + Func> action, + string taskName, + RetryPolicy retryPolicy, + AsyncRetryHandler retryHandler, + CancellationToken cancellationToken) + { + DateTime startTime = this.CurrentUtcDateTime; + int failureCount = 0; + + for (int retryCount = 0; retryCount < retryPolicy.MaxNumberOfAttempts; retryCount++) + { + try + { + return await action(); + } + catch (global::DurableTask.Core.Exceptions.OrchestrationException e) + { + // Some failures are not retryable, like failures for missing activities or sub-orchestrations + if (e.FailureDetails?.IsNonRetriable == true) + { + throw; + } + + failureCount++; + + this.logger.RetryingTask( + this.InstanceId, + taskName, + attempt: failureCount); + + RetryContext retryContext = new( + this, + failureCount, + TaskFailureDetails.FromException(e), + this.CurrentUtcDateTime.Subtract(startTime), + cancellationToken); + + bool keepRetrying = await retryHandler(retryContext); + if (!keepRetrying) + { + throw; + } + + TimeSpan nextDelay = this.ComputeNextDelay(retryPolicy, failureCount, startTime); + + // If not more retries and integer overflow safety check + if (nextDelay == TimeSpan.Zero) + { + throw; + } + + if (failureCount == int.MaxValue) + { + throw; + } + } + } + + // Should never execute. + return default!; + } + + TimeSpan ComputeNextDelay( + RetryPolicy retryPolicy, + int attempt, + DateTime firstAttempt) + { + TimeSpan nextDelay = TimeSpan.Zero; + try + { + if (attempt < retryPolicy.MaxNumberOfAttempts) + { + DateTime retryExpiration = (retryPolicy.RetryTimeout != Timeout.InfiniteTimeSpan) + ? firstAttempt.Add(retryPolicy.RetryTimeout) + : DateTime.MaxValue; + if (this.CurrentUtcDateTime < retryExpiration) + { + double nextDelayInMilliseconds = + retryPolicy.FirstRetryInterval.TotalMilliseconds * + Math.Pow(retryPolicy.BackoffCoefficient, attempt); + + nextDelay = nextDelayInMilliseconds < retryPolicy.MaxRetryInterval.TotalMilliseconds + ? TimeSpan.FromMilliseconds(nextDelayInMilliseconds) + : retryPolicy.MaxRetryInterval; + } + } + } + catch (Exception e) when (!Utils.IsFatal(e)) + { + // Catch any exceptions during ComputeNextDelay so we don't override original error with new error + TraceHelper.TraceExceptionInstance(TraceEventType.Error, "RetryInterceptor-ComputeNextDelayException", this.innerContext.OrchestrationInstance, e); + } + + return nextDelay; + } } diff --git a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs index f1281e9f..28af442c 100644 --- a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs +++ b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs @@ -211,6 +211,73 @@ public async Task RetryActivityFailuresCustomLogic(int expectedNumberOfAttempts, Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); } + [Theory] + [InlineData(10, typeof(ApplicationException), false, 2, 1)] // 1 attempt since retry timeout expired. + [InlineData(2, typeof(ApplicationException), false, null, 1)] // 1 attempt since handler specifies no retry. + [InlineData(2, typeof(CustomException), true, null, 2)] // 2 attempts, custom exception type + [InlineData(10, typeof(XunitException), true, null, 10)] // 10 attempts, 3rd party exception type + public async Task RetryActivityFailuresCustomLogicAndPolicy(int maxNumberOfAttempts, Type exceptionType, bool isRetryException, int? retryTimeout, int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + int retryHandlerCalls = 0; + RetryPolicy retryPolicy = new( + maxNumberOfAttempts, + firstRetryInterval: TimeSpan.FromMilliseconds(1), + backoffCoefficient: 2, + retryTimeout: retryTimeout.HasValue ? TimeSpan.FromMilliseconds(retryTimeout.Value) : null); + TaskOptions taskOptions = TaskOptions.FromRetryPolicy(retryPolicy, retryContext => + { + // This is technically orchestrator code that gets replayed, like everything else + if (!retryContext.OrchestrationContext.IsReplaying) + { + retryHandlerCalls++; + } + + // IsCausedBy is supposed to handle exception inheritance; fail if it doesn't + if (!retryContext.LastFailure.IsCausedBy()) + { + return Task.FromResult(false); + } + + // This handler only works with CustomException + if (retryContext.LastFailure.IsCausedBy(exceptionType)) + { + return Task.FromResult(isRetryException); + } + + return Task.FromResult(true); + }); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "BustedOrchestration"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => + tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + await ctx.CallActivityAsync("Foo", options: taskOptions); + }) + .AddActivityFunc("Foo", (TaskActivityContext context) => + { + actualNumberOfAttempts++; + throw MakeException(exceptionType, errorMessage); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); + } + + /// /// Tests retry policies for sub-orchestration calls. /// @@ -258,6 +325,76 @@ public async Task RetrySubOrchestrationFailures(int expectedNumberOfAttempts, Ty Assert.True(metadata.FailureDetails.IsCausedBy()); } + [Theory] + [InlineData(10, typeof(ApplicationException), false, 2, 1)] // 1 attempt since retry timeout expired. + [InlineData(2, typeof(ApplicationException), false, null, 1)] // 1 attempt since handler specifies no retry. + [InlineData(2, typeof(CustomException), true, null, 2)] // 2 attempts, custom exception type + [InlineData(10, typeof(XunitException), true, null, 10)] // 10 attempts, 3rd party exception type + public async Task RetrySubOrchestratorFailuresCustomLogicAndPolicy(int maxNumberOfAttempts, Type exceptionType, bool isRetryException, int? retryTimeout, int expectedNumberOfAttempts) + { + string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging + + int retryHandlerCalls = 0; + RetryPolicy retryPolicy = new( + maxNumberOfAttempts, + firstRetryInterval: TimeSpan.FromMilliseconds(1), + backoffCoefficient: 2, + retryTimeout: retryTimeout.HasValue ? TimeSpan.FromMilliseconds(retryTimeout.Value) : null); + TaskOptions taskOptions = TaskOptions.FromRetryPolicy(retryPolicy, retryContext => + { + // This is technically orchestrator code that gets replayed, like everything else + if (!retryContext.OrchestrationContext.IsReplaying) + { + retryHandlerCalls++; + } + + // IsCausedBy is supposed to handle exception inheritance; fail if it doesn't + if (!retryContext.LastFailure.IsCausedBy()) + { + return Task.FromResult(false); + } + + // This handler only works with CustomException + if (retryContext.LastFailure.IsCausedBy(exceptionType)) + { + return Task.FromResult(isRetryException); + } + + return Task.FromResult(true); + }); + + int actualNumberOfAttempts = 0; + + TaskName orchestratorName = "OrchestrationWithBustedSubOrchestrator"; + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => + tasks.AddOrchestratorFunc(orchestratorName, async ctx => + { + await ctx.CallSubOrchestratorAsync("BustedSubOrchestrator", options: taskOptions); + }) + .AddOrchestratorFunc("BustedSubOrchestrator", context => + { + actualNumberOfAttempts++; + throw MakeException(exceptionType, errorMessage); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls); + Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts); + + // The root orchestration failed due to a failure with the sub-orchestration, resulting in a TaskFailedException + Assert.NotNull(metadata.FailureDetails); + Assert.True(metadata.FailureDetails!.IsCausedBy()); + } + [Theory] [InlineData(1, typeof(ApplicationException))] // 1 attempt, built-in exception type [InlineData(2, typeof(CustomException))] // 2 attempts, custom exception type From 86c2230fe9d6572c24113ceba015b86f9045dd93 Mon Sep 17 00:00:00 2001 From: Tom Seida Date: Thu, 4 Apr 2024 09:07:49 -0400 Subject: [PATCH 2/2] Correct TraceHelper.TraceExceptionInstance() eventType parameter. --- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index b23b8d54..13e8f6a7 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -572,7 +572,7 @@ TimeSpan ComputeNextDelay( catch (Exception e) when (!Utils.IsFatal(e)) { // Catch any exceptions during ComputeNextDelay so we don't override original error with new error - TraceHelper.TraceExceptionInstance(TraceEventType.Error, "RetryInterceptor-ComputeNextDelayException", this.innerContext.OrchestrationInstance, e); + TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskOrchestrationContextWrapper-ComputeNextDelayException", this.innerContext.OrchestrationInstance, e); } return nextDelay;