diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/example/main.go b/example/main.go index 77ca1b5..6bda764 100644 --- a/example/main.go +++ b/example/main.go @@ -1,18 +1,17 @@ package main import ( - "fmt" "piper" "github.com/streadway/amqp" ) func main() { - connect, err := amqp.Dial("amqp://user:pass@localhost:5672") + connect, err := amqp.Dial("amqp://root:pass@localhost:5672") if err != nil { panic(err) } - c, err := piper.NewReadQueue(connect, "test.exchange", "test", 2, "test") + c, err := piper.NewReadQueue(connect, "test.exchange", "test", 40, "test") if err != nil { panic(err) } @@ -32,14 +31,18 @@ func main() { } }() - c.WithReport("test.report", "test.report") - go c.Run() + rq, err := c.WithReport("test.report", "test.report") if err != nil { panic(err) } - for message := range c.Read() { - fmt.Println(message) - c.Report() <- piper.Report{ + go func() { + err := rq.Run() + if err != nil { + panic(err) + } + }() + for message := range rq.Read() { + rq.Report() <- piper.Report{ Done: &piper.DoneReport{ Status: 1, }, diff --git a/read_queue.go b/read_queue.go index 0ea2d7e..41291b4 100644 --- a/read_queue.go +++ b/read_queue.go @@ -4,9 +4,9 @@ import ( "bytes" "encoding/json" "fmt" - "sync" - "github.com/streadway/amqp" + "piper/wpqueue" + "sync" ) type ReadQueueReport struct { @@ -129,37 +129,35 @@ func (rq *ReadQueue) Run() error { if err != nil { return fmt.Errorf("[Rq][%s][Run]: %s", rq.queue, err) } + wg.Add(1) + pool := wpqueue.NewWorkerPool(rq.routines, rq.queue, deliveries) + go pool.RunWorkerPool() - wg.Add(rq.routines) go func() { - for i := 0; i < rq.routines; i++ { - go func(idx int) { - defer wg.Done() - for { - select { - case delivery, ok := <-deliveries: - if !ok { - fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", rq.queue, idx) - return - } - var message *Message - if err := json.NewDecoder(bytes.NewReader(delivery.Body)).Decode(&message); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, idx, err) - if err := delivery.Ack(false); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err) - continue - } - continue - } - if err := delivery.Ack(false); err != nil { - fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, idx, err) - continue - } - fmt.Println(idx) - rq.read <- *message + defer wg.Done() + for { + select { + case result, ok := <-pool.Results(): + if !ok { + fmt.Printf("[Rq][%s][Run][queue result channel closed]\n", rq.queue) + return + } + var message *Message + if err := json.NewDecoder(bytes.NewReader(result.Delivery.Body)).Decode(&message); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed decode]: %s\n", rq.queue, result.WorkerId, err) + if err := result.Delivery.Ack(false); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err) + continue } + continue + } + if err := result.Delivery.Ack(false); err != nil { + fmt.Printf("[Rq][%s][%d][Run][failed ack]: %s", rq.queue, result.WorkerId, err) + continue } - }(i) + fmt.Println(*message) + rq.read <- *message + } } }() diff --git a/wpqueue/type.go b/wpqueue/type.go new file mode 100644 index 0000000..3f1893d --- /dev/null +++ b/wpqueue/type.go @@ -0,0 +1,15 @@ +package wpqueue + +import "github.com/streadway/amqp" + +type QueueWorkerPool struct { + workersCount int + queue string + deliveries <-chan amqp.Delivery + results chan ResultDelivery +} + +type ResultDelivery struct { + WorkerId int + Delivery amqp.Delivery +} diff --git a/wpqueue/wp.go b/wpqueue/wp.go new file mode 100644 index 0000000..6d39630 --- /dev/null +++ b/wpqueue/wp.go @@ -0,0 +1,47 @@ +package wpqueue + +import ( + "fmt" + "github.com/streadway/amqp" + "sync" +) + +func NewWorkerPool(count int, queue string, deliveries <-chan amqp.Delivery) QueueWorkerPool { + return QueueWorkerPool{ + workersCount: count, + queue: queue, + deliveries: deliveries, + results: make(chan ResultDelivery), + } +} + +func (wp QueueWorkerPool) RunWorkerPool() { + var wg sync.WaitGroup + for i := 0; i < wp.workersCount; i++ { + wg.Add(1) + go wp.workerProcessing(i, &wg) + } + wg.Wait() + close(wp.results) +} + +func (wp QueueWorkerPool) Results() <-chan ResultDelivery { + return wp.results +} + +func (wp QueueWorkerPool) workerProcessing(numWorker int, wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case delivery, ok := <-wp.deliveries: + if !ok { + fmt.Printf("[Rq][%s][%d][Run][channel closed]\n", wp.queue, numWorker) + return + } + wp.results <- ResultDelivery{ + WorkerId: numWorker, + Delivery: delivery, + } + } + } +}