Skip to content

Commit

Permalink
Merge pull request #141 from flow-hydraulics/navid/multiple-workers
Browse files Browse the repository at this point in the history
Multiple workers
  • Loading branch information
latenssi authored Aug 18, 2021
2 parents ff872c0 + f48a36a commit 2c01f42
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 76 deletions.
11 changes: 10 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@ ENABLED_TOKENS=FUSD:0xf8d6e0586b0a20c7:fusd,FlowToken:0x0ae53cb6e3f42a79:flowTok
# This sets the number of proposal keys to be used on the admin account.
# You can increase transaction throughput by using multiple proposal keys for
# parallel transaction execution.
ADMIN_PROPOSAL_KEY_COUNT=50
ADMIN_PROPOSAL_KEY_COUNT=50

# Defines the maximum number of active jobs that can be queued before
# new jobs are rejected.
# WORKER_QUEUE_CAPACITY=1000 (default)

# Number of concurrent workers handling incoming jobs.
# You can increase the number of workers if you're sending
# too many transactions and find that the queue is often backlogged.
# WORKER_COUNT=100 (default)
97 changes: 38 additions & 59 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ import (
)

type WorkerPool struct {
log *log.Logger
wg *sync.WaitGroup
workers []*Worker
store Store
}

type Worker struct {
pool *WorkerPool
jobChan *chan *Job
log *log.Logger
wg *sync.WaitGroup
store Store
jobChan chan *Job
capacity uint
workerCount uint
}

