Skip to content

Commit

Permalink
Merge pull request #5234 from onflow/yahya/6927-duplicate-ihave-topics
Browse files Browse the repository at this point in the history
[Networking] Enhance RPC Inspection with Configurable Thresholds and Granular Metrics Collection
  • Loading branch information
thep2p authored Jan 22, 2024
2 parents 5573867 + fbe37a5 commit 89594b2
Show file tree
Hide file tree
Showing 19 changed files with 1,937 additions and 651 deletions.
88 changes: 58 additions & 30 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,38 +140,66 @@ network-config:
# The size of the queue for notifications about invalid RPC messages
notification-cache-size: 10_000
validation: # RPC control message validation inspector configs
# Rpc validation inspector number of pool workers
workers: 5
# The size of the queue used by worker pool for the control message validation inspector
queue-size: 100
# The max sample size used for RPC message validation. If the total number of RPC messages exceeds this value a sample will be taken but messages will not be truncated
message-max-sample-size: 1000
# Max number of control messages in a sample to be inspected when inspecting GRAFT and PRUNE message types. If the total number of control messages (GRAFT or PRUNE)
# exceeds this max sample size then the respective message will be truncated before being processed.
graft-and-prune-message-max-sample-size: 1000
# The threshold at which an error will be returned if the number of invalid RPC messages exceeds this value
error-threshold: 500
ihave: # Max number of ihave messages in a sample to be inspected. If the number of ihave messages exceeds this configured value
# the control message ihaves will be truncated to the max sample size. This sample is randomly selected.
inspection-queue:
# Rpc validation inspector number of pool workers
workers: 5
# The size of the queue used by worker pool for the control message validation inspector
queue-size: 100
publish-messages:
# The maximum number of messages in a single RPC message that are randomly sampled for async inspection.
# When the size of a single RPC message exceeds this threshold, a random sample is taken for inspection, but the RPC message is not truncated.
max-sample-size: 1000
# Max number of ihave message ids in a sample to be inspected per ihave. Each ihave message includes a list of message ids
# each. If the size of the message ids list for a single ihave message exceeds the configured max message id sample size the list of message ids will be truncated.
max-message-id-sample-size: 1000
# The threshold at which an error will be returned if the number of invalid RPC messages exceeds this value
error-threshold: 500
graft-and-prune:
# The maximum number of GRAFT or PRUNE messages in a single RPC message.
# When the total number of GRAFT or PRUNE messages in a single RPC message exceeds this threshold,
# a random sample of GRAFT or PRUNE messages will be taken and the RPC message will be truncated to this sample size.
message-count-threshold: 1000
# Maximum number of total duplicate topic ids in a single GRAFT or PRUNE message, ideally this should be 0 but we allow for some tolerance
# to avoid penalizing peers that are not malicious but are misbehaving due to bugs or other issues.
# A topic id is considered duplicate if it appears more than once in a single GRAFT or PRUNE message.
duplicate-topic-id-threshold: 50
ihave:
# The maximum allowed number of iHave messages in a single RPC message.
# Each iHave message represents the list of message ids. When the total number of iHave messages
# in a single RPC message exceeds this threshold, a random sample of iHave messages will be taken and the RPC message will be truncated to this sample size.
# The sample size is equal to the configured message-count-threshold.
message-count-threshold: 1000
# The maximum allowed number of message ids in a single iHave message.
# Each iHave message represents the list of message ids for a specific topic, and this parameter controls the maximum number of message ids
# that can be included in a single iHave message. When the total number of message ids in a single iHave message exceeds this threshold,
# a random sample of message ids will be taken and the iHave message will be truncated to this sample size.
# The sample size is equal to the configured message-id-count-threshold.
message-id-count-threshold: 1000
# The tolerance threshold for having duplicate topics in an iHave message under inspection.
# When the total number of duplicate topic ids in a single iHave message exceeds this threshold, the inspection of message will fail.
# Note that a topic ID is counted as a duplicate only if it is repeated more than once.
duplicate-topic-id-threshold: 50
# Threshold of tolerance for having duplicate message IDs in a single iHave message under inspection.
# When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail.
# Ideally, an iHave message should not have any duplicate message IDs, hence a message id is considered duplicate when it is repeated more than once
# within the same iHave message. When the total number of duplicate message ids in a single iHave message exceeds this threshold, the inspection of message will fail.
duplicate-message-id-threshold: 100
iwant:
# Max number of iwant messages in a sample to be inspected. If the total number of iWant control messages
# exceeds this max sample size then the respective message will be truncated before being processed.
max-sample-size: 1000
# Max number of iwant message ids in a sample to be inspected per iwant. Each iwant message includes a list of message ids
# each, if the size of this list exceeds the configured max message id sample size the list of message ids will be truncated.
max-message-id-sample-size: 1000
# The allowed threshold of iWant messages received without a corresponding tracked iHave message that was sent. If the cache miss threshold is exceeded an
# invalid control message notification is disseminated and the sender will be penalized.
cache-miss-threshold: .5
# The iWants size at which message id cache misses will be checked.
cache-miss-check-size: 1000
# The max allowed duplicate message IDs in a single iWant control message. If the duplicate message threshold is exceeded an invalid control message
# notification is disseminated and the sender will be penalized.
duplicate-message-id-threshold: .15
# The maximum allowed number of iWant messages in a single RPC message.
# Each iWant message represents the list of message ids. When the total number of iWant messages
# in a single RPC message exceeds this threshold, a random sample of iWant messages will be taken and the RPC message will be truncated to this sample size.
# The sample size is equal to the configured message-count-threshold.
message-count-threshold: 1000
# The maximum allowed number of message ids in a single iWant message.
# Each iWant message represents the list of message ids for a specific topic, and this parameter controls the maximum number of message ids
# that can be included in a single iWant message. When the total number of message ids in a single iWant message exceeds this threshold,
# a random sample of message ids will be taken and the iWant message will be truncated to this sample size.
# The sample size is equal to the configured message-id-count-threshold.
message-id-count-threshold: 1000
# The allowed threshold of iWant messages received without a corresponding tracked iHave message that was sent.
# If the cache miss threshold is exceeded an invalid control message notification is disseminated and the sender will be penalized.
cache-miss-threshold: 500
# The max allowed number of duplicate message ids in a single iwant message.
# Note that ideally there should be no duplicate message ids in a single iwant message but
# we allow for some tolerance to avoid penalizing peers that are not malicious
duplicate-message-id-threshold: 100
cluster-prefixed-messages:
# Cluster prefixed control message validation configs
# The size of the cache used to track the amount of cluster prefixed topics received by peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc_inspector
import (
"context"
"fmt"
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestValidationInspector_InvalidTopicId_Detection(t *testing.T) {
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation

messageCount := 100
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1
controlMessageCount := int64(1)

count := atomic.NewUint64(0)
Expand Down Expand Up @@ -179,9 +180,11 @@ func TestValidationInspector_DuplicateTopicId_Detection(t *testing.T) {
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation

inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1

messageCount := 10
// sets the message count to the max of the duplicate topic id threshold for graft and prune control messages to ensure
// a successful attack
messageCount := int(math.Max(float64(inspectorConfig.IHave.DuplicateTopicIdThreshold), float64(inspectorConfig.GraftPrune.DuplicateTopicIdThreshold))) + 2
controlMessageCount := int64(1)

count := atomic.NewInt64(0)
Expand Down Expand Up @@ -289,7 +292,7 @@ func TestValidationInspector_IHaveDuplicateMessageId_Detection(t *testing.T) {
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation

inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1

count := atomic.NewInt64(0)
done := make(chan struct{})
Expand All @@ -301,7 +304,7 @@ func TestValidationInspector_IHaveDuplicateMessageId_Detection(t *testing.T) {
notification, ok := args[0].(*p2p.InvCtrlMsgNotif)
require.True(t, ok)
require.Equal(t, notification.TopicType, p2p.CtrlMsgNonClusterTopicType, "IsClusterPrefixed is expected to be false, no RPC with cluster prefixed topic sent in this test")
require.True(t, validation.IsDuplicateTopicErr(notification.Error))
require.True(t, validation.IsDuplicateMessageIDErr(notification.Error))
require.Equal(t, spammer.SpammerNode.ID(), notification.PeerID)
require.True(t,
notification.MsgType == p2pmsg.CtrlMsgIHave,
Expand Down Expand Up @@ -361,21 +364,25 @@ func TestValidationInspector_IHaveDuplicateMessageId_Detection(t *testing.T) {
validationInspector.Start(signalerCtx)
nodes := []p2p.LibP2PNode{victimNode, spammer.SpammerNode}
startNodesAndEnsureConnected(t, signalerCtx, nodes, sporkID)
// to suppress peers provider not set
p2ptest.RegisterPeerProviders(t, nodes)
spammer.Start(t)
defer stopComponents(t, cancel, nodes, validationInspector)

// generate 2 control messages with iHaves for different topics
ihaveCtlMsgs1 := spammer.GenerateCtlMessages(1, p2ptest.WithIHave(1, 1, pushBlocks.String()))
ihaveCtlMsgs2 := spammer.GenerateCtlMessages(1, p2ptest.WithIHave(1, 1, reqChunks.String()))

// duplicate message ids for a single topic is invalid and will cause an error
ihaveCtlMsgs1[0].Ihave[0].MessageIDs = append(ihaveCtlMsgs1[0].Ihave[0].MessageIDs, ihaveCtlMsgs1[0].Ihave[0].MessageIDs[0])
// duplicate message ids across different topics is valid
ihaveCtlMsgs2[0].Ihave[0].MessageIDs[0] = ihaveCtlMsgs1[0].Ihave[0].MessageIDs[0]
messageIdCount := inspectorConfig.IHave.DuplicateMessageIdThreshold + 2
messageIds := unittest.IdentifierListFixture(1)
for i := 0; i < messageIdCount; i++ {
messageIds = append(messageIds, messageIds[0])
}
// prepares 2 control messages with iHave messages for different topics with duplicate message IDs
ihaveCtlMsgs1 := spammer.GenerateCtlMessages(
1,
p2ptest.WithIHaveMessageIDs(messageIds.Strings(), pushBlocks.String()),
p2ptest.WithIHaveMessageIDs(messageIds.Strings(), reqChunks.String()))

// start spamming the victim peer
spammer.SpamControlMessage(t, victimNode, ihaveCtlMsgs1)
spammer.SpamControlMessage(t, victimNode, ihaveCtlMsgs2)

unittest.RequireCloseBefore(t, done, 5*time.Second, "failed to inspect RPC messages on time")
// ensure we receive the expected number of invalid control message notifications
Expand All @@ -393,7 +400,7 @@ func TestValidationInspector_UnknownClusterId_Detection(t *testing.T) {
// set hard threshold to 0 so that in the case of invalid cluster ID
// we force the inspector to return an error
inspectorConfig.ClusterPrefixedMessage.HardThreshold = 0
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1

// SafetyThreshold < messageCount < HardThreshold ensures that the RPC message will be further inspected and topic IDs will be checked
// restricting the message count to 1 allows us to only aggregate a single error when the error is logged in the inspector.
Expand Down Expand Up @@ -502,7 +509,7 @@ func TestValidationInspector_ActiveClusterIdsNotSet_Graft_Detection(t *testing.T
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation
inspectorConfig.ClusterPrefixedMessage.HardThreshold = 5
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1
controlMessageCount := int64(10)

count := atomic.NewInt64(0)
Expand Down Expand Up @@ -590,7 +597,7 @@ func TestValidationInspector_ActiveClusterIdsNotSet_Prune_Detection(t *testing.T
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation
inspectorConfig.ClusterPrefixedMessage.HardThreshold = 5
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1
controlMessageCount := int64(10)

count := atomic.NewInt64(0)
Expand Down Expand Up @@ -676,7 +683,7 @@ func TestValidationInspector_UnstakedNode_Detection(t *testing.T) {
// set hard threshold to 0 so that in the case of invalid cluster ID
// we force the inspector to return an error
inspectorConfig.ClusterPrefixedMessage.HardThreshold = 0
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1

// SafetyThreshold < messageCount < HardThreshold ensures that the RPC message will be further inspected and topic IDs will be checked
// restricting the message count to 1 allows us to only aggregate a single error when the error is logged in the inspector.
Expand Down Expand Up @@ -771,11 +778,9 @@ func TestValidationInspector_InspectIWants_CacheMissThreshold(t *testing.T) {
flowConfig, err := config.DefaultConfig()
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation
// force all cache miss checks
inspectorConfig.IWant.CacheMissCheckSize = 1
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.IWant.CacheMissThreshold = .5 // set cache miss threshold to 50%
messageCount := 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1
inspectorConfig.IWant.CacheMissThreshold = 10
messageCount := 10
controlMessageCount := int64(1)
cacheMissThresholdNotifCount := atomic.NewUint64(0)
done := make(chan struct{})
Expand Down Expand Up @@ -875,7 +880,7 @@ func TestValidationInspector_InspectRpcPublishMessages(t *testing.T) {
flowConfig, err := config.DefaultConfig()
require.NoError(t, err)
inspectorConfig := flowConfig.NetworkConfig.GossipSub.RpcInspector.Validation
inspectorConfig.NumberOfWorkers = 1
inspectorConfig.InspectionQueue.NumberOfWorkers = 1

idProvider := mock.NewIdentityProvider(t)
spammer := corruptlibp2p.NewGossipSubRouterSpammer(t, sporkID, role, idProvider)
Expand Down Expand Up @@ -968,7 +973,7 @@ func TestValidationInspector_InspectRpcPublishMessages(t *testing.T) {
topicProvider.UpdateTopics(topics)

// after 7 errors encountered disseminate a notification
inspectorConfig.MessageErrorThreshold = 6
inspectorConfig.PublishMessages.ErrorThreshold = 6

require.NoError(t, err)
corruptInspectorFunc := corruptlibp2p.CorruptInspectorFunc(validationInspector)
Expand Down Expand Up @@ -1087,12 +1092,14 @@ func TestGossipSubSpamMitigationIntegration(t *testing.T) {
graftCtlMsgsWithUnknownTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithGraft(spamRpcCount, unknownTopic.String()))
graftCtlMsgsWithMalformedTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithGraft(spamRpcCount, malformedTopic.String()))
graftCtlMsgsInvalidSporkIDTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithGraft(spamRpcCount, invalidSporkIDTopic.String()))
graftCtlMsgsDuplicateTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithGraft(3, duplicateTopic.String()))
graftCtlMsgsDuplicateTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), // sets duplicate to +2 above the threshold to ensure that the victim node will penalize the spammer node
p2ptest.WithGraft(cfg.NetworkConfig.GossipSub.RpcInspector.Validation.GraftPrune.DuplicateTopicIdThreshold+2, duplicateTopic.String()))

pruneCtlMsgsWithUnknownTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithPrune(spamRpcCount, unknownTopic.String()))
pruneCtlMsgsWithMalformedTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithPrune(spamRpcCount, malformedTopic.String()))
pruneCtlMsgsInvalidSporkIDTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithGraft(spamRpcCount, invalidSporkIDTopic.String()))
pruneCtlMsgsDuplicateTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), p2ptest.WithPrune(3, duplicateTopic.String()))
pruneCtlMsgsDuplicateTopic := spammer.GenerateCtlMessages(int(spamCtrlMsgCount), // sets duplicate to +2 above the threshold to ensure that the victim node will penalize the spammer node
p2ptest.WithPrune(cfg.NetworkConfig.GossipSub.RpcInspector.Validation.GraftPrune.DuplicateTopicIdThreshold+2, duplicateTopic.String()))

// start spamming the victim peer
spammer.SpamControlMessage(t, victimNode, graftCtlMsgsWithUnknownTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,20 @@ func TestGossipSubIHaveBrokenPromises_Above_Threshold(t *testing.T) {
conf, err := config.DefaultConfig()
require.NoError(t, err)
// overcompensate for RPC truncation
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MaxSampleSize = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MaxMessageIDSampleSize = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MaxSampleSize = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MaxMessageIDSampleSize = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MessageCountThreshold = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MessageIdCountThreshold = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MessageCountThreshold = 10000
conf.NetworkConfig.GossipSub.RpcInspector.Validation.IHave.MessageIdCountThreshold = 10000
// we override the decay interval to 1 second so that the score is updated within 1 second intervals.
conf.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Internal.DecayInterval = 1 * time.Second
// score tracer interval is set to 500 milliseconds to speed up the test, it should be shorter than the heartbeat interval (1 second) of gossipsub to catch the score updates in time.
conf.NetworkConfig.GossipSub.RpcTracer.ScoreTracerInterval = 500 * time.Millisecond

// relaxing the scoring parameters to fit the test scenario.
conf.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Internal.Behaviour.PenaltyDecay = 0.99
conf.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Internal.Behaviour.PenaltyThreshold = 10
conf.NetworkConfig.GossipSub.ScoringParameters.PeerScoring.Internal.Behaviour.PenaltyWeight = -1

ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
// we override some of the default scoring parameters in order to speed up the test in a time-efficient manner.
Expand Down
Loading

0 comments on commit 89594b2

Please sign in to comment.