Skip to content

Commit

Permalink
[#15]: feat: reconnects on ack/nack timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Apr 29, 2022
2 parents e1050fd + d09cbe2 commit 4749124
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 107 deletions.
9 changes: 0 additions & 9 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,3 @@ updates:
- "rustatian"
assignees:
- "rustatian"

- package-ecosystem: "docker"
directory: "/"
schedule:
interval: daily
reviewers:
- "rustatian"
assignees:
- "rustatian"
24 changes: 24 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Reason for This PR

`[Author TODO: add issue # or explain reasoning.]`

## Description of Changes

`[Author TODO: add description of changes.]`

## License Acceptance

By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.

## PR Checklist

`[Author TODO: Meet these criteria.]`
`[Reviewer TODO: Verify that these criteria are met. Request changes if not]`

- [ ] All commits in this PR are signed (`git commit -s`).
- [ ] The reason for this PR is clearly provided (issue no. or explanation).
- [ ] The description of changes is clear and encompassing.
- [ ] Any required documentation changes (code and docs) are included in this PR.
- [ ] Any user-facing changes are mentioned in `CHANGELOG.md`.
- [ ] All added/changed functionality is tested.
90 changes: 64 additions & 26 deletions amqpjobs/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ type Consumer struct {

pipeline atomic.Value

// amqp connection
conn *amqp.Connection
notifyConnClose chan *amqp.Error
consumeChan *amqp.Channel
publishChan chan *amqp.Channel
consumeID string
connStr string

retryTimeout time.Duration
//
// prefetch QoS AMQP
//
prefetch int
//
// pipeline's priority
//
// amqp connection notifiers
notifyCloseConnCh chan *amqp.Error
notifyClosePubCh chan *amqp.Error
notifyCloseConsumeCh chan *amqp.Error
notifyCloseStatCh chan *amqp.Error
redialCh chan *amqp.Error

conn *amqp.Connection
consumeChan *amqp.Channel
stateChan chan *amqp.Channel
publishChan chan *amqp.Channel
consumeID string
connStr string

retryTimeout time.Duration
prefetch int
priority int64
exchangeName string
queue string
Expand All @@ -59,6 +59,7 @@ type Consumer struct {
listeners uint32
delayed *int64
stopCh chan struct{}
stopped uint32
}

// NewAMQPConsumer initializes rabbitmq pipeline
Expand Down Expand Up @@ -102,8 +103,15 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer
priority: conf.Priority,
delayed: utils.Int64(0),

publishChan: make(chan *amqp.Channel, 1),
notifyConnClose: make(chan *amqp.Error, 1),
publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),
redialCh: make(chan *amqp.Error, 5),

notifyCloseConsumeCh: make(chan *amqp.Error, 1),
notifyCloseConnCh: make(chan *amqp.Error, 1),
notifyCloseStatCh: make(chan *amqp.Error, 1),
notifyClosePubCh: make(chan *amqp.Error, 1),

routingKey: conf.RoutingKey,
queue: conf.Queue,
durable: conf.Durable,
Expand All @@ -123,8 +131,6 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer

// save address
jb.connStr = conf.Addr
jb.conn.NotifyClose(jb.notifyConnClose)

err = jb.initRabbitMQ()
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -135,10 +141,21 @@ func NewAMQPConsumer(configKey string, log *zap.Logger, cfg cfgPlugin.Configurer
return nil, errors.E(op, err)
}

stch, err := jb.conn.Channel()
if err != nil {
return nil, errors.E(op, err)
}

jb.conn.NotifyClose(jb.notifyCloseConnCh)
pch.NotifyClose(jb.notifyClosePubCh)
stch.NotifyClose(jb.notifyCloseStatCh)

jb.publishChan <- pch
jb.stateChan <- stch

// run redialer and requeue listener for the connection
jb.redialer()
jb.redialMergeCh()

return jb, nil
}
Expand Down Expand Up @@ -171,8 +188,15 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co
retryTimeout: time.Minute,
delayed: utils.Int64(0),

publishChan: make(chan *amqp.Channel, 1),
notifyConnClose: make(chan *amqp.Error, 1),
publishChan: make(chan *amqp.Channel, 1),
stateChan: make(chan *amqp.Channel, 1),

redialCh: make(chan *amqp.Error, 5),
notifyCloseConsumeCh: make(chan *amqp.Error, 1),
notifyCloseConnCh: make(chan *amqp.Error, 1),
notifyCloseStatCh: make(chan *amqp.Error, 1),
notifyClosePubCh: make(chan *amqp.Error, 1),

routingKey: pipeline.String(routingKey, ""),
queue: pipeline.String(queue, "default"),
exchangeType: pipeline.String(exchangeType, "direct"),
Expand All @@ -193,7 +217,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co

// save address
jb.connStr = conf.Addr
jb.conn.NotifyClose(jb.notifyConnClose)

err = jb.initRabbitMQ()
if err != nil {
Expand All @@ -205,14 +228,26 @@ func FromPipeline(pipeline *pipeline.Pipeline, log *zap.Logger, cfg cfgPlugin.Co
return nil, errors.E(op, err)
}

// channel to report amqp states
stch, err := jb.conn.Channel()
if err != nil {
return nil, errors.E(op, err)
}

jb.conn.NotifyClose(jb.notifyCloseConnCh)
pch.NotifyClose(jb.notifyClosePubCh)
stch.NotifyClose(jb.notifyCloseStatCh)

jb.publishChan <- pch
jb.stateChan <- stch

// register the pipeline
// error here is always nil
_ = jb.Register(context.Background(), pipeline)

// run redialer for the connection
jb.redialer()
jb.redialMergeCh()

return jb, nil
}
Expand Down Expand Up @@ -278,6 +313,7 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return errors.E(op, err)
}

c.consumeChan.NotifyClose(c.notifyCloseConsumeCh)
// run listener
c.listener(deliv)

Expand All @@ -289,12 +325,12 @@ func (c *Consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
func (c *Consumer) State(ctx context.Context) (*jobs.State, error) {
const op = errors.Op("amqp_driver_state")
select {
case pch := <-c.publishChan:
case stateCh := <-c.stateChan:
defer func() {
c.publishChan <- pch
c.stateChan <- stateCh
}()

q, err := pch.QueueInspect(c.queue)
q, err := stateCh.QueueInspect(c.queue)
if err != nil {
return nil, errors.E(op, err)
}
Expand Down Expand Up @@ -405,10 +441,12 @@ func (c *Consumer) Resume(_ context.Context, p string) {

func (c *Consumer) Stop(context.Context) error {
start := time.Now()
atomic.StoreUint32(&c.stopped, 1)
c.stopCh <- struct{}{}

pipe := c.pipeline.Load().(*pipeline.Pipeline)
c.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
close(c.redialCh)
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions amqpjobs/listener.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package amqpjobs

import (
"sync/atomic"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
Expand All @@ -12,6 +14,14 @@ func (c *Consumer) listener(deliv <-chan amqp.Delivery) {
case msg, ok := <-deliv:
if !ok {
c.log.Debug("delivery channel was closed, leaving the rabbit listener")
// reduce number of listeners
if atomic.LoadUint32(&c.listeners) == 0 {
c.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&c.listeners)))
return
}

atomic.AddUint32(&c.listeners, ^uint32(0))
c.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&c.listeners)))
return
}

Expand Down
Loading

0 comments on commit 4749124

Please sign in to comment.