diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 0607870cf53..d7726213994 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1291,12 +1291,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: networkMetrics, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 8fe2a0f8d91..35c4f5c3a26 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -716,12 +716,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/config/default-config.yml b/config/default-config.yml index 9834694b0e2..c14f7e13c0b 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -66,6 +66,10 @@ network-config: # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. gossipsub-rpc-sent-tracker-cache-size: 1_000_000 + # Cache size of the rpc sent tracker queue used for async tracking. + gossipsub-rpc-sent-tracker-queue-cache-size: 100_000 + # Number of workers for rpc sent tracker worker pool. + gossipsub-rpc-sent-tracker-workers: 5 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs diff --git a/follower/follower_builder.go b/follower/follower_builder.go index e2eb43cb49c..27e69ce039c 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -605,12 +605,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/module/component/component.go b/module/component/component.go index 34f8f61cf14..07fc387e077 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -157,10 +157,10 @@ func NewComponentManagerBuilder() ComponentManagerBuilder { return &componentManagerBuilderImpl{} } -// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder // All worker functions will be run in parallel when the ComponentManager is started. // Note: AddWorker is not concurrency-safe, and should only be called on an individual builder -// within a single goroutine. +// within a single goroutine.// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder + func (c *componentManagerBuilderImpl) AddWorker(worker ComponentWorker) ComponentManagerBuilder { c.workers = append(c.workers, worker) return c diff --git a/module/metrics/herocache.go b/module/metrics/herocache.go index f3a88341c87..f82cd84bb57 100644 --- a/module/metrics/herocache.go +++ b/module/metrics/herocache.go @@ -155,6 +155,15 @@ func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType return f(namespaceNetwork, r) } +func GossipSubRPCSentTrackerQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { + // we don't use the public prefix for the metrics here for sake of backward compatibility of metric name. + r := ResourceNetworkingRPCSentTrackerQueue + if networkType == network.PublicNetwork { + r = PrependPublicPrefix(r) + } + return f(namespaceNetwork, r) +} + func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { r := ResourceNetworkingRpcInspectorNotificationQueue if networkType == network.PublicNetwork { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 4efb72b1152..c67b9283f38 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -97,6 +97,7 @@ const ( ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache" + ResourceNetworkingRPCSentTrackerQueue = "gossipsub_rpc_sent_tracker_queue" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 29ee0509fbb..7b7365d57ab 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -104,12 +104,15 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif require.NoError(t, err) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metrics.NewNoopCollector(), - IDProvider: idProvider, - LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), + NetworkingType: flownet.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/netconf/flags.go b/network/netconf/flags.go index bdf821aa60b..045755fde7b 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -37,10 +37,12 @@ const ( gracePeriod = "libp2p-grace-period" silencePeriod = "libp2p-silence-period" // gossipsub - peerScoring = "gossipsub-peer-scoring-enabled" - localMeshLogInterval = "gossipsub-local-mesh-logging-interval" - rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" - scoreTracerInterval = "gossipsub-score-tracer-interval" + peerScoring = "gossipsub-peer-scoring-enabled" + localMeshLogInterval = "gossipsub-local-mesh-logging-interval" + rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" + rpcSentTrackerQueueCacheSize = "gossipsub-rpc-sent-tracker-queue-cache-size" + rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers" + scoreTracerInterval = "gossipsub-score-tracer-interval" // gossipsub validation inspector gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size" validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers" @@ -67,8 +69,8 @@ func AllFlagNames() []string { return []string{ networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay, dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio, - fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval, - gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, + fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, rpcSentTrackerQueueCacheSize, rpcSentTrackerNumOfWorkers, + scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage, ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval, } @@ -109,6 +111,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub") flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable") flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.") + flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.") + flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index e27cbc5bedd..ac3f30fd4d0 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -26,7 +26,6 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/module/metrics" flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/netconf" "github.com/onflow/flow-go/network/p2p" @@ -491,12 +490,15 @@ func DefaultNodeBuilder( } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metricsCfg.Metrics, - IDProvider: idProvider, - LoggerInterval: gossipCfg.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), - RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: metricsCfg.HeroCacheFactory, + NetworkingType: flownet.PrivateNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index d297f5cba8b..683dff67fdc 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -17,12 +17,24 @@ type ResourceManagerConfig struct { type GossipSubConfig struct { // GossipSubRPCInspectorsConfig configuration for all gossipsub RPC control message inspectors. GossipSubRPCInspectorsConfig `mapstructure:",squash"` + + // GossipSubTracerConfig is the configuration for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores. + GossipSubTracerConfig `mapstructure:",squash"` + + // PeerScoring is whether to enable GossipSub peer scoring. + PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` +} + +// GossipSubTracerConfig is the config for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores. +type GossipSubTracerConfig struct { // LocalMeshLogInterval is the interval at which the local mesh is logged. - LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` + LocalMeshLogInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. - ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + ScoreTracerInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-score-tracer-interval"` // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. - RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` - // PeerScoring is whether to enable GossipSub peer scoring. - PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` + RPCSentTrackerCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` + // RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking. + RPCSentTrackerQueueCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` + // RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool. + RpcSentTrackerNumOfWorkers int `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-workers"` } diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 1cc25fd2565..fea310a251d 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -13,6 +13,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/tracer/internal" "github.com/onflow/flow-go/utils/logging" @@ -24,6 +26,12 @@ const ( // MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh. MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat" + + // defaultLastHighestIHaveRPCSizeResetInterval is the interval that we reset the tracker of max ihave size sent back + // to a default. We use ihave message max size to determine the health of requested iwants from remote peers. However, + // we don't desire an ihave size anomaly to persist forever, hence, we reset it back to a default every minute. + // The choice of the interval to be a minute is in harmony with the GossipSub decay interval. + defaultLastHighestIHaveRPCSizeResetInterval = time.Minute ) // The GossipSubMeshTracer component in the GossipSub pubsub.RawTracer that is designed to track the local @@ -50,12 +58,15 @@ type GossipSubMeshTracer struct { var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) type GossipSubMeshTracerConfig struct { - Logger zerolog.Logger - Metrics module.GossipSubLocalMeshMetrics - IDProvider module.IdentityProvider - LoggerInterval time.Duration - RpcSentTrackerCacheCollector module.HeroCacheMetrics - RpcSentTrackerCacheSize uint32 + network.NetworkingType + metrics.HeroCacheMetricsFactory + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheSize uint32 + RpcSentTrackerWorkerQueueCacheSize uint32 + RpcSentTrackerNumOfWorkers int } // NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. @@ -64,13 +75,22 @@ type GossipSubMeshTracerConfig struct { // Returns: // - *GossipSubMeshTracer: new mesh tracer. func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { - rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) + lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger() + rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{ + Logger: lg, + RPCSentCacheSize: config.RpcSentTrackerCacheSize, + RPCSentCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(config.HeroCacheMetricsFactory, config.NetworkingType), + WorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(config.HeroCacheMetricsFactory, config.NetworkingType), + WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, + NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + LastHighestIhavesSentResetInterval: defaultLastHighestIHaveRPCSizeResetInterval, + }) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), idProvider: config.IDProvider, metrics: config.Metrics, - logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(), + logger: lg, loggerInterval: config.LoggerInterval, rpcSentTracker: rpcSentTracker, } @@ -80,6 +100,15 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTra ready() g.logLoop(ctx) }). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + lg.Info().Msg("starting rpc sent tracker") + g.rpcSentTracker.Start(ctx) + lg.Info().Msg("rpc sent tracker started") + + <-g.rpcSentTracker.Done() + lg.Info().Msg("rpc sent tracker stopped") + }). Build() return g @@ -155,9 +184,9 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { // SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. // This function can be updated to track other control messages in the future as required. func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { - switch { - case len(rpc.GetControl().GetIhave()) > 0: - t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + err := t.rpcSentTracker.Track(rpc) + if err != nil { + t.logger.Err(err).Bool(logging.KeyNetworkingSecurity, true).Msg("failed to track sent pubsbub rpc") } } diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index a2da0584f94..c8a680df53e 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -66,12 +66,14 @@ func TestGossipSubMeshTracer(t *testing.T) { // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: collector, - IDProvider: idProvider, - LoggerInterval: time.Second, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: time.Second, + HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index b916133b270..655ddf2179f 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -47,26 +47,24 @@ func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { // add initializes the record cached for the given messageEntityID if it does not exist. // Returns true if the record is initialized, false otherwise (i.e.: the record already exists). // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). // Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the // subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). -func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType)) +func (r *rpcSentCache) add(messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(messageId, controlMsgType), controlMsgType)) } // has checks if the RPC message has been cached indicating it has been sent. // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - bool: true if the RPC has been cache indicating it was sent from the local node. -func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType)) +func (r *rpcSentCache) has(messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Has(r.rpcSentEntityID(messageId, controlMsgType)) } // size returns the number of records in the cache. @@ -74,13 +72,12 @@ func (r *rpcSentCache) size() uint { return r.c.Size() } -// rpcSentEntityID creates an entity ID from the topic, messageID and control message type. +// rpcSentEntityID creates an entity ID from the messageID and control message type. // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - flow.Identifier: the entity ID. -func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { - return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType))) +func (r *rpcSentCache) rpcSentEntityID(messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s", messageId, controlMsgType))) } diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index c92b42b5e02..10872b7b7ef 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/channels" p2pmsg "github.com/onflow/flow-go/network/p2p/message" "github.com/onflow/flow-go/utils/unittest" ) @@ -23,24 +22,23 @@ import ( func TestCache_Add(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageID1 := unittest.IdentifierFixture().String() messageID2 := unittest.IdentifierFixture().String() // test initializing a record for an ID that doesn't exist in the cache - initialized := cache.add(topic, messageID1, controlMsgType) + initialized := cache.add(messageID1, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID1, controlMsgType), "expected record to exist") // test initializing a record for an ID that already exists in the cache - initialized = cache.add(topic, messageID1, controlMsgType) + initialized = cache.add(messageID1, controlMsgType) require.False(t, initialized, "expected record not to be initialized") - require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID1, controlMsgType), "expected record to exist") // test initializing a record for another ID - initialized = cache.add(topic, messageID2, controlMsgType) + initialized = cache.add(messageID2, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(topic, messageID2, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID2, controlMsgType), "expected record to exist") } // TestCache_ConcurrentInit tests the concurrent initialization of records. @@ -50,7 +48,6 @@ func TestCache_Add(t *testing.T) { func TestCache_ConcurrentAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageIds := unittest.IdentifierListFixture(10) var wg sync.WaitGroup @@ -59,7 +56,7 @@ func TestCache_ConcurrentAdd(t *testing.T) { for _, id := range messageIds { go func(id flow.Identifier) { defer wg.Done() - cache.add(topic, id.String(), controlMsgType) + cache.add(id.String(), controlMsgType) }(id) } @@ -67,7 +64,7 @@ func TestCache_ConcurrentAdd(t *testing.T) { // ensure that all records are correctly initialized for _, id := range messageIds { - require.True(t, cache.has(topic, id.String(), controlMsgType)) + require.True(t, cache.has(id.String(), controlMsgType)) } } @@ -79,7 +76,6 @@ func TestCache_ConcurrentAdd(t *testing.T) { func TestCache_ConcurrentSameRecordAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageID := unittest.IdentifierFixture().String() const concurrentAttempts = 10 @@ -91,7 +87,7 @@ func TestCache_ConcurrentSameRecordAdd(t *testing.T) { for i := 0; i < concurrentAttempts; i++ { go func() { defer wg.Done() - initSuccess := cache.add(topic, messageID, controlMsgType) + initSuccess := cache.add(messageID, controlMsgType) if initSuccess { successGauge.Inc() } @@ -104,7 +100,7 @@ func TestCache_ConcurrentSameRecordAdd(t *testing.T) { require.Equal(t, int32(1), successGauge.Load()) // ensure that the record is correctly initialized in the cache - require.True(t, cache.has(topic, messageID, controlMsgType)) + require.True(t, cache.has(messageID, controlMsgType)) } // cacheFixture returns a new *RecordCache. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 6d44ac984a3..5c2c842a48c 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -1,47 +1,171 @@ package internal import ( + "crypto/rand" + "fmt" + "sync" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) -// RPCSentTracker tracks RPC messages that are sent. +const ( + iHaveRPCTrackedLog = "ihave rpc tracked successfully" +) + +// trackableRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed +// by the *RPCSentTracker. +type trackableRPC struct { + // Nonce prevents deduplication in the hero store + Nonce []byte + rpc *pubsub.RPC +} + +// lastHighestIHaveRPCSize tracks the last highest rpc control message size the time stamp it was last updated. +type lastHighestIHaveRPCSize struct { + sync.RWMutex + lastSize int64 + lastUpdate time.Time +} + +// RPCSentTracker tracks RPC messages and the size of the last largest iHave rpc control message sent. type RPCSentTracker struct { - cache *rpcSentCache + component.Component + *lastHighestIHaveRPCSize + logger zerolog.Logger + cache *rpcSentCache + workerPool *worker.Pool[trackableRPC] + lastHighestIHaveRPCSizeResetInterval time.Duration +} + +// RPCSentTrackerConfig configuration for the RPCSentTracker. +type RPCSentTrackerConfig struct { + Logger zerolog.Logger + //RPCSentCacheSize size of the *rpcSentCache cache. + RPCSentCacheSize uint32 + // RPCSentCacheCollector metrics collector for the *rpcSentCache cache. + RPCSentCacheCollector module.HeroCacheMetrics + // WorkerQueueCacheCollector metrics factory for the worker pool. + WorkerQueueCacheCollector module.HeroCacheMetrics + // WorkerQueueCacheSize the worker pool herostore cache size. + WorkerQueueCacheSize uint32 + // NumOfWorkers number of workers in the worker pool. + NumOfWorkers int + // LastHighestIhavesSentResetInterval the refresh interval to reset the lastHighestIHaveRPCSize. + LastHighestIhavesSentResetInterval time.Duration } // NewRPCSentTracker returns a new *NewRPCSentTracker. -func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker { - config := &rpcCtrlMsgSentCacheConfig{ - sizeLimit: sizeLimit, - logger: logger, - collector: collector, +func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { + cacheConfig := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: config.RPCSentCacheSize, + logger: config.Logger, + collector: config.RPCSentCacheCollector, + } + + store := queue.NewHeroStore( + config.WorkerQueueCacheSize, + config.Logger, + config.WorkerQueueCacheCollector) + + tracker := &RPCSentTracker{ + logger: config.Logger.With().Str("component", "rpc_sent_tracker").Logger(), + lastHighestIHaveRPCSize: &lastHighestIHaveRPCSize{sync.RWMutex{}, 0, time.Now()}, + cache: newRPCSentCache(cacheConfig), + lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } - return &RPCSentTracker{cache: newRPCSentCache(config)} + tracker.workerPool = worker.NewWorkerPoolBuilder[trackableRPC]( + config.Logger, + store, + tracker.rpcSentWorkerLogic).Build() + + builder := component.NewComponentManagerBuilder() + for i := 0; i < config.NumOfWorkers; i++ { + builder.AddWorker(tracker.workerPool.WorkerLogic()) + } + tracker.Component = builder.Build() + return tracker } -// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Track submits the control message to the worker queue for async tracking. // Args: // - *pubsub.RPC: the rpc sent. -func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { +// All errors returned from this function can be considered benign. +func (t *RPCSentTracker) Track(rpc *pubsub.RPC) error { + n, err := nonce() + if err != nil { + return fmt.Errorf("failed to get track rpc work nonce: %w", err) + } + if ok := t.workerPool.Submit(trackableRPC{Nonce: n, rpc: rpc}); !ok { + return fmt.Errorf("failed to track RPC could not submit work to worker pool") + } + return nil +} + +// rpcSentWorkerLogic tracks control messages sent in *pubsub.RPC. +func (t *RPCSentTracker) rpcSentWorkerLogic(work trackableRPC) error { + switch { + case len(work.rpc.GetControl().GetIhave()) > 0: + iHave := work.rpc.GetControl().GetIhave() + t.iHaveRPCSent(iHave) + t.updateLastHighestIHaveRPCSize(int64(len(iHave))) + t.logger.Info().Int("size", len(iHave)).Msg(iHaveRPCTrackedLog) + } + return nil +} + +func (t *RPCSentTracker) updateLastHighestIHaveRPCSize(size int64) { + t.Lock() + defer t.Unlock() + if t.lastSize < size || time.Since(t.lastUpdate) > t.lastHighestIHaveRPCSizeResetInterval { + // The last highest ihave RPC size is updated if the new size is larger than the current size, or if the time elapsed since the last update surpasses the reset interval. + t.lastSize = size + t.lastUpdate = time.Now() + } +} + +// iHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Args: +// - []*pb.ControlIHave: list of iHave control messages. +func (t *RPCSentTracker) iHaveRPCSent(iHaves []*pb.ControlIHave) { controlMsgType := p2pmsg.CtrlMsgIHave for _, iHave := range iHaves { - topicID := iHave.GetTopicID() for _, messageID := range iHave.GetMessageIDs() { - t.cache.add(topicID, messageID, controlMsgType) + t.cache.add(messageID, controlMsgType) } } } // WasIHaveRPCSent checks if an iHave control message with the provided message ID was sent. // Args: -// - string: the topic ID of the iHave RPC. -// - string: the message ID of the iHave RPC. +// - messageID: the message ID of the iHave RPC. // Returns: // - bool: true if the iHave rpc with the provided message ID was sent. -func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { - return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) +func (t *RPCSentTracker) WasIHaveRPCSent(messageID string) bool { + return t.cache.has(messageID, p2pmsg.CtrlMsgIHave) +} + +// LastHighestIHaveRPCSize returns the last highest size of iHaves sent in an rpc. +func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { + t.RLock() + defer t.RUnlock() + return t.lastSize +} + +// nonce returns random string that is used to store unique items in herocache. +func nonce() ([]byte, error) { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + return nil, err + } + return b, nil } diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 7b9c4ec9acb..9b571e3d5a6 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -1,70 +1,240 @@ package internal import ( + "context" + "os" "testing" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/onflow/flow-go/config" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/utils/unittest" ) // TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. func TestNewRPCSentTracker(t *testing.T) { - tracker := mockTracker(t) + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) } // TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. func TestRPCSentTracker_IHave(t *testing.T) { - tracker := mockTracker(t) + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { - require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) + require.False(t, tracker.WasIHaveRPCSent("message_id")) }) - t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { + t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with iHaveRPCSent", func(t *testing.T) { numOfMsgIds := 100 testCases := []struct { - topic string messageIDS []string }{ - {channels.PushBlocks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.ReceiveApprovals.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.SyncCommittee.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.RequestChunks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, } iHaves := make([]*pb.ControlIHave, len(testCases)) for i, testCase := range testCases { testCase := testCase iHaves[i] = &pb.ControlIHave{ - TopicID: &testCase.topic, MessageIDs: testCase.messageIDS, } } rpc := rpcFixture(withIhaves(iHaves)) - tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + require.NoError(t, tracker.Track(rpc)) + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(len(testCases)*numOfMsgIds) + }, time.Second, 100*time.Millisecond) for _, testCase := range testCases { for _, messageID := range testCase.messageIDS { - require.True(t, tracker.WasIHaveRPCSent(testCase.topic, messageID)) + require.True(t, tracker.WasIHaveRPCSent(messageID)) } } }) + +} + +// TestRPCSentTracker_DuplicateMessageID ensures the worker pool of the RPC tracker processes req with the same message ID but different nonce. +func TestRPCSentTracker_DuplicateMessageID(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + processedWorkLogs := atomic.NewInt64(0) + hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { + if level == zerolog.InfoLevel { + if message == iHaveRPCTrackedLog { + processedWorkLogs.Inc() + } + } + }) + logger := zerolog.New(os.Stdout).Level(zerolog.InfoLevel).Hook(hook) + + tracker := mockTracker(t, time.Minute) + require.NotNil(t, tracker) + tracker.logger = logger + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + messageID := unittest.IdentifierFixture().String() + rpc := rpcFixture(withIhaves([]*pb.ControlIHave{{ + MessageIDs: []string{messageID}, + }})) + // track duplicate RPC's each will be processed by a worker + require.NoError(t, tracker.Track(rpc)) + require.NoError(t, tracker.Track(rpc)) + + // eventually we should have processed both RPCs + require.Eventually(t, func() bool { + return processedWorkLogs.Load() == 2 + }, time.Second, 100*time.Millisecond) } -func mockTracker(t *testing.T) *RPCSentTracker { - logger := zerolog.Nop() +// TestRPCSentTracker_ConcurrentTracking ensures that all message IDs in RPC's are tracked as expected when tracked concurrently. +func TestRPCSentTracker_ConcurrentTracking(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + tracker := mockTracker(t, time.Minute) + require.NotNil(t, tracker) + + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + numOfMsgIds := 100 + numOfRPCs := 100 + rpcs := make([]*pubsub.RPC, numOfRPCs) + for i := 0; i < numOfRPCs; i++ { + i := i + go func() { + rpc := rpcFixture(withIhaves([]*pb.ControlIHave{{MessageIDs: unittest.IdentifierListFixture(numOfMsgIds).Strings()}})) + require.NoError(t, tracker.Track(rpc)) + rpcs[i] = rpc + }() + } + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(numOfRPCs*numOfMsgIds) + }, time.Second, 100*time.Millisecond) + + for _, rpc := range rpcs { + ihaves := rpc.GetControl().GetIhave() + for _, messageID := range ihaves[0].GetMessageIDs() { + require.True(t, tracker.WasIHaveRPCSent(messageID)) + } + } +} + +// TestRPCSentTracker_IHave ensures *RPCSentTracker tracks the last largest iHave size as expected. +func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + tracker := mockTracker(t, 3*time.Second) + require.NotNil(t, tracker) + + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + expectedLastHighestSize := 1000 + // adding a single message ID to the iHave enables us to track the expected cache size by the amount of iHaves. + numOfMessageIds := 1 + testCases := []struct { + rpcFixture *pubsub.RPC + numOfIhaves int + }{ + {rpcFixture(withIhaves(mockIHaveFixture(10, numOfMessageIds))), 10}, + {rpcFixture(withIhaves(mockIHaveFixture(100, numOfMessageIds))), 100}, + {rpcFixture(withIhaves(mockIHaveFixture(expectedLastHighestSize, numOfMessageIds))), expectedLastHighestSize}, + {rpcFixture(withIhaves(mockIHaveFixture(999, numOfMessageIds))), 999}, + {rpcFixture(withIhaves(mockIHaveFixture(23, numOfMessageIds))), 23}, + } + + expectedCacheSize := 0 + for _, testCase := range testCases { + require.NoError(t, tracker.Track(testCase.rpcFixture)) + expectedCacheSize += testCase.numOfIhaves + } + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(expectedCacheSize) + }, time.Second, 100*time.Millisecond) + + require.Equal(t, int64(expectedLastHighestSize), tracker.LastHighestIHaveRPCSize()) + + // after setting sending large RPC lastHighestIHaveRPCSize should reset to 0 after lastHighestIHaveRPCSize reset loop tick + largeIhave := 50000 + require.NoError(t, tracker.Track(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds))))) + require.Eventually(t, func() bool { + return tracker.LastHighestIHaveRPCSize() == int64(largeIhave) + }, 1*time.Second, 100*time.Millisecond) + + // we expect lastHighestIHaveRPCSize to be set to the current rpc size being tracked if it hasn't been updated since the configured lastHighestIHaveRPCSizeResetInterval + expectedEventualLastHighest := 8 + require.Eventually(t, func() bool { + require.NoError(t, tracker.Track(rpcFixture(withIhaves(mockIHaveFixture(expectedEventualLastHighest, numOfMessageIds))))) + return tracker.LastHighestIHaveRPCSize() == int64(expectedEventualLastHighest) + }, 4*time.Second, 100*time.Millisecond) +} + +// mockIHaveFixture generate list of iHaves of size n. Each iHave will be created with m number of random message ids. +func mockIHaveFixture(n, m int) []*pb.ControlIHave { + iHaves := make([]*pb.ControlIHave, n) + for i := 0; i < n; i++ { + // topic does not have to be a valid flow topic, for teting purposes we can use a random string + topic := unittest.IdentifierFixture().String() + iHaves[i] = &pb.ControlIHave{ + TopicID: &topic, + MessageIDs: unittest.IdentifierListFixture(m).Strings(), + } + } + return iHaves +} + +func mockTracker(t *testing.T, lastHighestIhavesSentResetInterval time.Duration) *RPCSentTracker { cfg, err := config.DefaultConfig() require.NoError(t, err) - collector := metrics.NewNoopCollector() - tracker := NewRPCSentTracker(logger, cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, collector) + tracker := NewRPCSentTracker(&RPCSentTrackerConfig{ + Logger: zerolog.Nop(), + RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RPCSentCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + NumOfWorkers: 1, + LastHighestIhavesSentResetInterval: lastHighestIhavesSentResetInterval, + }) return tracker }