Skip to content

Commit

Permalink
Adding new test for recursive terminate
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 committed Dec 6, 2023
1 parent 1f7746b commit 2d9b9d9
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,67 @@ func Test_Grpc_Terminate_Recursive(t *testing.T) {
})
}
}

func Test_Grpc_Terminate_Recursive2(t *testing.T) {
rootDelayTime := 5 * time.Second
L1DelayTime := 1 * time.Second
executedActivity := false
r := task.NewTaskRegistry()
r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) {
tasks := []task.Task{}
for i := 0; i < 5; i++ {
task := ctx.CallSubOrchestrator("L1")
tasks = append(tasks, task)
}
for _, task := range tasks {
task.Await(nil)
}
ctx.CreateTimer(rootDelayTime).Await(nil)
return nil, nil
})
r.AddOrchestratorN("L1", func(ctx *task.OrchestrationContext) (any, error) {
ctx.CallSubOrchestrator("L2")
ctx.CreateTimer(L1DelayTime).Await(nil)
return nil, nil
})
r.AddOrchestratorN("L2", func(ctx *task.OrchestrationContext) (any, error) {
ctx.CreateTimer(rootDelayTime).Await(nil)
ctx.CallActivity("Fail").Await(nil)
return nil, nil
})
r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) {
executedActivity = true
return nil, errors.New("Failed: Should not have executed the activity")
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()

// Test terminating with and without recursion
for _, recurse := range []bool{true, false} {
t.Run(fmt.Sprintf("Recurse = %v", recurse), func(t *testing.T) {
// Run the orchestration, which will block waiting for external events
id, err := grpcClient.ScheduleNewOrchestration(ctx, "Root")
require.NoError(t, err)

// Wait long enough to ensure that all L1 orchestrations have completed but Root and L2 are still running
time.Sleep(3 * time.Second)

// Terminate the root orchestration and mark whether a recursive termination
output := fmt.Sprintf("Recursive termination = %v", recurse)
opts := []api.TerminateOptions{api.WithOutput(output), api.WithRecursive(recurse)}
require.NoError(t, grpcClient.TerminateOrchestration(ctx, id, opts...))

// Wait for the root orchestration to complete and verify its terminated status
metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id)
require.NoError(t, err)
require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED, metadata.RuntimeStatus)
require.Equal(t, fmt.Sprintf("\"%s\"", output), metadata.SerializedOutput)

// Wait longer to ensure that none of the sub-orchestrations continued to the next step
// of executing the activity function.
time.Sleep(rootDelayTime)
assert.NotEqual(t, recurse, executedActivity)
})
}
}

0 comments on commit 2d9b9d9

Please sign in to comment.