diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index adbb3cd..2c8bc77 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "math/rand" "net" "os" "testing" @@ -253,7 +252,7 @@ func Test_Grpc_ReuseInstanceIDSkip(t *testing.T) { cancelListener := startGrpcListener(t, r) defer cancelListener() instanceIDs := api.InstanceID("SKIP_IF_RUNNING_OR_COMPLETED") - ReuseIdOption := api.OrchestrationIDReuseOption{ + reuseIdOption := api.OrchestrationIDReuseOption{ CreateOrchestrationAction: protos.CreateOrchestrationAction_SKIP, OrchestrationStatuses: []protos.OrchestrationStatus{ protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING, @@ -264,19 +263,21 @@ func Test_Grpc_ReuseInstanceIDSkip(t *testing.T) { id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs)) require.NoError(t, err) - // random sleep a time from 0 to 5 seconds, to allow previous orchestration instance to set in different status - randomeDuration := time.Duration(rand.Intn(6)) * time.Second - time.Sleep(randomeDuration) - // schedule again - id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption)) + // wait orchestration to start + grpcClient.WaitForOrchestrationStart(ctx, id) + pivotTime := time.Now() + // schedule again, it should skip creating the new orchestration + id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(reuseIdOption)) require.NoError(t, err) timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) defer cancelTimeout() metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) require.NoError(t, err) assert.Equal(t, true, metadata.IsComplete()) + // the first orchestration should complete as the second one is skipped assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) - time.Sleep(1 * time.Second) + // assert the orchestration created timestamp + assert.True(t, pivotTime.After(metadata.CreatedAt)) } func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { @@ -303,7 +304,7 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { cancelListener := startGrpcListener(t, r) defer cancelListener() instanceIDs := api.InstanceID("TERMINATE_IF_RUNNING_OR_COMPLETED") - ReuseIdOption := api.OrchestrationIDReuseOption{ + reuseIdOption := api.OrchestrationIDReuseOption{ CreateOrchestrationAction: protos.CreateOrchestrationAction_TERMINATE, OrchestrationStatuses: []protos.OrchestrationStatus{ protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING, @@ -314,19 +315,21 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) { id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs)) require.NoError(t, err) - // random sleep a time from 0 to 5 seconds, to allow previous orchestration instance to set in different status - randomeDuration := time.Duration(rand.Intn(6)) * time.Second - time.Sleep(randomeDuration) - // schedule again - id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption)) + // wait orchestration to start + grpcClient.WaitForOrchestrationStart(ctx, id) + pivotTime := time.Now() + // schedule again, it should terminate the first orchestration and start a new one + id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(reuseIdOption)) require.NoError(t, err) timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) defer cancelTimeout() metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true)) require.NoError(t, err) assert.Equal(t, true, metadata.IsComplete()) - assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput) - time.Sleep(1 * time.Second) + // the second orchestration should complete. + assert.Equal(t, `"Hello, World!"`, metadata.SerializedOutput) + // assert the orchestration created timestamp + assert.True(t, pivotTime.Before(metadata.CreatedAt)) } func Test_Grpc_ReuseInstanceIDThrow(t *testing.T) { @@ -360,5 +363,4 @@ func Test_Grpc_ReuseInstanceIDThrow(t *testing.T) { if assert.Error(t, err) { assert.Contains(t, err.Error(), "orchestration instance already exists") } - time.Sleep(1 * time.Second) }