Skip to content

Commit

Permalink
feat: handle pipeline shutdown with select
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Aug 17, 2022
1 parent ced440a commit 057bac9
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions 20-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
Expand All @@ -15,28 +16,41 @@ func main() {
var ch chan int
ch = make(chan int)

var wg sync.WaitGroup

// create 2 process to process data from the channel
for id := 0; id < processRoutines; id++ {
wg.Add(1)
go func(id int) {
// each processor prints the data besides its id
for {
i, ok := <-ch
if !ok {
// if channel is closed, we will close the processor
fmt.Printf("closed the processosr %d\n", id)
wg.Done()
return
}
fmt.Printf("process %d in %d\n", i, id)
}
}(id)
}

shutdown := make(chan int)

// produce data evey 1 second into the channel
go func() {
for {
time.Sleep(1 * time.Second)
fmt.Println("we have a new input")
ch <- rand.Intn(10)
// if we have any data in shutdonw channel then
// close ch or write new data to it.
select {
case <-shutdown:
close(ch)
default:
ch <- rand.Intn(10)
}
}
}()

Expand All @@ -45,8 +59,7 @@ func main() {
signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
<-quit

close(ch)
close(shutdown)

// how we can wait for processors to quit?
time.Sleep(time.Second)
wg.Wait()
}

0 comments on commit 057bac9

Please sign in to comment.