Skip to content

Commit

Permalink
Add orchestration session data sample
Browse files Browse the repository at this point in the history
  • Loading branch information
jviau committed May 28, 2020
1 parent bafa239 commit 63de36a
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 2 deletions.
21 changes: 19 additions & 2 deletions samples/DurableTask.Samples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the APACHE 2.0. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Reflection;
using System.Runtime.InteropServices;
Expand All @@ -12,6 +13,7 @@
using DurableTask.Emulator;
using DurableTask.Hosting;
using DurableTask.Samples.Greetings;
using Dynamitey.DynamicObjects;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -46,6 +48,13 @@ public static Task Main(string[] args)
builder.WithOrchestrationService(orchestrationService);

builder.AddClient();

builder.UseOrchestrationMiddleware<OrchestrationInstanceExMiddleware>();
builder.UseOrchestrationMiddleware<SampleMiddleware>();

builder.UseActivityMiddleware<ActivityInstanceExMiddleware>();
builder.UseActivityMiddleware<SampleMiddleware>();

builder.AddOrchestration<GreetingsOrchestration>();
builder
.AddActivity<GetUserTask>()
Expand All @@ -63,7 +72,7 @@ public static Task Main(string[] args)

private static HostBuilder CreateDefaultBuilder(string[] args)
{
// Host.CreateDefaultBuilder() is not available before .netcore 3.0.
// Host.CreateDefaultBuilder() is not available before Microsoft.Extensions.Hosting 3.0.
// So this is a copied from https://github.com/dotnet/extensions
var builder = new HostBuilder();

Expand Down Expand Up @@ -173,7 +182,15 @@ public TaskEnqueuer(TaskHubClient client, IConsole console)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
OrchestrationInstance instance = await _client.CreateOrchestrationInstanceAsync(
typeof(GreetingsOrchestration), _instanceId, null);
NameVersionHelper.GetDefaultName(typeof(GreetingsOrchestration)),
NameVersionHelper.GetDefaultVersion(typeof(GreetingsOrchestration)),
_instanceId,
null,
new Dictionary<string, string>()
{
["CorrelationId"] = Guid.NewGuid().ToString(),
});

OrchestrationState result = await _client.WaitForOrchestrationAsync(
instance, TimeSpan.FromSeconds(60));

Expand Down
31 changes: 31 additions & 0 deletions samples/DurableTask.Samples/SampleMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using DurableTask.Core.Middleware;
using DurableTask.DependencyInjection;

