Skip to content

Commit

Permalink
wf test updates
Browse files Browse the repository at this point in the history
Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Nov 19, 2024
1 parent 061cc23 commit c4a4a42
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 73 deletions.
17 changes: 1 addition & 16 deletions scheduler-actor-reminders/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,4 @@ Run the client with:
dapr run --app-id player-actor --app-port 3008 --dapr-http-port 3501 --dapr-grpc-port 50001 --log-level debug --config ../dapr/config.yaml -- go run player-actor-client.go
```

Note the config is using `SchedulerReminders`

Or

Build app images from `scheduler-actor-reminders` directory:
```shell
docker build -t player-actor-server -f Dockerfile-server .
docker build -t player-actor-client -f Dockerfile-client .
```

Run app containers:
```shell
# optionally add -d to both commands to run in background
docker run --name player-actor-server -p 3007:3007 player-actor-server
docker run --name player-actor-client -p 3008:3008 player-actor-client
```
Note the config is using `SchedulerReminders`
8 changes: 0 additions & 8 deletions scheduler-jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,4 @@ dapr run \
--dapr-grpc-port 3501 --app-protocol grpc \
--dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \
-- go run scheduler-jobs.go
```

To run locally as a container:
```shell
docker build -t scheduler-jobs .
docker run -p 8383:8383 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background
# check container is running
docker ps
```
8 changes: 0 additions & 8 deletions scheduler-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,4 @@ The long-haul test workflow performs the following operations repeatedly:
Run the server with:
```shell
dapr run --app-id scheduler-workflow --app-port 8484 --dapr-http-port 3502 --log-level debug --config dapr/config.yaml -- go run workflow.go
```

To run locally as a container, assuming scheduler + dapr are running as well:
```shell
docker build -t scheduler-workflow .
docker run -p 8383:8383 --name scheduler-jobs scheduler-workflow # optionally add -d to run in background
# check container is running
docker ps
```
95 changes: 54 additions & 41 deletions scheduler-workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@ import (
http2 "net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/http"
"github.com/dapr/go-sdk/workflow"
)

const appPort = ":8484"

var stage int
var stage atomic.Int64

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, err := dapr.NewClient()
if err != nil {
panic(err)
log.Fatalf("Error getting dapr client: %v", err)
}
defer client.Close()

Expand All @@ -41,29 +43,23 @@ func main() {
if err != nil {
log.Fatal(err)
}
fmt.Println("Workflow worker initialized")

defer func() {
if err := worker.Shutdown(); err != nil {
log.Printf("Failed to shutdown workflow worker: %v", err)
}
}()
log.Println("Workflow worker initialized")

if err := worker.RegisterWorkflow(TestWorkflow); err != nil {
log.Fatalf("Failed to register workflow: %v", err)
log.Printf("Failed to register workflow: %v", err)
}
if err := worker.RegisterActivity(TestActivity); err != nil {
log.Fatalf("Failed to register activity: %v", err)
log.Printf("Failed to register activity: %v", err)
}

// Start workflow runner
if err := worker.Start(); err != nil {
log.Fatalf("Failed to start worker: %v", err)
log.Printf("Failed to start worker: %v", err)
}
fmt.Println("Workflow worker started")
log.Println("Workflow worker started")

go startLonghaulWorkflow(ctx, client)
waitForShutdown(cancel)
waitForShutdown(daprService, worker, cancel)
}

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
Expand Down Expand Up @@ -94,12 +90,12 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {
return "", err
}

if stage >= 100 {
stage = 0
if stage.Load() >= 100 {
stage.Store(0)
}

stage++
return fmt.Sprintf("Stage: %d", stage), nil
stage.Add(1)
return fmt.Sprintf("Stage: %d", stage.Load()), nil
}

// startLonghaulWorkflow performs the following operations on a workflow:
Expand All @@ -111,7 +107,7 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) {
case <-ctx.Done():
return
default:
fmt.Printf("Starting workflow iteration %d\n", i)
log.Printf("Starting workflow iteration %d\n", i)
instanceID := fmt.Sprintf("longhaul-instance-%d", i)
workflowReq := &dapr.StartWorkflowRequest{
InstanceID: instanceID,
Expand All @@ -121,65 +117,73 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) {
SendRawInput: false,
}

respStart, err := client.StartWorkflowBeta1(ctx, workflowReq)
startWfCtx, startWfCancel := context.WithTimeout(ctx, 5*time.Second)
respStart, err := client.StartWorkflowBeta1(startWfCtx, workflowReq)
startWfCancel()
if err != nil {
log.Printf("Iteration %d: Failed to start workflow: %v\n", i, err)
continue
}
fmt.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID)
log.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID)

