Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a feature to await tasks started by eternal orchestration in preceding runs (AKA maintain task references after ContinueAsNew) #2973

Open
vgpro54321 opened this issue Nov 22, 2024 · 0 comments
Labels
Enhancement Feature requests.

Comments

@vgpro54321
Copy link

Is your feature request related to a problem? Please describe.
If tasks are started in an eternal orchestration run, it is impossible to await for tasks started by preceding run.

Why this is bad: we often have a task to process queues of items coming from database with limited degree of parallelism. Using async-await pattern, we could run eternal loop that fetches some items, starts a pool of tasks and uses WhenAny to wait for completed tasks and re-use released slots to schedule more tasks.

 static async Task Main(string[] args)
    {
        var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 50)); // Simulated items to process
        int maxParallelism = 10; // Maximum parallel tasks
        var tasks = new List<Task>();

        while (queue.Any() || tasks.Any())
        {
            // Fill up to the max parallelism with new tasks
            while (tasks.Count < maxParallelism && queue.TryDequeue(out var item))
            {
                tasks.Add(Task.Run(async () =>
                {
                    await ProcessItemAsync(item);
                }));
            }

            // Wait for any task to complete
            var completedTask = await Task.WhenAny(tasks);

            // Remove the completed task from the list
            tasks.Remove(completedTask);
        }

        Console.WriteLine("All items processed.");
    }

This pattern does not work well in Durable functions, because if queue is say, millions of items, replay history will be quite long.
In order to not to have a huge replay history, we can use context.ContinueAsNew. However, if we do, we lose Task references and can't await them in the new run.

What I would like to propose is ability to instantiate the Task reference from InstanceId. Using this functionality, new runs of orchestrators can get a reference to tasks started by previous orchestration runs. Eternal orchestrations then will get an ability to pass most important artefacts they have - task references - to future selves.

 public static async Task LongQueueProcessorOrchestrator(
     [OrchestrationTrigger] TaskOrchestrationContext context, EternalOrchestratorArgs args)
 {
     var previousTasks = args.instanceIds
         .Select(instanceId => context.**TaskFromInstanceId**(instanceId))
         .ToArray();

     var availableParallelism = 10 - previousTasks.Count;

     var workload = context.CallActivityAsync<int[]>("GetSomeWorkload");

     var tasksToStart = Math.Min(availableParallelism, workLoad.Length);

     var newTasks = workLoad.Take(tasksToStart).Select((work, index) =>
     {
         var instanceId = Guid.NewGuid().ToString();
         return (instanceId, context.CallSubOrchestratorAsync(
             "DoWork",
             work,
             new TaskOptions().WithInstanceId(instanceId)));
     }).ToArray();

     var allTasks = previousTasks.Concat(newTasks).ToArray();

     var completedTasks = Task.WhenAny(allTasks);

     allTasks.Remove(completedTasks);

     // Workload completed, as well as tasks?
     if (workload.Length == 0 && allTasks.Count = 0)
         return;

     // If not, continue looping
     context.ContinueAsNew(
         new EternalOrchestratorArgs(allTasks.Select(t => t.instanceId).ToArray()));
 }


@vgpro54321 vgpro54321 changed the title Add a feature to await tasks started by eternal orchestration in preceeding runs Add a feature to await tasks started by eternal orchestration in preceding runs (AKA maintain task references after ContinueAsNew) Nov 22, 2024
@lilyjma lilyjma added Enhancement Feature requests. and removed Needs: Triage 🔍 labels Nov 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancement Feature requests.
Projects
None yet
Development

No branches or pull requests

2 participants