Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds workflow replay-safe logger #1434

Merged
merged 19 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4404175
Removed obsolete type
WhitWaldo Oct 27, 2024
662b620
Added missing using
WhitWaldo Oct 27, 2024
cf876c0
Adding interface for IWorkflowContext for replayability concerns
WhitWaldo Oct 27, 2024
be5340a
Removed unused IConfiguration
WhitWaldo Oct 27, 2024
fcdeadf
Added ReplaySafeLogger type
WhitWaldo Oct 27, 2024
bcea844
Building out functionality to expose ReplayLogger in workflow context
WhitWaldo Oct 27, 2024
0a871f5
Added license information to file
WhitWaldo Dec 19, 2024
2a77908
Removed unnecessary file
WhitWaldo Dec 19, 2024
1d11d40
Updated copyright header for different project, made some tweaks for …
WhitWaldo Dec 19, 2024
ed19f04
Added virtual methods that use the already-available ILoggerFactory t…
WhitWaldo Dec 19, 2024
04a77a2
Removed unnecessary registration
WhitWaldo Dec 19, 2024
283e98d
Updated example to demonstrate using ReplaySafeLogger in the orchestr…
WhitWaldo Dec 19, 2024
182d58e
Tweaks on visibility and abstraction so that the methods are availabl…
WhitWaldo Dec 19, 2024
5f4a41b
Merge branch 'workflow-replay-logger' into workflow-logger
WhitWaldo Dec 19, 2024
3d6641e
Removed obsolete type registrations
WhitWaldo Dec 19, 2024
78ad0da
Simplified argument null check
WhitWaldo Dec 19, 2024
1d3f06f
Removed since-removed code leftover from merge
WhitWaldo Dec 19, 2024
4f32540
Added documentation demonstrating how to access the replay-safe logger
WhitWaldo Dec 19, 2024
53da1d8
Removed unnecessary and separate ReplaySafeLogger in favor of method …
WhitWaldo Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,64 @@ builder.Services.AddDaprWorkflow(options => {

var app = builder.Build();
await app.RunAsync();
```
```

## Injecting Services into Workflow Activities
Workflow activities support the same dependency injection that developers have come to expect of modern C# applications. Assuming a proper
registration at startup, any such type can be injected into the constructor of the workflow activity and available to utilize during
the execution of the workflow. This makes it simple to add logging via an injected `ILogger` or access to other Dapr
building blocks by injecting `DaprClient` or `DaprJobsClient`, for example.

```csharp
internal sealed class SquareNumberActivity : WorkflowActivity<int, int>
{
private readonly ILogger _logger;

public MyActivity(ILogger logger)
{
this._logger = logger;
}

public override Task<int> RunAsync(WorkflowActivityContext context, int input)
{
this._logger.LogInformation("Squaring the value {number}", input);
var result = input * input;
this._logger.LogInformation("Got a result of {squareResult}", result);

return Task.FromResult(result);
}
}
```

### Using ILogger in Workflow
Because workflows must be deterministic, it is not possible to inject arbitrary services into them. For example,
if you were able to inject a standard `ILogger` into a workflow and it needed to be replayed because of an error,
subsequent replay from the event source log would result in the log recording additional operations that didn't actually
take place a second or third time because their results were sourced from the log. This has the potential to introduce
a significant amount of confusion. Rather, a replay-safe logger is made available for use within workflows. It will only
log events the first time the workflow runs and will not log anything whenever the workflow is being replaced.

This logger can be retrieved from a method present on the `WorkflowContext` available on your workflow instance and
otherwise used precisely as you might otherwise use an `ILogger` instance.

An end-to-end sample demonstrating this can be seen in the
[.NET SDK repository](https://github.com/dapr/dotnet-sdk/blob/master/examples/Workflow/WorkflowConsoleApp/Workflows/OrderProcessingWorkflow.cs)
but a brief extraction of this sample is available below.

```csharp
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>(); //Use this method to access the logger instance

logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);

//...
}
}
```



Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Activities;

namespace WorkflowConsoleApp.Workflows
Expand All @@ -16,7 +17,10 @@ public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>();

logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);

// Notify the user that an order has come through
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -31,6 +35,8 @@ await context.CallActivityAsync(
// If there is insufficient inventory, fail and let the user know
if (!result.Success)
{
logger.LogError("Insufficient inventory for {orderName}", order.Name);

// End the workflow here since we don't have sufficient inventory
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -39,8 +45,10 @@ await context.CallActivityAsync(
}

// Require orders over a certain threshold to be approved
if (order.TotalCost > 50000)
const int threshold = 50000;
if (order.TotalCost > threshold)
{
logger.LogInformation("Requesting manager approval since total cost {totalCost} exceeds threshold {threshold}", order.TotalCost, threshold);
// Request manager approval for the order
await context.CallActivityAsync(nameof(RequestApprovalActivity), order);

Expand All @@ -51,9 +59,13 @@ await context.CallActivityAsync(
ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
eventName: "ManagerApproval",
timeout: TimeSpan.FromSeconds(30));

logger.LogInformation("Approval result: {approvalResult}", approvalResult);
context.SetCustomStatus($"Approval result: {approvalResult}");
if (approvalResult == ApprovalResult.Rejected)
{
logger.LogWarning("Order was rejected by approver");

// The order was rejected, end the workflow here
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -63,6 +75,8 @@ await context.CallActivityAsync(
}
catch (TaskCanceledException)
{
logger.LogError("Cancelling order because it didn't receive an approval");

// An approval timeout results in automatic order cancellation
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -72,6 +86,7 @@ await context.CallActivityAsync(
}

// There is enough inventory available so the user can purchase the item(s). Process their payment
logger.LogInformation("Processing payment as sufficient inventory is available");
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
Expand All @@ -88,13 +103,15 @@ await context.CallActivityAsync(
catch (WorkflowTaskFailedException e)
{
// Let them know their payment processing failed
logger.LogError("Order {orderId} failed! Details: {errorMessage}", orderId, e.FailureDetails.ErrorMessage);
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} Failed! Details: {e.FailureDetails.ErrorMessage}"));
return new OrderResult(Processed: false);
}

// Let them know their payment was processed
logger.LogError("Order {orderId} has completed!", orderId);
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} has completed!"));
Expand Down
23 changes: 22 additions & 1 deletion src/Dapr.Workflow/DaprWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using Microsoft.Extensions.Logging;

namespace Dapr.Workflow
{
using System;
Expand All @@ -34,7 +36,7 @@ internal DaprWorkflowContext(TaskOrchestrationContext innerContext)
public override DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime;

public override bool IsReplaying => this.innerContext.IsReplaying;

public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options?.ToDurableTaskOptions()));
Expand Down Expand Up @@ -95,6 +97,25 @@ public override Guid NewGuid()
return this.innerContext.NewGuid();
}

/// <summary>
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
/// writes logs when the orchestrator is not replaying previous history.
/// </summary>
/// <param name="categoryName">The logger's category name.</param>
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
public override ILogger CreateReplaySafeLogger(string categoryName) =>
this.innerContext.CreateReplaySafeLogger(categoryName);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <param name="type">The type to derive the category name from.</param>
public override ILogger CreateReplaySafeLogger(Type type) =>
this.innerContext.CreateReplaySafeLogger(type);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <typeparam name="T">The type to derive category name from.</typeparam>
public override ILogger CreateReplaySafeLogger<T>() =>
this.innerContext.CreateReplaySafeLogger<T>();

static async Task WrapExceptions(Task task)
{
try
Expand Down
21 changes: 21 additions & 0 deletions src/Dapr.Workflow/IWorkflowContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace Dapr.Workflow;

/// <summary>
/// Provides functionality available to orchestration code.
/// </summary>
public interface IWorkflowContext
{
/// <summary>
/// Gets a value indicating whether the orchestration or operation is currently replaying itself.
/// </summary>
/// <remarks>
/// This property is useful when there is logic that needs to run only when *not* replaying. For example,
/// certain types of application logging may become too noisy when duplicated as part of replay. The
/// application code could check to see whether the function is being replayed and then issue
/// the log statements when this value is <c>false</c>.
/// </remarks>
/// <value>
/// <c>true</c> if the orchestration or operation is currently being replayed; otherwise <c>false</c>.
/// </value>
bool IsReplaying { get; }
}
22 changes: 20 additions & 2 deletions src/Dapr.Workflow/WorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using Microsoft.Extensions.Logging;

namespace Dapr.Workflow
{
using System;
Expand All @@ -21,13 +23,13 @@ namespace Dapr.Workflow
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
/// external events, and for getting basic information about the current workflow instance.
/// </summary>
public abstract class WorkflowContext
public abstract class WorkflowContext : IWorkflowContext
{
/// <summary>
/// Gets the name of the current workflow.
/// </summary>
public abstract string Name { get; }

/// <summary>
/// Gets the instance ID of the current workflow.
/// </summary>
Expand Down Expand Up @@ -271,6 +273,22 @@ public virtual Task CallChildWorkflowAsync(
{
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
}

/// <summary>
/// Returns an instance of <see cref="ILogger"/> that is replay-safe, meaning that the logger only
/// writes logs when the orchestrator is not replaying previous history.
/// </summary>
/// <param name="categoryName">The logger's category name.</param>
/// <returns>An instance of <see cref="ILogger"/> that is replay-safe.</returns>
public abstract ILogger CreateReplaySafeLogger(string categoryName);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <param name="type">The type to derive the category name from.</param>
public abstract ILogger CreateReplaySafeLogger(Type type);

/// <inheritdoc cref="CreateReplaySafeLogger(string)" />
/// <typeparam name="T">The type to derive category name from.</typeparam>
public abstract ILogger CreateReplaySafeLogger<T>();

/// <summary>
/// Restarts the workflow with a new input and clears its history.
Expand Down
34 changes: 0 additions & 34 deletions src/Dapr.Workflow/WorkflowEngineClient.cs

This file was deleted.

3 changes: 1 addition & 2 deletions src/Dapr.Workflow/WorkflowLoggingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ internal sealed class WorkflowLoggingService : IHostedService
private static readonly HashSet<string> registeredWorkflows = new();
private static readonly HashSet<string> registeredActivities = new();

public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger, IConfiguration configuration)
public WorkflowLoggingService(ILogger<WorkflowLoggingService> logger)
{
this.logger = logger;

}
public Task StartAsync(CancellationToken cancellationToken)
{
Expand Down
22 changes: 5 additions & 17 deletions src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,50 +35,38 @@ public static IServiceCollection AddDaprWorkflow(
Action<WorkflowRuntimeOptions> configure,
ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
if (serviceCollection == null)
{
throw new ArgumentNullException(nameof(serviceCollection));
}
ArgumentNullException.ThrowIfNull(serviceCollection, nameof(serviceCollection));

serviceCollection.AddDaprClient(lifetime: lifetime);
serviceCollection.AddHttpClient();
serviceCollection.AddHostedService<WorkflowLoggingService>();

switch (lifetime)
{
case ServiceLifetime.Singleton:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddSingleton<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddSingleton<DaprWorkflowClient>();
serviceCollection.TryAddSingleton<WorkflowRuntimeOptions>();
break;
case ServiceLifetime.Scoped:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddScoped<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddScoped<DaprWorkflowClient>();
serviceCollection.TryAddScoped<WorkflowRuntimeOptions>();
break;
case ServiceLifetime.Transient:
#pragma warning disable CS0618 // Type or member is obsolete - keeping around temporarily - replaced by DaprWorkflowClient
serviceCollection.TryAddTransient<WorkflowEngineClient>();
#pragma warning restore CS0618 // Type or member is obsolete
serviceCollection.TryAddTransient<DaprWorkflowClient>();
serviceCollection.TryAddTransient<WorkflowRuntimeOptions>();
break;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, null);
}

serviceCollection.AddOptions<WorkflowRuntimeOptions>().Configure(configure);

//Register the factory and force resolution so the Durable Task client and worker can be registered
using (var scope = serviceCollection.BuildServiceProvider().CreateScope())
{
var httpClientFactory = scope.ServiceProvider.GetRequiredService<IHttpClientFactory>();
var configuration = scope.ServiceProvider.GetService<IConfiguration>();

var factory = new DaprWorkflowClientBuilderFactory(configuration, httpClientFactory);
factory.CreateClientBuilder(serviceCollection, configure);
}
Expand Down
Loading