diff --git a/.github/env/global.env b/.github/env/global.env index 70f766058..366dc03d7 100644 --- a/.github/env/global.env +++ b/.github/env/global.env @@ -1,5 +1,5 @@ - DAPR_CLI_VERSION: 1.13.0 - DAPR_RUNTIME_VERSION: 1.13.5 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v${DAPR_CLI_VERSION}/install/ - DAPR_DEFAULT_IMAGE_REGISTRY: ghcr - MACOS_PYTHON_VERSION: 3.10 +DAPR_CLI_VERSION: 1.14.0-rc.6 +DAPR_RUNTIME_VERSION: 1.14.0-rc.5 +DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v${DAPR_CLI_VERSION}/install/ +DAPR_DEFAULT_IMAGE_REGISTRY: ghcr +MACOS_PYTHON_VERSION: 3.10 diff --git a/README.md b/README.md index d78097694..4c78adb06 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Pick a building block API (for example, PubSub, state management, etc) and rapid | [Cryptography](./cryptography) | Perform cryptographic operations without exposing keys to your application | | [Resiliency](./resiliency) | Define and apply fault-tolerant policies (retries/back-offs, timeouts and circuit breakers) to your Dapr API requests | | [Workflow](./workflows) | Dapr Workflow enables you to create long running, fault-tolerant, stateful applications | +| [Jobs](./jobs) | Dapr Jobs enable you to manage and schedule tasks | ### Tutorials diff --git a/jobs/go/http/README.md b/jobs/go/http/README.md new file mode 100644 index 000000000..f775d43df --- /dev/null +++ b/jobs/go/http/README.md @@ -0,0 +1,152 @@ +# Dapr Jobs + +In this quickstart, you'll schedule, get, and delete a job using Dapr's Job API. This API is responsible for scheduling and running jobs at a specific time or interval. + +Visit [this](https://v1-14.docs.dapr.io/developing-applications/building-blocks/jobs/) link for more information about Dapr and the Jobs API. + +> **Note:** This example leverages HTTP `requests` only. If you are looking for the example using the Dapr Client SDK (recommended) [click here](../sdk/). + +This quickstart includes two apps: + +- `job-scheduler.go`, responsible for scheduling, retrieving and deleting jobs. +- `job-service.go`, responsible for handling the triggered jobs. + +## Run the app with the template file + +This section shows how to run both applications at once using [multi-app run template files](https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-overview/) with `dapr run -f .`. This enables to you test the interactions between multiple applications and will `schedule`, `run`, `get`, and `delete` jobs within a single process. + +Open a new terminal window and run the multi app run template: + + + +```bash +dapr run -f . +``` + +The terminal console output should look similar to this, where: + +- The `R2-D2` job is being scheduled. +- The `R2-D2` job is being executed after 2 seconds. +- The `C-3PO` job is being scheduled. +- The `C-3PO` job is being retrieved. + +```text +== APP - job-scheduler == Job Scheduled: R2-D2 +== APP - job-service == Received job request... +== APP - job-service == Starting droid: R2-D2 +== APP - job-service == Executing maintenance job: Oil Change +== APP - job-scheduler == Job Scheduled: C-3PO +== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"ttype.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} +``` + +After 30 seconds, the terminal output should present the `C-3PO` job being processed: + +```text +== APP - job-service == Received job request... +== APP - job-service == Starting droid: C-3PO +== APP - job-service == Executing maintenance job: Limb Calibration +``` + +2. Stop and clean up application processes + +```bash +dapr stop -f . +``` + + + +## Run the Jobs APIs individually + +### Schedule Jobs + +1. Open a terminal and run the `job-service` app: + +```bash +dapr run --app-id job-service --app-port 6200 --dapr-http-port 6280 -- go run . +``` + +2. On a new terminal window, schedule the `R2-D2` Job using the Jobs API. + +```bash +curl -X POST \ + http://localhost:6280/v1.0-alpha1/jobs/jobforjabba \ + -H "Content-Type: application/json" \ + -d '{ + "data": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "R2-D2:Oil Change" + }, + "dueTime": "2s" + }' + ``` + +Back at the `job-service` app terminal window, the output should be: + +```text +== APP - job-app == Received job request... +== APP - job-app == Starting droid: R2-D2 +== APP - job-app == Executing maintenance job: Oil Change +``` + +3. On the same terminal window, schedule the `C-3PO` Job using the Jobs API. + +```bash +curl -X POST \ + http://localhost:6280/v1.0-alpha1/jobs/c-3po \ + -H "Content-Type: application/json" \ + -d '{ + "data": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "C-3PO:Limb Calibration" + }, + "dueTime": "30s" + }' +``` + +### Get a scheduled job + +1. On the same terminal window, run the command below to get the recently scheduled `C-3PO` job. + +```bash +curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" +``` + +You should see the following: + +```text +{"name":"C-3PO", "dueTime":"30s", "data":{"@type":"type.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}} +``` + +### Delete a scheduled job + +1. On the same terminal window, run the command below to deleted the recently scheduled `C-3PO` job. + +```bash +curl -X DELETE http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" +``` + +2. Run the command below to attempt to retrieve the deleted job: + +```bash +curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json" +``` + +Back at the `job-service` app terminal window, the output should be: + +```text +ERRO[0249] Error getting job c-3po due to: rpc error: code = Unknown desc = job not found: app||default||job-service||c-3po instance=diagrid.local scope=dapr.api type=log ver=1.14.0-rc.2 +``` \ No newline at end of file diff --git a/jobs/go/http/dapr.yaml b/jobs/go/http/dapr.yaml new file mode 100644 index 000000000..c21872677 --- /dev/null +++ b/jobs/go/http/dapr.yaml @@ -0,0 +1,12 @@ +version: 1 +apps: + - appDirPath: ./job-service/ + appID: job-service + appPort: 6200 + daprHTTPPort: 6280 + command: ["go", "run", "."] + - appDirPath: ./job-scheduler/ + appID: job-scheduler + appPort: 6300 + daprHTTPPort: 6380 + command: ["go", "run", "."] \ No newline at end of file diff --git a/jobs/go/http/job-scheduler/go.mod b/jobs/go/http/job-scheduler/go.mod new file mode 100644 index 000000000..0cb787cbc --- /dev/null +++ b/jobs/go/http/job-scheduler/go.mod @@ -0,0 +1,3 @@ +module job-scheduler + +go 1.21 diff --git a/jobs/go/http/job-scheduler/go.sum b/jobs/go/http/job-scheduler/go.sum new file mode 100644 index 000000000..e69de29bb diff --git a/jobs/go/http/job-scheduler/job-scheduler.go b/jobs/go/http/job-scheduler/job-scheduler.go new file mode 100644 index 000000000..2b7827d4b --- /dev/null +++ b/jobs/go/http/job-scheduler/job-scheduler.go @@ -0,0 +1,113 @@ +package main + +import ( + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "time" +) + +var c3poJobBody = `{ + "data": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "C-3PO:Limb Calibration" + }, + "dueTime": "30s" + }` + +var r2d2JobBody = `{ + "data": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "R2-D2:Oil Change" + }, + "dueTime": "2s" + }` + +func main() { + //Sleep for 5 seconds to wait for job-service to start + time.Sleep(5 * time.Second) + + daprHost := os.Getenv("DAPR_HOST") + if daprHost == "" { + daprHost = "http://localhost" + } + + schedulerDaprHttpPort := "6280" + + client := http.Client{ + Timeout: 15 * time.Second, + } + + // Schedule a job using the Dapr Jobs API with short dueTime + jobName := "R2-D2" + reqURL := daprHost + ":" + schedulerDaprHttpPort + "/v1.0-alpha1/jobs/" + jobName + + req, err := http.NewRequest("POST", reqURL, strings.NewReader(r2d2JobBody)) + if err != nil { + log.Fatal(err.Error()) + } + + req.Header.Set("Content-Type", "application/json") + + // Schedule a job using the Dapr Jobs API + res, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + + if res.StatusCode != http.StatusNoContent { + log.Fatalf("failed to register job event handler. status code: %v", res.StatusCode) + } + + defer res.Body.Close() + + fmt.Println("Job Scheduled:", jobName) + + time.Sleep(5 * time.Second) + + // Schedule a job using the Dapr Jobs API with long dueTime + jobName = "C-3PO" + + reqURL = daprHost + ":" + schedulerDaprHttpPort + "/v1.0-alpha1/jobs/" + jobName + + req, err = http.NewRequest("POST", reqURL, strings.NewReader(c3poJobBody)) + if err != nil { + log.Fatal(err.Error()) + } + + req.Header.Set("Content-Type", "application/json") + + // Schedule a job using the Dapr Jobs API + res, err = client.Do(req) + if err != nil { + log.Fatal(err) + } + defer res.Body.Close() + + fmt.Println("Job Scheduled:", jobName) + + time.Sleep(5 * time.Second) + + // Gets a job using the Dapr Jobs API + jobName = "C-3PO" + reqURL = daprHost + ":" + schedulerDaprHttpPort + "/v1.0-alpha1/jobs/" + jobName + + res, err = http.Get(reqURL) + if err != nil { + log.Fatal(err.Error()) + } + defer res.Body.Close() + + resBody, err := io.ReadAll(res.Body) + if err != nil { + log.Fatal(err.Error()) + + } + + fmt.Println("Job details:", string(resBody)) + + time.Sleep(5 * time.Second) +} diff --git a/jobs/go/http/job-service/go.mod b/jobs/go/http/job-service/go.mod new file mode 100644 index 000000000..f9bf57220 --- /dev/null +++ b/jobs/go/http/job-service/go.mod @@ -0,0 +1,3 @@ +module job-service + +go 1.21 diff --git a/jobs/go/http/job-service/go.sum b/jobs/go/http/job-service/go.sum new file mode 100644 index 000000000..e69de29bb diff --git a/jobs/go/http/job-service/job-service.go b/jobs/go/http/job-service/job-service.go new file mode 100644 index 000000000..0eca44437 --- /dev/null +++ b/jobs/go/http/job-service/job-service.go @@ -0,0 +1,92 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" +) + +type Job struct { + TypeURL string `json:"type_url"` + Value string `json:"value"` +} + +type DroidJob struct { + Droid string `json:"droid"` + Task string `json:"task"` +} + +func main() { + appPort := os.Getenv("APP_PORT") + if appPort == "" { + appPort = "6200" + } + + // Setup job handler + http.HandleFunc("/job/", handleJob) + + fmt.Printf("Server started on port %v\n", appPort) + err := http.ListenAndServe(":"+appPort, nil) + if err != nil { + log.Fatal(err) + } + +} + +func handleJob(w http.ResponseWriter, r *http.Request) { + fmt.Println("Received job request...") + rawBody, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, fmt.Sprintf("error reading request body: %v", err), http.StatusBadRequest) + return + } + + var jobData Job + if err := json.Unmarshal(rawBody, &jobData); err != nil { + http.Error(w, fmt.Sprintf("error decoding JSON: %v", err), http.StatusBadRequest) + return + } + + // Decoding job data + decodedValue, err := base64.RawStdEncoding.DecodeString(jobData.Value) + if err != nil { + fmt.Printf("Error decoding base64: %v", err) + http.Error(w, fmt.Sprintf("error decoding base64: %v", err), http.StatusBadRequest) + return + } + + // Creating Droid Job from decoded value + droidJob := setDroidJob(string(decodedValue)) + + fmt.Println("Starting droid:", droidJob.Droid) + fmt.Println("Executing maintenance job:", droidJob.Task) + + w.WriteHeader(http.StatusOK) +} + +func setDroidJob(decodedValue string) DroidJob { + // Removing new lines from decoded value - Workaround for base64 encoding issue + droidStr := strings.ReplaceAll(decodedValue, "\n", "") + droidArray := strings.Split(droidStr, ":") + + droidJob := DroidJob{Droid: droidArray[0], Task: droidArray[1]} + return droidJob +} diff --git a/jobs/go/http/makefile b/jobs/go/http/makefile new file mode 100644 index 000000000..e7a8826bf --- /dev/null +++ b/jobs/go/http/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file diff --git a/jobs/go/sdk/README.md b/jobs/go/sdk/README.md new file mode 100644 index 000000000..4321e9b77 --- /dev/null +++ b/jobs/go/sdk/README.md @@ -0,0 +1,145 @@ +# Dapr Jobs + +In this quickstart, you'll schedule, get, and delete a job using Dapr's Job API. This API is responsible for scheduling and running jobs at a specific time or interval. + +Visit [this](https://v1-14.docs.dapr.io/developing-applications/building-blocks/jobs/) link for more information about Dapr and the Job API. + +> **Note:** This example leverages the SDK only. If you are looking for the example using the HTTP requests [click here](../http/). + +This quickstart includes two apps: + +- `job-scheduler.go`, responsible for scheduling, retrieving and deleting jobs. +- `job-service.go`, responsible for handling the scheduled jobs. + +## Run the app with the template file + +This section shows how to run both applications at once using [multi-app run template files](https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-overview/) with `dapr run -f .`. This enables to you test the interactions between multiple applications and will `schedule`, `run`, `get`, and `delete` jobs within a single process. + +Open a new terminal window and run the multi app run template: + + + +```bash +dapr run -f . +``` + +The terminal console output should look similar to this, where: + +- The `R2-D2` job is being scheduled. +- The `C-3PO` job is being scheduled. +- The `C-3PO` job is being retrieved. +- The `BB-8` job is being scheduled. +- The `BB-8` job is being retrieved. +- The `BB-8` job is being deleted. +- The `R2-D2` job is being executed after 5 seconds. +- The `R2-D2` job is being executed after 10 seconds. + + +```text +== APP - job-service == dapr client initializing for: 127.0.0.1:6281 +== APP - job-service == Registered job handler for: R2-D2 +== APP - job-service == Registered job handler for: C-3PO +== APP - job-service == Registered job handler for: BB-8 +== APP - job-service == Starting server on port: 6200 +== APP - job-service == Job scheduled: R2-D2 +== APP - job-service == Job scheduled: C-3PO +== APP - job-service == 2024/07/17 18:09:59 job:{name:"C-3PO" due_time:"10s" data:{value:"{\"droid\":\"C-3PO\",\"Task\":\"Memory Wipe\"}"}} +== APP - job-scheduler == Get job response: {"droid":"C-3PO","Task":"Memory Wipe"} +== APP - job-service == Job scheduled: BB-8 +== APP - job-service == 2024/07/17 18:09:59 job:{name:"BB-8" due_time:"15s" data:{value:"{\"droid\":\"BB-8\",\"Task\":\"Internal Gyroscope Check\"}"}} +== APP - job-scheduler == Get job response: {"droid":"BB-8","Task":"Internal Gyroscope Check"} +== APP - job-scheduler == Deleted job: BB-8 +``` + +After 5 seconds, the terminal output should present the `R2-D2` job being processed: + +```text +== APP - job-service == Starting droid: R2-D2 +== APP - job-service == Executing maintenance job: Oil Change +``` + +After 10 seconds, the terminal output should present the `C3-PO` job being processed: + +```text +== APP - job-service == Starting droid: C-3PO +== APP - job-service == Executing maintenance job: Memory Wipe +``` + +2. Stop and clean up application processes + +```bash +dapr stop -f . +``` + + + +## Run the Jobs APIs individually + +### Schedule Jobs + +1. Open a terminal and run the `job-service` app: + +```bash +dapr run --app-id job-service --app-port 6200 --dapr-http-port 6280 --dapr-grpc-port 6281 --app-protocol grpc -- go run . +``` + +The output should be: + +```text +== APP == dapr client initializing for: 127.0.0.1:6281 +== APP == Registered job handler for: R2-D2 +== APP == Registered job handler for: C-3PO +== APP == Registered job handler for: BB-8 +== APP == Starting server on port: 6200 +``` + +2. On a new terminal window, run the `job-scheduler` app: + +```bash +dapr run --app-id job-scheduler --app-port 6300 -- go run . +``` + +The output should be: + +```text +== APP == dapr client initializing for: +== APP == Get job response: {"droid":"C-3PO","Task":"Memory Wipe"} +== APP == Get job response: {"droid":"BB-8","Task":"Internal Gyroscope Check"} +== APP == Job deleted: BB-8 +``` + +Back at the `job-service` app terminal window, the output should be: + +```text +== APP == Job scheduled: R2-D2 +== APP == Job scheduled: C-3PO +== APP == 2024/07/17 18:25:36 job:{name:"C-3PO" due_time:"10s" data:{value:"{\"droid\":\"C-3PO\",\"Task\":\"Memory Wipe\"}"}} +== APP == Job scheduled: BB-8 +== APP == 2024/07/17 18:25:36 job:{name:"BB-8" due_time:"15s" data:{value:"{\"droid\":\"BB-8\",\"Task\":\"Internal Gyroscope Check\"}"}} +== APP == Starting droid: R2-D2 +== APP == Executing maintenance job: Oil Change +== APP == Starting droid: C-3PO +== APP == Executing maintenance job: Memory Wipe +``` diff --git a/jobs/go/sdk/dapr.yaml b/jobs/go/sdk/dapr.yaml new file mode 100644 index 000000000..8ef33902a --- /dev/null +++ b/jobs/go/sdk/dapr.yaml @@ -0,0 +1,13 @@ +version: 1 +apps: + - appDirPath: ./job-service/ + appID: job-service + appPort: 6200 + daprHTTPPort: 6280 + daprGRPCPort: 6281 + appProtocol: grpc + command: ["go", "run", "."] + - appDirPath: ./job-scheduler/ + appID: job-scheduler + appPort: 6300 + command: ["go", "run", "."] \ No newline at end of file diff --git a/jobs/go/sdk/job-scheduler/go.mod b/jobs/go/sdk/job-scheduler/go.mod new file mode 100644 index 000000000..0660c25b8 --- /dev/null +++ b/jobs/go/sdk/job-scheduler/go.mod @@ -0,0 +1,19 @@ +module dapr_job_example + +go 1.22.5 + +require github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164 + +require ( + github.com/dapr/dapr v1.14.0-rc.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/jobs/go/sdk/job-scheduler/go.sum b/jobs/go/sdk/job-scheduler/go.sum new file mode 100644 index 000000000..2d3213c68 --- /dev/null +++ b/jobs/go/sdk/job-scheduler/go.sum @@ -0,0 +1,43 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= +github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164 h1:zFl/d1LAz/dwCApp5HRsRC8eabB0SkAqEAqyB5f4XYA= +github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164/go.mod h1:Xit2/1Go+fYy/TXrpf8oEefluvXtuvmC+nKEd3wJdQE= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jobs/go/sdk/job-scheduler/job-scheduler.go b/jobs/go/sdk/job-scheduler/job-scheduler.go new file mode 100644 index 000000000..19364c04d --- /dev/null +++ b/jobs/go/sdk/job-scheduler/job-scheduler.go @@ -0,0 +1,161 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + daprc "github.com/dapr/go-sdk/client" +) + +type DroidJob struct { + Name string `json:"name"` + Job string `json:"job"` + DueTime string `json:"dueTime"` +} + +type App struct { + daprClient daprc.Client +} + +var app App + +func main() { + // Waiting 5 seconds for the job-service to start + time.Sleep(5 * time.Second) + + droidJobs := []DroidJob{ + {Name: "R2-D2", Job: "Oil Change", DueTime: "5s"}, + {Name: "C-3PO", Job: "Memory Wipe", DueTime: "15s"}, + {Name: "BB-8", Job: "Internal Gyroscope Check", DueTime: "30s"}, + } + + //Create new Dapr client + daprClient, err := daprc.NewClient() + if err != nil { + panic(err) + } + defer daprClient.Close() + + app = App{ + daprClient: daprClient, + } + + // Schedule R2-D2 job + err = schedule(droidJobs[0]) + if err != nil { + log.Fatalln("Error scheduling job: ", err) + } + + time.Sleep(3 * time.Second) + + // Schedule C-3PO job + err = schedule(droidJobs[1]) + if err != nil { + log.Fatalln("Error scheduling job: ", err) + } + + time.Sleep(5 * time.Second) + + // Get C-3PO job + resp, err := get(droidJobs[1]) + if err != nil { + log.Fatalln("Error retrieving job: ", err) + } + fmt.Println("Get job response: ", resp) + + // Schedule BB-8 job + err = schedule(droidJobs[2]) + if err != nil { + log.Fatalln("Error scheduling job: ", err) + } + + time.Sleep(5 * time.Second) + + // Get BB-8 job + resp, err = get(droidJobs[2]) + if err != nil { + log.Fatalln("Error retrieving job: ", err) + } + fmt.Println("Get job response: ", resp) + + time.Sleep(5 * time.Second) + + // Delete BB-8 job + err = delete(droidJobs[2]) + if err != nil { + log.Fatalln("Error deleting job: ", err) + } + fmt.Println("Job deleted: ", droidJobs[2].Name) +} + +// Schedules a job by invoking grpc service from job-service passing a DroidJob as an argument +func schedule(droidJob DroidJob) error { + jobData, err := json.Marshal(droidJob) + if err != nil { + fmt.Println("Error marshalling job content") + return err + } + + content := &daprc.DataContent{ + ContentType: "application/json", + Data: []byte(jobData), + } + + // Schedule Job + _, err = app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "scheduleJob", "POST", content) + if err != nil { + fmt.Println("Error invoking method: ", err) + return err + } + + return nil +} + +// Gets a job by invoking grpc service from job-service passing a job name as an argument +func get(droidJob DroidJob) (string, error) { + content := &daprc.DataContent{ + ContentType: "text/plain", + Data: []byte(droidJob.Name), + } + + //get job + resp, err := app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "getJob", "GET", content) + if err != nil { + fmt.Println("Error invoking method: ", err) + return "", err + } + + return string(resp), nil +} + +// Deletes a job by invoking grpc service from job-service passing a job name as an argument +func delete(droidJob DroidJob) error { + content := &daprc.DataContent{ + ContentType: "text/plain", + Data: []byte(droidJob.Name), + } + + _, err := app.daprClient.InvokeMethodWithContent(context.Background(), "job-service", "deleteJob", "DELETE", content) + if err != nil { + fmt.Println("Error invoking method: ", err) + return err + } + + return nil +} diff --git a/jobs/go/sdk/job-service/go.mod b/jobs/go/sdk/job-service/go.mod new file mode 100644 index 000000000..2245c2f13 --- /dev/null +++ b/jobs/go/sdk/job-service/go.mod @@ -0,0 +1,21 @@ +module dapr_job_example + +go 1.22.5 + +require ( + github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/dapr/dapr v1.14.0-rc.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/grpc v1.65.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/jobs/go/sdk/job-service/go.sum b/jobs/go/sdk/job-service/go.sum new file mode 100644 index 000000000..c5e81fe8a --- /dev/null +++ b/jobs/go/sdk/job-service/go.sum @@ -0,0 +1,45 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= +github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164 h1:zFl/d1LAz/dwCApp5HRsRC8eabB0SkAqEAqyB5f4XYA= +github.com/dapr/go-sdk v1.10.0-rc-1.0.20240722191953-77c213de6164/go.mod h1:Xit2/1Go+fYy/TXrpf8oEefluvXtuvmC+nKEd3wJdQE= +github.com/dapr/go-sdk v1.10.1 h1:g6mM2RXyGkrzsqWFfCy8rw+UAt1edQEgRaQXT+XP4PE= +github.com/dapr/go-sdk v1.10.1/go.mod h1:lPjyF/xubh35fbdNdKkxBbFxFNCmta4zmvsk0JxuUG0= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jobs/go/sdk/job-service/job-service.go b/jobs/go/sdk/job-service/job-service.go new file mode 100644 index 000000000..f056f2236 --- /dev/null +++ b/jobs/go/sdk/job-service/job-service.go @@ -0,0 +1,221 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +/* +dapr run --app-id maintenance-scheduler --app-port 5200 --dapr-http-port 5280 --dapr-grpc-port 5281 --scheduler-host-address=127.0.0.1:50006 -- go run . +*/ + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log" + "os" + + "github.com/dapr/go-sdk/service/common" + "google.golang.org/protobuf/types/known/anypb" + + daprc "github.com/dapr/go-sdk/client" + daprs "github.com/dapr/go-sdk/service/grpc" +) + +type App struct { + daprClient daprc.Client +} + +type DroidJob struct { + Name string `json:"name"` + Job string `json:"job"` + DueTime string `json:"dueTime"` +} + +type JobData struct { + Droid string `json:"droid"` + Task string `json:"Task"` +} + +var jobNames = []string{"R2-D2", "C-3PO", "BB-8"} + +var app App + +func main() { + + appPort := os.Getenv("APP_PORT") + if appPort == "" { + appPort = "6200" + } + + //Create new Dapr client + daprClient, err := daprc.NewClient() + if err != nil { + panic(err) + } + defer daprClient.Close() + + app = App{ + daprClient: daprClient, + } + + // Create a new Dapr service + server, err := daprs.NewService(":" + appPort) + if err != nil { + log.Fatalf("failed to start server: %v", err) + } + + // Creates handlers for the service + if err := server.AddServiceInvocationHandler("scheduleJob", scheduleJob); err != nil { + log.Fatalf("error adding invocation handler: %v", err) + } + + if err := server.AddServiceInvocationHandler("getJob", getJob); err != nil { + log.Fatalf("error adding invocation handler: %v", err) + } + + if err := server.AddServiceInvocationHandler("deleteJob", deleteJob); err != nil { + log.Fatalf("error adding invocation handler: %v", err) + } + + // Register job event handler for all jobs + for _, jobName := range jobNames { + if err := server.AddJobEventHandler(jobName, handleJob); err != nil { + log.Fatalf("failed to register job event handler: %v", err) + } + fmt.Println("Registered job handler for: ", jobName) + } + + fmt.Println("Starting server on port: " + appPort) + if err = server.Start(); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} + +// Handler that schedules a DroidJob +func scheduleJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + + if in == nil { + err = errors.New("no invocation parameter") + return + } + + droidJob := DroidJob{} + err = json.Unmarshal(in.Data, &droidJob) + if err != nil { + fmt.Println("failed to unmarshal job: ", err) + return nil, err + } + + jobData := JobData{ + Droid: droidJob.Name, + Task: droidJob.Job, + } + + content, err := json.Marshal(jobData) + if err != nil { + fmt.Printf("Error marshalling job content") + return nil, err + } + + // schedule job + job := daprc.Job{ + Name: droidJob.Name, + DueTime: droidJob.DueTime, + Data: &anypb.Any{ + Value: content, + }, + } + + err = app.daprClient.ScheduleJobAlpha1(ctx, &job) + if err != nil { + fmt.Println("failed to schedule job. err: ", err) + return nil, err + } + + fmt.Println("Job scheduled: ", droidJob.Name) + + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + + return out, err + +} + +// Handler that gets a job by name +func getJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + + if in == nil { + err = errors.New("no invocation parameter") + return nil, err + } + + job, err := app.daprClient.GetJobAlpha1(ctx, string(in.Data)) + if err != nil { + fmt.Println("failed to get job. err: ", err) + } + + out = &common.Content{ + Data: job.Data.Value, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + + return out, err +} + +// Handler that deletes a job by name +func deleteJob(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + if in == nil { + err = errors.New("no invocation parameter") + return nil, err + } + + err = app.daprClient.DeleteJobAlpha1(ctx, string(in.Data)) + if err != nil { + fmt.Println("failed to delete job. err: ", err) + } + + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + + return out, err +} + +// Handler that handles job events +func handleJob(ctx context.Context, job *common.JobEvent) error { + var jobData common.Job + if err := json.Unmarshal(job.Data, &jobData); err != nil { + return fmt.Errorf("failed to unmarshal job: %v", err) + } + decodedPayload, err := base64.StdEncoding.DecodeString(jobData.Value) + if err != nil { + return fmt.Errorf("failed to decode job payload: %v", err) + } + var jobPayload JobData + if err := json.Unmarshal(decodedPayload, &jobPayload); err != nil { + return fmt.Errorf("failed to unmarshal payload: %v", err) + } + + fmt.Println("Starting droid:", jobPayload.Droid) + fmt.Println("Executing maintenance job:", jobPayload.Task) + + return nil +} diff --git a/jobs/go/sdk/makefile b/jobs/go/sdk/makefile new file mode 100644 index 000000000..e7a8826bf --- /dev/null +++ b/jobs/go/sdk/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file