Skip to content

Commit

Permalink
Support recursive cascade terminate recursive
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 committed Dec 6, 2023
1 parent ba82393 commit 1f7746b
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 37 deletions.
47 changes: 47 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,50 @@ func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error) {
}
return e, nil
}

func GetSubOrchestrationInstances(ctx context.Context, be Backend, iid api.InstanceID) ([]api.InstanceID, error) {
owi := &OrchestrationWorkItem{
InstanceID: iid,
}
state, err := be.GetOrchestrationRuntimeState(ctx, owi)
if err != nil {
return nil, fmt.Errorf("failed to fetch orchestration state for '%s': %w", iid, err)
}
oldEvents := state.OldEvents()
newEvents := state.NewEvents()

subOrchestrationInstancesTotal := make([]api.InstanceID, 0)
subOrchestrationMap := make(map[int32]api.InstanceID)
for _, e := range oldEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
childSubOrchestrationInstances, err := GetSubOrchestrationInstances(ctx, be, api.InstanceID(created.InstanceId))
if err != nil {
return nil, fmt.Errorf("failed to fetch sub-orchestration instances for '%s': %w", created.InstanceId, err)
}
subOrchestrationMap[e.EventId] = api.InstanceID(created.InstanceId)
subOrchestrationInstancesTotal = append(subOrchestrationInstancesTotal, childSubOrchestrationInstances...)
} else if completed := e.GetSubOrchestrationInstanceCompleted(); completed != nil {
delete(subOrchestrationMap, completed.TaskScheduledId)
} else if failed := e.GetSubOrchestrationInstanceFailed(); failed != nil {
delete(subOrchestrationMap, failed.TaskScheduledId)
}
}
for _, e := range newEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
childSubOrchestrationInstances, err := GetSubOrchestrationInstances(ctx, be, api.InstanceID(created.InstanceId))
if err != nil {
return nil, fmt.Errorf("failed to fetch sub-orchestration instances for '%s': %w", created.InstanceId, err)
}
subOrchestrationInstancesTotal = append(subOrchestrationInstancesTotal, childSubOrchestrationInstances...)
subOrchestrationMap[e.EventId] = api.InstanceID(created.InstanceId)
} else if completed := e.GetSubOrchestrationInstanceCompleted(); completed != nil {
delete(subOrchestrationMap, completed.TaskScheduledId)
} else if failed := e.GetSubOrchestrationInstanceFailed(); failed != nil {
delete(subOrchestrationMap, failed.TaskScheduledId)
}
}
for _, iid := range subOrchestrationMap {
subOrchestrationInstancesTotal = append(subOrchestrationInstancesTotal, iid)
}
return subOrchestrationInstancesTotal, nil
}
14 changes: 12 additions & 2 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,18 @@ func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.Insta
}

e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to add terminate event: %w", err)
instancesToTerminate := []api.InstanceID{id}
if req.Recursive {
subOrchestrationInstances, err := GetSubOrchestrationInstances(ctx, c.be, id)
if err != nil {
return fmt.Errorf("failed to fetch sub-orchestration instances: %w", err)
}
instancesToTerminate = append(instancesToTerminate, subOrchestrationInstances...)
}
for _, iid := range instancesToTerminate {
if err := c.be.AddNewOrchestrationEvent(ctx, iid, e); err != nil {
return fmt.Errorf("failed to add terminate event to workflow with instanceId %s: %w", iid, err)
}
}
return nil
}
Expand Down
15 changes: 12 additions & 3 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,20 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst

// TerminateInstance implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
instancesToTerminate := []api.InstanceID{api.InstanceID(req.InstanceId)}
if req.Recursive {
subOrchestrationInstances, err := GetSubOrchestrationInstances(ctx, g.backend, api.InstanceID(req.InstanceId))
if err != nil {
return nil, fmt.Errorf("failed to fetch sub-orchestration instances: %w", err)
}
instancesToTerminate = append(instancesToTerminate, subOrchestrationInstances...)
}
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := g.backend.AddNewOrchestrationEvent(ctx, api.InstanceID(req.InstanceId), e); err != nil {
return nil, err
for _, iid := range instancesToTerminate {
if err := g.backend.AddNewOrchestrationEvent(ctx, iid, e); err != nil {
return nil, fmt.Errorf("failed to add terminate event to workflow with instanceId %s: %w", iid, err)
}
}

return &protos.TerminateResponse{}, nil
}

Expand Down
32 changes: 0 additions & 32 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,38 +538,6 @@ func (ctx *OrchestrationContext) onExecutionResumed(er *protos.ExecutionResumedE
}

func (ctx *OrchestrationContext) onExecutionTerminated(et *protos.ExecutionTerminatedEvent) error {
if et.Recurse {
// Use a map to track which sub-orchestrations have been created but not completed
instancesToTerminate := make(map[int32]string)
for _, e := range ctx.oldEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
instancesToTerminate[e.EventId] = created.InstanceId
} else if completed := e.GetSubOrchestrationInstanceCompleted(); completed != nil {
delete(instancesToTerminate, completed.TaskScheduledId)
} else if failed := e.GetSubOrchestrationInstanceFailed(); failed != nil {
delete(instancesToTerminate, failed.TaskScheduledId)
}
}
for _, e := range ctx.newEvents {
if created := e.GetSubOrchestrationInstanceCreated(); created != nil {
instancesToTerminate[e.EventId] = created.InstanceId
} else if completed := e.GetSubOrchestrationInstanceCompleted(); completed != nil {
delete(instancesToTerminate, completed.TaskScheduledId)
} else if failed := e.GetSubOrchestrationInstanceFailed(); failed != nil {
delete(instancesToTerminate, failed.TaskScheduledId)
}
}

// Create a terminate action for each sub-orchestration that has not yet completed
for _, instanceID := range instancesToTerminate {
terminateAction := helpers.NewTerminateOrchestrationAction(
ctx.getNextSequenceNumber(),
instanceID,
et.Recurse,
et.Input)
ctx.pendingActions[terminateAction.Id] = terminateAction
}
}
if err := ctx.setCompleteInternal(et.Input, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED, nil); err != nil {
return err
}
Expand Down

0 comments on commit 1f7746b

Please sign in to comment.