Skip to content

Commit

Permalink
Add support for new barong events (#40)
Browse files Browse the repository at this point in the history
* Upgrade golang to v1.13

* Ability to bind signer to exchange (closes #43)

EventAPI providers can use multiple exchanges

* Changed yaml configuration of exchanges. (Breaking!)
* Added new events to default configuration.
  • Loading branch information
shal authored Sep 23, 2019
1 parent 4b4991d commit 642e37c
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: default

steps:
- name: Go test
image: golang:1.11
image: golang:1.13
environment:
GO111MODULE: on
commands:
Expand All @@ -14,7 +14,7 @@ steps:
- push

- name: Bump version
image: golang:1.11
image: golang:1.13
environment:
GITHUB_API_KEY:
from_secret: kite_bot_key
Expand Down
50 changes: 43 additions & 7 deletions config/postmaster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,29 @@ languages:
- code: RU
name: Russian

exchanges:
barong: barong.events.system
peatio: peatio.events.model

keychain:
barong:
algorithm: RS256
value: "changeme"

peatio:
algorithm: RS256
value: "changeme"

exchanges:
barong_system:
name: barong.events.system
signer: barong
barong_model:
name: barong.events.model
signer: barong
peatio:
name: peatio.events.model
signer: peatio

events:
- name: Email Confirmation
key: user.email.confirmation.token
exchange: barong
exchange: barong_system
templates:
EN:
subject: Registration Confirmation
Expand All @@ -31,7 +37,7 @@ events:

- name: Password Reset
key: user.password.reset.token
exchange: barong
exchange: barong_system
templates:
EN:
subject: Password Reset
Expand All @@ -40,6 +46,28 @@ events:
subject: Сброс Пароля
template_path: templates/ru/password_reset.tpl

- name: Label Created
key: label.created
exchange: barong_model
expression: |
record.key in ["phone", "profile", "document"] &&
record.value in ["verified", "rejected"]
templates:
EN:
subject: Account Details Updated
template_path: templates/en/label_created.tpl

- name: Label Updated
key: label.updated
exchange: barong_model
expression: |
record.key in ["phone", "profile", "document"] &&
record.value in ["verified", "rejected"]
templates:
EN:
subject: Account Details Updated
template_path: templates/en/label_created.tpl

- name: Deposit Accepted
key: deposit.updated
exchange: peatio
Expand All @@ -49,6 +77,14 @@ events:
subject: Deposit Accepted
template_path: templates/en/deposit_accepted.tpl

- name: Session Create
key: session.create
exchange: barong_system
templates:
EN:
subject: New Login
template_path: templates/en/session_create.tpl

- name: Withdrawal Succeed
key: withdraw.updated
exchange: peatio
Expand Down
8 changes: 6 additions & 2 deletions docs/using_postmaster.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ Each Event API provider uses own AMQP exchange and algorithm to sign payload.
```yaml
exchanges:
barong: barong.events.system
peatio: peatio.events.model
barong:
name: barong.events.system
signer: peatio
peatio:
name: peatio.events.model
signer: peatio
```
Using keychain algorithms and defined public keys for each provider postmaster will validate the data.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ require (
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

go 1.13
38 changes: 28 additions & 10 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ import (
"github.com/openware/postmaster/pkg/eventapi"
)

// Language represents configuration for every language registered for further usage.
type Language struct {
Code string `yaml:"code"`
Name string `yaml:"name"`
}

// Template represents email massage content and subject.
type Template struct {
Subject string `yaml:"subject"`
TemplatePath string `yaml:"template_path,omitempty"`
Template string `yaml:"template,omitempty"`
}

// Event represent configuration for listening an message from RabbitMQ.
type Event struct {
Name string `yaml:"name"`
Key string `yaml:"key"`
Expand All @@ -29,18 +32,27 @@ type Event struct {
Expression string `yaml:"expression"`
}

// General application configuration.
// Exchange contains exchange name and signer unique identifier.
type Exchange struct {
Name string `yaml:"name"`
Signer string `yaml:"signer"`
}

// Config represents application configuration model.
type Config struct {
Languages []Language `yaml:"languages"`
Keychain map[string]eventapi.Validator `yaml:"keychain"`
Exchanges map[string]string `yaml:"exchanges"`
Exchanges map[string]Exchange `yaml:"exchanges"`
Events []Event `yaml:"events"`
}

// Template returns Template model for given unique key.
func (e *Event) Template(key string) Template {
return e.Templates[strings.ToUpper(key)]
}

// Content returns ready to go message with specified data.
// Note: "template" has bigger priority, than "template_path".
func (t *Template) Content(data interface{}) ([]byte, error) {
var err error

Expand All @@ -64,6 +76,7 @@ func (t *Template) Content(data interface{}) ([]byte, error) {
return buff.Bytes(), nil
}

// ContainsLanguage reports whether the language code is known.
func (config *Config) ContainsLanguage(code string) bool {
for _, lang := range config.Languages {
if strings.EqualFold(lang.Code, code) {
Expand All @@ -74,16 +87,19 @@ func (config *Config) ContainsLanguage(code string) bool {
return false
}

// ContainsExchange reports whether the exchange with specified key exist.
func (config *Config) ContainsExchange(id string) bool {
_, ok := config.Exchanges[id]
return ok
}

// ContainsKey reports whether the keychain key exist.
func (config *Config) ContainsKey(id string) bool {
_, ok := config.Keychain[id]
return ok
}

// Valid reports whether configuration is valid or not.
func (lang *Language) Valid() bool {
notEmpty := len(strings.TrimSpace(lang.Code)) != 0
isUp := lang.Code == strings.ToUpper(lang.Code)
Expand All @@ -101,28 +117,30 @@ func (config *Config) validateLanguages() (bool, error) {
return true, nil
}

// ValidateExchanges validates exchanges config.
func (config *Config) ValidateExchanges() error {
if len(config.Exchanges) < 1 {
return errors.New("no exchanges was specified")
}

for k, v := range config.Exchanges {
if v == "" {
return fmt.Errorf("exchange %s can not have empty value", k)
if v.Name == "" {
return fmt.Errorf("exchange name can not be empty: %s", k)
}

// Check, that signer is not empty and exist in keychain.
if v.Signer == "" {
return fmt.Errorf("signer %s of exchange %s can not be empty", v.Signer, k)
} else if _, ok := config.Keychain[v.Signer]; !ok {
return fmt.Errorf("signer %s is not registered", v.Signer)
}
}

return nil
}

// ValidateKeychain validates keychain config.
func (config *Config) ValidateKeychain() error {
for id := range config.Exchanges {
if !config.ContainsKey(id) {
return fmt.Errorf("exchange %s doesn't have a key", id)
}
}

for k, v := range config.Keychain {
if v.Value == "" {
return fmt.Errorf("key for %s has an empty value", k)
Expand Down
43 changes: 10 additions & 33 deletions pkg/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/streadway/amqp"

"github.com/openware/postmaster/internal/config"
"github.com/openware/postmaster/internal/log"
"github.com/openware/postmaster/pkg/eventapi"
)
Expand All @@ -23,7 +24,7 @@ type muxEntry struct {
}

type ServeMux struct {
exchanges map[string]string
exchanges map[string]config.Exchange
keychain map[string]eventapi.Validator

tag string
Expand All @@ -34,7 +35,7 @@ type ServeMux struct {
retries uint8
}

func NewServeMux(addr, tag string, exchanges map[string]string, keychain map[string]eventapi.Validator) *ServeMux {
func NewServeMux(addr, tag string, exchanges map[string]config.Exchange, keychain map[string]eventapi.Validator) *ServeMux {
return &ServeMux{
addr: addr,
tag: tag,
Expand Down Expand Up @@ -86,7 +87,7 @@ func (mux *ServeMux) declareExchange(name string, channel *amqp.Channel) error {
func (mux *ServeMux) ListenQueue(
deliveries <-chan amqp.Delivery,
handler Handler,
key, exchangeID string,
key, signer string,
) {
for {
delivery, ok := <-deliveries
Expand All @@ -111,7 +112,7 @@ func (mux *ServeMux) ListenQueue(
return
}

validator := mux.keychain[exchangeID]
validator := mux.keychain[signer]
claims, err := eventapi.ParseJWT(string(jwt), validator.ValidateJWT)
if err != nil {
log.Debug().
Expand Down Expand Up @@ -147,20 +148,20 @@ func (mux *ServeMux) listen() error {

// Declare exchanges using one channel.
for id := range mux.m {
if err != mux.declareExchange(mux.exchanges[id], channel) {
if err != mux.declareExchange(mux.exchanges[id].Name, channel) {
return fmt.Errorf("exchange: %s", err.Error())
}
}

// Bind queue to exchange and listen.
// Bind queue to an exchange and register listener.
for id, events := range mux.m {
for key, event := range events {
channel, err := conn.Channel()
if err != nil {
return fmt.Errorf("channel: %s", err.Error())
}

queue, err := mux.declareQueue(channel, key, mux.exchanges[id])
queue, err := mux.declareQueue(channel, key, mux.exchanges[id].Name)
if err != nil {
return fmt.Errorf("queue: %s", err.Error())
}
Expand All @@ -170,7 +171,7 @@ func (mux *ServeMux) listen() error {
return err
}

go mux.ListenQueue(deliveries, event.h, event.routingKey, id)
go mux.ListenQueue(deliveries, event.h, event.routingKey, mux.exchanges[id].Signer)
}
}

Expand Down Expand Up @@ -230,31 +231,7 @@ func (mux *ServeMux) Handle(routingKey, exchangeID string, handler Handler) {
}

func (mux *ServeMux) HandleFunc(routingKey, exchangeID string, handler func(raw eventapi.RawEvent)) {
mux.mu.Lock()
defer mux.mu.Unlock()

if routingKey == "" {
log.Panic().
Msgf("pattern %s is not valid", routingKey)
}
if handler == nil {
log.Panic().
Msgf("handler with key %s can not be nil ", routingKey)
}
if _, exist := mux.m[routingKey]; exist {
log.Panic().
Msgf("multiple registrations for %s", routingKey)
}

if mux.m == nil {
mux.m = make(map[string]map[string]muxEntry)
}

if mux.m[exchangeID] == nil {
mux.m[exchangeID] = make(map[string]muxEntry)
}

mux.m[exchangeID][routingKey] = muxEntry{h: HandlerFunc(handler), routingKey: routingKey}
mux.Handle(routingKey, exchangeID, HandlerFunc(handler))
}

type Handler interface {
Expand Down
Loading

0 comments on commit 642e37c

Please sign in to comment.