From c4a4a422df94f484334d5235dfad4a45f484eb5f Mon Sep 17 00:00:00 2001 From: Cassandra Coyle Date: Tue, 19 Nov 2024 17:12:13 -0600 Subject: [PATCH] wf test updates Signed-off-by: Cassandra Coyle --- scheduler-actor-reminders/README.md | 17 +----- scheduler-jobs/README.md | 8 --- scheduler-workflow/README.md | 8 --- scheduler-workflow/workflow.go | 95 ++++++++++++++++------------- 4 files changed, 55 insertions(+), 73 deletions(-) diff --git a/scheduler-actor-reminders/README.md b/scheduler-actor-reminders/README.md index 2be457ff..8efc0fdd 100644 --- a/scheduler-actor-reminders/README.md +++ b/scheduler-actor-reminders/README.md @@ -33,19 +33,4 @@ Run the client with: dapr run --app-id player-actor --app-port 3008 --dapr-http-port 3501 --dapr-grpc-port 50001 --log-level debug --config ../dapr/config.yaml -- go run player-actor-client.go ``` -Note the config is using `SchedulerReminders` - -Or - -Build app images from `scheduler-actor-reminders` directory: -```shell -docker build -t player-actor-server -f Dockerfile-server . -docker build -t player-actor-client -f Dockerfile-client . -``` - -Run app containers: -```shell -# optionally add -d to both commands to run in background -docker run --name player-actor-server -p 3007:3007 player-actor-server -docker run --name player-actor-client -p 3008:3008 player-actor-client -``` \ No newline at end of file +Note the config is using `SchedulerReminders` \ No newline at end of file diff --git a/scheduler-jobs/README.md b/scheduler-jobs/README.md index 6056cb55..673c0f87 100644 --- a/scheduler-jobs/README.md +++ b/scheduler-jobs/README.md @@ -17,12 +17,4 @@ dapr run \ --dapr-grpc-port 3501 --app-protocol grpc \ --dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \ -- go run scheduler-jobs.go -``` - -To run locally as a container: -```shell -docker build -t scheduler-jobs . -docker run -p 8383:8383 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background -# check container is running -docker ps ``` \ No newline at end of file diff --git a/scheduler-workflow/README.md b/scheduler-workflow/README.md index df3376db..cc720079 100644 --- a/scheduler-workflow/README.md +++ b/scheduler-workflow/README.md @@ -36,12 +36,4 @@ The long-haul test workflow performs the following operations repeatedly: Run the server with: ```shell dapr run --app-id scheduler-workflow --app-port 8484 --dapr-http-port 3502 --log-level debug --config dapr/config.yaml -- go run workflow.go -``` - -To run locally as a container, assuming scheduler + dapr are running as well: -```shell -docker build -t scheduler-workflow . -docker run -p 8383:8383 --name scheduler-jobs scheduler-workflow # optionally add -d to run in background -# check container is running -docker ps ``` \ No newline at end of file diff --git a/scheduler-workflow/workflow.go b/scheduler-workflow/workflow.go index 1636404a..8a5379aa 100644 --- a/scheduler-workflow/workflow.go +++ b/scheduler-workflow/workflow.go @@ -7,17 +7,19 @@ import ( http2 "net/http" "os" "os/signal" + "sync/atomic" "syscall" "time" dapr "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/service/common" "github.com/dapr/go-sdk/service/http" "github.com/dapr/go-sdk/workflow" ) const appPort = ":8484" -var stage int +var stage atomic.Int64 func main() { ctx, cancel := context.WithCancel(context.Background()) @@ -25,7 +27,7 @@ func main() { client, err := dapr.NewClient() if err != nil { - panic(err) + log.Fatalf("Error getting dapr client: %v", err) } defer client.Close() @@ -41,29 +43,23 @@ func main() { if err != nil { log.Fatal(err) } - fmt.Println("Workflow worker initialized") - - defer func() { - if err := worker.Shutdown(); err != nil { - log.Printf("Failed to shutdown workflow worker: %v", err) - } - }() + log.Println("Workflow worker initialized") if err := worker.RegisterWorkflow(TestWorkflow); err != nil { - log.Fatalf("Failed to register workflow: %v", err) + log.Printf("Failed to register workflow: %v", err) } if err := worker.RegisterActivity(TestActivity); err != nil { - log.Fatalf("Failed to register activity: %v", err) + log.Printf("Failed to register activity: %v", err) } // Start workflow runner if err := worker.Start(); err != nil { - log.Fatalf("Failed to start worker: %v", err) + log.Printf("Failed to start worker: %v", err) } - fmt.Println("Workflow worker started") + log.Println("Workflow worker started") go startLonghaulWorkflow(ctx, client) - waitForShutdown(cancel) + waitForShutdown(daprService, worker, cancel) } func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { @@ -94,12 +90,12 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) { return "", err } - if stage >= 100 { - stage = 0 + if stage.Load() >= 100 { + stage.Store(0) } - stage++ - return fmt.Sprintf("Stage: %d", stage), nil + stage.Add(1) + return fmt.Sprintf("Stage: %d", stage.Load()), nil } // startLonghaulWorkflow performs the following operations on a workflow: @@ -111,7 +107,7 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { case <-ctx.Done(): return default: - fmt.Printf("Starting workflow iteration %d\n", i) + log.Printf("Starting workflow iteration %d\n", i) instanceID := fmt.Sprintf("longhaul-instance-%d", i) workflowReq := &dapr.StartWorkflowRequest{ InstanceID: instanceID, @@ -121,65 +117,73 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { SendRawInput: false, } - respStart, err := client.StartWorkflowBeta1(ctx, workflowReq) + startWfCtx, startWfCancel := context.WithTimeout(ctx, 5*time.Second) + respStart, err := client.StartWorkflowBeta1(startWfCtx, workflowReq) + startWfCancel() if err != nil { log.Printf("Iteration %d: Failed to start workflow: %v\n", i, err) continue } - fmt.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID) + log.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID) - err = client.PauseWorkflowBeta1(ctx, &dapr.PauseWorkflowRequest{ + pauseWfCtx, pauseWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.PauseWorkflowBeta1(pauseWfCtx, &dapr.PauseWorkflowRequest{ InstanceID: instanceID, WorkflowComponent: "", }) + pauseWfCancel() if err != nil { - log.Fatalf("Failed to pause workflow: %v\n", err) + log.Printf("Failed to pause workflow: %v\n", err) } - fmt.Printf("Workflow '%s' paused\n", instanceID) - - err = client.ResumeWorkflowBeta1(ctx, &dapr.ResumeWorkflowRequest{ + log.Printf("Workflow '%s' paused\n", instanceID) + resumeWfCtx, resumeWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.ResumeWorkflowBeta1(resumeWfCtx, &dapr.ResumeWorkflowRequest{ InstanceID: instanceID, WorkflowComponent: "", }) + resumeWfCancel() if err != nil { - log.Fatalf("Failed to resume workflow: %v\n", err) + log.Printf("Failed to resume workflow: %v\n", err) } - fmt.Printf("Workflow '%s' resumed\n", instanceID) - + log.Printf("Workflow '%s' resumed\n", instanceID) + raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second) // Raise event to advance the workflow - err = client.RaiseEventWorkflowBeta1(ctx, &dapr.RaiseEventWorkflowRequest{ + err = client.RaiseEventWorkflowBeta1(raiseEventWfCtx, &dapr.RaiseEventWorkflowRequest{ InstanceID: instanceID, WorkflowComponent: "", EventName: "testEvent", EventData: "testData", }) + raiseEventWfCancel() if err != nil { - log.Fatalf("Failed to raise event: %v\n", err) + log.Printf("Failed to raise event: %v\n", err) } - fmt.Printf("Workflow '%s' event raised\n", instanceID) - fmt.Printf("[wfclient] stage: %d\n", stage) + log.Printf("Workflow '%s' event raised\n", instanceID) + log.Printf("[wfclient] stage: %d\n", stage.Load()) // Wait for workflow to complete // Poll every 5 seconds to check the workflow status waitForWorkflowCompletion(ctx, client, instanceID) - + terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second) // Terminate and purge after completion - err = client.TerminateWorkflowBeta1(ctx, &dapr.TerminateWorkflowRequest{ + err = client.TerminateWorkflowBeta1(terminateWfCtx, &dapr.TerminateWorkflowRequest{ InstanceID: instanceID, }) + terminateWfCancel() if err != nil { log.Printf("Failed to terminate workflow %s: %v\n", instanceID, err) } else { - fmt.Printf("Workflow '%s' terminated\n", instanceID) + log.Printf("Workflow '%s' terminated\n", instanceID) } - - err = client.PurgeWorkflowBeta1(ctx, &dapr.PurgeWorkflowRequest{ + purgeWfCtx, purgeWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.PurgeWorkflowBeta1(purgeWfCtx, &dapr.PurgeWorkflowRequest{ InstanceID: instanceID, }) + purgeWfCancel() if err != nil { log.Printf("Failed to purge workflow %s: %v\n", instanceID, err) } else { - fmt.Printf("Workflow '%s' purged\n", instanceID) + log.Printf("Workflow '%s' purged\n", instanceID) } i++ @@ -203,7 +207,7 @@ func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instance switch respGet.RuntimeStatus { case workflow.StatusCompleted.String(): - fmt.Printf("Workflow '%s' completed\n", instanceID) + log.Printf("Workflow '%s' completed\n", instanceID) return case workflow.StatusFailed.String(): log.Printf("Workflow '%s' failed\n", instanceID) @@ -214,12 +218,21 @@ func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instance } // waitForShutdown keeps the app alive until an interrupt or termination signal is received -func waitForShutdown(cancelFunc context.CancelFunc) { +func waitForShutdown(daprService common.Service, worker *workflow.WorkflowWorker, cancelFunc context.CancelFunc) { sigCh := make(chan os.Signal, 1) // Notify the channel on Interrupt (Ctrl+C) or SIGTERM (for Docker/K8s graceful shutdown) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) <-sigCh log.Println("Shutting down...") + + if err := daprService.GracefulStop(); err != nil { + log.Printf("Failed to gracefully shutdown dapr service: %v", err) + } + + if err := worker.Shutdown(); err != nil { + log.Printf("Failed to shutdown workflow worker: %v", err) + } + cancelFunc() }