Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to set both RetryPolicy and AsyncRetryHandler #281

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,30 @@ public TaskOptions(TaskRetryOptions? retry = null)
/// <returns>A <see cref="TaskOptions" /> built from the policy.</returns>
public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy);


/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="RetryPolicy" /> and <see cref="AsyncRetryHandler" />.
/// </summary>
/// <param name="policy">The policy to convert from.</param>
/// <param name="handler">The handler to convert from.</param>
/// <returns>A <see cref="TaskOptions" /> built from the policy.</returns>
public static TaskOptions FromRetryPolicy(RetryPolicy policy, AsyncRetryHandler handler) => new(new TaskRetryOptions(policy, handler));

/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="AsyncRetryHandler" />.
/// </summary>
/// <param name="handler">The handler to convert from.</param>
/// <returns>A <see cref="TaskOptions" /> built from the handler.</returns>
public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler);

/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="AsyncRetryHandler" /> and <see cref="RetryPolicy" />.
/// </summary>
/// <param name="handler">The handler to convert from.</param>
/// <param name="policy">The policy to convert from.</param>
/// <returns>A <see cref="TaskOptions" /> built from the handler.</returns>
public static TaskOptions FromRetryHandler(AsyncRetryHandler handler, RetryPolicy policy) => new(new TaskRetryOptions(policy, handler));

/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="RetryHandler" />.
/// </summary>
Expand Down
19 changes: 19 additions & 0 deletions src/Abstractions/TaskRetryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ public TaskRetryOptions(AsyncRetryHandler handler)
this.Handler = Check.NotNull(handler);
}

/// <summary>
/// Initializes a new instance of the <see cref="TaskRetryOptions"/> class.
/// </summary>
/// <param name="policy">The retry policy to use.</param>
/// <param name="handler">The retry handler to use.</param>
public TaskRetryOptions(RetryPolicy policy, AsyncRetryHandler handler)
{
this.Policy = Check.NotNull(policy);
this.Handler = Check.NotNull(handler);
}

/// <summary>
/// Gets the retry policy. <c>null</c> if <see cref="Handler" /> is set.
/// </summary>
Expand Down Expand Up @@ -62,6 +73,14 @@ public TaskRetryOptions(AsyncRetryHandler handler)
/// <returns>A <see cref="TaskRetryOptions" /> built from the policy.</returns>
public static TaskRetryOptions FromRetryPolicy(RetryPolicy policy) => new(policy);

/// <summary>
/// Returns a new <see cref="TaskRetryOptions" /> from the provided <see cref="RetryPolicy" />.
/// </summary>
/// <param name="policy">The policy to convert from.</param>
/// <param name="handler">The handler to convert from.</param>
/// <returns>A <see cref="TaskRetryOptions" /> built from the policy.</returns>
public static TaskRetryOptions FromRetryPolicy(RetryPolicy policy, AsyncRetryHandler handler) => new(policy, handler);

