Skip to content

Commit

Permalink
Merge #4553
Browse files Browse the repository at this point in the history
4553: Khalil/1899 async rpc sent tracker r=kc1116 a=kc1116

This PR adds a worker pool to the gossipsub mesh tracer rpc sent tracker worker pool. RPC control messages are processed synchronously, so it's imperative that any sub-process involved in the processing of RPC's must be non-blocking. This PR also updates the tracker to keep track of the `last highest iHaves size`, this will be used during iWant flooding detection to dynamically update the sample size of iWants expected based on the most recent largest iHaves sent.

Further reading: https://github.com/dapperlabs/flow-go/issues/6472

Co-authored-by: Khalil Claybon <[email protected]>
  • Loading branch information
bors[bot] and kc1116 authored Jul 25, 2023
2 parents b53dd43 + 2f95232 commit 6bb73d8
Show file tree
Hide file tree
Showing 17 changed files with 480 additions and 118 deletions.
15 changes: 9 additions & 6 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,12 +1291,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: networkMetrics,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: networkMetrics,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(),
NetworkingType: network.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
15 changes: 9 additions & 6 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(),
NetworkingType: network.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
4 changes: 4 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ network-config:
# The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node.
# Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history.
gossipsub-rpc-sent-tracker-cache-size: 1_000_000
# Cache size of the rpc sent tracker queue used for async tracking.
gossipsub-rpc-sent-tracker-queue-cache-size: 100_000
# Number of workers for rpc sent tracker worker pool.
gossipsub-rpc-sent-tracker-workers: 5
# Peer scoring is the default value for enabling peer scoring
gossipsub-peer-scoring-enabled: true
# Gossipsub rpc inspectors configs
Expand Down
15 changes: 9 additions & 6 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: builder.HeroCacheMetricsFactory(),
NetworkingType: network.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
4 changes: 2 additions & 2 deletions module/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ func NewComponentManagerBuilder() ComponentManagerBuilder {
return &componentManagerBuilderImpl{}
}

// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder
// All worker functions will be run in parallel when the ComponentManager is started.
// Note: AddWorker is not concurrency-safe, and should only be called on an individual builder
// within a single goroutine.
// within a single goroutine.// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder

func (c *componentManagerBuilderImpl) AddWorker(worker ComponentWorker) ComponentManagerBuilder {
c.workers = append(c.workers, worker)
return c
Expand Down
9 changes: 9 additions & 0 deletions module/metrics/herocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType
return f(namespaceNetwork, r)
}

func GossipSubRPCSentTrackerQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
// we don't use the public prefix for the metrics here for sake of backward compatibility of metric name.
r := ResourceNetworkingRPCSentTrackerQueue
if networkType == network.PublicNetwork {
r = PrependPublicPrefix(r)
}
return f(namespaceNetwork, r)
}

func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
r := ResourceNetworkingRpcInspectorNotificationQueue
if networkType == network.PublicNetwork {
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache"
ResourceNetworkingDisallowListCache = "disallow_list_cache"
ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache"
ResourceNetworkingRPCSentTrackerQueue = "gossipsub_rpc_sent_tracker_queue"

ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
Expand Down
15 changes: 9 additions & 6 deletions network/internal/p2pfixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,15 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif
require.NoError(t, err)

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.NewNoopCollector(),
RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: logger,
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: metrics.NewNoopHeroCacheMetricsFactory(),
NetworkingType: flownet.PublicNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
16 changes: 10 additions & 6 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ const (
gracePeriod = "libp2p-grace-period"
silencePeriod = "libp2p-silence-period"
// gossipsub
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
scoreTracerInterval = "gossipsub-score-tracer-interval"
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
rpcSentTrackerQueueCacheSize = "gossipsub-rpc-sent-tracker-queue-cache-size"
rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers"
scoreTracerInterval = "gossipsub-score-tracer-interval"
// gossipsub validation inspector
gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size"
validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers"
Expand All @@ -67,8 +69,8 @@ func AllFlagNames() []string {
return []string{
networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay,
dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval,
gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, rpcSentTrackerQueueCacheSize, rpcSentTrackerNumOfWorkers,
scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage,
ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval,
}
Expand Down Expand Up @@ -109,6 +111,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub")
flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.")
flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.")
// gossipsub RPC control message validation limits used for validation configuration and rate limiting
flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers")
flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.")
Expand Down
16 changes: 9 additions & 7 deletions network/p2p/p2pbuilder/libp2pNodeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
flownet "github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/netconf"
"github.com/onflow/flow-go/network/p2p"
Expand Down Expand Up @@ -491,12 +490,15 @@ func DefaultNodeBuilder(
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metricsCfg.Metrics,
IDProvider: idProvider,
LoggerInterval: gossipCfg.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork),
RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize,
Logger: logger,
Metrics: metricsCfg.Metrics,
IDProvider: idProvider,
LoggerInterval: gossipCfg.LocalMeshLogInterval,
RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers,
HeroCacheMetricsFactory: metricsCfg.HeroCacheFactory,
NetworkingType: flownet.PrivateNetwork,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
22 changes: 17 additions & 5 deletions network/p2p/p2pconf/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@ type ResourceManagerConfig struct {
type GossipSubConfig struct {
// GossipSubRPCInspectorsConfig configuration for all gossipsub RPC control message inspectors.
GossipSubRPCInspectorsConfig `mapstructure:",squash"`

// GossipSubTracerConfig is the configuration for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores.
GossipSubTracerConfig `mapstructure:",squash"`

// PeerScoring is whether to enable GossipSub peer scoring.
PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"`
}

// GossipSubTracerConfig is the config for the gossipsub tracer. GossipSub tracer is used to trace the local mesh events and peer scores.
type GossipSubTracerConfig struct {
// LocalMeshLogInterval is the interval at which the local mesh is logged.
LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"`
LocalMeshLogInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-local-mesh-logging-interval"`
// ScoreTracerInterval is the interval at which the score tracer logs the peer scores.
ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"`
ScoreTracerInterval time.Duration `validate:"gt=0s" mapstructure:"gossipsub-score-tracer-interval"`
// RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer.
RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"`
// PeerScoring is whether to enable GossipSub peer scoring.
PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"`
RPCSentTrackerCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-cache-size"`
// RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking.
RPCSentTrackerQueueCacheSize uint32 `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"`
// RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool.
RpcSentTrackerNumOfWorkers int `validate:"gt=0" mapstructure:"gossipsub-rpc-sent-tracker-workers"`
}
Loading

0 comments on commit 6bb73d8

Please sign in to comment.