Skip to content

Commit

Permalink
refactor: extract message constructor out of the Message interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Jan 2, 2023
1 parent d847ef3 commit bcad3a4
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 46 deletions.
32 changes: 8 additions & 24 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,16 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

// Message is a contract that any user defined message has to satisfy to be sent via Messenger.
type Message interface {
serde.Message

To() peer.ID
// From points to peer the Message was received from.
From() peer.ID

New(from, to peer.ID) Message
}

type PlainMessage struct {
serde.PlainMessage

from, to peer.ID
}

func (p *PlainMessage) To() peer.ID {
return p.to
}

func (p *PlainMessage) From() peer.ID {
return p.from
// To points to Messenger the peer to send the Message to.
To() peer.ID
}

func (p *PlainMessage) New(from, to peer.ID) Message {
return &PlainMessage{
from: from,
to: to,
}
}
// NewMessageFn is a type-parameterized constructor func for Message.
// It is required by Messenger, s.t. it can instantiate user-defined messages
// coming from the network.
type NewMessageFn[M Message] func(from, to peer.ID) M
17 changes: 17 additions & 0 deletions message_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package msngr

import "github.com/libp2p/go-libp2p/core/peer"

// MessageBase is a helper to struct that users may embed into their custom Message implementations.
// It's sole purpose is to minimize boilerplate code.
type MessageBase struct {
FromPeer, ToPeer peer.ID
}

func (mb *MessageBase) From() peer.ID {
return mb.FromPeer
}