err = client.PauseWorkflowBeta1(ctx, &dapr.PauseWorkflowRequest{
pauseWfCtx, pauseWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.PauseWorkflowBeta1(pauseWfCtx, &dapr.PauseWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
})
pauseWfCancel()
if err != nil {
log.Fatalf("Failed to pause workflow: %v\n", err)
log.Printf("Failed to pause workflow: %v\n", err)
}
fmt.Printf("Workflow '%s' paused\n", instanceID)

err = client.ResumeWorkflowBeta1(ctx, &dapr.ResumeWorkflowRequest{
log.Printf("Workflow '%s' paused\n", instanceID)
resumeWfCtx, resumeWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.ResumeWorkflowBeta1(resumeWfCtx, &dapr.ResumeWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
})
resumeWfCancel()
if err != nil {
log.Fatalf("Failed to resume workflow: %v\n", err)
log.Printf("Failed to resume workflow: %v\n", err)
}
fmt.Printf("Workflow '%s' resumed\n", instanceID)

log.Printf("Workflow '%s' resumed\n", instanceID)
raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second)
// Raise event to advance the workflow
err = client.RaiseEventWorkflowBeta1(ctx, &dapr.RaiseEventWorkflowRequest{
err = client.RaiseEventWorkflowBeta1(raiseEventWfCtx, &dapr.RaiseEventWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
EventName: "testEvent",
EventData: "testData",
})
raiseEventWfCancel()
if err != nil {
log.Fatalf("Failed to raise event: %v\n", err)
log.Printf("Failed to raise event: %v\n", err)
}
fmt.Printf("Workflow '%s' event raised\n", instanceID)
fmt.Printf("[wfclient] stage: %d\n", stage)
log.Printf("Workflow '%s' event raised\n", instanceID)
log.Printf("[wfclient] stage: %d\n", stage.Load())

// Wait for workflow to complete
// Poll every 5 seconds to check the workflow status
waitForWorkflowCompletion(ctx, client, instanceID)

terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second)
// Terminate and purge after completion
err = client.TerminateWorkflowBeta1(ctx, &dapr.TerminateWorkflowRequest{
err = client.TerminateWorkflowBeta1(terminateWfCtx, &dapr.TerminateWorkflowRequest{
InstanceID: instanceID,
})
terminateWfCancel()
if err != nil {
log.Printf("Failed to terminate workflow %s: %v\n", instanceID, err)
} else {
fmt.Printf("Workflow '%s' terminated\n", instanceID)
log.Printf("Workflow '%s' terminated\n", instanceID)
}

err = client.PurgeWorkflowBeta1(ctx, &dapr.PurgeWorkflowRequest{
purgeWfCtx, purgeWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.PurgeWorkflowBeta1(purgeWfCtx, &dapr.PurgeWorkflowRequest{
InstanceID: instanceID,
})
purgeWfCancel()
if err != nil {
log.Printf("Failed to purge workflow %s: %v\n", instanceID, err)
} else {
fmt.Printf("Workflow '%s' purged\n", instanceID)
log.Printf("Workflow '%s' purged\n", instanceID)
}

i++
Expand All @@ -203,7 +207,7 @@ func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instance

switch respGet.RuntimeStatus {
case workflow.StatusCompleted.String():
fmt.Printf("Workflow '%s' completed\n", instanceID)
log.Printf("Workflow '%s' completed\n", instanceID)
return
case workflow.StatusFailed.String():
log.Printf("Workflow '%s' failed\n", instanceID)
Expand All @@ -214,12 +218,21 @@ func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instance
}

// waitForShutdown keeps the app alive until an interrupt or termination signal is received
func waitForShutdown(cancelFunc context.CancelFunc) {
func waitForShutdown(daprService common.Service, worker *workflow.WorkflowWorker, cancelFunc context.CancelFunc) {
sigCh := make(chan os.Signal, 1)
// Notify the channel on Interrupt (Ctrl+C) or SIGTERM (for Docker/K8s graceful shutdown)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh

log.Println("Shutting down...")

if err := daprService.GracefulStop(); err != nil {
log.Printf("Failed to gracefully shutdown dapr service: %v", err)
}

if err := worker.Shutdown(); err != nil {
log.Printf("Failed to shutdown workflow worker: %v", err)
}

cancelFunc()
}

0 comments on commit c4a4a42

Please sign in to comment.