Skip to content

Commit

Permalink
Add test to sendoToMany
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Jul 22, 2024
1 parent 1c0fbba commit e64f98e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 59 deletions.
4 changes: 3 additions & 1 deletion node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Me
errGroup.Go(func() error {
err := n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i, len(peers), peer.String(), err)
return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i+1, len(peers), peer.String(), err)
}

return nil
Expand All @@ -105,6 +105,8 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Me
return fmt.Errorf("some sends failed: %w", retErr)
}

n.log.Warn().Err(retErr).Msg("some sends failed, proceeding")

return nil
}
}
Expand Down
159 changes: 101 additions & 58 deletions node/message_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,102 +3,137 @@ package node
import (
"context"
"encoding/json"
"math/rand"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/blocklessnetwork/b7s/host"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/testing/mocks"
)

func TestNode_Messaging(t *testing.T) {
func TestNode_SendMessage(t *testing.T) {

const (
topic = DefaultTopic
)
client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

rec := newDummyRecord()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)

var received dummyRecord
getStreamPayload(t, stream, &received)

require.Equal(t, rec, received)
})

err = node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)

wg.Wait()
}

func TestNode_Publish(t *testing.T) {

var (
rec = dummyRecord{
ID: mocks.GenericUUID.String(),
Value: 19846,
Description: "dummy-description",
}
rec = newDummyRecord()
ctx = context.Background()
topic = DefaultTopic
)

client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = client.InitPubSub(ctx)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

t.Run("sending single message", func(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()
err = node.subscribeToTopics(ctx)
require.NoError(t, err)

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)
// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)

var received dummyRecord
getStreamPayload(t, stream, &received)
// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)

require.Equal(t, rec, received)
})
time.Sleep(subscriptionDiseminationPause)

err := node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)
err = node.publish(ctx, rec)
require.NoError(t, err)

wg.Wait()
})
t.Run("publishing to a topic", func(t *testing.T) {
t.Parallel()
deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
require.NoError(t, err)

ctx := context.Background()
from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

err = client.InitPubSub(ctx)
require.NoError(t, err)
var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
require.NoError(t, err)
require.Equal(t, rec, received)
}

// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)
func TestNode_SendMessageToMany(t *testing.T) {

// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)
client1, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = node.subscribeToTopics(ctx)
require.NoError(t, err)
client2, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

time.Sleep(subscriptionDiseminationPause)
node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client1)
hostAddNewPeer(t, node.host, client2)

err = node.publish(ctx, rec)
require.NoError(t, err)
client1.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})
client2.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})

deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
// NOTE: These subtests are sequential.
t.Run("nominal case - sending to two online peers is ok", func(t *testing.T) {
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.NoError(t, err)

from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
})
t.Run("peer is down with requireAll is an error", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.Error(t, err)
})
t.Run("peer is down with partial delivery is ok", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.NoError(t, err)

require.Equal(t, rec, received)
})
t.Run("all sends failing produces an error", func(t *testing.T) {
client1.Close()
client2.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.Error(t, err)
})
}

Expand All @@ -111,3 +146,11 @@ type dummyRecord struct {
func (dummyRecord) Type() string {
return "MessageDummyRecord"
}

func newDummyRecord() dummyRecord {
return dummyRecord{
ID: mocks.GenericUUID.String(),
Value: rand.Uint64(),
Description: "dummy-description",
}
}

0 comments on commit e64f98e

Please sign in to comment.