Skip to content

Commit

Permalink
Add Forwarder component (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo authored Jun 4, 2020
1 parent 482e08e commit 5b98ac2
Show file tree
Hide file tree
Showing 8 changed files with 508 additions and 4 deletions.
71 changes: 71 additions & 0 deletions components/forwarder/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package forwarder

import (
"encoding/json"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

// messageEnvelope wraps Watermill message and contains destination topic.
type messageEnvelope struct {
DestinationTopic string `json:"destination_topic"`

UUID string `json:"uuid"`
Payload []byte `json:"payload"`
Metadata map[string]string `json:"metadata"`
}

func newMessageEnvelope(destTopic string, msg *message.Message) (*messageEnvelope, error) {
e := &messageEnvelope{
DestinationTopic: destTopic,
UUID: msg.UUID,
Payload: msg.Payload,
Metadata: msg.Metadata,
}

if err := e.validate(); err != nil {
return nil, errors.Wrap(err, "cannot create a message envelope")
}

return e, nil
}

func (e *messageEnvelope) validate() error {
if e.DestinationTopic == "" {
return errors.New("unknown destination topic")
}

return nil
}

func wrapMessageInEnvelope(destinationTopic string, msg *message.Message) (*message.Message, error) {
envelope, err := newMessageEnvelope(destinationTopic, msg)
if err != nil {
return nil, errors.Wrap(err, "cannot envelope a message")
}

envelopedMessage, err := json.Marshal(envelope)
if err != nil {
return nil, errors.Wrap(err, "cannot marshal a message")
}

return message.NewMessage(watermill.NewUUID(), envelopedMessage), nil
}

func unwrapMessageFromEnvelope(msg *message.Message) (destinationTopic string, unwrappedMsg *message.Message, err error) {
envelopedMsg := messageEnvelope{}
if err := json.Unmarshal(msg.Payload, &envelopedMsg); err != nil {
return "", nil, errors.Wrap(err, "cannot unmarshal message wrapped in an envelope")
}

if err := envelopedMsg.validate(); err != nil {
return "", nil, errors.Wrap(err, "an unmarshalled message envelope is invalid")
}

watermillMessage := message.NewMessage(envelopedMsg.UUID, envelopedMsg.Payload)
watermillMessage.Metadata = envelopedMsg.Metadata

return envelopedMsg.DestinationTopic, watermillMessage, nil
}
32 changes: 32 additions & 0 deletions components/forwarder/envelope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package forwarder

import (
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEnvelope(t *testing.T) {
expectedUUID := watermill.NewUUID()
expectedPayload := message.Payload("msg content")
expectedMetadata := message.Metadata{"key": "value"}
expectedDestinationTopic := "dest_topic"

msg := message.NewMessage(expectedUUID, expectedPayload)
msg.Metadata = expectedMetadata

wrappedMsg, err := wrapMessageInEnvelope(expectedDestinationTopic, msg)
require.NoError(t, err)
require.NotNil(t, wrappedMsg)

destinationTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(wrappedMsg)
require.NoError(t, err)
require.NotNil(t, unwrappedMsg)
assert.Equal(t, expectedUUID, unwrappedMsg.UUID)
assert.Equal(t, expectedPayload, unwrappedMsg.Payload)
assert.Equal(t, expectedMetadata, unwrappedMsg.Metadata)
assert.Equal(t, expectedDestinationTopic, destinationTopic)
}
128 changes: 128 additions & 0 deletions components/forwarder/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package forwarder

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
)

const defaultForwarderTopic = "forwarder_topic"

type Config struct {
// ForwarderTopic is a topic on which the forwarder will be listening to enveloped messages to forward.
// Defaults to `forwarder_topic`.
ForwarderTopic string

// Middlewares are used to decorate forwarder's handler function.
Middlewares []message.HandlerMiddleware

// CloseTimeout determines how long router should work for handlers when closing.
CloseTimeout time.Duration

// AckWhenCannotUnwrap enables acking of messages which cannot be unwrapped from an envelope.
AckWhenCannotUnwrap bool
}

func (c *Config) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
if c.ForwarderTopic == "" {
c.ForwarderTopic = defaultForwarderTopic
}
}

func (c *Config) Validate() error {
if c.ForwarderTopic == "" {
return errors.New("empty forwarder topic")
}

return nil
}

// Forwarder subscribes to the topic provided in the config and publishes them to the destination topic embedded in the enveloped message.
type Forwarder struct {
router *message.Router
publisher message.Publisher
logger watermill.LoggerAdapter
config Config
}

// NewForwarder creates a forwarder which will subscribe to the topic provided in the config using the provided subscriber.
// It will publish messages received on this subscription to the destination topic embedded in the enveloped message using the provided publisher.
//
// Provided subscriber and publisher can be from different Watermill Pub/Sub implementations, i.e. MySQL subscriber and Google Pub/Sub publisher.
//
// Note: Keep in mind that by default the forwarder will nack all messages which weren't sent using a decorated publisher.
// You can change this behavior by passing a middleware which will ack them instead.
func NewForwarder(subscriberIn message.Subscriber, publisherOut message.Publisher, logger watermill.LoggerAdapter, config Config) (*Forwarder, error) {
config.setDefaults()

routerConfig := message.RouterConfig{CloseTimeout: config.CloseTimeout}
if err := routerConfig.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid router config")
}

router, err := message.NewRouter(routerConfig, logger)
if err != nil {
return nil, errors.Wrap(err, "cannot create a router")
}

f := &Forwarder{router, publisherOut, logger, config}

router.AddNoPublisherHandler(
"events_forwarder",
config.ForwarderTopic,
subscriberIn,
f.forwardMessage,
)

router.AddMiddleware(config.Middlewares...)

return f, nil
}

// Run runs forwarder's handler responsible for forwarding messages.
// This call is blocking while the forwarder is running.
// ctx will be propagated to the forwarder's subscription.
//
// To stop Run() you should call Close() on the forwarder.
func (f *Forwarder) Run(ctx context.Context) error {
return f.router.Run(ctx)
}

// Close stops forwarder's handler.
func (f *Forwarder) Close() error {
return f.router.Close()
}

// Running returns channel which is closed when the forwarder is running.
func (f *Forwarder) Running() chan struct{} {
return f.router.Running()
}

func (f *Forwarder) forwardMessage(msg *message.Message) error {
destTopic, unwrappedMsg, err := unwrapMessageFromEnvelope(msg)
if err != nil {
f.logger.Error("Could not unwrap a message from an envelope", err, watermill.LogFields{
"uuid": msg.UUID,
"payload": msg.Payload,
"metadata": msg.Metadata,
"acked": f.config.AckWhenCannotUnwrap,
})

if f.config.AckWhenCannotUnwrap {
return nil
}
return errors.Wrap(err, "cannot unwrap message from an envelope")
}

if err := f.publisher.Publish(destTopic, unwrappedMsg); err != nil {
return errors.Wrap(err, "cannot publish a message")
}

return nil
}
Loading

0 comments on commit 5b98ac2

Please sign in to comment.