Skip to content

Commit

Permalink
apply functional pattern to receive and send
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Silva <[email protected]>
  • Loading branch information
cx-joses committed Jul 22, 2024
1 parent d173978 commit 0480cd0
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
29 changes: 24 additions & 5 deletions protocol/amqp/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
// Option is the function signature required to be considered an amqp.Option.
type Option func(*Protocol) error

// WithConnOpt sets a connection option for amqp
type SendOption func(sender *sender)
type ReceiveOption func(receiver *receiver)

// WithConnOpts sets a connection option for amqp
func WithConnOpts(opt *amqp.ConnOptions) Option {
return func(t *Protocol) error {
t.connOpts = opt
Expand All @@ -26,43 +29,59 @@ func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option
return WithConnOpts(opt)
}

// WithSessionOpt sets a session option for amqp
// WithSessionOpts sets a session option for amqp
func WithSessionOpts(opt *amqp.SessionOptions) Option {
return func(t *Protocol) error {
t.sessionOpts = opt
return nil
}
}

// WithSenderLinkOption sets a link option for amqp
// WithSenderOpts sets a link option for amqp
func WithSenderOpts(opt *amqp.SenderOptions) Option {
return func(t *Protocol) error {
t.senderOpts = opt
return nil
}
}

// WithReceiverLinkOption sets a link option for amqp
// WithReceiverOpts sets a link option for amqp
func WithReceiverOpts(opt *amqp.ReceiverOptions) Option {
return func(t *Protocol) error {
t.receiverOpts = opt
return nil
}
}

func WithReceiveOpts(opt amqp.ReceiveOptions) Option {
// WithReceiveOpts sets a receive option for amqp
func WithReceiveOpts(opt *amqp.ReceiveOptions) Option {
return func(t *Protocol) error {
t.receiveOpts = opt
return nil
}
}

// WithSendOpts sets a send option for amqp
func WithSendOpts(opt *amqp.SendOptions) Option {
return func(t *Protocol) error {
t.sendOpts = opt
return nil
}
}

// WithSendOptions sets send options for amqp
func WithSendOptions(opts *amqp.SendOptions) SendOption {
return func(t *sender) {
t.options = opts
}
}

// WithReceiveOptions sets receive options for amqp
func WithReceiveOptions(opts *amqp.ReceiveOptions) ReceiveOption {
return func(t *receiver) {
t.options = opts
}
}

// SenderOptionFunc is the type of amqp.Sender options
type SenderOptionFunc func(sender *sender)
11 changes: 6 additions & 5 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Protocol struct {
senderOpts *amqp.SenderOptions
receiverOpts *amqp.ReceiverOptions
sendOpts *amqp.SendOptions
receiveOpts amqp.ReceiveOptions
receiveOpts *amqp.ReceiveOptions

// Sender
Sender *sender
Expand Down Expand Up @@ -60,14 +60,14 @@ func NewProtocolFromClient(
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.Sender = NewSender(amqpSender, WithSendOptions(t.sendOpts)).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver)
return t, nil
}

Expand Down Expand Up @@ -124,7 +124,8 @@ func NewSenderProtocolFromClient(
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.Sender = NewSender(amqpSender).(*sender)

t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
Expand Down Expand Up @@ -152,7 +153,7 @@ func NewReceiverProtocolFromClient(
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver)
return t, nil
}

Expand Down
17 changes: 13 additions & 4 deletions protocol/amqp/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const serverDown = "session ended by server"
// receiver wraps an amqp.Receiver as a binding.Receiver
type receiver struct {
amqp *amqp.Receiver
options amqp.ReceiveOptions
options *amqp.ReceiveOptions
}

func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
m, err := r.amqp.Receive(ctx, &r.options)
m, err := r.amqp.Receive(ctx, r.options)
if err != nil {
if err == ctx.Err() {
return nil, io.EOF
Expand All @@ -41,6 +41,15 @@ func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
}

// NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver
func NewReceiver(amqp *amqp.Receiver, options amqp.ReceiveOptions) protocol.Receiver {
return &receiver{amqp: amqp, options: options}
func NewReceiver(amqp *amqp.Receiver, opts ...ReceiveOption) protocol.Receiver {
r := &receiver{amqp: amqp, options: nil}
applyReceiveOptions(r, opts...)
return r
}

func applyReceiveOptions(s *receiver, opts ...ReceiveOption) *receiver {
for _, o := range opts {
o(s)
}
return s
}
11 changes: 9 additions & 2 deletions protocol/amqp/v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...b
}

// NewSender creates a new Sender which wraps an amqp.Sender in a binding.Sender
func NewSender(amqpSender *amqp.Sender, options *amqp.SendOptions) protocol.Sender {
s := &sender{amqp: amqpSender, options: options}
func NewSender(amqpSender *amqp.Sender, opts ...SendOption) protocol.Sender {
s := &sender{amqp: amqpSender, options: nil}
applySenderOptions(s, opts...)
return s
}

func applySenderOptions(s *sender, opts ...SendOption) *sender {
for _, o := range opts {
o(s)
}
return s
}
6 changes: 3 additions & 3 deletions test/integration/amqp_binding/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/url"
"os"
"strings"
"testing"

"github.com/Azure/go-amqp"
Expand Down Expand Up @@ -95,7 +96,7 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st
addr = "test"
s := os.Getenv("TEST_AMQP_URL")
if u, err := url.Parse(s); err == nil && u.Path != "" {
addr = u.Path
addr = strings.TrimPrefix(u.Path, "/")
}
client, err := amqp.Dial(context.Background(), s, &amqp.ConnOptions{})
if err != nil {
Expand All @@ -105,7 +106,6 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st
require.NoError(t, err)

return client, session, addr

}

func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Receiver) {
Expand All @@ -114,7 +114,7 @@ func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Rece
require.NoError(t, err)
s, err := ss.NewSender(context.Background(), a, nil)
require.NoError(t, err)
return c, protocolamqp.NewSender(s, nil), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{})
return c, protocolamqp.NewSender(s), protocolamqp.NewReceiver(r)
}

func BenchmarkSendReceive(b *testing.B) {
Expand Down

0 comments on commit 0480cd0

Please sign in to comment.