From dbcacb4b1072884e8487a8e4a8bd752a6681d0b1 Mon Sep 17 00:00:00 2001 From: David Kuryakin Date: Tue, 5 Apr 2022 02:32:42 +0300 Subject: [PATCH 1/2] Make some worker & service consts configurable. --- configs/configs.go | 20 ++++++++++++++++++++ main.go | 9 ++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/configs/configs.go b/configs/configs.go index 10ce450a..331b907d 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -130,6 +130,26 @@ type Config struct { // Max transactions per second, rate at which the service can submit transactions to Flow TransactionMaxSendRate int `env:"MAX_TPS" envDefault:"10"` + + // maxJobErrorCount is the maximum number of times a Job can be tried to + // execute before considering it completely failed. + MaxJobErrorCount int `env:"MAX_JOB_ERROR_COUNT" envDefault:"10"` + + // Poll DB for new schedulable jobs every 30s. + DBJobPollInterval time.Duration `env:"DB_JOB_POLL_INTERVAL" envDefault:"30s"` + + // Grace time period before re-scheduling jobs that are in state INIT or + // ACCEPTED. These are jobs where the executor processing has been + // unexpectedly disrupted (such as bug, dead node, disconnected + // networking etc.). + AcceptedGracePeriod time.Duration `env:"ACCEPTED_GRACE_PERIOD" envDefault:"180s"` + + // Grace time period before re-scheduling jobs that are up for immediate + // restart (such as NO_AVAILABLE_WORKERS or ERROR). + ReSchedulableGracePeriod time.Duration `env:"RESCHEDULABLE_GRACE_PERIOD" envDefault:"60s"` + + // Sleep duration in case of service isHalted + PauseDuration time.Duration `env:"PAUSE_DURATION" envDefault:"60s"` } // Parse parses environment variables and flags to a valid Config. diff --git a/main.go b/main.go index 24333157..0ca7e694 100644 --- a/main.go +++ b/main.go @@ -92,7 +92,10 @@ func runServer(cfg *configs.Config) { } defer gorm.Close(db) - systemService := system.NewService(system.NewGormStore(db)) + systemService := system.NewService( + system.NewGormStore(db), + system.WithPauseDuration(cfg.PauseDuration), + ) // Create a worker pool wp := jobs.NewWorkerPool( @@ -101,6 +104,10 @@ func runServer(cfg *configs.Config) { cfg.WorkerCount, jobs.WithJobStatusWebhook(cfg.JobStatusWebhookUrl, cfg.JobStatusWebhookTimeout), jobs.WithSystemService(systemService), + jobs.WithMaxJobErrorCount(cfg.MaxJobErrorCount), + jobs.WithDbJobPollInterval(cfg.DBJobPollInterval), + jobs.WithAcceptedGracePeriod(cfg.AcceptedGracePeriod), + jobs.WithReSchedulableGracePeriod(cfg.ReSchedulableGracePeriod), ) defer func() { From dc391bd0108bf4052abf1e69bb209a74f925ab9f Mon Sep 17 00:00:00 2001 From: David Kuryakin Date: Fri, 8 Apr 2022 10:38:20 +0300 Subject: [PATCH 2/2] add more logging & pass extra params to env --- chain_events/listener.go | 3 +++ configs/configs.go | 2 ++ jobs/workerpool.go | 3 +++ main.go | 6 +++++- 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/chain_events/listener.go b/chain_events/listener.go index ff7b5d5a..30b44024 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -71,6 +71,8 @@ func NewListener( opt(listener) } + log.Debug(listener) + return listener } @@ -167,6 +169,7 @@ func (l *ListenerImpl) Start() Listener { // Unable to connect to chain, pause system. if l.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) if err := l.systemService.Pause(); err != nil { entry. WithFields(log.Fields{"error": err}). diff --git a/configs/configs.go b/configs/configs.go index 331b907d..cab02950 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -150,6 +150,8 @@ type Config struct { // Sleep duration in case of service isHalted PauseDuration time.Duration `env:"PAUSE_DURATION" envDefault:"60s"` + + GrpcMaxCallRecvMsgSize int `env:"GRPC_MAX_CALL_RECV_MSG_SIZE" envDefault:"16777216"` } // Parse parses environment variables and flags to a valid Config. diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 73776ee8..92d0e370 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -112,6 +112,8 @@ func NewWorkerPool(db Store, capacity uint, workerCount uint, opts ...WorkerPool // Register asynchronous job executor. pool.RegisterExecutor(SendJobStatusJobType, pool.executeSendJobStatus) + pool.logger.Debug(pool) + return pool } @@ -320,6 +322,7 @@ func (wp *WorkerPoolImpl) startWorkers() { if wallet_errors.IsChainConnectionError(err) { if wp.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) // Unable to connect to chain, pause system. if err := wp.systemService.Pause(); err != nil { entry. diff --git a/main.go b/main.go index 0ca7e694..123883ee 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,11 @@ func runServer(cfg *configs.Config) { // Flow client // TODO: WithInsecure()? - fc, err := client.New(cfg.AccessAPIHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + fc, err := client.New( + cfg.AccessAPIHost, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcMaxCallRecvMsgSize)), + ) if err != nil { log.Fatal(err) }