/// <summary>
/// Returns a new <see cref="TaskRetryOptions" /> from the provided <see cref="AsyncRetryHandler" />.
/// </summary>
Expand Down
148 changes: 138 additions & 10 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.Serializing.Internal;
using DurableTask.Core.Tracing;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -115,11 +118,23 @@ public override async Task<T> CallActivityAsync<T>(
// TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7)
if (options?.Retry?.Policy is RetryPolicy policy)
{
return await this.innerContext.ScheduleWithRetry<T>(
name.Name,
name.Version,
policy.ToDurableTaskCoreRetryOptions(),
input);
if (options?.Retry?.Handler is AsyncRetryHandler handler)
{
return await this.InvokeWithCustomRetryHandler(
() => this.innerContext.ScheduleTask<T>(name.Name, name.Version, input),
name.Name,
policy,
handler,
default);
}
else
{
return await this.innerContext.ScheduleWithRetry<T>(
name.Name,
name.Version,
policy.ToDurableTaskCoreRetryOptions(),
input);
}
}
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
{
Expand Down Expand Up @@ -164,12 +179,28 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
{
if (options?.Retry?.Policy is RetryPolicy policy)
{
return await this.innerContext.CreateSubOrchestrationInstanceWithRetry<TResult>(
if (options?.Retry?.Handler is AsyncRetryHandler handler)
{
return await this.InvokeWithCustomRetryHandler(
() => this.innerContext.CreateSubOrchestrationInstance<TResult>(
orchestratorName.Name,
orchestratorName.Version,
instanceId,
input),
orchestratorName.Name,
orchestratorName.Version,
instanceId,
policy.ToDurableTaskCoreRetryOptions(),
input);
policy,
handler,
default);
}
else
{
return await this.innerContext.CreateSubOrchestrationInstanceWithRetry<TResult>(
orchestratorName.Name,
orchestratorName.Version,
instanceId,
policy.ToDurableTaskCoreRetryOptions(),
input);
}
}
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
{
Expand Down Expand Up @@ -452,4 +483,101 @@ async Task<T> InvokeWithCustomRetryHandler<T>(
}
}
}

async Task<T> InvokeWithCustomRetryHandler<T>(
Func<Task<T>> action,
string taskName,
RetryPolicy retryPolicy,
AsyncRetryHandler retryHandler,
CancellationToken cancellationToken)
{
DateTime startTime = this.CurrentUtcDateTime;
int failureCount = 0;

for (int retryCount = 0; retryCount < retryPolicy.MaxNumberOfAttempts; retryCount++)
{
try
{
return await action();
}
catch (global::DurableTask.Core.Exceptions.OrchestrationException e)
{
// Some failures are not retryable, like failures for missing activities or sub-orchestrations
if (e.FailureDetails?.IsNonRetriable == true)
{
throw;
}

failureCount++;

this.logger.RetryingTask(
this.InstanceId,
taskName,
attempt: failureCount);

RetryContext retryContext = new(
this,
failureCount,
TaskFailureDetails.FromException(e),
this.CurrentUtcDateTime.Subtract(startTime),
cancellationToken);

bool keepRetrying = await retryHandler(retryContext);
if (!keepRetrying)
{
throw;
}

TimeSpan nextDelay = this.ComputeNextDelay(retryPolicy, failureCount, startTime);

// If not more retries and integer overflow safety check
if (nextDelay == TimeSpan.Zero)
{
throw;
}

if (failureCount == int.MaxValue)
{
throw;
}
}
}

// Should never execute.
return default!;
}

TimeSpan ComputeNextDelay(
RetryPolicy retryPolicy,
int attempt,
DateTime firstAttempt)
{
TimeSpan nextDelay = TimeSpan.Zero;
try
{
if (attempt < retryPolicy.MaxNumberOfAttempts)
{
DateTime retryExpiration = (retryPolicy.RetryTimeout != Timeout.InfiniteTimeSpan)
? firstAttempt.Add(retryPolicy.RetryTimeout)
: DateTime.MaxValue;
if (this.CurrentUtcDateTime < retryExpiration)
{
double nextDelayInMilliseconds =
retryPolicy.FirstRetryInterval.TotalMilliseconds *
Math.Pow(retryPolicy.BackoffCoefficient, attempt);

nextDelay = nextDelayInMilliseconds < retryPolicy.MaxRetryInterval.TotalMilliseconds
? TimeSpan.FromMilliseconds(nextDelayInMilliseconds)
: retryPolicy.MaxRetryInterval;
}
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
// Catch any exceptions during ComputeNextDelay so we don't override original error with new error
TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskOrchestrationContextWrapper-ComputeNextDelayException", this.innerContext.OrchestrationInstance, e);
}

return nextDelay;
}
}
137 changes: 137 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,73 @@ public async Task RetryActivityFailuresCustomLogic(int expectedNumberOfAttempts,
Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts);
}

[Theory]
[InlineData(10, typeof(ApplicationException), false, 2, 1)] // 1 attempt since retry timeout expired.
[InlineData(2, typeof(ApplicationException), false, null, 1)] // 1 attempt since handler specifies no retry.
[InlineData(2, typeof(CustomException), true, null, 2)] // 2 attempts, custom exception type
[InlineData(10, typeof(XunitException), true, null, 10)] // 10 attempts, 3rd party exception type
public async Task RetryActivityFailuresCustomLogicAndPolicy(int maxNumberOfAttempts, Type exceptionType, bool isRetryException, int? retryTimeout, int expectedNumberOfAttempts)
{
string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging

int retryHandlerCalls = 0;
RetryPolicy retryPolicy = new(
maxNumberOfAttempts,
firstRetryInterval: TimeSpan.FromMilliseconds(1),
backoffCoefficient: 2,
retryTimeout: retryTimeout.HasValue ? TimeSpan.FromMilliseconds(retryTimeout.Value) : null);
TaskOptions taskOptions = TaskOptions.FromRetryPolicy(retryPolicy, retryContext =>
{
// This is technically orchestrator code that gets replayed, like everything else
if (!retryContext.OrchestrationContext.IsReplaying)
{
retryHandlerCalls++;
}

// IsCausedBy is supposed to handle exception inheritance; fail if it doesn't
if (!retryContext.LastFailure.IsCausedBy<Exception>())
{
return Task.FromResult(false);
}

// This handler only works with CustomException
if (retryContext.LastFailure.IsCausedBy(exceptionType))
{
return Task.FromResult(isRetryException);
}

return Task.FromResult(true);
});

int actualNumberOfAttempts = 0;

TaskName orchestratorName = "BustedOrchestration";
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallActivityAsync("Foo", options: taskOptions);
})
.AddActivityFunc("Foo", (TaskActivityContext context) =>
{
actualNumberOfAttempts++;
throw MakeException(exceptionType, errorMessage);
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(instanceId, metadata.InstanceId);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls);
Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts);
}


