Skip to content

Commit

Permalink
improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kaibocai committed Dec 1, 2023
1 parent 9bbcf92 commit 57ffa30
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"log"
"math/rand"
"net"
"os"
"testing"
Expand Down Expand Up @@ -253,7 +252,7 @@ func Test_Grpc_ReuseInstanceIDSkip(t *testing.T) {
cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceIDs := api.InstanceID("SKIP_IF_RUNNING_OR_COMPLETED")
ReuseIdOption := api.OrchestrationIDReuseOption{
reuseIdOption := api.OrchestrationIDReuseOption{
CreateOrchestrationAction: protos.CreateOrchestrationAction_SKIP,
OrchestrationStatuses: []protos.OrchestrationStatus{
protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING,
Expand All @@ -264,19 +263,21 @@ func Test_Grpc_ReuseInstanceIDSkip(t *testing.T) {

id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs))
require.NoError(t, err)
// random sleep a time from 0 to 5 seconds, to allow previous orchestration instance to set in different status
randomeDuration := time.Duration(rand.Intn(6)) * time.Second
time.Sleep(randomeDuration)
// schedule again
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption))
// wait orchestration to start
grpcClient.WaitForOrchestrationStart(ctx, id)
pivotTime := time.Now()
// schedule again, it should skip creating the new orchestration
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(reuseIdOption))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
// the first orchestration should complete as the second one is skipped
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
time.Sleep(1 * time.Second)
// assert the orchestration created timestamp
assert.True(t, pivotTime.After(metadata.CreatedAt))
}

func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) {
Expand All @@ -303,7 +304,7 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) {
cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceIDs := api.InstanceID("TERMINATE_IF_RUNNING_OR_COMPLETED")
ReuseIdOption := api.OrchestrationIDReuseOption{
reuseIdOption := api.OrchestrationIDReuseOption{
CreateOrchestrationAction: protos.CreateOrchestrationAction_TERMINATE,
OrchestrationStatuses: []protos.OrchestrationStatus{
protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING,
Expand All @@ -314,19 +315,21 @@ func Test_Grpc_ReuseInstanceIDTerminate(t *testing.T) {

id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceIDs))
require.NoError(t, err)
// random sleep a time from 0 to 5 seconds, to allow previous orchestration instance to set in different status
randomeDuration := time.Duration(rand.Intn(6)) * time.Second
time.Sleep(randomeDuration)
// schedule again
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(ReuseIdOption))
// wait orchestration to start
grpcClient.WaitForOrchestrationStart(ctx, id)
pivotTime := time.Now()
// schedule again, it should terminate the first orchestration and start a new one
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationReuseOption(reuseIdOption))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
time.Sleep(1 * time.Second)
// the second orchestration should complete.
assert.Equal(t, `"Hello, World!"`, metadata.SerializedOutput)
// assert the orchestration created timestamp
assert.True(t, pivotTime.Before(metadata.CreatedAt))
}

func Test_Grpc_ReuseInstanceIDThrow(t *testing.T) {
Expand Down Expand Up @@ -360,5 +363,4 @@ func Test_Grpc_ReuseInstanceIDThrow(t *testing.T) {
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "orchestration instance already exists")
}
time.Sleep(1 * time.Second)
}

0 comments on commit 57ffa30

Please sign in to comment.