namespace DurableTask.Samples
{
/// <summary>
/// Sample middleware
/// </summary>
public class SampleMiddleware : ITaskMiddleware
{
private readonly IConsole _console;

/// <summary>
/// Initializes a new instance of the <see cref="SampleMiddleware"/> class.
/// </summary>
/// <param name="console">The console output helper.</param>
public SampleMiddleware(IConsole console)
{
_console = console;
}

/// <inheritdoc />
public Task InvokeAsync(DispatchMiddlewareContext context, Func<Task> next)
{
_console.WriteLine("In sample middleware. Dependency Injection works.");
return next();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Middleware;
using DurableTask.DependencyInjection;

namespace DurableTask.Samples
{
/// <summary>
/// This middleware configures the <see cref="OrchestrationInstanceEx"/> for this activity.
/// </summary>
public class ActivityInstanceExMiddleware : ITaskMiddleware
{
/// <inheritdoc />
public Task InvokeAsync(DispatchMiddlewareContext context, Func<Task> next)
{
var customInstance = OrchestrationInstanceEx.Get(context.GetProperty<OrchestrationInstance>());
context.SetProperty<OrchestrationInstance>(customInstance);

// Do something with the session data, such as starting a logging scope with correlation id property.

return next();
}
}
}
111 changes: 111 additions & 0 deletions samples/DurableTask.Samples/SessionData/OrchestrationInstanceEx.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using DurableTask.Core;

namespace DurableTask.Samples
{
/// <summary>
/// This is an example for how to use <see cref="OrchestrationInstance.ExtensionData"/> to hold orchestration-wide session
/// data that can be accessed in all activities within the orchestration. This is done by taking the original
/// <see cref="OrchestrationInstance"/>, "casting" it via serialization to our type, populating the data, then letting the
/// orchestration service serialize it back - putting our extra data in the extension data structure. We can then retrieve
/// that data at a later time via serialization "casting".
///
/// Data is seeded into this session via usage of tags on
/// <see cref="TaskHubClient.CreateOrchestrationInstanceAsync(string, string, string, object, IDictionary{string, string})"/>.
/// </summary>
/// <remarks>
/// The data contract purposefully is identical to <see cref="OrchestrationInstance"/> data contract. This lets the serializers
/// serialize this type as its base type without any <see cref="KnownTypeAttribute"/>.
/// </remarks>
[DataContract(Name = "OrchestrationInstance", Namespace = "http://schemas.datacontract.org/2004/07/DurableTask.Core")]
public class OrchestrationInstanceEx : OrchestrationInstance
{
private static readonly DataContractSerializer s_customSerializer =
new DataContractSerializer(typeof(OrchestrationInstanceEx));

private static readonly DataContractSerializer s_defaultSerializer =
new DataContractSerializer(typeof(OrchestrationInstance));

/// <summary>
/// Gets or sets the correlation id.
/// </summary>
[DataMember]
public string CorrelationId { get; set; }

/// <summary>
/// Creates a <see cref="OrchestrationInstanceEx"/> from the provided
/// <see cref="OrchestrationRuntimeState"/>, using its extension data if available.
/// </summary>
/// <param name="runtimeState">The runtime state to create this instance from.</param>
/// <returns>A new or deserialized instance.</returns>
public static OrchestrationInstanceEx Initialize(OrchestrationRuntimeState runtimeState)
{
if (runtimeState == null)
{
throw new ArgumentNullException(nameof(runtimeState));
}

OrchestrationInstance instance = runtimeState.OrchestrationInstance;
if (instance is OrchestrationInstanceEx custom)
{
return custom;
}

custom = Get(instance);

// Populate values not there.
// In the case of SubOrchestrations, the CustomOrchestrationInstance is not carried
// over, but the tags are! It is possible to carry the session over to the sub orchestration
// as the parent orchestration instance is available here in runtimeState.ParentInstance.
if (string.IsNullOrEmpty(custom.CorrelationId))
{
if (runtimeState.ExecutionStartedEvent.Tags == null)
{
runtimeState.ExecutionStartedEvent.Tags = new Dictionary<string, string>();
}

if (!runtimeState.ExecutionStartedEvent.Tags.TryGetValue(
nameof(CorrelationId), out string correlationId))
{
correlationId = Guid.NewGuid().ToString();
runtimeState.ExecutionStartedEvent.Tags[nameof(CorrelationId)] = correlationId;
}

custom.CorrelationId = correlationId;
}

runtimeState.ExecutionStartedEvent.OrchestrationInstance = custom;
return custom;
}

/// <summary>
/// Gets a <see cref="OrchestrationInstanceEx"/> from a <see cref="OrchestrationInstance"/>,
/// using its extension data if available.
/// </summary>
/// <param name="instance">The orchestration instance. Not null.</param>
/// <returns>The custom orchestration instance.</returns>
public static OrchestrationInstanceEx Get(OrchestrationInstance instance)
{
if (instance == null)
{
throw new ArgumentNullException(nameof(instance));
}

if (instance is OrchestrationInstanceEx custom)
{
return custom;
}

// We need to first get custom extension data by serializing & deserializing.
using (var stream = new MemoryStream())
{
s_defaultSerializer.WriteObject(stream, instance);
stream.Position = 0;
return (OrchestrationInstanceEx)s_customSerializer.ReadObject(stream);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Middleware;
using DurableTask.DependencyInjection;

namespace DurableTask.Samples
{
/// <summary>
/// This middleware configures the <see cref="OrchestrationInstanceEx"/> for this orchestration.
/// </summary>
public class OrchestrationInstanceExMiddleware : ITaskMiddleware
{
/// <inheritdoc />
public Task InvokeAsync(DispatchMiddlewareContext context, Func<Task> next)
{
// Initialize the OrchestrationInstance with our session data. Or if it already exists,
// then set it to the appropriate context properties.
OrchestrationRuntimeState runtimeState = context.GetProperty<OrchestrationRuntimeState>();
var customInstance = OrchestrationInstanceEx.Initialize(runtimeState);
context.SetProperty<OrchestrationInstance>(customInstance);

// Do something with the session data, such as starting a logging scope with correlation id property.

return next();
}
}
}
16 changes: 16 additions & 0 deletions samples/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Samples

The sample includes a basic scenario of using a host builder to configure a durable task host, and register some basic orchestrations and activities.

## Bonus Samples

The sample includes some bonus scenarios I have found useful when working with DurableTask Framework.

### 1. Orchestration Session Data

#### Use Case

Say you want some non-orchestration or activity specific data that is carried through the whole execution of a single orchestration. The orchestration or activities have no use to directly interact with this data, but it is there for some other purpose such as a logging correlation id.

#### Solution
DurableTask offers no official session data. However, the framework does make use of data contracts and `ExtensionData`. Using this functionality, we can store our session data in the `OrchestrationInstance.ExtensionData` and be able to access it for the whole life of the orchestration. See [OrchestrationInstanceEx.cs](./DurableTask.Samples/SessionData/OrchestrationInstanceEx.cs).

0 comments on commit 63de36a

Please sign in to comment.