From fef877bc25730c542b0e21464fee91ddbca19886 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 8 Nov 2021 12:53:55 +0200 Subject: [PATCH 1/5] Add job status update via webhook --- README.md | 46 ++++------ accounts/service.go | 2 + configs/configs.go | 2 + jobs/jobs.go | 26 +++--- jobs/jobs_test.go | 180 ++++++++++++++++++++++++++++++++++++++++ jobs/notification.go | 50 +++++++++++ jobs/options.go | 24 ++++++ jobs/workerpool.go | 64 ++++++++++++-- main.go | 8 +- transactions/service.go | 2 + 10 files changed, 354 insertions(+), 50 deletions(-) create mode 100644 jobs/jobs_test.go create mode 100644 jobs/notification.go create mode 100644 jobs/options.go diff --git a/README.md b/README.md index 8ba0ad42..b55ad6e7 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ The Flow Wallet API is a REST HTTP service that allows a developer to integrate wallet functionality into a larger Flow application infrastructure. This service can be used by an application that needs to manage Flow user accounts and the assets inside them. - ## Features - Create new Flow accounts @@ -16,7 +15,6 @@ This service can be used by an application that needs to manage Flow user accoun View full list of functionality in the [API documentation](https://flow-hydraulics.github.io/flow-wallet-api/). - ## Background Some application developers may wish to manage Flow accounts in a fully-custodial fashion, @@ -28,19 +26,16 @@ For security and/or legal reasons, some developers need to use a custody service running on-premises as part of their existing infrastructure, rather than a hosted 3rd-party solution. - ### Example use cases - **FLOW/FUSD Hot Wallet** — an application that allows users to convert fiat currency to FLOW or FUSD. A single admin account would be used as a hot wallet for outgoing payments, and additional deposit accounts would be created to accept incoming payments. - **Exchange** — a cryptocurrency exchange that is listing FLOW and/or FUSD. Similar to the case above, one or more admin accounts may be used as a hot wallet for outgoing payments, and additional deposit accounts would be created to accept incoming payments. - **Web Wallet** — a user-facing wallet application that is compatible with Flow dapps. Each user account would be created and managed by the wallet service. - ## API Specification View the [Wallet API documentation and OpenAPI (Swagger) specification](https://flow-hydraulics.github.io/flow-wallet-api/). - ## Installation The Wallet API is provided as a Docker image: @@ -49,7 +44,6 @@ The Wallet API is provided as a Docker image: docker pull ghcr.io/flow-hydraulics/flow-wallet-api:latest ``` - ### Basic example usage > This setup requires [Docker](https://docs.docker.com/engine/install/) and the [Flow CLI](https://docs.onflow.org/flow-cli/install/). @@ -84,9 +78,13 @@ Once you're finished, run this to stop the containers: docker-compose down ``` - ## Configuration +### Updates on async requests (webhook) + +If you have the possibility to setup a webhook endpoint, you can set `FLOW_WALLET_JOB_STATUS_WEBHOOK` to receive updates on async requests (requests which return a job). The wallet will send a `POST` request to this URL containing the job whenever the status of the job is updated. + +**NOTE:** The wallet expects a response with status code **200** and will retry if unsuccessful. ### Enabled fungible tokens @@ -100,7 +98,6 @@ Examples: FLOW_WALLET_ENABLED_TOKENS=FlowToken:0x0ae53cb6e3f42a79:flowToken,FUSD:0xf8d6e0586b0a20c7:fusd - ### Database | Config variable | Environment variable | Description | Default | Examples | @@ -122,7 +119,6 @@ For more: https://gorm.io/docs/connecting_to_the_database.html To learn more about database schema versioning and migrations, read [MIGRATIONS.md](MIGRATIONS.MD). - ### Google KMS setup **Note**: In order to use Google KMS for remote key management you'll need a Google Cloud Platform account. @@ -159,7 +155,6 @@ Configure Google KMS as the key storage for `flow-wallet-api` and set the necess | LocationID | `FLOW_WALLET_GOOGLE_KMS_LOCATION_ID` | GCP Location ID | - | `europe-north1`, `us-west1` | | KeyRingID | `FLOW_WALLET_GOOGLE_KMS_KEYRING_ID` | GCP Key Ring ID | - | `example-wallet-keyring` | - ### Google KMS for admin account If you want to use a key stored in Google KMS for the admin account, just pass the resource identifier as the private key (`FLOW_WALLET_ADMIN_PRIVATE_KEY`) and set `FLOW_WALLET_ADMIN_KEY_TYPE` to `google_kms`. @@ -185,7 +180,6 @@ Example environment: NOTE: This will mess up the docker-compose setup (emulator won't start) as it uses `FLOW_WALLET_ADMIN_PRIVATE_KEY` as `FLOW_SERVICEPRIVATEKEY`. It will cause an encoding error on the emulator. - ### Google KMS key for database encryption Before configuring a Google KMS key for database encryption please refer to the official guide for setting up a symmetric encryption key; @@ -194,18 +188,16 @@ https://cloud.google.com/kms/docs/encrypt-decrypt#before_you_begin If you want to use an Google KMS symmetric encryption key for encrypting the stored account keys, please refer to the following configuration settings; -| Config variable | Environment variable | Description | Default | Examples value for Google KMS | -| ----------------- | --------------------------------- | ------------------------------------------------------ | ------- | ----------------------------- | -| EncryptionKeyType | `FLOW_WALLET_ENCRYPTION_KEY_TYPE` | Encryption key type | `local` | `google_kms` | -| EncryptionKey | `FLOW_WALLET_ENCRYPTION_KEY` | KMS encryption key resource name | - | `projects/my-project/locations/us-west1/keyRings/my-keyring/cryptoKeys/my-encryption-key` | - +| Config variable | Environment variable | Description | Default | Examples value for Google KMS | +| ----------------- | --------------------------------- | -------------------------------- | ------- | ----------------------------------------------------------------------------------------- | +| EncryptionKeyType | `FLOW_WALLET_ENCRYPTION_KEY_TYPE` | Encryption key type | `local` | `google_kms` | +| EncryptionKey | `FLOW_WALLET_ENCRYPTION_KEY` | KMS encryption key resource name | - | `projects/my-project/locations/us-west1/keyRings/my-keyring/cryptoKeys/my-encryption-key` | ### AWS KMS setup **Note**: In order to use AWS KMS for remote key management you'll need an AWS account. **Note**: Custom key stores are not supported. - #### Pre-requisites: 1. AWS credentials for an account that has access to KMS @@ -223,15 +215,14 @@ Configure AWS KMS as the key storage for `flow-wallet-api` and set the necessary | --------------- | ------------------------------ | ---------------- | ------- | ----------------- | | DefaultKeyType | `FLOW_WALLET_DEFAULT_KEY_TYPE` | Default key type | `local` | `aws_kms` | - ### AWS KMS for admin account If you want to use a key stored in AWS KMS for the admin account, please refer to the following configuration settings; -| Config variable | Environment variable | Description | Default | Example value for AWS KMS | -| --------------- | ------------------------------- | --------------------- | ------- | ------------------------- | -| AdminKeyType | `FLOW_WALLET_ADMIN_KEY_TYPE` | Admin key type | `local` | `aws_kms` | -| AdminPrivateKey | `FLOW_WALLET_ADMIN_PRIVATE_KEY` | Admin private key ARN | - | `arn:aws:kms:eu-central-1:012345678910:key/00000000-aaaa-bbbb-cccc-12345678910` | +| Config variable | Environment variable | Description | Default | Example value for AWS KMS | +| --------------- | ------------------------------- | --------------------- | ------- | ------------------------------------------------------------------------------- | +| AdminKeyType | `FLOW_WALLET_ADMIN_KEY_TYPE` | Admin key type | `local` | `aws_kms` | +| AdminPrivateKey | `FLOW_WALLET_ADMIN_PRIVATE_KEY` | Admin private key ARN | - | `arn:aws:kms:eu-central-1:012345678910:key/00000000-aaaa-bbbb-cccc-12345678910` | When testing make sure to add the key to the admin account. You can convert the AWS public key (e.g. `aws.pem`) you downloaded/copied from AWS with flow-cli; @@ -239,22 +230,19 @@ When testing make sure to add the key to the admin account. You can convert the flow keys decode pem --from-file=aws.pem --sig-algo "ECDSA_secp256k1" ``` - ### AWS KMS for encrypting stored keys If you want to use an AWS KMS symmetric encryption key for encrypting the stored account keys, please refer to the following configuration settings; -| Config variable | Environment variable | Description | Default | Examples value for AWS KMS | -| ----------------- | --------------------------------- | ------------------------------------------------------ | ------- | -------------------------- | -| EncryptionKeyType | `FLOW_WALLET_ENCRYPTION_KEY_TYPE` | Encryption key type | `local` | `aws_kms` | -| EncryptionKey | `FLOW_WALLET_ENCRYPTION_KEY` | KMS encryption key ARN | - | `arn:aws:kms:eu-central-1:012345678910:key/00000000-aaaa-bbbb-cccc-12345678910` | - +| Config variable | Environment variable | Description | Default | Examples value for AWS KMS | +| ----------------- | --------------------------------- | ---------------------- | ------- | ------------------------------------------------------------------------------- | +| EncryptionKeyType | `FLOW_WALLET_ENCRYPTION_KEY_TYPE` | Encryption key type | `local` | `aws_kms` | +| EncryptionKey | `FLOW_WALLET_ENCRYPTION_KEY` | KMS encryption key ARN | - | `arn:aws:kms:eu-central-1:012345678910:key/00000000-aaaa-bbbb-cccc-12345678910` | ### All possible configuration variables Refer to [configs/configs.go](configs/configs.go) for details and documentation. - ## Credit The Flow Wallet API is developed and maintained by [Equilibrium](https://equilibrium.co/), diff --git a/accounts/service.go b/accounts/service.go index ed77c5d8..0f2cf35a 100644 --- a/accounts/service.go +++ b/accounts/service.go @@ -277,6 +277,8 @@ func (s *Service) executeAccountCreateJob(j *jobs.Job) error { return jobs.ErrInvalidJobType } + j.ShouldSendNotification = true + ctx := context.Background() a, txID, err := s.createAccount(ctx) diff --git a/configs/configs.go b/configs/configs.go index 4313a9c8..35f66ef7 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -78,6 +78,8 @@ type Config struct { // You can increase the number of workers if you're sending // too many transactions and find that the queue is often backlogged. WorkerCount uint `env:"FLOW_WALLET_WORKER_COUNT" envDefault:"100"` + // Webhook endpoint to receive job status updates + JobStatusWebook string `env:"FLOW_WALLET_JOB_STATUS_WEBHOOK" envDefault:""` // -- Google KMS -- diff --git a/jobs/jobs.go b/jobs/jobs.go index ad514cf4..1b0d20c2 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -13,8 +13,6 @@ type Result struct { TransactionID string } -type Process func(result *Result) error - // State is a type for Job state. type State string @@ -29,22 +27,23 @@ const ( // Job database model type Job struct { - ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"` - Type string `gorm:"column:type"` - State State `gorm:"column:state;default:INIT"` - Error string `gorm:"column:error"` - Result string `gorm:"column:result"` - TransactionID string `gorm:"column:transaction_id"` - ExecCount int `gorm:"column:exec_count;default:0"` - CreatedAt time.Time `gorm:"column:created_at"` - UpdatedAt time.Time `gorm:"column:updated_at"` - DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"` - Do Process `gorm:"-"` + ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"` + Type string `gorm:"column:type"` + State State `gorm:"column:state;default:INIT"` + Error string `gorm:"column:error"` + Result string `gorm:"column:result"` + TransactionID string `gorm:"column:transaction_id"` + ExecCount int `gorm:"column:exec_count;default:0"` + CreatedAt time.Time `gorm:"column:created_at"` + UpdatedAt time.Time `gorm:"column:updated_at"` + DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;index"` + ShouldSendNotification bool `gorm:"-"` // Whether or not to notify admin (via webhook for example) } // Job HTTP response type JSONResponse struct { ID uuid.UUID `json:"jobId"` + Type string `json:"type"` State State `json:"state"` Error string `json:"error"` Result string `json:"result"` @@ -56,6 +55,7 @@ type JSONResponse struct { func (j Job) ToJSONResponse() JSONResponse { return JSONResponse{ ID: j.ID, + Type: j.Type, State: j.State, Error: j.Error, Result: j.Result, diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go new file mode 100644 index 00000000..09c8bff7 --- /dev/null +++ b/jobs/jobs_test.go @@ -0,0 +1,180 @@ +package jobs + +import ( + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/flow-hydraulics/flow-wallet-api/datastore" + "github.com/google/uuid" +) + +type dummyStore struct{} + +func (*dummyStore) Jobs(datastore.ListOptions) ([]Job, error) { return nil, nil } +func (*dummyStore) Job(id uuid.UUID) (Job, error) { return Job{}, nil } +func (*dummyStore) InsertJob(*Job) error { return nil } +func (*dummyStore) UpdateJob(*Job) error { return nil } +func (*dummyStore) IncreaseExecCount(j *Job) error { return nil } +func (*dummyStore) SchedulableJobs(acceptedGracePeriod, reSchedulableGracePeriod time.Duration, o datastore.ListOptions) ([]Job, error) { + return nil, nil +} + +type dummyWriter struct { + T *testing.T + record []string +} + +func (writer *dummyWriter) Write(p []byte) (n int, err error) { + writer.T.Helper() + writer.record = append(writer.record, string(p)) + return 0, nil +} + +func TestScheduleSendNotification(t *testing.T) { + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } + + WithJobStatusWebHook("http://localhost")(&wp) + + sendNotificationCalled := false + + wp.RegisterExecutor(SendJobStatusJobType, func(j *Job) error { + j.ShouldSendNotification = false + sendNotificationCalled = true + return nil + }) + + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return nil + }) + + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } + + wp.process(job) + + if len(wp.jobChan) == 0 { + t.Fatal("expected job channel to contain a job") + } + + sendNotificationJob := <-wp.jobChan + + if sendNotificationJob.Type != "send_job_status" { + t.Fatalf("expected pool to have a send_job_status job") + } + + wp.process(sendNotificationJob) + + if !sendNotificationCalled { + t.Fatalf("expected 'sendNotificationCalled' to equal true") + } + + if len(writer.record) > 0 { + t.Fatalf("did not expect a warning, got %s", writer.record) + } +} + +func TestExecuteSendNotification(t *testing.T) { + notification := "" + + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bodyBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Fatal(err) + } + notification = string(bodyBytes) + fmt.Fprintf(w, "ok") + })) + defer svr.Close() + + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } + + WithJobStatusWebHook(svr.URL)(&wp) + + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return nil + }) + + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } + + wp.process(job) + wp.process(<-wp.jobChan) + + if notification == "" || !strings.Contains(notification, "TestJobType") { + t.Fatalf("expected webhook endpoint to have received a notification") + } + + if len(writer.record) > 0 { + t.Fatalf("did not expect a warning, got %s", writer.record) + } +} + +func TestExecuteSendNotificationFail(t *testing.T) { + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "test error", http.StatusBadGateway) + })) + defer svr.Close() + + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } + + WithJobStatusWebHook(svr.URL)(&wp) + + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return nil + }) + + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } + + wp.process(job) + sendNotificationJob := <-wp.jobChan + wp.process(sendNotificationJob) + + if len(writer.record) != 1 { + t.Errorf("expected there to be one warning, got %s", writer.record) + } + + if sendNotificationJob.State != Error { + t.Errorf("expected send notification job to be in '%s' state, got '%s'", Error, sendNotificationJob.State) + } +} diff --git a/jobs/notification.go b/jobs/notification.go new file mode 100644 index 00000000..41d97c05 --- /dev/null +++ b/jobs/notification.go @@ -0,0 +1,50 @@ +package jobs + +import ( + "bytes" + "fmt" + "net/http" + "net/url" +) + +const SendJobStatusJobType = "send_job_status" + +type NotificationConfig struct { + jobStatusWebHookUrl *url.URL +} + +func (cfg *NotificationConfig) ShouldSendJobStatus() bool { + return cfg.jobStatusWebHookUrl != nil +} + +func (cfg *NotificationConfig) SendJobStatus(content string) error { + // Handle each job status notification endpoint separately + + if err := cfg.SendJobStatusWebHook(content); err != nil { + return err + } + + // if err := cfg.SendSomeOtherWay(content); err != nil { + // return err + // } + + return nil +} + +func (cfg *NotificationConfig) SendJobStatusWebHook(content string) error { + if cfg.jobStatusWebHookUrl == nil { + // Do nothing as config has no 'jobStatusWebHookUrl' + return nil + } + + resp, err := http.Post(cfg.jobStatusWebHookUrl.String(), "application/json", bytes.NewBuffer([]byte(content))) + if err != nil { + return fmt.Errorf("error while sending to webhook: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("webhook endpoint responded with an unexpected status code: %d", resp.StatusCode) + } + + return nil +} diff --git a/jobs/options.go b/jobs/options.go new file mode 100644 index 00000000..c2349365 --- /dev/null +++ b/jobs/options.go @@ -0,0 +1,24 @@ +package jobs + +import "net/url" + +type WorkerPoolOption func(*WorkerPool) + +func WithJobStatusWebHook(u string) WorkerPoolOption { + return func(wp *WorkerPool) { + if u == "" { + return + } + + valid, err := url.ParseRequestURI(u) + if err != nil { + panic(err) + } + + if wp.notificationConfig == nil { + wp.notificationConfig = &NotificationConfig{} + } + + wp.notificationConfig.jobStatusWebHookUrl = valid + } +} diff --git a/jobs/workerpool.go b/jobs/workerpool.go index fab3d2d4..de9eb634 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -1,6 +1,7 @@ package jobs import ( + "encoding/json" "errors" "fmt" "log" @@ -46,30 +47,42 @@ type WorkerPool struct { workerCount uint dbJobPollInterval time.Duration stopChan chan struct{} + + notificationConfig *NotificationConfig } -func NewWorkerPool(logger *log.Logger, db Store, capacity uint, workerCount uint) *WorkerPool { +func NewWorkerPool(logger *log.Logger, db Store, capacity uint, workerCount uint, opts ...WorkerPoolOption) *WorkerPool { if logger == nil { // Make sure we always have a logger logger = log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) } - wg := &sync.WaitGroup{} - jobChan := make(chan *Job, capacity) + pool := &WorkerPool{ + wg: &sync.WaitGroup{}, + jobChan: make(chan *Job, capacity), + stopChan: make(chan struct{}), executors: make(map[string]ExecutorFunc), logger: logger, - wg: wg, store: db, - jobChan: jobChan, capacity: capacity, workerCount: workerCount, dbJobPollInterval: defaultDBJobPollInterval, - stopChan: make(chan struct{}), + + notificationConfig: &NotificationConfig{}, } pool.startWorkers() pool.startDBJobScheduler() + + // Register asynchronous job executor. + pool.RegisterExecutor(SendJobStatusJobType, pool.executeSendJobStatus) + + // Go through options + for _, opt := range opts { + opt(pool) + } + return pool } @@ -138,7 +151,7 @@ func (wp *WorkerPool) startDBJobScheduler() { o := datastore.ParseListOptions(0, 0) jobs, err := wp.store.SchedulableJobs(acceptedGracePeriod, reSchedulableGracePeriod, o) if err != nil { - wp.logger.Println("WARNING: Could not fetch schedulable jobs from DB: ", err) + wp.logger.Printf("WARNING: Could not fetch schedulable jobs from DB: %s", err) continue } @@ -216,8 +229,45 @@ func (wp *WorkerPool) process(job *Job) { if err := wp.store.UpdateJob(job); err != nil { wp.logger.Printf("WARNING: Could not update DB entry for Job(id: %q, type: %q): %s\n", job.ID, job.Type, err.Error()) } + + if job.ShouldSendNotification && wp.notificationConfig.ShouldSendJobStatus() { + if err := ScheduleJobStatusNotification(wp, job); err != nil { + wp.logger.Printf("WARNING: Could not schedule a status update notification for Job(id: %q, type: %q): %s\n", job.ID, job.Type, err.Error()) + } + } +} + +func (wp *WorkerPool) executeSendJobStatus(j *Job) error { + if j.Type != SendJobStatusJobType { + return ErrInvalidJobType + } + + j.ShouldSendNotification = false + + return wp.notificationConfig.SendJobStatus(j.Result) } func PermanentFailure(err error) error { return fmt.Errorf("%w: %s", ErrPermanentFailure, err.Error()) } + +func ScheduleJobStatusNotification(wp *WorkerPool, parent *Job) error { + job, err := wp.CreateJob(SendJobStatusJobType, "") + if err != nil { + return err + } + + b, err := json.Marshal(parent.ToJSONResponse()) + if err != nil { + return err + } + + // Store the notification content of the parent job in Result of the new job + job.Result = string(b) + + if err := wp.store.UpdateJob(job); err != nil { + return err + } + + return wp.Schedule(job) +} diff --git a/main.go b/main.go index 2860690d..c120feb9 100644 --- a/main.go +++ b/main.go @@ -101,7 +101,13 @@ func runServer(cfg *configs.Config) { tokenStore := tokens.NewGormStore(db) // Create a worker pool - wp := jobs.NewWorkerPool(lj, jobStore, cfg.WorkerQueueCapacity, cfg.WorkerCount) + wp := jobs.NewWorkerPool( + lj, + jobStore, + cfg.WorkerQueueCapacity, + cfg.WorkerCount, + jobs.WithJobStatusWebHook(cfg.JobStatusWebook), + ) defer func() { ls.Println("Stopping worker pool..") wp.Stop() diff --git a/transactions/service.go b/transactions/service.go index 39e71b3f..3088cebf 100644 --- a/transactions/service.go +++ b/transactions/service.go @@ -324,6 +324,8 @@ func (s *Service) executeTransactionJob(j *jobs.Job) error { return jobs.ErrInvalidJobType } + j.ShouldSendNotification = true + ctx := context.Background() tx, err := s.store.Transaction(j.TransactionID) From 5349786a56fcfbc9a4ff3e641ae0d18f06f69317 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 8 Nov 2021 13:31:39 +0200 Subject: [PATCH 2/5] Fix typo --- configs/configs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/configs.go b/configs/configs.go index 35f66ef7..9d500d47 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -79,7 +79,7 @@ type Config struct { // too many transactions and find that the queue is often backlogged. WorkerCount uint `env:"FLOW_WALLET_WORKER_COUNT" envDefault:"100"` // Webhook endpoint to receive job status updates - JobStatusWebook string `env:"FLOW_WALLET_JOB_STATUS_WEBHOOK" envDefault:""` + JobStatusWebhook string `env:"FLOW_WALLET_JOB_STATUS_WEBHOOK" envDefault:""` // -- Google KMS -- From 1e6911773ac627333d98d18198f93541846a21da Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 8 Nov 2021 13:33:36 +0200 Subject: [PATCH 3/5] Webhook not WebHook --- jobs/jobs_test.go | 6 +++--- jobs/notification.go | 14 +++++++------- jobs/options.go | 4 ++-- main.go | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go index 09c8bff7..e77552fa 100644 --- a/jobs/jobs_test.go +++ b/jobs/jobs_test.go @@ -46,7 +46,7 @@ func TestScheduleSendNotification(t *testing.T) { store: &dummyStore{}, } - WithJobStatusWebHook("http://localhost")(&wp) + WithJobStatusWebhook("http://localhost")(&wp) sendNotificationCalled := false @@ -111,7 +111,7 @@ func TestExecuteSendNotification(t *testing.T) { store: &dummyStore{}, } - WithJobStatusWebHook(svr.URL)(&wp) + WithJobStatusWebhook(svr.URL)(&wp) wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) @@ -152,7 +152,7 @@ func TestExecuteSendNotificationFail(t *testing.T) { store: &dummyStore{}, } - WithJobStatusWebHook(svr.URL)(&wp) + WithJobStatusWebhook(svr.URL)(&wp) wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) diff --git a/jobs/notification.go b/jobs/notification.go index 41d97c05..c3173049 100644 --- a/jobs/notification.go +++ b/jobs/notification.go @@ -10,17 +10,17 @@ import ( const SendJobStatusJobType = "send_job_status" type NotificationConfig struct { - jobStatusWebHookUrl *url.URL + jobStatusWebhookUrl *url.URL } func (cfg *NotificationConfig) ShouldSendJobStatus() bool { - return cfg.jobStatusWebHookUrl != nil + return cfg.jobStatusWebhookUrl != nil } func (cfg *NotificationConfig) SendJobStatus(content string) error { // Handle each job status notification endpoint separately - if err := cfg.SendJobStatusWebHook(content); err != nil { + if err := cfg.SendJobStatusWebhook(content); err != nil { return err } @@ -31,13 +31,13 @@ func (cfg *NotificationConfig) SendJobStatus(content string) error { return nil } -func (cfg *NotificationConfig) SendJobStatusWebHook(content string) error { - if cfg.jobStatusWebHookUrl == nil { - // Do nothing as config has no 'jobStatusWebHookUrl' +func (cfg *NotificationConfig) SendJobStatusWebhook(content string) error { + if cfg.jobStatusWebhookUrl == nil { + // Do nothing as config has no 'jobStatusWebhookUrl' return nil } - resp, err := http.Post(cfg.jobStatusWebHookUrl.String(), "application/json", bytes.NewBuffer([]byte(content))) + resp, err := http.Post(cfg.jobStatusWebhookUrl.String(), "application/json", bytes.NewBuffer([]byte(content))) if err != nil { return fmt.Errorf("error while sending to webhook: %w", err) } diff --git a/jobs/options.go b/jobs/options.go index c2349365..44621479 100644 --- a/jobs/options.go +++ b/jobs/options.go @@ -4,7 +4,7 @@ import "net/url" type WorkerPoolOption func(*WorkerPool) -func WithJobStatusWebHook(u string) WorkerPoolOption { +func WithJobStatusWebhook(u string) WorkerPoolOption { return func(wp *WorkerPool) { if u == "" { return @@ -19,6 +19,6 @@ func WithJobStatusWebHook(u string) WorkerPoolOption { wp.notificationConfig = &NotificationConfig{} } - wp.notificationConfig.jobStatusWebHookUrl = valid + wp.notificationConfig.jobStatusWebhookUrl = valid } } diff --git a/main.go b/main.go index c120feb9..c90ea575 100644 --- a/main.go +++ b/main.go @@ -106,7 +106,7 @@ func runServer(cfg *configs.Config) { jobStore, cfg.WorkerQueueCapacity, cfg.WorkerCount, - jobs.WithJobStatusWebHook(cfg.JobStatusWebook), + jobs.WithJobStatusWebhook(cfg.JobStatusWebhook), ) defer func() { ls.Println("Stopping worker pool..") From e1428f017924bbc802e561ed540c3c8611f8e767 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 8 Nov 2021 14:04:57 +0200 Subject: [PATCH 4/5] Only send notification when job is failed or completed --- jobs/jobs_test.go | 212 +++++++++++++++++++++++++++++++-------------- jobs/workerpool.go | 2 +- 2 files changed, 147 insertions(+), 67 deletions(-) diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go index e77552fa..946ef68a 100644 --- a/jobs/jobs_test.go +++ b/jobs/jobs_test.go @@ -1,12 +1,11 @@ package jobs import ( + "encoding/json" "fmt" - "io/ioutil" "log" "net/http" "net/http/httptest" - "strings" "testing" "time" @@ -90,91 +89,172 @@ func TestScheduleSendNotification(t *testing.T) { } func TestExecuteSendNotification(t *testing.T) { - notification := "" + t.Run("valid job should send", func(t *testing.T) { + var webhookJob Job + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := json.NewDecoder(r.Body).Decode(&webhookJob); err != nil { + t.Fatal(err) + } + })) + defer svr.Close() + + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } + + WithJobStatusWebhook(svr.URL)(&wp) + + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - bodyBytes, err := ioutil.ReadAll(r.Body) + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return nil + }) + + job, err := wp.CreateJob("TestJobType", "") if err != nil { - log.Fatal(err) + t.Fatal(err) } - notification = string(bodyBytes) - fmt.Fprintf(w, "ok") - })) - defer svr.Close() - - writer := &dummyWriter{T: t} - wp := WorkerPool{ - executors: make(map[string]ExecutorFunc), - jobChan: make(chan *Job, 1), - logger: log.New(writer, "", 0), - store: &dummyStore{}, - } + wp.process(job) + wp.process(<-wp.jobChan) - WithJobStatusWebhook(svr.URL)(&wp) + if webhookJob.Type != "TestJobType" { + t.Fatalf("expected webhook endpoint to have received a notification") + } - wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + if webhookJob.State != Complete { + t.Fatalf("expected job to be in state '%s' got '%s'", Complete, webhookJob.State) + } - wp.RegisterExecutor("TestJobType", func(j *Job) error { - j.ShouldSendNotification = true - return nil + if len(writer.record) > 0 { + t.Fatalf("did not expect a warning, got %s", writer.record) + } }) - job, err := wp.CreateJob("TestJobType", "") - if err != nil { - t.Fatal(err) - } + t.Run("failed job should send", func(t *testing.T) { + var webhookJob Job + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if err := json.NewDecoder(r.Body).Decode(&webhookJob); err != nil { + t.Fatal(err) + } + })) + defer svr.Close() + + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } - wp.process(job) - wp.process(<-wp.jobChan) + WithJobStatusWebhook(svr.URL)(&wp) - if notification == "" || !strings.Contains(notification, "TestJobType") { - t.Fatalf("expected webhook endpoint to have received a notification") - } + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) - if len(writer.record) > 0 { - t.Fatalf("did not expect a warning, got %s", writer.record) - } -} + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return ErrPermanentFailure + }) -func TestExecuteSendNotificationFail(t *testing.T) { - svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "test error", http.StatusBadGateway) - })) - defer svr.Close() + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } - writer := &dummyWriter{T: t} + wp.process(job) + wp.process(<-wp.jobChan) - wp := WorkerPool{ - executors: make(map[string]ExecutorFunc), - jobChan: make(chan *Job, 1), - logger: log.New(writer, "", 0), - store: &dummyStore{}, - } + if webhookJob.Type != "TestJobType" { + t.Fatalf("expected webhook endpoint to have received a notification") + } - WithJobStatusWebhook(svr.URL)(&wp) + if webhookJob.State != Failed { + t.Fatalf("expected job to be in state '%s' got '%s'", Failed, webhookJob.State) + } - wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + if len(writer.record) == 0 { + t.Fatalf("expected a warning, got %s", writer.record) + } + }) - wp.RegisterExecutor("TestJobType", func(j *Job) error { - j.ShouldSendNotification = true - return nil + t.Run("erroring job should not send", func(t *testing.T) { + writer := &dummyWriter{T: t} + + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } + + WithJobStatusWebhook("http://localhost")(&wp) + + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return fmt.Errorf("test error") + }) + + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } + + wp.process(job) + + if len(wp.jobChan) != 0 { + t.Errorf("did not expect a job to be queued") + } }) - job, err := wp.CreateJob("TestJobType", "") - if err != nil { - t.Fatal(err) - } + t.Run("valid job should send but get an endpoint error and retry send", func(t *testing.T) { + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "test error", http.StatusBadGateway) + })) + defer svr.Close() - wp.process(job) - sendNotificationJob := <-wp.jobChan - wp.process(sendNotificationJob) + writer := &dummyWriter{T: t} - if len(writer.record) != 1 { - t.Errorf("expected there to be one warning, got %s", writer.record) - } + wp := WorkerPool{ + executors: make(map[string]ExecutorFunc), + jobChan: make(chan *Job, 1), + logger: log.New(writer, "", 0), + store: &dummyStore{}, + } - if sendNotificationJob.State != Error { - t.Errorf("expected send notification job to be in '%s' state, got '%s'", Error, sendNotificationJob.State) - } + WithJobStatusWebhook(svr.URL)(&wp) + + wp.RegisterExecutor(SendJobStatusJobType, wp.executeSendJobStatus) + + wp.RegisterExecutor("TestJobType", func(j *Job) error { + j.ShouldSendNotification = true + return nil + }) + + job, err := wp.CreateJob("TestJobType", "") + if err != nil { + t.Fatal(err) + } + + wp.process(job) + sendNotificationJob := <-wp.jobChan + wp.process(sendNotificationJob) + + if len(writer.record) != 1 { + t.Errorf("expected there to be one warning, got %s", writer.record) + } + + if sendNotificationJob.State != Error { + t.Errorf("expected send notification job to be in '%s' state, got '%s'", Error, sendNotificationJob.State) + } + }) } diff --git a/jobs/workerpool.go b/jobs/workerpool.go index de9eb634..3c922d2e 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -230,7 +230,7 @@ func (wp *WorkerPool) process(job *Job) { wp.logger.Printf("WARNING: Could not update DB entry for Job(id: %q, type: %q): %s\n", job.ID, job.Type, err.Error()) } - if job.ShouldSendNotification && wp.notificationConfig.ShouldSendJobStatus() { + if (job.State == Failed || job.State == Complete) && job.ShouldSendNotification && wp.notificationConfig.ShouldSendJobStatus() { if err := ScheduleJobStatusNotification(wp, job); err != nil { wp.logger.Printf("WARNING: Could not schedule a status update notification for Job(id: %q, type: %q): %s\n", job.ID, job.Type, err.Error()) } From 8f638dbd198c88d08775e58ead261adfdf4a5429 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 8 Nov 2021 14:55:11 +0200 Subject: [PATCH 5/5] More helpful panic message [skip ci] --- jobs/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/options.go b/jobs/options.go index 44621479..2433e296 100644 --- a/jobs/options.go +++ b/jobs/options.go @@ -12,7 +12,7 @@ func WithJobStatusWebhook(u string) WorkerPoolOption { valid, err := url.ParseRequestURI(u) if err != nil { - panic(err) + panic("invalid job status webhook url") } if wp.notificationConfig == nil {