diff --git a/node/message.go b/node/message.go index 655e9de7..03fde885 100644 --- a/node/message.go +++ b/node/message.go @@ -81,7 +81,7 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Me errGroup.Go(func() error { err := n.host.SendMessage(ctx, peer, payload) if err != nil { - return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i, len(peers), peer.String(), err) + return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i+1, len(peers), peer.String(), err) } return nil @@ -105,6 +105,8 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Me return fmt.Errorf("some sends failed: %w", retErr) } + n.log.Warn().Err(retErr).Msg("some sends failed, proceeding") + return nil } } diff --git a/node/message_internal_test.go b/node/message_internal_test.go index 077d86ea..6ba823bf 100644 --- a/node/message_internal_test.go +++ b/node/message_internal_test.go @@ -3,11 +3,13 @@ package node import ( "context" "encoding/json" + "math/rand" "sync" "testing" "time" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/blocklessnetwork/b7s/host" @@ -15,90 +17,123 @@ import ( "github.com/blocklessnetwork/b7s/testing/mocks" ) -func TestNode_Messaging(t *testing.T) { +func TestNode_SendMessage(t *testing.T) { - const ( - topic = DefaultTopic - ) + client, err := host.New(mocks.NoopLogger, loopback, 0) + require.NoError(t, err) + + node := createNode(t, blockless.HeadNode) + hostAddNewPeer(t, node.host, client) + + rec := newDummyRecord() + + var wg sync.WaitGroup + wg.Add(1) + + client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer wg.Done() + defer stream.Close() + + from := stream.Conn().RemotePeer() + require.Equal(t, node.host.ID(), from) + + var received dummyRecord + getStreamPayload(t, stream, &received) + + require.Equal(t, rec, received) + }) + + err = node.send(context.Background(), client.ID(), rec) + require.NoError(t, err) + + wg.Wait() +} + +func TestNode_Publish(t *testing.T) { var ( - rec = dummyRecord{ - ID: mocks.GenericUUID.String(), - Value: 19846, - Description: "dummy-description", - } + rec = newDummyRecord() + ctx = context.Background() + topic = DefaultTopic ) client, err := host.New(mocks.NoopLogger, loopback, 0) require.NoError(t, err) + err = client.InitPubSub(ctx) + require.NoError(t, err) + node := createNode(t, blockless.HeadNode) hostAddNewPeer(t, node.host, client) - t.Run("sending single message", func(t *testing.T) { - t.Parallel() - - var wg sync.WaitGroup - wg.Add(1) - - client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { - defer wg.Done() - defer stream.Close() + err = node.subscribeToTopics(ctx) + require.NoError(t, err) - from := stream.Conn().RemotePeer() - require.Equal(t, node.host.ID(), from) + // Establish a connection between peers. + clientInfo := hostGetAddrInfo(t, client) + err = node.host.Connect(ctx, *clientInfo) + require.NoError(t, err) - var received dummyRecord - getStreamPayload(t, stream, &received) + // Have both client and node subscribe to the same topic. + _, subscription, err := client.Subscribe(topic) + require.NoError(t, err) - require.Equal(t, rec, received) - }) + time.Sleep(subscriptionDiseminationPause) - err := node.send(context.Background(), client.ID(), rec) - require.NoError(t, err) + err = node.publish(ctx, rec) + require.NoError(t, err) - wg.Wait() - }) - t.Run("publishing to a topic", func(t *testing.T) { - t.Parallel() + deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout) + defer cancel() + msg, err := subscription.Next(deadlineCtx) + require.NoError(t, err) - ctx := context.Background() + from := msg.ReceivedFrom + require.Equal(t, node.host.ID(), from) + require.NotNil(t, msg.Topic) + require.Equal(t, topic, *msg.Topic) - err = client.InitPubSub(ctx) - require.NoError(t, err) + var received dummyRecord + err = json.Unmarshal(msg.Data, &received) + require.NoError(t, err) + require.Equal(t, rec, received) +} - // Establish a connection between peers. - clientInfo := hostGetAddrInfo(t, client) - err = node.host.Connect(ctx, *clientInfo) - require.NoError(t, err) +func TestNode_SendMessageToMany(t *testing.T) { - // Have both client and node subscribe to the same topic. - _, subscription, err := client.Subscribe(topic) - require.NoError(t, err) + client1, err := host.New(mocks.NoopLogger, loopback, 0) + require.NoError(t, err) - err = node.subscribeToTopics(ctx) - require.NoError(t, err) + client2, err := host.New(mocks.NoopLogger, loopback, 0) + require.NoError(t, err) - time.Sleep(subscriptionDiseminationPause) + node := createNode(t, blockless.HeadNode) + hostAddNewPeer(t, node.host, client1) + hostAddNewPeer(t, node.host, client2) - err = node.publish(ctx, rec) - require.NoError(t, err) + client1.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {}) + client2.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {}) - deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout) - defer cancel() - msg, err := subscription.Next(deadlineCtx) + // NOTE: These subtests are sequential. + t.Run("nominal case - sending to two online peers is ok", func(t *testing.T) { + err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true) require.NoError(t, err) - - from := msg.ReceivedFrom - require.Equal(t, node.host.ID(), from) - require.NotNil(t, msg.Topic) - require.Equal(t, topic, *msg.Topic) - - var received dummyRecord - err = json.Unmarshal(msg.Data, &received) + }) + t.Run("peer is down with requireAll is an error", func(t *testing.T) { + client1.Close() + err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true) + require.Error(t, err) + }) + t.Run("peer is down with partial delivery is ok", func(t *testing.T) { + client1.Close() + err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false) require.NoError(t, err) - - require.Equal(t, rec, received) + }) + t.Run("all sends failing produces an error", func(t *testing.T) { + client1.Close() + client2.Close() + err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false) + require.Error(t, err) }) } @@ -111,3 +146,11 @@ type dummyRecord struct { func (dummyRecord) Type() string { return "MessageDummyRecord" } + +func newDummyRecord() dummyRecord { + return dummyRecord{ + ID: mocks.GenericUUID.String(), + Value: rand.Uint64(), + Description: "dummy-description", + } +}