Skip to content

Commit

Permalink
test: messages surpassing MConnection's receiving buffer capacity are…
Browse files Browse the repository at this point in the history
… dropped (#1199)

Addresses the first item of #1190
Closes #1162
  • Loading branch information
staheri14 authored Feb 13, 2024
1 parent 938fb01 commit edd9b9d
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions p2p/conn/connection_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func sendMessages(mc *MConnection,
case <-ticker.C:
// generate message
if mc.Send(chIDs[i], msgs[i]) {
log.TestingLogger().Info("Sent message ", i, " on channel ",
chIDs[i])
i++
if i >= total {
log.TestingLogger().Info("Completed the message generation as the" +
Expand Down Expand Up @@ -717,3 +719,97 @@ func TestMConnection_Message_Order_ChannelID(t *testing.T) {

require.Equal(t, chIDs, recvChIds) // assert that the order of received messages is the same as the order of sent messages
}

func TestMConnection_Failing_Large_Messages(t *testing.T) {
// This test evaluates how MConnection handles messages exceeding channel
// ID's receive message capacity i.e., `RecvMessageCapacity`.
// It involves two connections, each with two channels: Channel ID 1 (
// capacity 1024 bytes) and Channel ID 2 (capacity 1023 bytes).
// All the other channel ID's and MConnection's configurations are set high
// enough to not be a limiting factor.
// A 1KB message is sent over the first and second channels in succession.
// Message on Channel ID 1 (capacity equal to message size) is received,
// while the message on Channel ID 2 (capacity less than message size) is dropped.

totalMsgs := 2
msgSize := 1 * kibibyte
sendRate := 50 * kibibyte
recRate := 50 * kibibyte
chDesc := []*ChannelDescriptor{
{ID: 0x01, Priority: 1, SendQueueCapacity: 50,
RecvMessageCapacity: msgSize,
RecvBufferCapacity: defaultRecvBufferCapacity},
{ID: 0x02, Priority: 1, SendQueueCapacity: 50,
RecvMessageCapacity: msgSize - 1,
RecvBufferCapacity: defaultRecvBufferCapacity},
}

// prepare messages and channel IDs
// 1 message on channel ID 1 and 1 message on channel ID 2
msgs := make([][]byte, totalMsgs)
chIDs := make([]byte, totalMsgs)
msgs[0] = bytes.Repeat([]byte{'x'}, msgSize)
chIDs[0] = 0x01
msgs[1] = bytes.Repeat([]byte{'y'}, msgSize)
chIDs[1] = 0x02

// set up two networked connections
// server, client := NetPipe() // can alternatively use this and comment out the line below
server, client := tcpNetPipe()
defer server.Close()
defer client.Close()

// prepare callback to receive messages
allReceived := make(chan bool)
recvChIds := make(chan byte, totalMsgs)
onReceive := func(chID byte, msgBytes []byte) {
recvChIds <- chID
if len(recvChIds) >= totalMsgs {
allReceived <- true
}
}

cnfg := DefaultMConnConfig()
cnfg.SendRate = int64(sendRate)
cnfg.RecvRate = int64(recRate)

// mount the channel descriptors to the connections
clientMconn := NewMConnectionWithConfig(client, chDesc,
func(chID byte, msgBytes []byte) {},
func(r interface{}) {},
cnfg)
serverMconn := NewMConnectionWithConfig(server, chDesc,
onReceive,
func(r interface{}) {},
cnfg)
clientMconn.SetLogger(log.TestingLogger())
serverMconn.SetLogger(log.TestingLogger())

err := clientMconn.Start()
require.Nil(t, err)
defer func() {
_ = clientMconn.Stop()
}()
err = serverMconn.Start()
require.Nil(t, err)
defer func() {
_ = serverMconn.Stop()
}()

// start sending messages
go sendMessages(clientMconn,
time.Millisecond,
1*time.Second,
msgs, chIDs)

// wait for messages to be received
select {
case <-allReceived:
require.Fail(t, "All messages should not have been received") // the message sent
// on channel ID 2 should have been dropped
case <-time.After(500 * time.Millisecond):
require.Equal(t, 1, len(recvChIds))
require.Equal(t, chIDs[0], <-recvChIds) // the first message should be received
require.True(t, !serverMconn.IsRunning()) // the serverMconn should have stopped due to the error
}
}

0 comments on commit edd9b9d

Please sign in to comment.