type Job struct {
Expand Down Expand Up @@ -52,19 +49,30 @@ func (j *Job) Wait(wait bool) error {
return nil
}

func NewWorkerPool(l *log.Logger, db Store) *WorkerPool {
return &WorkerPool{l, &sync.WaitGroup{}, []*Worker{}, db}
func NewWorkerPool(l *log.Logger, db Store, capacity uint, workerCount uint) *WorkerPool {
wg := &sync.WaitGroup{}
jobChan := make(chan *Job, capacity)

pool := &WorkerPool{l, wg, db, jobChan, capacity, workerCount}

pool.startWorkers()

return pool
}

func (p *WorkerPool) AddWorker(capacity uint) {
if len(p.workers) > 0 {
panic("multiple workers not supported yet")
func (p *WorkerPool) startWorkers() {
for i := uint(0); i < p.workerCount; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for job := range p.jobChan {
if job == nil {
break
}
p.process(job)
}
}()
}
p.wg.Add(1)
jobChan := make(chan *Job, capacity)
worker := &Worker{p, &jobChan}
p.workers = append(p.workers, worker)
go worker.start()
}

func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) {
Expand All @@ -73,16 +81,7 @@ func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) {
return job, err
}

worker, err := p.AvailableWorker()
if err != nil {
job.Status = NoAvailableWorkers
if err := p.store.UpdateJob(job); err != nil {
p.log.Println("WARNING: Could not update DB entry for Job", job.ID)
}
return job, &errors.JobQueueFull{Err: fmt.Errorf(job.Status.String())}
}

if !worker.tryEnqueue(job) {
if !p.tryEnqueue(job) {
job.Status = QueueFull
if err := p.store.UpdateJob(job); err != nil {
p.log.Println("WARNING: Could not update DB entry for Job", job.ID)
Expand All @@ -98,56 +97,36 @@ func (p *WorkerPool) AddJob(do func() (string, error)) (*Job, error) {
return job, nil
}

func (p *WorkerPool) AvailableWorker() (*Worker, error) {
// TODO: support multiple workers, use load balancing
if len(p.workers) < 1 {
return nil, fmt.Errorf("no available workers")
}
return p.workers[0], nil
}

func (p *WorkerPool) Stop() {
for _, w := range p.workers {
close(*w.jobChan)
}
close(p.jobChan)
p.wg.Wait()
}

func (w *Worker) start() {
defer w.pool.wg.Done()
for job := range *w.jobChan {
if job == nil {
return
}
w.process(job)
}
}

func (w *Worker) tryEnqueue(job *Job) bool {
func (p *WorkerPool) tryEnqueue(job *Job) bool {
select {
case *w.jobChan <- job:
case p.jobChan <- job:
return true
default:
return false
}
}

func (w *Worker) process(job *Job) {
func (p *WorkerPool) process(job *Job) {
result, err := job.Do()
job.Result = result
if err != nil {
if w.pool.log != nil {
w.pool.log.Printf("[Job %s] Error while processing job: %s\n", job.ID, err)
if p.log != nil {
p.log.Printf("[Job %s] Error while processing job: %s\n", job.ID, err)
}
job.Status = Error
job.Error = err.Error()
if err := w.pool.store.UpdateJob(job); err != nil {
w.pool.log.Println("WARNING: Could not update DB entry for Job", job.ID)
if err := p.store.UpdateJob(job); err != nil {
p.log.Println("WARNING: Could not update DB entry for Job", job.ID)
}
return
}
job.Status = Complete
if err := w.pool.store.UpdateJob(job); err != nil {
w.pool.log.Println("WARNING: Could not update DB entry for Job", job.ID)
if err := p.store.UpdateJob(job); err != nil {
p.log.Println("WARNING: Could not update DB entry for Job", job.ID)
}
}
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Config struct {
Port int `env:"PORT" envDefault:"3000"`
AccessAPIHost string `env:"ACCESS_API_HOST,notEmpty"`
ChainID flow.ChainID `env:"CHAIN_ID" envDefault:"flow-emulator"`

WorkerQueueCapacity uint `env:"WORKER_QUEUE_CAPACITY" envDefault:"1000"`
WorkerCount uint `env:"WORKER_COUNT" envDefault:"100"`
}

func main() {
Expand Down Expand Up @@ -119,15 +122,12 @@ func runServer(disableRawTx, disableFt, disableNft, disableChainEvents bool) {
tokenStore := tokens.NewGormStore(db)

// Create a worker pool
wp := jobs.NewWorkerPool(lj, jobStore)
wp := jobs.NewWorkerPool(lj, jobStore, cfg.WorkerQueueCapacity, cfg.WorkerCount)
defer func() {
ls.Println("Stopping worker pool..")
wp.Stop()
}()

// TODO: make this configurable
wp.AddWorker(100) // Add a worker with capacity of 100

// Key manager
km := basic.NewKeyManager(keyStore, fc)

Expand Down
18 changes: 6 additions & 12 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ func TestAccountServices(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

service := accounts.NewService(accountStore, km, fc, wp, nil, templateService)
txService := transactions.NewService(txStore, km, fc, wp)
Expand Down Expand Up @@ -283,9 +282,8 @@ func TestAccountHandlers(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

store := accounts.NewGormStore(db)
service := accounts.NewService(store, km, fc, wp, nil, templateService)
Expand Down Expand Up @@ -434,9 +432,8 @@ func TestTransactionHandlers(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

store := transactions.NewGormStore(db)
service := transactions.NewService(store, km, fc, wp)
Expand Down Expand Up @@ -792,9 +789,8 @@ func TestTokenServices(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

transactionService := transactions.NewService(transactionStore, km, fc, wp)
accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService)
Expand Down Expand Up @@ -984,9 +980,8 @@ func TestTokenHandlers(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

transactionService := transactions.NewService(transactionStore, km, fc, wp)
accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService)
Expand Down Expand Up @@ -1526,9 +1521,8 @@ func TestNFTDeployment(t *testing.T) {

km := basic.NewKeyManager(keyStore, fc)

wp := jobs.NewWorkerPool(nil, jobStore)
wp := jobs.NewWorkerPool(nil, jobStore, 100, 1)
defer wp.Stop()
wp.AddWorker(1)

transactionService := transactions.NewService(transactionStore, km, fc, wp)
accountService := accounts.NewService(accountStore, km, fc, wp, transactionService, templateService)
Expand Down

0 comments on commit 2c01f42

Please sign in to comment.