diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 390cad33d43..5edd2629ee2 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1191,11 +1191,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri return nil, fmt.Errorf("could not create connection manager: %w", err) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - networkMetrics, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) libp2pNode, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 6f825421278..78ddc464fb7 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -702,11 +702,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/config/default-config.yml b/config/default-config.yml index 371fc4c385c..9834694b0e2 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -63,6 +63,9 @@ network-config: # The default interval at which the gossipsub score tracer logs the peer scores. This is used for debugging and forensics purposes. # Note that we purposefully choose this logging interval high enough to avoid spamming the logs. gossipsub-score-tracer-interval: 1m + # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. + # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. + gossipsub-rpc-sent-tracker-cache-size: 1_000_000 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs diff --git a/follower/follower_builder.go b/follower/follower_builder.go index b8f3eaa5bc4..e2eb43cb49c 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -604,11 +604,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/module/metrics/herocache.go b/module/metrics/herocache.go index 54e287bdb1b..f3a88341c87 100644 --- a/module/metrics/herocache.go +++ b/module/metrics/herocache.go @@ -146,6 +146,15 @@ func GossipSubRPCInspectorQueueMetricFactory(f HeroCacheMetricsFactory, networkT return f(namespaceNetwork, r) } +func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { + // we don't use the public prefix for the metrics here for sake of backward compatibility of metric name. + r := ResourceNetworkingRPCSentTrackerCache + if networkType == network.PublicNetwork { + r = PrependPublicPrefix(r) + } + return f(namespaceNetwork, r) +} + func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { r := ResourceNetworkingRpcInspectorNotificationQueue if networkType == network.PublicNetwork { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 353e1b3ca25..9febc9ab391 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -92,6 +92,7 @@ const ( ResourceNetworkingApplicationLayerSpamReportQueue = "application_layer_spam_report_queue" ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" + ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 40229337dfa..29ee0509fbb 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -102,11 +102,16 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif idProvider := id.NewFixedIdentityProvider(nodeIds) defaultFlowConfig, err := config.DefaultConfig() require.NoError(t, err) - meshTracer := tracer.NewGossipSubMeshTracer( - logger, - metrics.NewNoopCollector(), - idProvider, - defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) builder := p2pbuilder.NewNodeBuilder( logger, diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 3d8e6357e76..bdf821aa60b 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -37,9 +37,10 @@ const ( gracePeriod = "libp2p-grace-period" silencePeriod = "libp2p-silence-period" // gossipsub - peerScoring = "gossipsub-peer-scoring-enabled" - localMeshLogInterval = "gossipsub-local-mesh-logging-interval" - scoreTracerInterval = "gossipsub-score-tracer-interval" + peerScoring = "gossipsub-peer-scoring-enabled" + localMeshLogInterval = "gossipsub-local-mesh-logging-interval" + rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" + scoreTracerInterval = "gossipsub-score-tracer-interval" // gossipsub validation inspector gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size" validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers" @@ -66,7 +67,7 @@ func AllFlagNames() []string { return []string{ networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay, dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio, - fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, scoreTracerInterval, + fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage, ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval, @@ -107,6 +108,7 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Bool(peerScoring, config.GossipSubConfig.PeerScoring, "enabling peer scoring on pubsub network") flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub") flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable") + flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") diff --git a/network/p2p/inspector/internal/cache/cache.go b/network/p2p/inspector/internal/cache/cache.go index 133fd0a9ac7..82d8f781a98 100644 --- a/network/p2p/inspector/internal/cache/cache.go +++ b/network/p2p/inspector/internal/cache/cache.go @@ -40,9 +40,7 @@ type RecordCache struct { // NewRecordCache creates a new *RecordCache. // Args: -// - sizeLimit: the maximum number of records that the cache can hold. -// - logger: the logger used by the cache. -// - collector: the metrics collector used by the cache. +// - config: record cache config. // - recordEntityFactory: a factory function that creates a new spam record. // Returns: // - *RecordCache, the created cache. diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index c0a6412297c..8e550b4fa94 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/netconf" "github.com/onflow/flow-go/network/p2p" @@ -494,7 +495,16 @@ func DefaultNodeBuilder( builder.EnableGossipSubPeerScoring(nil) } - meshTracer := tracer.NewGossipSubMeshTracer(logger, metricsCfg.Metrics, idProvider, gossipCfg.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) + builder.SetGossipSubTracer(meshTracer) builder.SetGossipSubScoreTracerInterval(gossipCfg.ScoreTracerInterval) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index f9155129efd..d297f5cba8b 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -21,6 +21,8 @@ type GossipSubConfig struct { LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. + RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` // PeerScoring is whether to enable GossipSub peer scoring. PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 7cd4dd2b692..1cc25fd2565 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/network/p2p" + "github.com/onflow/flow-go/network/p2p/tracer/internal" "github.com/onflow/flow-go/utils/logging" ) @@ -43,23 +44,35 @@ type GossipSubMeshTracer struct { idProvider module.IdentityProvider loggerInterval time.Duration metrics module.GossipSubLocalMeshMetrics + rpcSentTracker *internal.RPCSentTracker } var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) -func NewGossipSubMeshTracer( - logger zerolog.Logger, - metrics module.GossipSubLocalMeshMetrics, - idProvider module.IdentityProvider, - loggerInterval time.Duration) *GossipSubMeshTracer { +type GossipSubMeshTracerConfig struct { + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheCollector module.HeroCacheMetrics + RpcSentTrackerCacheSize uint32 +} +// NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. +// Args: +// - *GossipSubMeshTracerConfig: the mesh tracer config. +// Returns: +// - *GossipSubMeshTracer: new mesh tracer. +func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { + rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), - idProvider: idProvider, - metrics: metrics, - logger: logger.With().Str("component", "gossip_sub_topology_tracer").Logger(), - loggerInterval: loggerInterval, + idProvider: config.IDProvider, + metrics: config.Metrics, + logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(), + loggerInterval: config.LoggerInterval, + rpcSentTracker: rpcSentTracker, } g.Component = component.NewComponentManagerBuilder(). @@ -139,6 +152,15 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { lg.Info().Hex("flow_id", logging.ID(id.NodeID)).Str("role", id.Role.String()).Msg("pruned peer") } +// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. +// This function can be updated to track other control messages in the future as required. +func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { + switch { + case len(rpc.GetControl().GetIhave()) > 0: + t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + } +} + // logLoop logs the mesh peers of the local node for each topic at a regular interval. func (t *GossipSubMeshTracer) logLoop(ctx irrecoverable.SignalerContext) { ticker := time.NewTicker(t.loggerInterval) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index fc14b280282..a2da0584f94 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -11,8 +11,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p" @@ -29,6 +31,8 @@ import ( // One of the nodes is running with an unknown peer id, which the identity provider is mocked to return an error and // the mesh tracer should log a warning message. func TestGossipSubMeshTracer(t *testing.T) { + defaultConfig, err := config.DefaultConfig() + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) sporkId := unittest.IdentifierFixture() @@ -61,7 +65,15 @@ func TestGossipSubMeshTracer(t *testing.T) { // we only need one node with a meshTracer to test the meshTracer. // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) - meshTracer := tracer.NewGossipSubMeshTracer(logger, collector, idProvider, 1*time.Second) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: time.Second, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( t, sporkId, diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go new file mode 100644 index 00000000000..b916133b270 --- /dev/null +++ b/network/p2p/tracer/internal/cache.go @@ -0,0 +1,86 @@ +package internal + +import ( + "fmt" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata" + "github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool" + "github.com/onflow/flow-go/module/mempool/stdmap" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// rpcCtrlMsgSentCacheConfig configuration for the rpc sent cache. +type rpcCtrlMsgSentCacheConfig struct { + logger zerolog.Logger + sizeLimit uint32 + collector module.HeroCacheMetrics +} + +// rpcSentCache cache that stores rpcSentEntity. These entity's represent RPC control messages sent from the local node. +type rpcSentCache struct { + // c is the underlying cache. + c *stdmap.Backend +} + +// newRPCSentCache creates a new *rpcSentCache. +// Args: +// - config: record cache config. +// Returns: +// - *rpcSentCache: the created cache. +// Note that this cache is intended to track control messages sent by the local node, +// it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked. +func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { + backData := herocache.NewCache(config.sizeLimit, + herocache.DefaultOversizeFactor, + heropool.LRUEjection, + config.logger.With().Str("mempool", "gossipsub-rpc-control-messages-sent").Logger(), + config.collector) + return &rpcSentCache{ + c: stdmap.NewBackend(stdmap.WithBackData(backData)), + } +} + +// add initializes the record cached for the given messageEntityID if it does not exist. +// Returns true if the record is initialized, false otherwise (i.e.: the record already exists). +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). +// Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the +// subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). +func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType)) +} + +// has checks if the RPC message has been cached indicating it has been sent. +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - bool: true if the RPC has been cache indicating it was sent from the local node. +func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType)) +} + +// size returns the number of records in the cache. +func (r *rpcSentCache) size() uint { + return r.c.Size() +} + +// rpcSentEntityID creates an entity ID from the topic, messageID and control message type. +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - flow.Identifier: the entity ID. +func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType))) +} diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go new file mode 100644 index 00000000000..c92b42b5e02 --- /dev/null +++ b/network/p2p/tracer/internal/cache_test.go @@ -0,0 +1,122 @@ +package internal + +import ( + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestCache_Add tests the add method of the rpcSentCache. +// It ensures that the method returns true when a new record is initialized +// and false when an existing record is initialized. +func TestCache_Add(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageID1 := unittest.IdentifierFixture().String() + messageID2 := unittest.IdentifierFixture().String() + + // test initializing a record for an ID that doesn't exist in the cache + initialized := cache.add(topic, messageID1, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + + // test initializing a record for an ID that already exists in the cache + initialized = cache.add(topic, messageID1, controlMsgType) + require.False(t, initialized, "expected record not to be initialized") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + + // test initializing a record for another ID + initialized = cache.add(topic, messageID2, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(topic, messageID2, controlMsgType), "expected record to exist") +} + +// TestCache_ConcurrentInit tests the concurrent initialization of records. +// The test covers the following scenarios: +// 1. Multiple goroutines initializing records for different ids. +// 2. Ensuring that all records are correctly initialized. +func TestCache_ConcurrentAdd(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageIds := unittest.IdentifierListFixture(10) + + var wg sync.WaitGroup + wg.Add(len(messageIds)) + + for _, id := range messageIds { + go func(id flow.Identifier) { + defer wg.Done() + cache.add(topic, id.String(), controlMsgType) + }(id) + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that all records are correctly initialized + for _, id := range messageIds { + require.True(t, cache.has(topic, id.String(), controlMsgType)) + } +} + +// TestCache_ConcurrentSameRecordInit tests the concurrent initialization of the same record. +// The test covers the following scenarios: +// 1. Multiple goroutines attempting to initialize the same record concurrently. +// 2. Only one goroutine successfully initializes the record, and others receive false on initialization. +// 3. The record is correctly initialized in the cache and can be retrieved using the Get method. +func TestCache_ConcurrentSameRecordAdd(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageID := unittest.IdentifierFixture().String() + const concurrentAttempts = 10 + + var wg sync.WaitGroup + wg.Add(concurrentAttempts) + + successGauge := atomic.Int32{} + + for i := 0; i < concurrentAttempts; i++ { + go func() { + defer wg.Done() + initSuccess := cache.add(topic, messageID, controlMsgType) + if initSuccess { + successGauge.Inc() + } + }() + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that only one goroutine successfully initialized the record + require.Equal(t, int32(1), successGauge.Load()) + + // ensure that the record is correctly initialized in the cache + require.True(t, cache.has(topic, messageID, controlMsgType)) +} + +// cacheFixture returns a new *RecordCache. +func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *rpcSentCache { + config := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + r := newRPCSentCache(config) + // expect cache to be empty + require.Equalf(t, uint(0), r.size(), "cache size must be 0") + require.NotNil(t, r) + return r +} diff --git a/network/p2p/tracer/internal/rpc_send_entity.go b/network/p2p/tracer/internal/rpc_send_entity.go new file mode 100644 index 00000000000..b3f56ce0b55 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_send_entity.go @@ -0,0 +1,37 @@ +package internal + +import ( + "github.com/onflow/flow-go/model/flow" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// rpcSentEntity struct representing an RPC control message sent from local node. +// This struct implements the flow.Entity interface and uses messageID field deduplication. +type rpcSentEntity struct { + // messageID the messageID of the rpc control message. + messageID flow.Identifier + // controlMsgType the control message type. + controlMsgType p2pmsg.ControlMessageType +} + +var _ flow.Entity = (*rpcSentEntity)(nil) + +// ID returns the node ID of the sender, which is used as the unique identifier of the entity for maintenance and +// deduplication purposes in the cache. +func (r rpcSentEntity) ID() flow.Identifier { + return r.messageID +} + +// Checksum returns the node ID of the sender, it does not have any purpose in the cache. +// It is implemented to satisfy the flow.Entity interface. +func (r rpcSentEntity) Checksum() flow.Identifier { + return r.messageID +} + +// newRPCSentEntity returns a new rpcSentEntity. +func newRPCSentEntity(id flow.Identifier, controlMessageType p2pmsg.ControlMessageType) rpcSentEntity { + return rpcSentEntity{ + messageID: id, + controlMsgType: controlMessageType, + } +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go new file mode 100644 index 00000000000..6d44ac984a3 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -0,0 +1,47 @@ +package internal + +import ( + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/module" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// RPCSentTracker tracks RPC messages that are sent. +type RPCSentTracker struct { + cache *rpcSentCache +} + +// NewRPCSentTracker returns a new *NewRPCSentTracker. +func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker { + config := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + return &RPCSentTracker{cache: newRPCSentCache(config)} +} + +// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Args: +// - *pubsub.RPC: the rpc sent. +func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { + controlMsgType := p2pmsg.CtrlMsgIHave + for _, iHave := range iHaves { + topicID := iHave.GetTopicID() + for _, messageID := range iHave.GetMessageIDs() { + t.cache.add(topicID, messageID, controlMsgType) + } + } +} + +// WasIHaveRPCSent checks if an iHave control message with the provided message ID was sent. +// Args: +// - string: the topic ID of the iHave RPC. +// - string: the message ID of the iHave RPC. +// Returns: +// - bool: true if the iHave rpc with the provided message ID was sent. +func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { + return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go new file mode 100644 index 00000000000..7b9c4ec9acb --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -0,0 +1,89 @@ +package internal + +import ( + "testing" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/config" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. +func TestNewRPCSentTracker(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) +} + +// TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. +func TestRPCSentTracker_IHave(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) + + t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { + require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) + }) + + t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { + numOfMsgIds := 100 + testCases := []struct { + topic string + messageIDS []string + }{ + {channels.PushBlocks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.ReceiveApprovals.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.SyncCommittee.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.RequestChunks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + } + iHaves := make([]*pb.ControlIHave, len(testCases)) + for i, testCase := range testCases { + testCase := testCase + iHaves[i] = &pb.ControlIHave{ + TopicID: &testCase.topic, + MessageIDs: testCase.messageIDS, + } + } + rpc := rpcFixture(withIhaves(iHaves)) + tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + + for _, testCase := range testCases { + for _, messageID := range testCase.messageIDS { + require.True(t, tracker.WasIHaveRPCSent(testCase.topic, messageID)) + } + } + }) +} + +func mockTracker(t *testing.T) *RPCSentTracker { + logger := zerolog.Nop() + cfg, err := config.DefaultConfig() + require.NoError(t, err) + collector := metrics.NewNoopCollector() + tracker := NewRPCSentTracker(logger, cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, collector) + return tracker +} + +type rpcFixtureOpt func(*pubsub.RPC) + +func withIhaves(iHave []*pb.ControlIHave) rpcFixtureOpt { + return func(rpc *pubsub.RPC) { + rpc.Control.Ihave = iHave + } +} + +func rpcFixture(opts ...rpcFixtureOpt) *pubsub.RPC { + rpc := &pubsub.RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{}, + }, + } + for _, opt := range opts { + opt(rpc) + } + return rpc +}