You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
If tasks are started in an eternal orchestration run, it is impossible to await for tasks started by preceding run.
Why this is bad: we often have a task to process queues of items coming from database with limited degree of parallelism. Using async-await pattern, we could run eternal loop that fetches some items, starts a pool of tasks and uses WhenAny to wait for completed tasks and re-use released slots to schedule more tasks.
static async Task Main(string[] args)
{
var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 50)); // Simulated items to process
int maxParallelism = 10; // Maximum parallel tasks
var tasks = new List<Task>();
while (queue.Any() || tasks.Any())
{
// Fill up to the max parallelism with new tasks
while (tasks.Count < maxParallelism && queue.TryDequeue(out var item))
{
tasks.Add(Task.Run(async () =>
{
await ProcessItemAsync(item);
}));
}
// Wait for any task to complete
var completedTask = await Task.WhenAny(tasks);
// Remove the completed task from the list
tasks.Remove(completedTask);
}
Console.WriteLine("All items processed.");
}
This pattern does not work well in Durable functions, because if queue is say, millions of items, replay history will be quite long.
In order to not to have a huge replay history, we can use context.ContinueAsNew. However, if we do, we lose Task references and can't await them in the new run.
What I would like to propose is ability to instantiate the Task reference from InstanceId. Using this functionality, new runs of orchestrators can get a reference to tasks started by previous orchestration runs. Eternal orchestrations then will get an ability to pass most important artefacts they have - task references - to future selves.
public static async Task LongQueueProcessorOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context, EternalOrchestratorArgs args)
{
var previousTasks = args.instanceIds
.Select(instanceId => context.**TaskFromInstanceId**(instanceId))
.ToArray();
var availableParallelism = 10 - previousTasks.Count;
var workload = context.CallActivityAsync<int[]>("GetSomeWorkload");
var tasksToStart = Math.Min(availableParallelism, workLoad.Length);
var newTasks = workLoad.Take(tasksToStart).Select((work, index) =>
{
var instanceId = Guid.NewGuid().ToString();
return (instanceId, context.CallSubOrchestratorAsync(
"DoWork",
work,
new TaskOptions().WithInstanceId(instanceId)));
}).ToArray();
var allTasks = previousTasks.Concat(newTasks).ToArray();
var completedTasks = Task.WhenAny(allTasks);
allTasks.Remove(completedTasks);
// Workload completed, as well as tasks?
if (workload.Length == 0 && allTasks.Count = 0)
return;
// If not, continue looping
context.ContinueAsNew(
new EternalOrchestratorArgs(allTasks.Select(t => t.instanceId).ToArray()));
}
The text was updated successfully, but these errors were encountered:
vgpro54321
changed the title
Add a feature to await tasks started by eternal orchestration in preceeding runs
Add a feature to await tasks started by eternal orchestration in preceding runs (AKA maintain task references after ContinueAsNew)
Nov 22, 2024
Is your feature request related to a problem? Please describe.
If tasks are started in an eternal orchestration run, it is impossible to await for tasks started by preceding run.
Why this is bad: we often have a task to process queues of items coming from database with limited degree of parallelism. Using async-await pattern, we could run eternal loop that fetches some items, starts a pool of tasks and uses WhenAny to wait for completed tasks and re-use released slots to schedule more tasks.
This pattern does not work well in Durable functions, because if queue is say, millions of items, replay history will be quite long.
In order to not to have a huge replay history, we can use context.ContinueAsNew. However, if we do, we lose Task references and can't await them in the new run.
What I would like to propose is ability to instantiate the Task reference from InstanceId. Using this functionality, new runs of orchestrators can get a reference to tasks started by previous orchestration runs. Eternal orchestrations then will get an ability to pass most important artefacts they have - task references - to future selves.
The text was updated successfully, but these errors were encountered: