Skip to content

Commit

Permalink
update ctx + cleanup + readme
Browse files Browse the repository at this point in the history
Signed-off-by: Cassandra Coyle <[email protected]>
  • Loading branch information
cicoyle committed Nov 19, 2024
1 parent 16c161c commit 061cc23
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 52 deletions.
25 changes: 18 additions & 7 deletions scheduler-jobs/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# How-To Run Locally:
# Scheduler Jobs

To run locally as a process: `go run scheduler-jobs.go`, assuming you have a scheduler and dapr running accordingly (see below).
## Overview

Run Scheduler:
![scheduler_debugger_config.png](scheduler_debugger_config.png)
- schedule 100 oneshot jobs indefinitely (repeat = 1)
- schedule 100 indefinite jobs indefinitely (repeat not set, trigger every 30s)
- schedule repeat-job job indefinitely (repeat = 5, trigger every 1s due immediately)
- indefinitely schedule and delete a create-delete-job job (repeat = 1, trigger every 1s)

Run Dapr sidecar:
![sidecar_debugger_config.png](sidecar_debugger_config.png)
## How-To Run Locally:

Run with dapr:
```shell
dapr run \
--app-id scheduler-jobs \
--app-port 8383 \
--dapr-grpc-port 3501 --app-protocol grpc \
--dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \
-- go run scheduler-jobs.go
```

To run locally as a container:
```shell
docker build -t scheduler-jobs .
docker run -p 3006:3006 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background
docker run -p 8383:8383 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background
# check container is running
docker ps
```
118 changes: 73 additions & 45 deletions scheduler-jobs/scheduler-jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"github.com/dapr/go-sdk/client"
)

const appPort = ":3006"
const daprGRPCPort = "3501"
const appPort = ":8383"

var oneshot atomic.Int64
var indefinite atomic.Int64
Expand Down Expand Up @@ -121,8 +120,15 @@ func (s *appServer) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventReque
return nil, nil
}

func scheduleOneshotJobs(daprClient client.Client) {
func scheduleOneshotJobs(ctx context.Context, daprClient client.Client) {
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
log.Println("context canceled, stopping scheduleOneshotJobs.")
return
default:
}
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
name := "oneshot-job-" + strconv.Itoa(i)
req := &client.Job{
Name: name,
Expand All @@ -132,15 +138,23 @@ func scheduleOneshotJobs(daprClient client.Client) {
TTL: "40s",
Data: nil,
}
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
cancel()
if err != nil {
log.Printf("Error scheduling oneshot job '%s': %s\n", name, err)
}
}
}

func scheduleIndefiniteJobs(daprClient client.Client) {
func scheduleIndefiniteJobs(ctx context.Context, daprClient client.Client) {
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
log.Println("context canceled, stopping scheduleOneshotJobs.")
return
default:
}
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
name := "indefinite-job-" + strconv.Itoa(i)
req := &client.Job{
Name: name,
Expand All @@ -149,14 +163,15 @@ func scheduleIndefiniteJobs(daprClient client.Client) {
TTL: "40s",
Data: nil,
}
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
cancel()
if err != nil {
log.Printf("Error scheduling indefinite job '%s': %s\n", name, err)
}
}
}

func scheduleRepeatedJob(daprClient client.Client) {
func scheduleRepeatedJob(ctx context.Context, daprClient client.Client) {
name := "repeat-job"
req := &client.Job{
Name: name,
Expand All @@ -166,13 +181,15 @@ func scheduleRepeatedJob(daprClient client.Client) {
TTL: "10s",
Data: nil,
}
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
if err != nil {
log.Printf("Error scheduling repeat job '%s': %s\n", name, err)
}
}

func scheduleSingleJob(daprClient client.Client) {
func scheduleSingleJob(ctx context.Context, daprClient client.Client) {
name := "create-delete-job"
req := &client.Job{
Name: name,
Expand All @@ -182,15 +199,19 @@ func scheduleSingleJob(daprClient client.Client) {
TTL: "3s",
Data: nil,
}
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
if err != nil {
log.Printf("Error scheduling single job '%s': %s\n", name, err)
}
}

func deleteSingleJob(daprClient client.Client) {
func deleteSingleJob(ctx context.Context, daprClient client.Client) {
name := "create-delete-job"
err := daprClient.DeleteJobAlpha1(context.Background(), name)
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := daprClient.DeleteJobAlpha1(jobCtx, name)
if err != nil {
log.Printf("Error deleting single job '%s': %s\n", name, err)
}
Expand All @@ -209,8 +230,7 @@ func main() {
}
}(ctx)

// --dapr-grpc-port=3501
daprClient, err := client.NewClientWithPort(daprGRPCPort)
daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("Error getting dapr client: %v", err)
}
Expand All @@ -222,46 +242,54 @@ func main() {
time.Sleep(5 * time.Second)

// Schedule initial batch of jobs
go scheduleOneshotJobs(daprClient)
go scheduleIndefiniteJobs(daprClient)
go scheduleRepeatedJob(daprClient)
go scheduleOneshotJobs(ctx, daprClient)
go scheduleIndefiniteJobs(ctx, daprClient)
go scheduleRepeatedJob(ctx, daprClient)

// Schedule additional oneshot jobs once 100 are triggered
go func(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-oneshotDoneCh:
log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...")
time.Sleep(10 * time.Second)
log.Println("Scheduling next batch of oneshot jobs...")
go scheduleOneshotJobs(daprClient)
for {
select {
case <-ctx.Done():
log.Println("context canceled, stopping oneshot scheduling goroutine.")
return
case <-oneshotDoneCh:
log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...")
time.Sleep(10 * time.Second)
log.Println("Scheduling next batch of oneshot jobs...")
go scheduleOneshotJobs(ctx, daprClient)
}
}
}(ctx)

// Schedule additional indefinite jobs once 100 are triggered
go func(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-indefiniteDoneCh:
log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...")
time.Sleep(10 * time.Second)
log.Println("Scheduling next batch of indefinite jobs...")
go scheduleIndefiniteJobs(daprClient)
for {
select {
case <-ctx.Done():
log.Println("context canceled, stopping indefinite scheduling goroutine.")
return
case <-indefiniteDoneCh:
log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...")
time.Sleep(10 * time.Second)
log.Println("Scheduling next batch of indefinite jobs...")
go scheduleIndefiniteJobs(ctx, daprClient)
}
}
}(ctx)

// Schedule job to trigger immediately every second for 1s
// Schedule job to trigger immediately every second for 1s for 5 times max (repeats)
go func(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-repeatDoneCh:
log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...")
time.Sleep(60 * time.Second)
log.Println("Scheduling next repeated job...")
go scheduleRepeatedJob(daprClient)
for {
select {
case <-ctx.Done():
return
case <-repeatDoneCh:
log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...")
time.Sleep(60 * time.Second)
log.Println("Scheduling next repeated job...")
go scheduleRepeatedJob(ctx, daprClient)
}
}
}(ctx)

Expand All @@ -274,13 +302,13 @@ func main() {
case <-receivedSingleJobDoneCh:
log.Println("Received input that the create-delete-job triggered, now deleting the job...")
// received triggered job, now delete it & set atomic int to 0
deleteSingleJob(daprClient)
deleteSingleJob(ctx, daprClient)
log.Println("Successfully deleted create-delete-job.")
}
}
}(ctx)

go scheduleSingleJob(daprClient)
go scheduleSingleJob(ctx, daprClient)

// Reschedule the create-delete job after deletion, ensure triggers once
go func(ctx context.Context) {
Expand All @@ -292,7 +320,7 @@ func main() {
log.Println("Received input that the create-delete-job was deleted. Sleeping for 5 seconds...")
time.Sleep(5 * time.Second)
log.Println("Scheduling create-delete-job...")
scheduleSingleJob(daprClient)
scheduleSingleJob(ctx, daprClient)
log.Println("Successfully scheduled create-delete-job.")
}
}
Expand Down
Binary file removed scheduler-jobs/scheduler_debugger_config.png
Binary file not shown.
Binary file removed scheduler-jobs/sidecar_debugger_config.png
Binary file not shown.

0 comments on commit 061cc23

Please sign in to comment.