From 1ca59f1afb4e180cc5f6dc05f9e600e0ef3437c3 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Fri, 17 Jun 2022 22:52:48 +0800 Subject: [PATCH] chore(executor): wait all groutine done. sync wait with goroutine number not job count. --- internal/executor/executor.go | 37 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 2650fbf..1385af1 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -13,36 +13,41 @@ func ExecuteAll(numCPU int, tasks ...TaskFunc) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - wg := sync.WaitGroup{} - wg.Add(len(tasks)) - if numCPU == 0 { numCPU = runtime.NumCPU() } - queue := make(chan TaskFunc, numCPU) + + wg := sync.WaitGroup{} + wg.Add(numCPU) + + queue := make(chan TaskFunc, len(tasks)) + // Add tasks to queue + for _, task := range tasks { + queue <- task + } + close(queue) // Spawn the executer for i := 0; i < numCPU; i++ { go func() { - for task := range queue { - if err == nil { - taskErr := task(ctx) - if taskErr != nil { - err = taskErr + defer wg.Done() + for { + select { + case task, ok := <-queue: + if ctx.Err() != nil || !ok { + return + } + if e := task(ctx); e != nil { + err = e cancel() } + case <-ctx.Done(): + return } - wg.Done() } }() } - // Add tasks to queue - for _, task := range tasks { - queue <- task - } - close(queue) - // wait for all task done wg.Wait() return err