/// <summary>
/// Tests retry policies for sub-orchestration calls.
/// </summary>
Expand Down Expand Up @@ -269,6 +336,76 @@ public async Task RetrySubOrchestrationFailures(int expectedNumberOfAttempts, Ty
Assert.True(metadata.FailureDetails.IsCausedBy<TaskFailedException>());
}

[Theory]
[InlineData(10, typeof(ApplicationException), false, 2, 1)] // 1 attempt since retry timeout expired.
[InlineData(2, typeof(ApplicationException), false, null, 1)] // 1 attempt since handler specifies no retry.
[InlineData(2, typeof(CustomException), true, null, 2)] // 2 attempts, custom exception type
[InlineData(10, typeof(XunitException), true, null, 10)] // 10 attempts, 3rd party exception type
public async Task RetrySubOrchestratorFailuresCustomLogicAndPolicy(int maxNumberOfAttempts, Type exceptionType, bool isRetryException, int? retryTimeout, int expectedNumberOfAttempts)
{
string errorMessage = "Kah-BOOOOOM!!!"; // Use an obviously fake error message to avoid confusion when debugging

int retryHandlerCalls = 0;
RetryPolicy retryPolicy = new(
maxNumberOfAttempts,
firstRetryInterval: TimeSpan.FromMilliseconds(1),
backoffCoefficient: 2,
retryTimeout: retryTimeout.HasValue ? TimeSpan.FromMilliseconds(retryTimeout.Value) : null);
TaskOptions taskOptions = TaskOptions.FromRetryPolicy(retryPolicy, retryContext =>
{
// This is technically orchestrator code that gets replayed, like everything else
if (!retryContext.OrchestrationContext.IsReplaying)
{
retryHandlerCalls++;
}

// IsCausedBy is supposed to handle exception inheritance; fail if it doesn't
if (!retryContext.LastFailure.IsCausedBy<Exception>())
{
return Task.FromResult(false);
}

// This handler only works with CustomException
if (retryContext.LastFailure.IsCausedBy(exceptionType))
{
return Task.FromResult(isRetryException);
}

return Task.FromResult(true);
});

int actualNumberOfAttempts = 0;

TaskName orchestratorName = "OrchestrationWithBustedSubOrchestrator";
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallSubOrchestratorAsync("BustedSubOrchestrator", options: taskOptions);
})
.AddOrchestratorFunc("BustedSubOrchestrator", context =>
{
actualNumberOfAttempts++;
throw MakeException(exceptionType, errorMessage);
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);

Assert.NotNull(metadata);
Assert.Equal(instanceId, metadata.InstanceId);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal(expectedNumberOfAttempts, retryHandlerCalls);
Assert.Equal(expectedNumberOfAttempts, actualNumberOfAttempts);

// The root orchestration failed due to a failure with the sub-orchestration, resulting in a TaskFailedException
Assert.NotNull(metadata.FailureDetails);
Assert.True(metadata.FailureDetails!.IsCausedBy<TaskFailedException>());
}

[Theory]
[InlineData(1, typeof(ApplicationException))] // 1 attempt, built-in exception type
[InlineData(2, typeof(CustomException))] // 2 attempts, custom exception type
Expand Down