From 398b49c0b7dcbb4839dd64e570297ee4e2b86ab6 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 11:19:44 -0400 Subject: [PATCH 01/29] add component and worker pool to rpc sent tracker --- config/default-config.yml | 4 + module/metrics/herocache.go | 9 +++ module/metrics/labels.go | 1 + network/netconf/flags.go | 16 ++-- network/p2p/p2pconf/gossipsub.go | 4 + network/p2p/tracer/gossipSubMeshTracer.go | 41 +++++++--- .../p2p/tracer/internal/rpc_sent_tracker.go | 79 ++++++++++++++++--- 7 files changed, 127 insertions(+), 27 deletions(-) diff --git a/config/default-config.yml b/config/default-config.yml index 9834694b0e2..896668301df 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -66,6 +66,10 @@ network-config: # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. gossipsub-rpc-sent-tracker-cache-size: 1_000_000 + # Cache size of the rpc sent tracker queue used for async tracking. + gossipsub-rpc-sent-tracker-queue-cache-size: 1000 + # Number of workers for rpc sent tracker worker pool. + gossipsub-rpc-sent-tracker-workers: 5 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs diff --git a/module/metrics/herocache.go b/module/metrics/herocache.go index f3a88341c87..f82cd84bb57 100644 --- a/module/metrics/herocache.go +++ b/module/metrics/herocache.go @@ -155,6 +155,15 @@ func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType return f(namespaceNetwork, r) } +func GossipSubRPCSentTrackerQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { + // we don't use the public prefix for the metrics here for sake of backward compatibility of metric name. + r := ResourceNetworkingRPCSentTrackerQueue + if networkType == network.PublicNetwork { + r = PrependPublicPrefix(r) + } + return f(namespaceNetwork, r) +} + func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { r := ResourceNetworkingRpcInspectorNotificationQueue if networkType == network.PublicNetwork { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 9febc9ab391..8000576b4d9 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -93,6 +93,7 @@ const ( ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache" + ResourceNetworkingRPCSentTrackerQueue = "gossipsub_rpc_sent_tracker_queue" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel diff --git a/network/netconf/flags.go b/network/netconf/flags.go index bdf821aa60b..045755fde7b 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -37,10 +37,12 @@ const ( gracePeriod = "libp2p-grace-period" silencePeriod = "libp2p-silence-period" // gossipsub - peerScoring = "gossipsub-peer-scoring-enabled" - localMeshLogInterval = "gossipsub-local-mesh-logging-interval" - rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" - scoreTracerInterval = "gossipsub-score-tracer-interval" + peerScoring = "gossipsub-peer-scoring-enabled" + localMeshLogInterval = "gossipsub-local-mesh-logging-interval" + rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" + rpcSentTrackerQueueCacheSize = "gossipsub-rpc-sent-tracker-queue-cache-size" + rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers" + scoreTracerInterval = "gossipsub-score-tracer-interval" // gossipsub validation inspector gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size" validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers" @@ -67,8 +69,8 @@ func AllFlagNames() []string { return []string{ networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay, dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio, - fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval, - gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, + fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, rpcSentTrackerQueueCacheSize, rpcSentTrackerNumOfWorkers, + scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage, ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval, } @@ -109,6 +111,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub") flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable") flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.") + flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.") + flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index d297f5cba8b..b76ff9f4c9b 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -23,6 +23,10 @@ type GossipSubConfig struct { ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` + // RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking. + RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` + // RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool. + RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"` // PeerScoring is whether to enable GossipSub peer scoring. PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 1cc25fd2565..27b9ebc8f0f 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -50,12 +50,15 @@ type GossipSubMeshTracer struct { var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) type GossipSubMeshTracerConfig struct { - Logger zerolog.Logger - Metrics module.GossipSubLocalMeshMetrics - IDProvider module.IdentityProvider - LoggerInterval time.Duration - RpcSentTrackerCacheCollector module.HeroCacheMetrics - RpcSentTrackerCacheSize uint32 + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheCollector module.HeroCacheMetrics + RpcSentTrackerCacheSize uint32 + RpcSentTrackerWorkerQueueCacheCollector module.HeroCacheMetrics + RpcSentTrackerWorkerQueueCacheSize uint32 + RpcSentTrackerNumOfWorkers int } // NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. @@ -64,13 +67,21 @@ type GossipSubMeshTracerConfig struct { // Returns: // - *GossipSubMeshTracer: new mesh tracer. func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { - rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) + lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger() + rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{ + Logger: lg, + RPCSentCacheSize: config.RpcSentTrackerCacheSize, + RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, + WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, + WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, + NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + }) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), idProvider: config.IDProvider, metrics: config.Metrics, - logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(), + logger: lg, loggerInterval: config.LoggerInterval, rpcSentTracker: rpcSentTracker, } @@ -80,6 +91,15 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTra ready() g.logLoop(ctx) }). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + lg.Debug().Msg("starting rpc sent tracker") + g.rpcSentTracker.Start(ctx) + lg.Debug().Msg("rpc sent tracker started") + + <-g.rpcSentTracker.Done() + lg.Debug().Msg("rpc sent tracker stopped") + }). Build() return g @@ -155,10 +175,7 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { // SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. // This function can be updated to track other control messages in the future as required. func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { - switch { - case len(rpc.GetControl().GetIhave()) > 0: - t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) - } + t.rpcSentTracker.RPCSent(rpc) } // logLoop logs the mesh peers of the local node for each topic at a regular interval. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 6d44ac984a3..bb707883cae 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -1,32 +1,93 @@ package internal import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) +// trackRpcSentWork is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed +// by the *RPCSentTracker. +type trackRpcSentWork struct { + rpc *pubsub.RPC +} + // RPCSentTracker tracks RPC messages that are sent. type RPCSentTracker struct { - cache *rpcSentCache + component.Component + cache *rpcSentCache + workerPool *worker.Pool[trackRpcSentWork] +} + +// RPCSentTrackerConfig configuration for the RPCSentTracker. +type RPCSentTrackerConfig struct { + Logger zerolog.Logger + //RPCSentCacheSize size of the *rpcSentCache cache. + RPCSentCacheSize uint32 + // RPCSentCacheCollector metrics collector for the *rpcSentCache cache. + RPCSentCacheCollector module.HeroCacheMetrics + // WorkerQueueCacheCollector metrics factory for the worker pool. + WorkerQueueCacheCollector module.HeroCacheMetrics + // WorkerQueueCacheSize the worker pool herostore cache size. + WorkerQueueCacheSize uint32 + // NumOfWorkers number of workers in the worker pool. + NumOfWorkers int } // NewRPCSentTracker returns a new *NewRPCSentTracker. -func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker { - config := &rpcCtrlMsgSentCacheConfig{ - sizeLimit: sizeLimit, - logger: logger, - collector: collector, +func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { + cacheConfig := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: config.RPCSentCacheSize, + logger: config.Logger, + collector: config.RPCSentCacheCollector, + } + + store := queue.NewHeroStore( + config.WorkerQueueCacheSize, + config.Logger, + config.WorkerQueueCacheCollector) + + tracker := &RPCSentTracker{cache: newRPCSentCache(cacheConfig)} + tracker.workerPool = worker.NewWorkerPoolBuilder[trackRpcSentWork]( + config.Logger, + store, + tracker.rpcSent).Build() + + builder := component.NewComponentManagerBuilder() + for i := 0; i < config.NumOfWorkers; i++ { + builder.AddWorker(tracker.workerPool.WorkerLogic()) } - return &RPCSentTracker{cache: newRPCSentCache(config)} + tracker.Component = builder.Build() + + return tracker } -// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// RPCSent submits the control message to the worker queue for async tracking. // Args: // - *pubsub.RPC: the rpc sent. -func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { +func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) { + t.workerPool.Submit(trackRpcSentWork{rpc}) +} + +// rpcSent tracks control messages sent in *pubsub.RPC. +func (t *RPCSentTracker) rpcSent(work trackRpcSentWork) error { + switch { + case len(work.rpc.GetControl().GetIhave()) > 0: + t.iHaveRPCSent(work.rpc.GetControl().GetIhave()) + } + return nil +} + +// iHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Args: +// - []*pb.ControlIHave: list of iHave control messages. +func (t *RPCSentTracker) iHaveRPCSent(iHaves []*pb.ControlIHave) { controlMsgType := p2pmsg.CtrlMsgIHave for _, iHave := range iHaves { topicID := iHave.GetTopicID() From e373b014b0e83fef093530c699d38492486c0e89 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 11:23:43 -0400 Subject: [PATCH 02/29] update all usages of NewGossipSubMeshTracer --- cmd/access/node_builder/access_node_builder.go | 15 +++++++++------ cmd/observer/node_builder/observer_builder.go | 15 +++++++++------ follower/follower_builder.go | 15 +++++++++------ network/internal/p2pfixtures/fixtures.go | 15 +++++++++------ network/p2p/p2pbuilder/libp2pNodeBuilder.go | 15 +++++++++------ network/p2p/tracer/gossipSubMeshTracer_test.go | 15 +++++++++------ 6 files changed, 54 insertions(+), 36 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 5edd2629ee2..5e0d790af8e 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1192,12 +1192,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: networkMetrics, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 78ddc464fb7..4f3c5ea7738 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -703,12 +703,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/follower/follower_builder.go b/follower/follower_builder.go index e2eb43cb49c..06626555855 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -605,12 +605,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 29ee0509fbb..5a8e60932c8 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -104,12 +104,15 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif require.NoError(t, err) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metrics.NewNoopCollector(), - IDProvider: idProvider, - LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.NewNoopCollector(), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index 8e550b4fa94..2034b9971f6 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -496,12 +496,15 @@ func DefaultNodeBuilder( } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metricsCfg.Metrics, - IDProvider: idProvider, - LoggerInterval: gossipCfg.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), - RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index a2da0584f94..6fb4e96316f 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -66,12 +66,15 @@ func TestGossipSubMeshTracer(t *testing.T) { // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: collector, - IDProvider: idProvider, - LoggerInterval: time.Second, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: time.Second, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + RpcSentTrackerWorkerQueueCacheCollector: metrics.NewNoopCollector(), } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( From 2a83452ad7136d3fbece08f7160d30fd6420b61b Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 11:39:01 -0400 Subject: [PATCH 03/29] update rpc sent tracker test --- .../tracer/internal/rpc_sent_tracker_test.go | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 7b9c4ec9acb..3293dcafd5c 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -1,7 +1,9 @@ package internal import ( + "context" "testing" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -9,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/onflow/flow-go/config" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/utils/unittest" @@ -22,14 +25,23 @@ func TestNewRPCSentTracker(t *testing.T) { // TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. func TestRPCSentTracker_IHave(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + tracker := mockTracker(t) require.NotNil(t, tracker) + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) }) - t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { + t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with iHaveRPCSent", func(t *testing.T) { numOfMsgIds := 100 testCases := []struct { topic string @@ -49,7 +61,12 @@ func TestRPCSentTracker_IHave(t *testing.T) { } } rpc := rpcFixture(withIhaves(iHaves)) - tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + tracker.RPCSent(rpc) + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(len(testCases)*numOfMsgIds) + }, time.Second, 100*time.Millisecond) for _, testCase := range testCases { for _, messageID := range testCase.messageIDS { @@ -60,11 +77,16 @@ func TestRPCSentTracker_IHave(t *testing.T) { } func mockTracker(t *testing.T) *RPCSentTracker { - logger := zerolog.Nop() cfg, err := config.DefaultConfig() require.NoError(t, err) - collector := metrics.NewNoopCollector() - tracker := NewRPCSentTracker(logger, cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, collector) + tracker := NewRPCSentTracker(&RPCSentTrackerConfig{ + Logger: zerolog.Nop(), + RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RPCSentCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + NumOfWorkers: 1, + }) return tracker } From 14ef77dc8784e243a03e4e60c85ac36b0aceb712 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 11:46:26 -0400 Subject: [PATCH 04/29] Update rpc_sent_tracker.go --- network/p2p/tracer/internal/rpc_sent_tracker.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index bb707883cae..8e7195d1db0 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -12,9 +12,9 @@ import ( p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) -// trackRpcSentWork is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed +// trackRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed // by the *RPCSentTracker. -type trackRpcSentWork struct { +type trackRPC struct { rpc *pubsub.RPC } @@ -22,7 +22,7 @@ type trackRpcSentWork struct { type RPCSentTracker struct { component.Component cache *rpcSentCache - workerPool *worker.Pool[trackRpcSentWork] + workerPool *worker.Pool[trackRPC] } // RPCSentTrackerConfig configuration for the RPCSentTracker. @@ -54,7 +54,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{cache: newRPCSentCache(cacheConfig)} - tracker.workerPool = worker.NewWorkerPoolBuilder[trackRpcSentWork]( + tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC]( config.Logger, store, tracker.rpcSent).Build() @@ -72,11 +72,11 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { // Args: // - *pubsub.RPC: the rpc sent. func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) { - t.workerPool.Submit(trackRpcSentWork{rpc}) + t.workerPool.Submit(trackRPC{rpc}) } // rpcSent tracks control messages sent in *pubsub.RPC. -func (t *RPCSentTracker) rpcSent(work trackRpcSentWork) error { +func (t *RPCSentTracker) rpcSent(work trackRPC) error { switch { case len(work.rpc.GetControl().GetIhave()) > 0: t.iHaveRPCSent(work.rpc.GetControl().GetIhave()) From e38c31e39f83c2a6b65c794010744d2da87a41a7 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 17:11:13 -0400 Subject: [PATCH 05/29] track last highest iHaves size --- module/component/component.go | 15 ++++- .../mock/component_manager_builder.go | 16 ++++++ network/p2p/tracer/gossipSubMeshTracer.go | 5 +- .../p2p/tracer/internal/rpc_sent_tracker.go | 57 +++++++++++++++---- .../tracer/internal/rpc_sent_tracker_test.go | 56 ++++++++++++++++++ 5 files changed, 135 insertions(+), 14 deletions(-) diff --git a/module/component/component.go b/module/component/component.go index 34f8f61cf14..a75534d386a 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -143,7 +143,8 @@ func NoopWorker(ctx irrecoverable.SignalerContext, ready ReadyFunc) { type ComponentManagerBuilder interface { // AddWorker adds a worker routine for the ComponentManager AddWorker(ComponentWorker) ComponentManagerBuilder - + // AddWorkers adds n number of worker routines for the ComponentManager. + AddWorkers(int, ComponentWorker) ComponentManagerBuilder // Build builds and returns a new ComponentManager instance Build() *ComponentManager } @@ -157,15 +158,23 @@ func NewComponentManagerBuilder() ComponentManagerBuilder { return &componentManagerBuilderImpl{} } -// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder // All worker functions will be run in parallel when the ComponentManager is started. // Note: AddWorker is not concurrency-safe, and should only be called on an individual builder -// within a single goroutine. +// within a single goroutine.// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder + func (c *componentManagerBuilderImpl) AddWorker(worker ComponentWorker) ComponentManagerBuilder { c.workers = append(c.workers, worker) return c } +// AddWorkers adds n number of workers for the ComponentManager. +func (c *componentManagerBuilderImpl) AddWorkers(n int, worker ComponentWorker) ComponentManagerBuilder { + for i := 0; i < n; i++ { + c.workers = append(c.workers, worker) + } + return c +} + // Build returns a new ComponentManager instance with the configured workers // Build may be called multiple times to create multiple individual ComponentManagers. This will // result in the worker routines being called multiple times. If this is unsafe, do not call it diff --git a/module/component/mock/component_manager_builder.go b/module/component/mock/component_manager_builder.go index c414ddc6663..570ecda9269 100644 --- a/module/component/mock/component_manager_builder.go +++ b/module/component/mock/component_manager_builder.go @@ -28,6 +28,22 @@ func (_m *ComponentManagerBuilder) AddWorker(_a0 component.ComponentWorker) comp return r0 } +// AddWorkers provides a mock function with given fields: _a0, _a1 +func (_m *ComponentManagerBuilder) AddWorkers(_a0 int, _a1 component.ComponentWorker) component.ComponentManagerBuilder { + ret := _m.Called(_a0, _a1) + + var r0 component.ComponentManagerBuilder + if rf, ok := ret.Get(0).(func(int, component.ComponentWorker) component.ComponentManagerBuilder); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(component.ComponentManagerBuilder) + } + } + + return r0 +} + // Build provides a mock function with given fields: func (_m *ComponentManagerBuilder) Build() *component.ComponentManager { ret := _m.Called() diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 27b9ebc8f0f..cd0d548e3c0 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -175,7 +175,10 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { // SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. // This function can be updated to track other control messages in the future as required. func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { - t.rpcSentTracker.RPCSent(rpc) + err := t.rpcSentTracker.RPCSent(rpc) + if err != nil { + t.logger.Err(err).Msg("failed to track sent pubsbub rpc") + } } // logLoop logs the mesh peers of the local node for each topic at a regular interval. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 8e7195d1db0..09f29436c49 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -1,9 +1,13 @@ package internal import ( + "crypto/rand" + "fmt" + pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" + "go.uber.org/atomic" "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" @@ -15,7 +19,9 @@ import ( // trackRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed // by the *RPCSentTracker. type trackRPC struct { - rpc *pubsub.RPC + // Nonce prevents deduplication in the hero store + Nonce []byte + rpc *pubsub.RPC } // RPCSentTracker tracks RPC messages that are sent. @@ -23,6 +29,8 @@ type RPCSentTracker struct { component.Component cache *rpcSentCache workerPool *worker.Pool[trackRPC] + // lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent. + lastHighestIHaveRPCSize *atomic.Int64 } // RPCSentTrackerConfig configuration for the RPCSentTracker. @@ -53,17 +61,18 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.Logger, config.WorkerQueueCacheCollector) - tracker := &RPCSentTracker{cache: newRPCSentCache(cacheConfig)} + tracker := &RPCSentTracker{ + cache: newRPCSentCache(cacheConfig), + lastHighestIHaveRPCSize: atomic.NewInt64(0), + } tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC]( config.Logger, store, tracker.rpcSent).Build() - builder := component.NewComponentManagerBuilder() - for i := 0; i < config.NumOfWorkers; i++ { - builder.AddWorker(tracker.workerPool.WorkerLogic()) - } - tracker.Component = builder.Build() + tracker.Component = component.NewComponentManagerBuilder(). + AddWorkers(config.NumOfWorkers, tracker.workerPool.WorkerLogic()). + Build() return tracker } @@ -71,19 +80,32 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { // RPCSent submits the control message to the worker queue for async tracking. // Args: // - *pubsub.RPC: the rpc sent. -func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) { - t.workerPool.Submit(trackRPC{rpc}) +func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) error { + n, err := nonce() + if err != nil { + return fmt.Errorf("failed to get track rpc work nonce: %w", err) + } + t.workerPool.Submit(trackRPC{Nonce: n, rpc: rpc}) + return nil } // rpcSent tracks control messages sent in *pubsub.RPC. func (t *RPCSentTracker) rpcSent(work trackRPC) error { switch { case len(work.rpc.GetControl().GetIhave()) > 0: - t.iHaveRPCSent(work.rpc.GetControl().GetIhave()) + iHave := work.rpc.GetControl().GetIhave() + t.iHaveRPCSent(iHave) + t.updateLastHighestIHaveRPCSize(int64(len(iHave))) } return nil } +func (t *RPCSentTracker) updateLastHighestIHaveRPCSize(size int64) { + if t.lastHighestIHaveRPCSize.Load() < size { + t.lastHighestIHaveRPCSize.Store(size) + } +} + // iHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. // Args: // - []*pb.ControlIHave: list of iHave control messages. @@ -106,3 +128,18 @@ func (t *RPCSentTracker) iHaveRPCSent(iHaves []*pb.ControlIHave) { func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) } + +// LastHighestIHaveRPCSize returns the last highest size of iHaves sent in an rpc. +func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { + return t.lastHighestIHaveRPCSize.Load() +} + +// nonce returns random string that is used to store unique items in herocache. +func nonce() ([]byte, error) { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + return nil, err + } + return b, nil +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 3293dcafd5c..47e897442f8 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -76,6 +76,62 @@ func TestRPCSentTracker_IHave(t *testing.T) { }) } +// TestRPCSentTracker_IHave ensures *RPCSentTracker tracks the last largest iHave size as expected. +func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + tracker := mockTracker(t) + require.NotNil(t, tracker) + + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + expectedLastHighestSize := 1000 + // adding a single message ID to the iHave enables us to track the expected cache size by the amount of iHaves. + numOfMessageIds := 1 + testCases := []struct { + rpcFixture *pubsub.RPC + numOfIhaves int + }{ + {rpcFixture(withIhaves(mockIHaveFixture(10, numOfMessageIds))), 10}, + {rpcFixture(withIhaves(mockIHaveFixture(100, numOfMessageIds))), 100}, + {rpcFixture(withIhaves(mockIHaveFixture(expectedLastHighestSize, numOfMessageIds))), expectedLastHighestSize}, + {rpcFixture(withIhaves(mockIHaveFixture(999, numOfMessageIds))), 999}, + {rpcFixture(withIhaves(mockIHaveFixture(23, numOfMessageIds))), 23}, + } + + expectedCacheSize := 0 + for _, testCase := range testCases { + require.NoError(t, tracker.RPCSent(testCase.rpcFixture)) + expectedCacheSize += testCase.numOfIhaves + } + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(expectedCacheSize) + }, time.Second, 100*time.Millisecond) + + require.Equal(t, int64(expectedLastHighestSize), tracker.LastHighestIHaveRPCSize()) +} + +// mockIHaveFixture generate list of iHaves of size n. Each iHave will be created with m number of random message ids. +func mockIHaveFixture(n, m int) []*pb.ControlIHave { + iHaves := make([]*pb.ControlIHave, n) + for i := 0; i < n; i++ { + // topic does not have to be a valid flow topic, for teting purposes we can use a random string + topic := unittest.IdentifierFixture().String() + iHaves[i] = &pb.ControlIHave{ + TopicID: &topic, + MessageIDs: unittest.IdentifierListFixture(m).Strings(), + } + } + return iHaves +} + func mockTracker(t *testing.T) *RPCSentTracker { cfg, err := config.DefaultConfig() require.NoError(t, err) From 20b917e20457aa33e48c943418a82842a2d8b186 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 17:48:39 -0400 Subject: [PATCH 06/29] reset LastHighestIhavesSent every 1 minute --- network/p2p/tracer/gossipSubMeshTracer.go | 16 +++++---- .../p2p/tracer/internal/rpc_sent_tracker.go | 36 +++++++++++++++++-- .../tracer/internal/rpc_sent_tracker_test.go | 32 +++++++++++------ 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index cd0d548e3c0..21bc9e5de6c 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -24,6 +24,9 @@ const ( // MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh. MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat" + + // defaultLastHighestIHaveRPCSizeResetInterval this default interval should always be equal to the gossipsub heart beat interval. + defaultLastHighestIHaveRPCSizeResetInterval = time.Minute ) // The GossipSubMeshTracer component in the GossipSub pubsub.RawTracer that is designed to track the local @@ -69,12 +72,13 @@ type GossipSubMeshTracerConfig struct { func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger() rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{ - Logger: lg, - RPCSentCacheSize: config.RpcSentTrackerCacheSize, - RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, - WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, - WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, - NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + Logger: lg, + RPCSentCacheSize: config.RpcSentTrackerCacheSize, + RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, + WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, + WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, + NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + LastHighestIhavesSentResetInterval: defaultLastHighestIHaveRPCSizeResetInterval, }) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 09f29436c49..63ae18edb81 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -3,6 +3,7 @@ package internal import ( "crypto/rand" "fmt" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -12,6 +13,7 @@ import ( "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -30,7 +32,8 @@ type RPCSentTracker struct { cache *rpcSentCache workerPool *worker.Pool[trackRPC] // lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent. - lastHighestIHaveRPCSize *atomic.Int64 + lastHighestIHaveRPCSize *atomic.Int64 + lastHighestIHaveRPCSizeResetInterval time.Duration } // RPCSentTrackerConfig configuration for the RPCSentTracker. @@ -46,6 +49,8 @@ type RPCSentTrackerConfig struct { WorkerQueueCacheSize uint32 // NumOfWorkers number of workers in the worker pool. NumOfWorkers int + // LastHighestIhavesSentResetInterval the refresh interval to reset the lastHighestIHaveRPCSize. + LastHighestIhavesSentResetInterval time.Duration } // NewRPCSentTracker returns a new *NewRPCSentTracker. @@ -62,8 +67,9 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{ - cache: newRPCSentCache(cacheConfig), - lastHighestIHaveRPCSize: atomic.NewInt64(0), + cache: newRPCSentCache(cacheConfig), + lastHighestIHaveRPCSize: atomic.NewInt64(0), + lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC]( config.Logger, @@ -71,6 +77,10 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { tracker.rpcSent).Build() tracker.Component = component.NewComponentManagerBuilder(). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + tracker.lastHighestIHaveRPCSizeResetLoop(ctx) + }). AddWorkers(config.NumOfWorkers, tracker.workerPool.WorkerLogic()). Build() @@ -134,6 +144,26 @@ func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { return t.lastHighestIHaveRPCSize.Load() } +// lastHighestIHaveRPCSizeResetLoop resets the lastHighestIHaveRPCSize to 0 on each interval tick. +func (t *RPCSentTracker) lastHighestIHaveRPCSizeResetLoop(ctx irrecoverable.SignalerContext) { + ticker := time.NewTicker(t.lastHighestIHaveRPCSizeResetInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + default: + } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.lastHighestIHaveRPCSize.Store(0) + } + } +} + // nonce returns random string that is used to store unique items in herocache. func nonce() ([]byte, error) { b := make([]byte, 16) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 47e897442f8..7d2e9d57a6a 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -19,7 +19,7 @@ import ( // TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. func TestNewRPCSentTracker(t *testing.T) { - tracker := mockTracker(t) + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) } @@ -28,7 +28,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - tracker := mockTracker(t) + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) tracker.Start(signalerCtx) @@ -81,7 +81,7 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - tracker := mockTracker(t) + tracker := mockTracker(t, 3*time.Second) require.NotNil(t, tracker) tracker.Start(signalerCtx) @@ -116,6 +116,17 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { }, time.Second, 100*time.Millisecond) require.Equal(t, int64(expectedLastHighestSize), tracker.LastHighestIHaveRPCSize()) + + // after setting sending large RPC lastHighestIHaveRPCSize should reset to 0 after lastHighestIHaveRPCSize reset loop tick + largeIhave := 50000 + require.NoError(t, tracker.RPCSent(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds))))) + require.Eventually(t, func() bool { + return tracker.LastHighestIHaveRPCSize() == int64(largeIhave) + }, 1*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return tracker.LastHighestIHaveRPCSize() == 0 + }, 4*time.Second, 100*time.Millisecond) } // mockIHaveFixture generate list of iHaves of size n. Each iHave will be created with m number of random message ids. @@ -132,16 +143,17 @@ func mockIHaveFixture(n, m int) []*pb.ControlIHave { return iHaves } -func mockTracker(t *testing.T) *RPCSentTracker { +func mockTracker(t *testing.T, lastHighestIhavesSentResetInterval time.Duration) *RPCSentTracker { cfg, err := config.DefaultConfig() require.NoError(t, err) tracker := NewRPCSentTracker(&RPCSentTrackerConfig{ - Logger: zerolog.Nop(), - RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RPCSentCacheCollector: metrics.NewNoopCollector(), - WorkerQueueCacheCollector: metrics.NewNoopCollector(), - WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - NumOfWorkers: 1, + Logger: zerolog.Nop(), + RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RPCSentCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + NumOfWorkers: 1, + LastHighestIhavesSentResetInterval: lastHighestIhavesSentResetInterval, }) return tracker } From f19483fde61fd2eba129b4749b99815acbfb9336 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 12 Jul 2023 17:52:49 -0400 Subject: [PATCH 07/29] Update rpc_sent_tracker_test.go --- network/p2p/tracer/internal/rpc_sent_tracker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 7d2e9d57a6a..5324be9d459 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -61,7 +61,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { } } rpc := rpcFixture(withIhaves(iHaves)) - tracker.RPCSent(rpc) + require.NoError(t, tracker.RPCSent(rpc)) // eventually we should have tracked numOfMsgIds per single topic require.Eventually(t, func() bool { From bc369b2a6ee7fa3b4ea2b0097052298e3dbdf4e5 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 16:40:04 -0400 Subject: [PATCH 08/29] Update network/p2p/tracer/gossipSubMeshTracer.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/tracer/gossipSubMeshTracer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 21bc9e5de6c..c79e12ab14b 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -25,7 +25,10 @@ const ( // MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh. MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat" - // defaultLastHighestIHaveRPCSizeResetInterval this default interval should always be equal to the gossipsub heart beat interval. + // defaultLastHighestIHaveRPCSizeResetInterval is the interval that we reset the tracker of max ihave size sent back + // to a default. We use ihave message max size to determine the health of requested iwants from remote peers. However, + // we don't desire an ihave size anomaly to persist forever, hence, we reset it back to a default every minute. + // The choice of the interval to be a minute is in harmony with the GossipSub decay interval. defaultLastHighestIHaveRPCSizeResetInterval = time.Minute ) From 7b52ab18decbfceb530800a79ae21b0255731e46 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 16:43:27 -0400 Subject: [PATCH 09/29] encapsulate tracer-related config parameters --- network/p2p/p2pconf/gossipsub.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index b76ff9f4c9b..4578131a6ab 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -17,6 +17,16 @@ type ResourceManagerConfig struct { type GossipSubConfig struct { // GossipSubRPCInspectorsConfig configuration for all gossipsub RPC control message inspectors. GossipSubRPCInspectorsConfig `mapstructure:",squash"` + + // GossipSubTracerConfig is the configuration for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores. + GossipSubTracerConfig `mapstructure:",squash"` + + // PeerScoring is whether to enable GossipSub peer scoring. + PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` +} + +// GossipSubTracerConfig is the config for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores. +type GossipSubTracerConfig struct { // LocalMeshLogInterval is the interval at which the local mesh is logged. LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. @@ -27,6 +37,4 @@ type GossipSubConfig struct { RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` // RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool. RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"` - // PeerScoring is whether to enable GossipSub peer scoring. - PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } From b82247df433b790f467f716fd494df77cb9c02ed Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 16:43:34 -0400 Subject: [PATCH 10/29] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index 896668301df..c14f7e13c0b 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -67,7 +67,7 @@ network-config: # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. gossipsub-rpc-sent-tracker-cache-size: 1_000_000 # Cache size of the rpc sent tracker queue used for async tracking. - gossipsub-rpc-sent-tracker-queue-cache-size: 1000 + gossipsub-rpc-sent-tracker-queue-cache-size: 100_000 # Number of workers for rpc sent tracker worker pool. gossipsub-rpc-sent-tracker-workers: 5 # Peer scoring is the default value for enabling peer scoring From b8c72cc07ff349c19e6215cc3cf1d06382a84453 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 16:43:50 -0400 Subject: [PATCH 11/29] Update network/p2p/p2pconf/gossipsub.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/p2pconf/gossipsub.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index b76ff9f4c9b..9ca737e7feb 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -20,13 +20,13 @@ type GossipSubConfig struct { // LocalMeshLogInterval is the interval at which the local mesh is logged. LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. - ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + ScoreTracerInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-score-tracer-interval"` // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. - RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` + RPCSentTrackerCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` // RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking. - RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` + RPCSentTrackerQueueCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` // RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool. - RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"` + RpcSentTrackerNumOfWorkers int `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-workers"` // PeerScoring is whether to enable GossipSub peer scoring. PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } From bea0fff1ef901e0fc2d4e7ca5f6a8a515d4a23f7 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 16:46:22 -0400 Subject: [PATCH 12/29] Update gossipsub.go --- network/p2p/p2pconf/gossipsub.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index 4578131a6ab..683dff67fdc 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -28,13 +28,13 @@ type GossipSubConfig struct { // GossipSubTracerConfig is the config for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores. type GossipSubTracerConfig struct { // LocalMeshLogInterval is the interval at which the local mesh is logged. - LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` + LocalMeshLogInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. - ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + ScoreTracerInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-score-tracer-interval"` // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. - RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` + RPCSentTrackerCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` // RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking. - RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` + RPCSentTrackerQueueCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"` // RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool. - RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"` + RpcSentTrackerNumOfWorkers int `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-workers"` } From 4bacc83da04645ac3950596b8c126831e9227124 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 17:36:36 -0400 Subject: [PATCH 13/29] internally construct metrics for the tracker --- .../node_builder/access_node_builder.go | 18 ++++++------ cmd/observer/node_builder/observer_builder.go | 18 ++++++------ follower/follower_builder.go | 18 ++++++------ network/internal/p2pfixtures/fixtures.go | 18 ++++++------ network/p2p/p2pbuilder/libp2pNodeBuilder.go | 19 ++++++------- network/p2p/tracer/gossipSubMeshTracer.go | 28 ++++++++++--------- .../p2p/tracer/gossipSubMeshTracer_test.go | 17 +++++------ 7 files changed, 67 insertions(+), 69 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 5e0d790af8e..b12ae8f08f8 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1192,15 +1192,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: networkMetrics, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 4f3c5ea7738..a869e24332c 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -703,15 +703,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/follower/follower_builder.go b/follower/follower_builder.go index 06626555855..27e69ce039c 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -605,15 +605,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: builder.Logger, - Metrics: builder.Metrics.Network, - IDProvider: builder.IdentityProvider, - LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), - RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(), + NetworkingType: network.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 5a8e60932c8..7b7365d57ab 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -104,15 +104,15 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif require.NoError(t, err) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metrics.NewNoopCollector(), - IDProvider: idProvider, - LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.NewNoopCollector(), + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), + NetworkingType: flownet.PublicNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index 2034b9971f6..9c9fa5f6713 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -26,7 +26,6 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/module/metrics" flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/netconf" "github.com/onflow/flow-go/network/p2p" @@ -496,15 +495,15 @@ func DefaultNodeBuilder( } meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: metricsCfg.Metrics, - IDProvider: idProvider, - LoggerInterval: gossipCfg.LocalMeshLogInterval, - RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), - RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers, + HeroCacheMetricsFactory: metricsCfg.HeroCacheFactory, + NetworkingType: flownet.PrivateNetwork, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index c79e12ab14b..232b0eeadb5 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -13,6 +13,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/p2p" "github.com/onflow/flow-go/network/p2p/tracer/internal" "github.com/onflow/flow-go/utils/logging" @@ -25,9 +27,9 @@ const ( // MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh. MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat" - // defaultLastHighestIHaveRPCSizeResetInterval is the interval that we reset the tracker of max ihave size sent back + // defaultLastHighestIHaveRPCSizeResetInterval is the interval that we reset the tracker of max ihave size sent back // to a default. We use ihave message max size to determine the health of requested iwants from remote peers. However, - // we don't desire an ihave size anomaly to persist forever, hence, we reset it back to a default every minute. + // we don't desire an ihave size anomaly to persist forever, hence, we reset it back to a default every minute. // The choice of the interval to be a minute is in harmony with the GossipSub decay interval. defaultLastHighestIHaveRPCSizeResetInterval = time.Minute ) @@ -56,15 +58,15 @@ type GossipSubMeshTracer struct { var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) type GossipSubMeshTracerConfig struct { - Logger zerolog.Logger - Metrics module.GossipSubLocalMeshMetrics - IDProvider module.IdentityProvider - LoggerInterval time.Duration - RpcSentTrackerCacheCollector module.HeroCacheMetrics - RpcSentTrackerCacheSize uint32 - RpcSentTrackerWorkerQueueCacheCollector module.HeroCacheMetrics - RpcSentTrackerWorkerQueueCacheSize uint32 - RpcSentTrackerNumOfWorkers int + network.NetworkingType + metrics.HeroCacheMetricsFactory + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheSize uint32 + RpcSentTrackerWorkerQueueCacheSize uint32 + RpcSentTrackerNumOfWorkers int } // NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. @@ -77,8 +79,8 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTra rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{ Logger: lg, RPCSentCacheSize: config.RpcSentTrackerCacheSize, - RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, - WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, + RPCSentCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(config.HeroCacheMetricsFactory, config.NetworkingType), + WorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(config.HeroCacheMetricsFactory, config.NetworkingType), WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, NumOfWorkers: config.RpcSentTrackerNumOfWorkers, LastHighestIhavesSentResetInterval: defaultLastHighestIHaveRPCSizeResetInterval, diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 6fb4e96316f..2f0d9581a2b 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -14,7 +14,6 @@ import ( "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p" @@ -66,15 +65,13 @@ func TestGossipSubMeshTracer(t *testing.T) { // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ - Logger: logger, - Metrics: collector, - IDProvider: idProvider, - LoggerInterval: time.Second, - RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RpcSentTrackerWorkerQueueCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - RpcSentTrackerNumOfWorkers: defaultConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, - RpcSentTrackerWorkerQueueCacheCollector: metrics.NewNoopCollector(), + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: time.Second, + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RpcSentTrackerWorkerQueueCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + RpcSentTrackerNumOfWorkers: defaultConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers, } meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( From 87c07e07e6d7f8804faa0a91cf1366107fa41fbc Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 17:50:29 -0400 Subject: [PATCH 14/29] remove AddWorkers --- module/component/component.go | 3 +-- .../component/mock/component_manager_builder.go | 16 ---------------- network/p2p/tracer/internal/rpc_sent_tracker.go | 11 ++++++----- 3 files changed, 7 insertions(+), 23 deletions(-) diff --git a/module/component/component.go b/module/component/component.go index a75534d386a..a887d149270 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -143,8 +143,7 @@ func NoopWorker(ctx irrecoverable.SignalerContext, ready ReadyFunc) { type ComponentManagerBuilder interface { // AddWorker adds a worker routine for the ComponentManager AddWorker(ComponentWorker) ComponentManagerBuilder - // AddWorkers adds n number of worker routines for the ComponentManager. - AddWorkers(int, ComponentWorker) ComponentManagerBuilder + // Build builds and returns a new ComponentManager instance Build() *ComponentManager } diff --git a/module/component/mock/component_manager_builder.go b/module/component/mock/component_manager_builder.go index 570ecda9269..c414ddc6663 100644 --- a/module/component/mock/component_manager_builder.go +++ b/module/component/mock/component_manager_builder.go @@ -28,22 +28,6 @@ func (_m *ComponentManagerBuilder) AddWorker(_a0 component.ComponentWorker) comp return r0 } -// AddWorkers provides a mock function with given fields: _a0, _a1 -func (_m *ComponentManagerBuilder) AddWorkers(_a0 int, _a1 component.ComponentWorker) component.ComponentManagerBuilder { - ret := _m.Called(_a0, _a1) - - var r0 component.ComponentManagerBuilder - if rf, ok := ret.Get(0).(func(int, component.ComponentWorker) component.ComponentManagerBuilder); ok { - r0 = rf(_a0, _a1) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(component.ComponentManagerBuilder) - } - } - - return r0 -} - // Build provides a mock function with given fields: func (_m *ComponentManagerBuilder) Build() *component.ComponentManager { ret := _m.Called() diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 63ae18edb81..0d5fb1a2260 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -76,14 +76,15 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { store, tracker.rpcSent).Build() - tracker.Component = component.NewComponentManagerBuilder(). + builder := component.NewComponentManagerBuilder(). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() tracker.lastHighestIHaveRPCSizeResetLoop(ctx) - }). - AddWorkers(config.NumOfWorkers, tracker.workerPool.WorkerLogic()). - Build() - + }) + for i := 0; i < config.NumOfWorkers; i++ { + builder.AddWorker(tracker.workerPool.WorkerLogic()) + } + tracker.Component = builder.Build() return tracker } From 13382794a6e431fe4f6e8c4121dde4f811e39d72 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 17:56:15 -0400 Subject: [PATCH 15/29] change debug -> info --- network/p2p/tracer/gossipSubMeshTracer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 232b0eeadb5..ac4bb60dbf0 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -102,12 +102,12 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTra }). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { ready() - lg.Debug().Msg("starting rpc sent tracker") + lg.Info().Msg("starting rpc sent tracker") g.rpcSentTracker.Start(ctx) - lg.Debug().Msg("rpc sent tracker started") + lg.Info().Msg("rpc sent tracker started") <-g.rpcSentTracker.Done() - lg.Debug().Msg("rpc sent tracker stopped") + lg.Info().Msg("rpc sent tracker stopped") }). Build() From bec095c25ce8c5715cd4e1d52932daf4bd6dfc27 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 17:57:21 -0400 Subject: [PATCH 16/29] rename trackRPC -> trackableRPC --- network/p2p/tracer/internal/rpc_sent_tracker.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 0d5fb1a2260..98dfdfaee7e 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -18,9 +18,9 @@ import ( p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) -// trackRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed +// trackableRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed // by the *RPCSentTracker. -type trackRPC struct { +type trackableRPC struct { // Nonce prevents deduplication in the hero store Nonce []byte rpc *pubsub.RPC @@ -30,7 +30,7 @@ type trackRPC struct { type RPCSentTracker struct { component.Component cache *rpcSentCache - workerPool *worker.Pool[trackRPC] + workerPool *worker.Pool[trackableRPC] // lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent. lastHighestIHaveRPCSize *atomic.Int64 lastHighestIHaveRPCSizeResetInterval time.Duration @@ -71,7 +71,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { lastHighestIHaveRPCSize: atomic.NewInt64(0), lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } - tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC]( + tracker.workerPool = worker.NewWorkerPoolBuilder[trackableRPC]( config.Logger, store, tracker.rpcSent).Build() @@ -96,12 +96,12 @@ func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) error { if err != nil { return fmt.Errorf("failed to get track rpc work nonce: %w", err) } - t.workerPool.Submit(trackRPC{Nonce: n, rpc: rpc}) + t.workerPool.Submit(trackableRPC{Nonce: n, rpc: rpc}) return nil } // rpcSent tracks control messages sent in *pubsub.RPC. -func (t *RPCSentTracker) rpcSent(work trackRPC) error { +func (t *RPCSentTracker) rpcSent(work trackableRPC) error { switch { case len(work.rpc.GetControl().GetIhave()) > 0: iHave := work.rpc.GetControl().GetIhave() From 119f946a5b8f902eb2e30fd190cff68ba9a94752 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 17:58:24 -0400 Subject: [PATCH 17/29] rename RPCSent -> Track --- network/p2p/tracer/gossipSubMeshTracer.go | 2 +- network/p2p/tracer/internal/rpc_sent_tracker.go | 4 ++-- network/p2p/tracer/internal/rpc_sent_tracker_test.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index ac4bb60dbf0..f0e2d035c59 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -184,7 +184,7 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { // SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. // This function can be updated to track other control messages in the future as required. func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { - err := t.rpcSentTracker.RPCSent(rpc) + err := t.rpcSentTracker.Track(rpc) if err != nil { t.logger.Err(err).Msg("failed to track sent pubsbub rpc") } diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 98dfdfaee7e..e93ec2c53dc 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -88,10 +88,10 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { return tracker } -// RPCSent submits the control message to the worker queue for async tracking. +// Track submits the control message to the worker queue for async tracking. // Args: // - *pubsub.RPC: the rpc sent. -func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) error { +func (t *RPCSentTracker) Track(rpc *pubsub.RPC) error { n, err := nonce() if err != nil { return fmt.Errorf("failed to get track rpc work nonce: %w", err) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 5324be9d459..9445391c615 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -61,7 +61,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { } } rpc := rpcFixture(withIhaves(iHaves)) - require.NoError(t, tracker.RPCSent(rpc)) + require.NoError(t, tracker.Track(rpc)) // eventually we should have tracked numOfMsgIds per single topic require.Eventually(t, func() bool { @@ -106,7 +106,7 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { expectedCacheSize := 0 for _, testCase := range testCases { - require.NoError(t, tracker.RPCSent(testCase.rpcFixture)) + require.NoError(t, tracker.Track(testCase.rpcFixture)) expectedCacheSize += testCase.numOfIhaves } @@ -119,7 +119,7 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { // after setting sending large RPC lastHighestIHaveRPCSize should reset to 0 after lastHighestIHaveRPCSize reset loop tick largeIhave := 50000 - require.NoError(t, tracker.RPCSent(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds))))) + require.NoError(t, tracker.Track(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds))))) require.Eventually(t, func() bool { return tracker.LastHighestIHaveRPCSize() == int64(largeIhave) }, 1*time.Second, 100*time.Millisecond) From 23c6cfe60db822d2945041faf701f1f125b4c511 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 18:05:50 -0400 Subject: [PATCH 18/29] document errors as benign --- network/p2p/tracer/internal/rpc_sent_tracker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index e93ec2c53dc..47ba3d16439 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -91,6 +91,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { // Track submits the control message to the worker queue for async tracking. // Args: // - *pubsub.RPC: the rpc sent. +// All errors returned from this function can be considered benign. func (t *RPCSentTracker) Track(rpc *pubsub.RPC) error { n, err := nonce() if err != nil { From e1c57c6011024d00d7bbf6e678fe3e67ebdfdf00 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 18:08:08 -0400 Subject: [PATCH 19/29] return error if submit returns false --- network/p2p/tracer/internal/rpc_sent_tracker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 47ba3d16439..eb860f569bf 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -97,7 +97,9 @@ func (t *RPCSentTracker) Track(rpc *pubsub.RPC) error { if err != nil { return fmt.Errorf("failed to get track rpc work nonce: %w", err) } - t.workerPool.Submit(trackableRPC{Nonce: n, rpc: rpc}) + if ok := t.workerPool.Submit(trackableRPC{Nonce: n, rpc: rpc}); !ok { + return fmt.Errorf("failed to track RPC could not submit work to worker pool") + } return nil } From 344b30f2fdf69a2f44da8245751dfa2a4c4d0906 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 18:08:53 -0400 Subject: [PATCH 20/29] rename rpcSent -> rpcSentWorkerLogic --- network/p2p/tracer/internal/rpc_sent_tracker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index eb860f569bf..27dae125f4a 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -74,7 +74,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { tracker.workerPool = worker.NewWorkerPoolBuilder[trackableRPC]( config.Logger, store, - tracker.rpcSent).Build() + tracker.rpcSentWorkerLogic).Build() builder := component.NewComponentManagerBuilder(). AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { @@ -103,8 +103,8 @@ func (t *RPCSentTracker) Track(rpc *pubsub.RPC) error { return nil } -// rpcSent tracks control messages sent in *pubsub.RPC. -func (t *RPCSentTracker) rpcSent(work trackableRPC) error { +// rpcSentWorkerLogic tracks control messages sent in *pubsub.RPC. +func (t *RPCSentTracker) rpcSentWorkerLogic(work trackableRPC) error { switch { case len(work.rpc.GetControl().GetIhave()) > 0: iHave := work.rpc.GetControl().GetIhave() From e6a0a33b3651649bc7806ab87f51fe373b06aa5b Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 18:26:06 -0400 Subject: [PATCH 21/29] add lastUpdate to lastHighestIHaveRPCSize tracking --- .../p2p/tracer/internal/rpc_sent_tracker.go | 47 ++++++------------- .../tracer/internal/rpc_sent_tracker_test.go | 5 +- 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 27dae125f4a..c3907208c70 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -13,7 +13,6 @@ import ( "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" - "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -26,13 +25,18 @@ type trackableRPC struct { rpc *pubsub.RPC } -// RPCSentTracker tracks RPC messages that are sent. +// lastHighestIHaveRPCSize tracks the last highest rpc control message size the time stamp it was last updated. +type lastHighestIHaveRPCSize struct { + *atomic.Int64 + lastUpdate time.Time +} + +// RPCSentTracker tracks RPC messages and the size of the last largest iHave rpc control message sent. type RPCSentTracker struct { component.Component - cache *rpcSentCache - workerPool *worker.Pool[trackableRPC] - // lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent. - lastHighestIHaveRPCSize *atomic.Int64 + *lastHighestIHaveRPCSize + cache *rpcSentCache + workerPool *worker.Pool[trackableRPC] lastHighestIHaveRPCSizeResetInterval time.Duration } @@ -67,8 +71,8 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{ + lastHighestIHaveRPCSize: &lastHighestIHaveRPCSize{atomic.NewInt64(0), time.Now()}, cache: newRPCSentCache(cacheConfig), - lastHighestIHaveRPCSize: atomic.NewInt64(0), lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } tracker.workerPool = worker.NewWorkerPoolBuilder[trackableRPC]( @@ -76,11 +80,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { store, tracker.rpcSentWorkerLogic).Build() - builder := component.NewComponentManagerBuilder(). - AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - ready() - tracker.lastHighestIHaveRPCSizeResetLoop(ctx) - }) + builder := component.NewComponentManagerBuilder() for i := 0; i < config.NumOfWorkers; i++ { builder.AddWorker(tracker.workerPool.WorkerLogic()) } @@ -115,7 +115,8 @@ func (t *RPCSentTracker) rpcSentWorkerLogic(work trackableRPC) error { } func (t *RPCSentTracker) updateLastHighestIHaveRPCSize(size int64) { - if t.lastHighestIHaveRPCSize.Load() < size { + if t.lastHighestIHaveRPCSize.Load() < size || time.Since(t.lastHighestIHaveRPCSize.lastUpdate) > t.lastHighestIHaveRPCSizeResetInterval { + // The last highest ihave RPC size is updated if the new size is larger than the current size, or if the time elapsed since the last update surpasses the reset interval. t.lastHighestIHaveRPCSize.Store(size) } } @@ -148,26 +149,6 @@ func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { return t.lastHighestIHaveRPCSize.Load() } -// lastHighestIHaveRPCSizeResetLoop resets the lastHighestIHaveRPCSize to 0 on each interval tick. -func (t *RPCSentTracker) lastHighestIHaveRPCSizeResetLoop(ctx irrecoverable.SignalerContext) { - ticker := time.NewTicker(t.lastHighestIHaveRPCSizeResetInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - default: - } - - select { - case <-ctx.Done(): - return - case <-ticker.C: - t.lastHighestIHaveRPCSize.Store(0) - } - } -} - // nonce returns random string that is used to store unique items in herocache. func nonce() ([]byte, error) { b := make([]byte, 16) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 9445391c615..05fce5f8552 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -124,8 +124,11 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { return tracker.LastHighestIHaveRPCSize() == int64(largeIhave) }, 1*time.Second, 100*time.Millisecond) + // we expect lastHighestIHaveRPCSize to be set to the current rpc size being tracked if it hasn't been updated since the configured lastHighestIHaveRPCSizeResetInterval + expectedEventualLastHighest := 8 require.Eventually(t, func() bool { - return tracker.LastHighestIHaveRPCSize() == 0 + require.NoError(t, tracker.Track(rpcFixture(withIhaves(mockIHaveFixture(expectedEventualLastHighest, numOfMessageIds))))) + return tracker.LastHighestIHaveRPCSize() == int64(expectedEventualLastHighest) }, 4*time.Second, 100*time.Millisecond) } From 640f9bfcd9e8e7653652611fd9cd590a55fc518f Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 18 Jul 2023 18:30:24 -0400 Subject: [PATCH 22/29] add KeyNetworkingSecurity when err returned during tracking --- network/p2p/tracer/gossipSubMeshTracer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index f0e2d035c59..fea310a251d 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -186,7 +186,7 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { err := t.rpcSentTracker.Track(rpc) if err != nil { - t.logger.Err(err).Msg("failed to track sent pubsbub rpc") + t.logger.Err(err).Bool(logging.KeyNetworkingSecurity, true).Msg("failed to track sent pubsbub rpc") } } From 999eb9bdeaf15b420e76b8d76546dfd62f445611 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 20 Jul 2023 18:05:34 -0400 Subject: [PATCH 23/29] add lock for last size and last size update --- .../p2p/tracer/internal/rpc_sent_tracker.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index c3907208c70..09b8cf8e48a 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -3,18 +3,17 @@ package internal import ( "crypto/rand" "fmt" + "sync" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/rs/zerolog" - "go.uber.org/atomic" - "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" + "github.com/rs/zerolog" ) // trackableRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed @@ -27,7 +26,8 @@ type trackableRPC struct { // lastHighestIHaveRPCSize tracks the last highest rpc control message size the time stamp it was last updated. type lastHighestIHaveRPCSize struct { - *atomic.Int64 + sync.RWMutex + lastSize int64 lastUpdate time.Time } @@ -71,7 +71,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{ - lastHighestIHaveRPCSize: &lastHighestIHaveRPCSize{atomic.NewInt64(0), time.Now()}, + lastHighestIHaveRPCSize: &lastHighestIHaveRPCSize{sync.RWMutex{}, 0, time.Now()}, cache: newRPCSentCache(cacheConfig), lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } @@ -115,9 +115,12 @@ func (t *RPCSentTracker) rpcSentWorkerLogic(work trackableRPC) error { } func (t *RPCSentTracker) updateLastHighestIHaveRPCSize(size int64) { - if t.lastHighestIHaveRPCSize.Load() < size || time.Since(t.lastHighestIHaveRPCSize.lastUpdate) > t.lastHighestIHaveRPCSizeResetInterval { + t.Lock() + defer t.Unlock() + if t.lastSize < size || time.Since(t.lastUpdate) > t.lastHighestIHaveRPCSizeResetInterval { // The last highest ihave RPC size is updated if the new size is larger than the current size, or if the time elapsed since the last update surpasses the reset interval. - t.lastHighestIHaveRPCSize.Store(size) + t.lastSize = size + t.lastUpdate = time.Now() } } @@ -146,7 +149,9 @@ func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { // LastHighestIHaveRPCSize returns the last highest size of iHaves sent in an rpc. func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { - return t.lastHighestIHaveRPCSize.Load() + t.RLock() + defer t.RUnlock() + return t.lastSize } // nonce returns random string that is used to store unique items in herocache. From 65bc05bd5f54ec9bcc6a3c404857bc3bd8f1c9c9 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 20 Jul 2023 18:11:36 -0400 Subject: [PATCH 24/29] remove topic ID from entity ID --- network/p2p/tracer/internal/cache.go | 17 ++++++------- network/p2p/tracer/internal/cache_test.go | 24 ++++++++----------- .../p2p/tracer/internal/rpc_sent_tracker.go | 10 ++++---- .../tracer/internal/rpc_sent_tracker_test.go | 15 +++++------- 4 files changed, 27 insertions(+), 39 deletions(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index b916133b270..655ddf2179f 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -47,26 +47,24 @@ func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { // add initializes the record cached for the given messageEntityID if it does not exist. // Returns true if the record is initialized, false otherwise (i.e.: the record already exists). // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). // Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the // subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). -func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType)) +func (r *rpcSentCache) add(messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(messageId, controlMsgType), controlMsgType)) } // has checks if the RPC message has been cached indicating it has been sent. // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - bool: true if the RPC has been cache indicating it was sent from the local node. -func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType)) +func (r *rpcSentCache) has(messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Has(r.rpcSentEntityID(messageId, controlMsgType)) } // size returns the number of records in the cache. @@ -74,13 +72,12 @@ func (r *rpcSentCache) size() uint { return r.c.Size() } -// rpcSentEntityID creates an entity ID from the topic, messageID and control message type. +// rpcSentEntityID creates an entity ID from the messageID and control message type. // Args: -// - topic: the topic ID. // - messageId: the message ID. // - controlMsgType: the rpc control message type. // Returns: // - flow.Identifier: the entity ID. -func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { - return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType))) +func (r *rpcSentCache) rpcSentEntityID(messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s", messageId, controlMsgType))) } diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index c92b42b5e02..10872b7b7ef 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -12,7 +12,6 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/channels" p2pmsg "github.com/onflow/flow-go/network/p2p/message" "github.com/onflow/flow-go/utils/unittest" ) @@ -23,24 +22,23 @@ import ( func TestCache_Add(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageID1 := unittest.IdentifierFixture().String() messageID2 := unittest.IdentifierFixture().String() // test initializing a record for an ID that doesn't exist in the cache - initialized := cache.add(topic, messageID1, controlMsgType) + initialized := cache.add(messageID1, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID1, controlMsgType), "expected record to exist") // test initializing a record for an ID that already exists in the cache - initialized = cache.add(topic, messageID1, controlMsgType) + initialized = cache.add(messageID1, controlMsgType) require.False(t, initialized, "expected record not to be initialized") - require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID1, controlMsgType), "expected record to exist") // test initializing a record for another ID - initialized = cache.add(topic, messageID2, controlMsgType) + initialized = cache.add(messageID2, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(topic, messageID2, controlMsgType), "expected record to exist") + require.True(t, cache.has(messageID2, controlMsgType), "expected record to exist") } // TestCache_ConcurrentInit tests the concurrent initialization of records. @@ -50,7 +48,6 @@ func TestCache_Add(t *testing.T) { func TestCache_ConcurrentAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageIds := unittest.IdentifierListFixture(10) var wg sync.WaitGroup @@ -59,7 +56,7 @@ func TestCache_ConcurrentAdd(t *testing.T) { for _, id := range messageIds { go func(id flow.Identifier) { defer wg.Done() - cache.add(topic, id.String(), controlMsgType) + cache.add(id.String(), controlMsgType) }(id) } @@ -67,7 +64,7 @@ func TestCache_ConcurrentAdd(t *testing.T) { // ensure that all records are correctly initialized for _, id := range messageIds { - require.True(t, cache.has(topic, id.String(), controlMsgType)) + require.True(t, cache.has(id.String(), controlMsgType)) } } @@ -79,7 +76,6 @@ func TestCache_ConcurrentAdd(t *testing.T) { func TestCache_ConcurrentSameRecordAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - topic := channels.PushBlocks.String() messageID := unittest.IdentifierFixture().String() const concurrentAttempts = 10 @@ -91,7 +87,7 @@ func TestCache_ConcurrentSameRecordAdd(t *testing.T) { for i := 0; i < concurrentAttempts; i++ { go func() { defer wg.Done() - initSuccess := cache.add(topic, messageID, controlMsgType) + initSuccess := cache.add(messageID, controlMsgType) if initSuccess { successGauge.Inc() } @@ -104,7 +100,7 @@ func TestCache_ConcurrentSameRecordAdd(t *testing.T) { require.Equal(t, int32(1), successGauge.Load()) // ensure that the record is correctly initialized in the cache - require.True(t, cache.has(topic, messageID, controlMsgType)) + require.True(t, cache.has(messageID, controlMsgType)) } // cacheFixture returns a new *RecordCache. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 09b8cf8e48a..85346125c15 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -130,21 +130,19 @@ func (t *RPCSentTracker) updateLastHighestIHaveRPCSize(size int64) { func (t *RPCSentTracker) iHaveRPCSent(iHaves []*pb.ControlIHave) { controlMsgType := p2pmsg.CtrlMsgIHave for _, iHave := range iHaves { - topicID := iHave.GetTopicID() for _, messageID := range iHave.GetMessageIDs() { - t.cache.add(topicID, messageID, controlMsgType) + t.cache.add(messageID, controlMsgType) } } } // WasIHaveRPCSent checks if an iHave control message with the provided message ID was sent. // Args: -// - string: the topic ID of the iHave RPC. -// - string: the message ID of the iHave RPC. +// - messageID: the message ID of the iHave RPC. // Returns: // - bool: true if the iHave rpc with the provided message ID was sent. -func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { - return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) +func (t *RPCSentTracker) WasIHaveRPCSent(messageID string) bool { + return t.cache.has(messageID, p2pmsg.CtrlMsgIHave) } // LastHighestIHaveRPCSize returns the last highest size of iHaves sent in an rpc. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 05fce5f8552..5ee97e9a80e 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -13,7 +13,6 @@ import ( "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/utils/unittest" ) @@ -38,25 +37,23 @@ func TestRPCSentTracker_IHave(t *testing.T) { }() t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { - require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) + require.False(t, tracker.WasIHaveRPCSent("message_id")) }) t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with iHaveRPCSent", func(t *testing.T) { numOfMsgIds := 100 testCases := []struct { - topic string messageIDS []string }{ - {channels.PushBlocks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.ReceiveApprovals.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.SyncCommittee.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, - {channels.RequestChunks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {unittest.IdentifierListFixture(numOfMsgIds).Strings()}, } iHaves := make([]*pb.ControlIHave, len(testCases)) for i, testCase := range testCases { testCase := testCase iHaves[i] = &pb.ControlIHave{ - TopicID: &testCase.topic, MessageIDs: testCase.messageIDS, } } @@ -70,7 +67,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { for _, testCase := range testCases { for _, messageID := range testCase.messageIDS { - require.True(t, tracker.WasIHaveRPCSent(testCase.topic, messageID)) + require.True(t, tracker.WasIHaveRPCSent(messageID)) } } }) From 7c5daef4ab8ea91ab7cb03e51a1820198a9583c5 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 20 Jul 2023 19:17:16 -0400 Subject: [PATCH 25/29] add test cases for concurrent tracking and duplicate RPC tracking --- .../p2p/tracer/internal/rpc_sent_tracker.go | 7 ++ .../tracer/internal/rpc_sent_tracker_test.go | 80 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 85346125c15..12ba32c4915 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -16,6 +16,10 @@ import ( "github.com/rs/zerolog" ) +const ( + iHaveRPCTrackedLog = "ihave rpc tracked successfully" +) + // trackableRPC is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed // by the *RPCSentTracker. type trackableRPC struct { @@ -35,6 +39,7 @@ type lastHighestIHaveRPCSize struct { type RPCSentTracker struct { component.Component *lastHighestIHaveRPCSize + logger zerolog.Logger cache *rpcSentCache workerPool *worker.Pool[trackableRPC] lastHighestIHaveRPCSizeResetInterval time.Duration @@ -71,6 +76,7 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{ + logger: config.Logger.With().Str("component", "rpc_sent_tracker").Logger(), lastHighestIHaveRPCSize: &lastHighestIHaveRPCSize{sync.RWMutex{}, 0, time.Now()}, cache: newRPCSentCache(cacheConfig), lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, @@ -110,6 +116,7 @@ func (t *RPCSentTracker) rpcSentWorkerLogic(work trackableRPC) error { iHave := work.rpc.GetControl().GetIhave() t.iHaveRPCSent(iHave) t.updateLastHighestIHaveRPCSize(int64(len(iHave))) + t.logger.Info().Int("size", len(iHave)).Msg(iHaveRPCTrackedLog) } return nil } diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 5ee97e9a80e..9b571e3d5a6 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "os" "testing" "time" @@ -9,6 +10,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/module/irrecoverable" @@ -71,6 +73,84 @@ func TestRPCSentTracker_IHave(t *testing.T) { } } }) + +} + +// TestRPCSentTracker_DuplicateMessageID ensures the worker pool of the RPC tracker processes req with the same message ID but different nonce. +func TestRPCSentTracker_DuplicateMessageID(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + processedWorkLogs := atomic.NewInt64(0) + hook := zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { + if level == zerolog.InfoLevel { + if message == iHaveRPCTrackedLog { + processedWorkLogs.Inc() + } + } + }) + logger := zerolog.New(os.Stdout).Level(zerolog.InfoLevel).Hook(hook) + + tracker := mockTracker(t, time.Minute) + require.NotNil(t, tracker) + tracker.logger = logger + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + messageID := unittest.IdentifierFixture().String() + rpc := rpcFixture(withIhaves([]*pb.ControlIHave{{ + MessageIDs: []string{messageID}, + }})) + // track duplicate RPC's each will be processed by a worker + require.NoError(t, tracker.Track(rpc)) + require.NoError(t, tracker.Track(rpc)) + + // eventually we should have processed both RPCs + require.Eventually(t, func() bool { + return processedWorkLogs.Load() == 2 + }, time.Second, 100*time.Millisecond) +} + +// TestRPCSentTracker_ConcurrentTracking ensures that all message IDs in RPC's are tracked as expected when tracked concurrently. +func TestRPCSentTracker_ConcurrentTracking(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) + + tracker := mockTracker(t, time.Minute) + require.NotNil(t, tracker) + + tracker.Start(signalerCtx) + defer func() { + cancel() + unittest.RequireComponentsDoneBefore(t, time.Second, tracker) + }() + + numOfMsgIds := 100 + numOfRPCs := 100 + rpcs := make([]*pubsub.RPC, numOfRPCs) + for i := 0; i < numOfRPCs; i++ { + i := i + go func() { + rpc := rpcFixture(withIhaves([]*pb.ControlIHave{{MessageIDs: unittest.IdentifierListFixture(numOfMsgIds).Strings()}})) + require.NoError(t, tracker.Track(rpc)) + rpcs[i] = rpc + }() + } + + // eventually we should have tracked numOfMsgIds per single topic + require.Eventually(t, func() bool { + return tracker.cache.size() == uint(numOfRPCs*numOfMsgIds) + }, time.Second, 100*time.Millisecond) + + for _, rpc := range rpcs { + ihaves := rpc.GetControl().GetIhave() + for _, messageID := range ihaves[0].GetMessageIDs() { + require.True(t, tracker.WasIHaveRPCSent(messageID)) + } + } } // TestRPCSentTracker_IHave ensures *RPCSentTracker tracks the last largest iHave size as expected. From c1d3b2e0922e62309f67db7615671e24155de1ed Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 20 Jul 2023 19:17:57 -0400 Subject: [PATCH 26/29] remove add workers func --- module/component/component.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/module/component/component.go b/module/component/component.go index a887d149270..07fc387e077 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -166,14 +166,6 @@ func (c *componentManagerBuilderImpl) AddWorker(worker ComponentWorker) Componen return c } -// AddWorkers adds n number of workers for the ComponentManager. -func (c *componentManagerBuilderImpl) AddWorkers(n int, worker ComponentWorker) ComponentManagerBuilder { - for i := 0; i < n; i++ { - c.workers = append(c.workers, worker) - } - return c -} - // Build returns a new ComponentManager instance with the configured workers // Build may be called multiple times to create multiple individual ComponentManagers. This will // result in the worker routines being called multiple times. If this is unsafe, do not call it From ab2ee74147e47cd50a61e368249355d742f31997 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 25 Jul 2023 14:22:45 -0400 Subject: [PATCH 27/29] Update rpc_sent_tracker.go --- network/p2p/tracer/internal/rpc_sent_tracker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 12ba32c4915..9551693af13 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -8,12 +8,13 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" - "github.com/rs/zerolog" ) const ( From 41f7d5c348caf4a31a7c6ff14c323ecc3ba36fa4 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 25 Jul 2023 14:30:17 -0400 Subject: [PATCH 28/29] Update rpc_sent_tracker.go --- network/p2p/tracer/internal/rpc_sent_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 9551693af13..5c2c842a48c 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -9,7 +9,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" - + "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" From f1b7e5cc746047763e50d0e750cb483466dfa533 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 25 Jul 2023 14:54:34 -0400 Subject: [PATCH 29/29] Update gossipSubMeshTracer_test.go --- network/p2p/tracer/gossipSubMeshTracer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 2f0d9581a2b..c8a680df53e 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p" @@ -69,6 +70,7 @@ func TestGossipSubMeshTracer(t *testing.T) { Metrics: collector, IDProvider: idProvider, LoggerInterval: time.Second, + HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(), RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, RpcSentTrackerWorkerQueueCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, RpcSentTrackerNumOfWorkers: defaultConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,