From d017940a87f36b534358e20bd9bb8f994b198849 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 29 Apr 2022 13:40:52 +0200 Subject: [PATCH 1/2] reconnects in ack timeouts Signed-off-by: Valery Piashchynski --- .github/dependabot.yml | 9 - .github/pull_request_template.md | 24 +++ amqpjobs/consumer.go | 90 +++++++--- amqpjobs/listener.go | 10 ++ amqpjobs/redial.go | 277 +++++++++++++++++++++++-------- 5 files changed, 303 insertions(+), 107 deletions(-) create mode 100644 .github/pull_request_template.md diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 6271660..8cf3909 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -22,12 +22,3 @@ updates: - "rustatian" assignees: - "rustatian" - - - package-ecosystem: "docker" - directory: "/" - schedule: - interval: daily - reviewers: - - "rustatian" - assignees: - - "rustatian" diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..c346785 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,24 @@ +# Reason for This PR + +`[Author TODO: add issue # or explain reasoning.]` + +## Description of Changes + +`[Author TODO: add description of changes.]` + +## License Acceptance + +By submitting this pull request, I confirm that my contribution is made under +the terms of the MIT license. + +## PR Checklist + +`[Author TODO: Meet these criteria.]` +`[Reviewer TODO: Verify that these criteria are met. Request changes if not]` + +- [ ] All commits in this PR are signed (`git commit -s`). +- [ ] The reason for this PR is clearly provided (issue no. or explanation). +- [ ] The description of changes is clear and encompassing. +- [ ] Any required documentation changes (code and docs) are included in this PR. +- [ ] Any user-facing changes are mentioned in `CHANGELOG.md`. +- [ ] All added/changed functionality is tested. diff --git a/amqpjobs/consumer.go b/amqpjobs/consumer.go index c0b7843..f5bd16c 100644 --- a/amqpjobs/consumer.go +++ b/amqpjobs/consumer.go @@ -29,22 +29,22 @@ type Consumer struct { pipeline atomic.Value - // amqp connection - conn *amqp.Connection - notifyConnClose chan *amqp.Error - consumeChan *amqp.Channel - publishChan chan *amqp.Channel - consumeID string - connStr string - - retryTimeout time.Duration - // - // prefetch QoS AMQP - // - prefetch int - // - // pipeline's priority - // + // amqp connection notifiers + notifyCloseConnCh chan *amqp.Error + notifyClosePubCh chan *amqp.Error + notifyCloseConsumeCh chan *amqp.Error + notifyCloseStatCh chan *amqp.Error + redialCh chan *amqp.Error + + conn *amqp.Connection + consumeChan *amqp.Channel + stateChan chan *amqp.Channel + publishChan chan *amqp.Channel + consumeID string + connStr string + + retryTimeout time.Duration + prefetch int priority int64 exchangeName string queue string @@ -59,6 +59,7 @@ type Consumer struct { listeners uint32 delayed *int64 stopCh chan struct{} + stopped uint32 } // NewAMQPConsumer initializes rabbitmq pipeline @@ -102,8 +103,15 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer priority: conf.Priority, delayed: utils.Int64(0), - publishChan: make(chan *amqp.Channel, 1), - notifyConnClose: make(chan *amqp.Error, 1), + publishChan: make(chan *amqp.Channel, 1), + stateChan: make(chan *amqp.Channel, 1), + redialCh: make(chan *amqp.Error, 5), + + notifyCloseConsumeCh: make(chan *amqp.Error, 1), + notifyCloseConnCh: make(chan *amqp.Error, 1), + notifyCloseStatCh: make(chan *amqp.Error, 1), + notifyClosePubCh: make(chan *amqp.Error, 1), + routingKey: conf.RoutingKey, queue: conf.Queue, durable: conf.Durable, @@ -123,8 +131,6 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer // save address jb.connStr = conf.Addr - jb.conn.NotifyClose(jb.notifyConnClose) - err = jb.initRabbitMQ() if err != nil { return nil, errors.E(op, err) @@ -135,10 +141,21 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer return nil, errors.E(op, err) } + stch, err := jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + jb.conn.NotifyClose(jb.notifyCloseConnCh) + pch.NotifyClose(jb.notifyClosePubCh) + stch.NotifyClose(jb.notifyCloseStatCh) + jb.publishChan <- pch + jb.stateChan <- stch // run redialer and requeue listener for the connection jb.redialer() + jb.redialMergeCh() return jb, nil } @@ -171,8 +188,15 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co retryTimeout: time.Minute, delayed: utils.Int64(0), - publishChan: make(chan *amqp.Channel, 1), - notifyConnClose: make(chan *amqp.Error, 1), + publishChan: make(chan *amqp.Channel, 1), + stateChan: make(chan *amqp.Channel, 1), + + redialCh: make(chan *amqp.Error, 5), + notifyCloseConsumeCh: make(chan *amqp.Error, 1), + notifyCloseConnCh: make(chan *amqp.Error, 1), + notifyCloseStatCh: make(chan *amqp.Error, 1), + notifyClosePubCh: make(chan *amqp.Error, 1), + routingKey: pipeline.String(routingKey, ""), queue: pipeline.String(queue, "default"), exchangeType: pipeline.String(exchangeType, "direct"), @@ -193,7 +217,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co // save address jb.connStr = conf.Addr - jb.conn.NotifyClose(jb.notifyConnClose) err = jb.initRabbitMQ() if err != nil { @@ -205,7 +228,18 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co return nil, errors.E(op, err) } + // channel to report amqp states + stch, err := jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + jb.conn.NotifyClose(jb.notifyCloseConnCh) + pch.NotifyClose(jb.notifyClosePubCh) + stch.NotifyClose(jb.notifyCloseStatCh) + jb.publishChan <- pch + jb.stateChan <- stch // register the pipeline // error here is always nil @@ -213,6 +247,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co // run redialer for the connection jb.redialer() + jb.redialMergeCh() return jb, nil } @@ -278,6 +313,7 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return errors.E(op, err) } + c.consumeChan.NotifyClose(c.notifyCloseConsumeCh) // run listener c.listener(deliv) @@ -289,12 +325,12 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error { func (c *Consumer) State(ctx context.Context) (*jobs.State, error) { const op = errors.Op("amqp_driver_state") select { - case pch := <-c.publishChan: + case stateCh := <-c.stateChan: defer func() { - c.publishChan <- pch + c.stateChan <- stateCh }() - q, err := pch.QueueInspect(c.queue) + q, err := stateCh.QueueInspect(c.queue) if err != nil { return nil, errors.E(op, err) } @@ -405,10 +441,12 @@ func (c *Consumer) Resume(_ context.Context, p string) { func (c *Consumer) Stop(context.Context) error { start := time.Now() + atomic.StoreUint32(&c.stopped, 1) c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) c.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start))) + close(c.redialCh) return nil } diff --git a/amqpjobs/listener.go b/amqpjobs/listener.go index 017659a..a979c59 100644 --- a/amqpjobs/listener.go +++ b/amqpjobs/listener.go @@ -1,6 +1,8 @@ package amqpjobs import ( + "sync/atomic" + amqp "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" ) @@ -12,6 +14,14 @@ func (c *Consumer) listener(deliv <-chan amqp.Delivery) { case msg, ok := <-deliv: if !ok { c.log.Debug("delivery channel was closed, leaving the rabbit listener") + // reduce number of listeners + if atomic.LoadUint32(&c.listeners) == 0 { + c.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&c.listeners))) + return + } + + atomic.AddUint32(&c.listeners, ^uint32(0)) + c.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&c.listeners))) return } diff --git a/amqpjobs/redial.go b/amqpjobs/redial.go index ee8c157..81ccd50 100644 --- a/amqpjobs/redial.go +++ b/amqpjobs/redial.go @@ -1,6 +1,7 @@ package amqpjobs import ( + "sync/atomic" "time" "github.com/cenkalti/backoff/v4" @@ -13,94 +14,81 @@ import ( // redialer used to redial to the rabbitmq in case of the connection interrupts func (c *Consumer) redialer() { //nolint:gocognit,gocyclo go func() { - const op = errors.Op("rabbitmq_redial") - for { select { - case err := <-c.notifyConnClose: + case err := <-c.notifyCloseConnCh: if err == nil { return } - c.Lock() - - // trash the broken publishing channel - <-c.publishChan - - t := time.Now().UTC() - pipe := c.pipeline.Load().(*pipeline.Pipeline) - - c.log.Error("pipeline connection was closed, redialing", zap.Error(err), zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t)) - - expb := backoff.NewExponentialBackOff() - // set the retry timeout (minutes) - expb.MaxElapsedTime = c.retryTimeout - operation := func() error { - c.log.Warn("reconnecting", zap.Error(err)) - var dialErr error - c.conn, dialErr = amqp.Dial(c.connStr) - if dialErr != nil { - return errors.E(op, dialErr) - } - - c.log.Info("rabbitmq dial was succeed. trying to redeclare queues and subscribers") + // stopped + if atomic.LoadUint32(&c.stopped) == 1 { + c.log.Debug("redialer stopped") + continue + } - // re-init connection - errInit := c.initRabbitMQ() - if errInit != nil { - c.log.Error("rabbitmq dial", zap.Error(errInit)) - return errInit - } + select { + case c.redialCh <- err: + return + default: + return + } - // redeclare consume channel - var errConnCh error - c.consumeChan, errConnCh = c.conn.Channel() - if errConnCh != nil { - return errors.E(op, errConnCh) - } + case err := <-c.notifyCloseConsumeCh: + if err == nil { + return + } - // redeclare publish channel - pch, errPubCh := c.conn.Channel() - if errPubCh != nil { - return errors.E(op, errPubCh) - } + // stopped + if atomic.LoadUint32(&c.stopped) == 1 { + c.log.Debug("redialer stopped") + continue + } - // start reading messages from the channel - deliv, err := c.consumeChan.Consume( - c.queue, - c.consumeID, - false, - false, - false, - false, - nil, - ) - if err != nil { - return errors.E(op, err) - } + select { + case c.redialCh <- err: + return + default: + return + } + case err := <-c.notifyClosePubCh: + if err == nil { + return + } - // put the fresh publishing channel - c.publishChan <- pch - // restart listener - c.listener(deliv) + // stopped + if atomic.LoadUint32(&c.stopped) == 1 { + c.log.Debug("redialer stopped") + continue + } - c.log.Info("queues and subscribers was redeclared successfully") + select { + case c.redialCh <- err: + return + default: + return + } + case err := <-c.notifyCloseStatCh: + if err == nil { + return + } - return nil + // stopped + if atomic.LoadUint32(&c.stopped) == 1 { + c.log.Debug("redialer stopped") + continue } - retryErr := backoff.Retry(operation, expb) - if retryErr != nil { - c.Unlock() - c.log.Error("backoff operation failed", zap.Error(retryErr)) + select { + case c.redialCh <- err: + return + default: return } - c.log.Info("connection was successfully restored", zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t), zap.Duration("elapsed", time.Since(t))) - c.Unlock() - case <-c.stopCh: pch := <-c.publishChan + stCh := <-c.stateChan if c.deleteQueueOnStop { msg, err := pch.QueueDelete(c.queue, false, false, false) @@ -114,6 +102,10 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo if err != nil { c.log.Error("publish channel close", zap.Error(err)) } + err = stCh.Close() + if err != nil { + c.log.Error("state channel close", zap.Error(err)) + } if c.consumeChan != nil { err = c.consumeChan.Close() @@ -122,14 +114,155 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo } } - err = c.conn.Close() - if err != nil { - c.log.Error("amqp connection closed", zap.Error(err)) + if c.conn != nil { + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection closed", zap.Error(err)) + } } - close(c.publishChan) return } } }() } + +func (c *Consumer) reset() { + pch := <-c.publishChan + stCh := <-c.stateChan + + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", zap.Error(err)) + } + err = stCh.Close() + if err != nil { + c.log.Error("state channel close", zap.Error(err)) + } + + if c.consumeChan != nil { + err = c.consumeChan.Close() + if err != nil { + c.log.Error("consume channel close", zap.Error(err)) + } + } + + if c.conn != nil { + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection closed", zap.Error(err)) + } + } +} + +func (c *Consumer) redialMergeCh() { + go func() { + for err := range c.redialCh { + c.Lock() + c.redial(err) + c.Unlock() + } + }() +} + +func (c *Consumer) redial(amqpErr *amqp.Error) { + const op = errors.Op("rabbitmq_redial") + // trash the broken publishing channel + c.reset() + + t := time.Now().UTC() + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + c.log.Error("pipeline connection was closed, redialing", zap.Error(amqpErr), zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t)) + + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = c.retryTimeout + operation := func() error { + var err error + c.conn, err = amqp.Dial(c.connStr) + if err != nil { + return errors.E(op, err) + } + + c.log.Info("rabbitmq dial was succeed. trying to redeclare queues and subscribers") + + // re-init connection + err = c.initRabbitMQ() + if err != nil { + c.log.Error("rabbitmq dial", zap.Error(err)) + return errors.E(op, err) + } + + // redeclare consume channel + c.consumeChan, err = c.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = c.consumeChan.Qos(c.prefetch, 0, false) + if err != nil { + c.log.Error("QOS", zap.Error(err)) + return errors.E(op, err) + } + + // redeclare publish channel + pch, err := c.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + sch, err := c.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + c.notifyClosePubCh = make(chan *amqp.Error, 1) + c.notifyCloseStatCh = make(chan *amqp.Error, 1) + c.notifyCloseConnCh = make(chan *amqp.Error, 1) + c.notifyCloseConsumeCh = make(chan *amqp.Error, 1) + + c.conn.NotifyClose(c.notifyCloseConnCh) + c.consumeChan.NotifyClose(c.notifyCloseConsumeCh) + pch.NotifyClose(c.notifyClosePubCh) + sch.NotifyClose(c.notifyCloseStatCh) + + // put the fresh channels + c.stateChan <- sch + c.publishChan <- pch + + // restart listener + atomic.StoreUint32(&c.listeners, 1) + c.listener(deliv) + + c.log.Info("queues and subscribers was redeclared successfully") + + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + c.log.Error("backoff operation failed", zap.Error(retryErr)) + return + } + + c.log.Info("connection was successfully restored", zap.String("pipeline", pipe.Name()), zap.String("driver", pipe.Driver()), zap.Time("start", t), zap.Duration("elapsed", time.Since(t))) + + // restart redialer + c.redialer() + c.log.Info("redialer restarted") +} From d09cbe2ba23d947f5f2826efa02b917ca57424de Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 29 Apr 2022 13:48:50 +0200 Subject: [PATCH 2/2] add more debug logs Signed-off-by: Valery Piashchynski --- amqpjobs/redial.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/amqpjobs/redial.go b/amqpjobs/redial.go index 81ccd50..233c808 100644 --- a/amqpjobs/redial.go +++ b/amqpjobs/redial.go @@ -18,6 +18,7 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo select { case err := <-c.notifyCloseConnCh: if err == nil { + c.log.Debug("exited from redialer") return } @@ -29,13 +30,16 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo select { case c.redialCh <- err: + c.log.Debug("exited from redialer") return default: + c.log.Debug("exited from redialer") return } case err := <-c.notifyCloseConsumeCh: if err == nil { + c.log.Debug("exited from redialer") return } @@ -47,12 +51,15 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo select { case c.redialCh <- err: + c.log.Debug("exited from redialer") return default: + c.log.Debug("exited from redialer") return } case err := <-c.notifyClosePubCh: if err == nil { + c.log.Debug("exited from redialer") return } @@ -64,12 +71,15 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo select { case c.redialCh <- err: + c.log.Debug("exited from redialer") return default: + c.log.Debug("exited from redialer") return } case err := <-c.notifyCloseStatCh: if err == nil { + c.log.Debug("redialer stopped") return } @@ -81,12 +91,16 @@ func (c *Consumer) redialer() { //nolint:gocognit,gocyclo select { case c.redialCh <- err: + c.log.Debug("redialer stopped") return default: + c.log.Debug("redialer stopped") return } case <-c.stopCh: + c.log.Debug("starting stop routine") + pch := <-c.publishChan stCh := <-c.stateChan