Skip to content

A tiny wrapper over amqp exchanges and queues 🚌 ✨

License

Notifications You must be signed in to change notification settings

stone-payments/rabbus

 
 

Repository files navigation

Rabbus 🚌 ✨

  • A tiny wrapper over amqp exchanges and queues.
  • Automatic retries and exponential backoff for sending messages.
  • Makes use of gobreaker.
  • Automatic reconnect to RabbitMQ broker.
  • Golang channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
  "github.com/rafaeljesus/rabbus"
)

func main() {
  r, err := rabbus.NewRabbus(rabbus.Config{
    Dsn           : "amqp://guest:guest@localhost:5672",
    Durable       : true,
    Retry         : rabbus.Retry {
      Attempts    : 5,
      Sleep       : time.Second * 2,
    },
    CircuitBreaker: rabbus.CircuitBreaker {
      Threshold: 3,
      OnStateChange: func(name, from, to string) {
        // do something when state is changed
      }
    },
  })

  select {
    case r.EmitAsync() <- Message{
      Exchange  : "test_ex",
      Kind      : "topic",
      Key       : "test_key",
      Payload   : []byte(`foo`),
    }
    case r.EmitOk():
     // message was sent
    case r.EmitErr():
     // failed to send message
  }
}

Listen

import (
  "encoding/json"

  "github.com/rafaeljesus/rabbus"
)

func main() {
  r, err := rabbus.NewRabbus(rabbus.Config{
    Dsn           : "amqp://guest:guest@localhost:5672",
    Durable       : true,
    Retry         : rabbus.Retry {
      Attempts    : 3,
      Sleep       : time.Second * 2,
    },
    CircuitBreaker: rabbus.CircuitBreaker {
      Threshold: 3,
      OnStateChange: func(name, from, to string) {
        // do something when state is changed
      }
    },
  })

  messages, err := r.Listen(rabbus.ListenConfig{
    Exchange: "events_ex",
    Kind:     "topic",
    Key:      "events_key",
    Queue:    "events_q",
  })
  if err != nil {
    // handle errors during adding listener
  }

  go func() {
    for m := range messages {
      m.Ack(false)
    }
  }()
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  Β·  Medium @_jesus_rafael  Β·  Twitter @_jesus_rafael

About

A tiny wrapper over amqp exchanges and queues 🚌 ✨

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 92.9%
  • Shell 4.5%
  • Makefile 2.6%