diff --git a/chain_events/listener.go b/chain_events/listener.go index ff7b5d5a..30b44024 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -71,6 +71,8 @@ func NewListener( opt(listener) } + log.Debug(listener) + return listener } @@ -167,6 +169,7 @@ func (l *ListenerImpl) Start() Listener { // Unable to connect to chain, pause system. if l.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) if err := l.systemService.Pause(); err != nil { entry. WithFields(log.Fields{"error": err}). diff --git a/configs/configs.go b/configs/configs.go index 10ce450a..cab02950 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -130,6 +130,28 @@ type Config struct { // Max transactions per second, rate at which the service can submit transactions to Flow TransactionMaxSendRate int `env:"MAX_TPS" envDefault:"10"` + + // maxJobErrorCount is the maximum number of times a Job can be tried to + // execute before considering it completely failed. + MaxJobErrorCount int `env:"MAX_JOB_ERROR_COUNT" envDefault:"10"` + + // Poll DB for new schedulable jobs every 30s. + DBJobPollInterval time.Duration `env:"DB_JOB_POLL_INTERVAL" envDefault:"30s"` + + // Grace time period before re-scheduling jobs that are in state INIT or + // ACCEPTED. These are jobs where the executor processing has been + // unexpectedly disrupted (such as bug, dead node, disconnected + // networking etc.). + AcceptedGracePeriod time.Duration `env:"ACCEPTED_GRACE_PERIOD" envDefault:"180s"` + + // Grace time period before re-scheduling jobs that are up for immediate + // restart (such as NO_AVAILABLE_WORKERS or ERROR). + ReSchedulableGracePeriod time.Duration `env:"RESCHEDULABLE_GRACE_PERIOD" envDefault:"60s"` + + // Sleep duration in case of service isHalted + PauseDuration time.Duration `env:"PAUSE_DURATION" envDefault:"60s"` + + GrpcMaxCallRecvMsgSize int `env:"GRPC_MAX_CALL_RECV_MSG_SIZE" envDefault:"16777216"` } // Parse parses environment variables and flags to a valid Config. diff --git a/custom_account_setup_emulator.cdc b/custom_account_setup_emulator.cdc index c7237951..6e1192a4 100644 --- a/custom_account_setup_emulator.cdc +++ b/custom_account_setup_emulator.cdc @@ -1,29 +1,32 @@ import NonFungibleToken from 0xf8d6e0586b0a20c7 import Electables from 0xf8d6e0586b0a20c7 +import Crypto -transaction(publicKeys: [String], contracts: {String: String}) { +transaction(publicKeys: [Crypto.KeyListEntry], contracts: {String: String}) { prepare(signer: AuthAccount) { - let acct = AuthAccount(payer: signer) + let account = AuthAccount(payer: signer) + // add all the keys to the account for key in publicKeys { - acct.addPublicKey(key.decodeHex()) + account.keys.add(publicKey: key.publicKey, hashAlgorithm: key.hashAlgorithm, weight: key.weight) } + // add contracts if provided for contract in contracts.keys { - acct.contracts.add(name: contract, code: contracts[contract]!.decodeHex()) + account.contracts.add(name: contract, code: contracts[contract]!.decodeHex()) } - if acct.borrow<&Electables.Collection>(from: Electables.CollectionStoragePath) == nil { + if account.borrow<&Electables.Collection>(from: Electables.CollectionStoragePath) == nil { // create a new empty collection let collection <- Electables.createEmptyCollection() // save it to the account - acct.save(<- collection, to: Electables.CollectionStoragePath) + account.save(<- collection, to: Electables.CollectionStoragePath) // Creates a public capability for the collection so that other users can publicly access electable attributes. // The pieces inside of the brackets specify the type of the linked object, and only expose the fields and // functions on those types. - acct.link<&Electables.Collection{NonFungibleToken.CollectionPublic, Electables.ElectablesPublicCollection}>( + account.link<&Electables.Collection{NonFungibleToken.CollectionPublic, Electables.ElectablesPublicCollection}>( Electables.CollectionPublicPath, target: Electables.CollectionStoragePath ) } diff --git a/custom_account_setup_qa.cdc b/custom_account_setup_qa.cdc index c6c96d44..549225bc 100644 --- a/custom_account_setup_qa.cdc +++ b/custom_account_setup_qa.cdc @@ -1,29 +1,32 @@ import NonFungibleToken from 0x631e88ae7f1d7c20 import Electables from 0x4c05c3d3499ca274 +import Crypto -transaction(publicKeys: [String], contracts: {String: String}) { +transaction(publicKeys: [Crypto.KeyListEntry], contracts: {String: String}) { prepare(signer: AuthAccount) { - let acct = AuthAccount(payer: signer) + let account = AuthAccount(payer: signer) + // add all the keys to the account for key in publicKeys { - acct.addPublicKey(key.decodeHex()) + account.keys.add(publicKey: key.publicKey, hashAlgorithm: key.hashAlgorithm, weight: key.weight) } + // add contracts if provided for contract in contracts.keys { - acct.contracts.add(name: contract, code: contracts[contract]!.decodeHex()) + account.contracts.add(name: contract, code: contracts[contract]!.decodeHex()) } - if acct.borrow<&Electables.Collection>(from: Electables.CollectionStoragePath) == nil { + if account.borrow<&Electables.Collection>(from: Electables.CollectionStoragePath) == nil { // create a new empty collection let collection <- Electables.createEmptyCollection() // save it to the account - acct.save(<- collection, to: Electables.CollectionStoragePath) + account.save(<- collection, to: Electables.CollectionStoragePath) // Creates a public capability for the collection so that other users can publicly access electable attributes. // The pieces inside of the brackets specify the type of the linked object, and only expose the fields and // functions on those types. - acct.link<&Electables.Collection{NonFungibleToken.CollectionPublic, Electables.ElectablesPublicCollection}>( + account.link<&Electables.Collection{NonFungibleToken.CollectionPublic, Electables.ElectablesPublicCollection}>( Electables.CollectionPublicPath, target: Electables.CollectionStoragePath ) } diff --git a/flow_helpers/flow_helpers.go b/flow_helpers/flow_helpers.go index c96b36f4..f460b6c8 100644 --- a/flow_helpers/flow_helpers.go +++ b/flow_helpers/flow_helpers.go @@ -32,7 +32,7 @@ const hexPrefix = "0x" // LatestBlockId retuns the flow.Identifier for the latest block in the chain. func LatestBlockId(ctx context.Context, flowClient FlowClient) (*flow.Identifier, error) { - block, err := flowClient.GetLatestBlockHeader(ctx, true) + block, err := flowClient.GetLatestBlockHeader(ctx, false) if err != nil { return nil, err } diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 73776ee8..92d0e370 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -112,6 +112,8 @@ func NewWorkerPool(db Store, capacity uint, workerCount uint, opts ...WorkerPool // Register asynchronous job executor. pool.RegisterExecutor(SendJobStatusJobType, pool.executeSendJobStatus) + pool.logger.Debug(pool) + return pool } @@ -320,6 +322,7 @@ func (wp *WorkerPoolImpl) startWorkers() { if wallet_errors.IsChainConnectionError(err) { if wp.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) // Unable to connect to chain, pause system. if err := wp.systemService.Pause(); err != nil { entry. diff --git a/main.go b/main.go index 24333157..123883ee 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,11 @@ func runServer(cfg *configs.Config) { // Flow client // TODO: WithInsecure()? - fc, err := client.New(cfg.AccessAPIHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + fc, err := client.New( + cfg.AccessAPIHost, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcMaxCallRecvMsgSize)), + ) if err != nil { log.Fatal(err) } @@ -92,7 +96,10 @@ func runServer(cfg *configs.Config) { } defer gorm.Close(db) - systemService := system.NewService(system.NewGormStore(db)) + systemService := system.NewService( + system.NewGormStore(db), + system.WithPauseDuration(cfg.PauseDuration), + ) // Create a worker pool wp := jobs.NewWorkerPool( @@ -101,6 +108,10 @@ func runServer(cfg *configs.Config) { cfg.WorkerCount, jobs.WithJobStatusWebhook(cfg.JobStatusWebhookUrl, cfg.JobStatusWebhookTimeout), jobs.WithSystemService(systemService), + jobs.WithMaxJobErrorCount(cfg.MaxJobErrorCount), + jobs.WithDbJobPollInterval(cfg.DBJobPollInterval), + jobs.WithAcceptedGracePeriod(cfg.AcceptedGracePeriod), + jobs.WithReSchedulableGracePeriod(cfg.ReSchedulableGracePeriod), ) defer func() {