func (mb *MessageBase) To() peer.ID {
return mb.ToPeer
}
8 changes: 5 additions & 3 deletions messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ var errClosed = errors.New("msngr: closed")
// Messenger provides a simple API to send messages to multiple peers.
type Messenger[M Message] struct {
*options
host host.Host
host host.Host

// fields below are used and protected in processIn
inbound chan M
newStreamsIn chan inet.Stream
deadStreamsIn chan inet.Stream
streamsIn map[peer.ID]map[inet.Stream]context.CancelFunc
new NewMessageFn[M]

// fields below are used and protected by processOut
outbound chan M
Expand All @@ -45,20 +46,21 @@ type Messenger[M Message] struct {
// New instantiates a Messenger.
// WithProtocols option is mandatory for at least one protocol.
// WithMessageType overrides default serde.PlainMessage.
func New[M Message](host host.Host, opts ...Option) (*Messenger[M], error) {
func New[M Message](host host.Host, new NewMessageFn[M], opts ...Option) (*Messenger[M], error) {
o, err := parseOptions(opts...)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
m := &Messenger[M]{
options: o,
options: o,
host: host,
inbound: make(chan M, 32),
newStreamsIn: make(chan inet.Stream, 4),
deadStreamsIn: make(chan inet.Stream, 2),
streamsIn: make(map[peer.ID]map[inet.Stream]context.CancelFunc),
new: new,
outbound: make(chan M, 32),
newStreamsOut: make(chan inet.Stream, 4),
deadStreamsOut: make(chan inet.Stream, 2),
Expand Down
3 changes: 1 addition & 2 deletions messenger_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ func (m *Messenger[M]) msgsIn(ctx context.Context, s inet.Stream) {
r := bufio.NewReader(s)

from, to := s.Conn().RemotePeer(), s.Conn().LocalPeer()
var tp M
for {
msg := tp.New(from, to).(M)
msg := m.new(from, to)
_, err := serde.Read(r, msg)
if err != nil {
select {
Expand Down
43 changes: 26 additions & 17 deletions messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/celestiaorg/go-libp2p-messenger/serde"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -34,10 +35,10 @@ func TestSend_PeersConnected(t *testing.T) {
mnet, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

min, err := New[*PlainMessage](mnet.Hosts()[0], WithProtocols(tproto))
min, err := New[*plainMessage](mnet.Hosts()[0], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

mout, err := New[*PlainMessage](mnet.Hosts()[1], WithProtocols(tproto))
mout, err := New[*plainMessage](mnet.Hosts()[1], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

msgin := randPlainMessage(256, mnet.Peers()[1])
Expand All @@ -64,10 +65,10 @@ func TestSend_PeersDisconnected(t *testing.T) {
mnet, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)

min, err := New[*PlainMessage](mnet.Hosts()[0], WithProtocols(tproto))
min, err := New[*plainMessage](mnet.Hosts()[0], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

mout, err := New[*PlainMessage](mnet.Hosts()[1], WithProtocols(tproto))
mout, err := New[*plainMessage](mnet.Hosts()[1], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

msgin := randPlainMessage(256, mnet.Peers()[1])
Expand All @@ -91,10 +92,10 @@ func TestReconnect(t *testing.T) {

hosts := realTransportHosts(t, 2)

min, err := New[*PlainMessage](hosts[0], WithProtocols(tproto))
min, err := New[*plainMessage](hosts[0], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

mout, err := New[*PlainMessage](hosts[1], WithProtocols(tproto))
mout, err := New[*plainMessage](hosts[1], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

err = hosts[0].Connect(ctx, *host.InfoFromHost(hosts[1]))
Expand Down Expand Up @@ -128,10 +129,10 @@ func TestStreamDuplicates(t *testing.T) {

hosts := realTransportHosts(t, 2)

min, err := New[*PlainMessage](hosts[0], WithProtocols(tproto))
min, err := New[*plainMessage](hosts[0], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

mout, err := New[*PlainMessage](hosts[1], WithProtocols(tproto))
mout, err := New[*plainMessage](hosts[1], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

err = min.Host().Connect(ctx, *host.InfoFromHost(mout.Host()))
Expand Down Expand Up @@ -169,14 +170,14 @@ func TestSend_Events(t *testing.T) {
firstSub, err := firstHst.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)

first, err := New[*PlainMessage](mnet.Hosts()[0], WithProtocols(tproto))
first, err := New[*plainMessage](mnet.Hosts()[0], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

secondHst := mnet.Hosts()[1]
secondSub, err := secondHst.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)

second, err := New[*PlainMessage](mnet.Hosts()[1], WithProtocols(tproto))
second, err := New[*plainMessage](mnet.Hosts()[1], newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)

_, err = mnet.ConnectPeers(mnet.Peers()[0], mnet.Peers()[1])
Expand Down Expand Up @@ -222,9 +223,9 @@ func TestGroupBroadcast(t *testing.T) {
require.NoError(t, err)

// create messengers according to netSize
ms := make([]*Messenger[*PlainMessage], netSize)
ms := make([]*Messenger[*plainMessage], netSize)
for i, h := range mnet.Hosts() {
ms[i], err = New[*PlainMessage](h, WithProtocols(tproto))
ms[i], err = New[*plainMessage](h, newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)
}

Expand Down Expand Up @@ -263,9 +264,9 @@ func TestPeers(t *testing.T) {
require.NoError(t, err)

// create messengers according to netSize
ms := make([]*Messenger[*PlainMessage], netSize)
ms := make([]*Messenger[*plainMessage], netSize)
for i, h := range mnet.Hosts() {
ms[i], err = New[*PlainMessage](h, WithProtocols(tproto))
ms[i], err = New[*plainMessage](h, newPlainMessage, WithProtocols(tproto))
require.NoError(t, err)
}

Expand All @@ -281,10 +282,18 @@ func TestPeers(t *testing.T) {
}
}

func randPlainMessage(size int, to peer.ID) *PlainMessage {
msg := &PlainMessage{}
type plainMessage struct {
serde.PlainMessage
MessageBase
}

var newPlainMessage NewMessageFn[*plainMessage] = func(from, to peer.ID) *plainMessage {
return &plainMessage{MessageBase: MessageBase{FromPeer: from, ToPeer: to}}
}

func randPlainMessage(size int, to peer.ID) *plainMessage {
msg := &plainMessage{MessageBase: MessageBase{ToPeer: to}}
msg.Data = make([]byte, size)
msg.to = to
rand.Read(msg.Data)
return msg
}
Expand Down

0 comments on commit bcad3a4

Please sign in to comment.