Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Khalil/1899 async rpc sent tracker #4553

Merged
merged 36 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
398b49c
add component and worker pool to rpc sent tracker
kc1116 Jul 12, 2023
e373b01
update all usages of NewGossipSubMeshTracer
kc1116 Jul 12, 2023
2a83452
update rpc sent tracker test
kc1116 Jul 12, 2023
14ef77d
Update rpc_sent_tracker.go
kc1116 Jul 12, 2023
e38c31e
track last highest iHaves size
kc1116 Jul 12, 2023
20b917e
reset LastHighestIhavesSent every 1 minute
kc1116 Jul 12, 2023
c1e13c9
Merge branch 'master' into khalil/1899-async-rpc-sent-tracker
kc1116 Jul 12, 2023
f19483f
Update rpc_sent_tracker_test.go
kc1116 Jul 12, 2023
672df92
Merge branch 'khalil/1899-async-rpc-sent-tracker' of github.com:onflo…
kc1116 Jul 12, 2023
bc369b2
Update network/p2p/tracer/gossipSubMeshTracer.go
kc1116 Jul 18, 2023
7b52ab1
encapsulate tracer-related config parameters
kc1116 Jul 18, 2023
b82247d
Update config/default-config.yml
kc1116 Jul 18, 2023
b8c72cc
Update network/p2p/p2pconf/gossipsub.go
kc1116 Jul 18, 2023
4e31202
Merge branch 'khalil/1899-async-rpc-sent-tracker' of github.com:onflo…
kc1116 Jul 18, 2023
bea0fff
Update gossipsub.go
kc1116 Jul 18, 2023
4bacc83
internally construct metrics for the tracker
kc1116 Jul 18, 2023
87c07e0
remove AddWorkers
kc1116 Jul 18, 2023
1338279
change debug -> info
kc1116 Jul 18, 2023
bec095c
rename trackRPC -> trackableRPC
kc1116 Jul 18, 2023
119f946
rename RPCSent -> Track
kc1116 Jul 18, 2023
23c6cfe
document errors as benign
kc1116 Jul 18, 2023
e1c57c6
return error if submit returns false
kc1116 Jul 18, 2023
344b30f
rename rpcSent -> rpcSentWorkerLogic
kc1116 Jul 18, 2023
e6a0a33
add lastUpdate to lastHighestIHaveRPCSize tracking
kc1116 Jul 18, 2023
06575d4
Merge branch 'master' into khalil/1899-async-rpc-sent-tracker
kc1116 Jul 18, 2023
640f9bf
add KeyNetworkingSecurity when err returned during tracking
kc1116 Jul 18, 2023
4e4ca6c
Merge branch 'master' into khalil/1899-async-rpc-sent-tracker
kc1116 Jul 19, 2023
999eb9b
add lock for last size and last size update
kc1116 Jul 20, 2023
65bc05b
remove topic ID from entity ID
kc1116 Jul 20, 2023
7c5daef
add test cases for concurrent tracking and duplicate RPC tracking
kc1116 Jul 20, 2023
c1d3b2e
remove add workers func
kc1116 Jul 20, 2023
d5e7b22
Merge branch 'master' into khalil/1899-async-rpc-sent-tracker
kc1116 Jul 20, 2023
ab2ee74
Update rpc_sent_tracker.go
kc1116 Jul 25, 2023
41f7d5c
Update rpc_sent_tracker.go
kc1116 Jul 25, 2023
f1b7e5c
Update gossipSubMeshTracer_test.go
kc1116 Jul 25, 2023
2f95232
Merge branch 'master' into khalil/1899-async-rpc-sent-tracker
kc1116 Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,12 +1192,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: networkMetrics,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: networkMetrics,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
15 changes: 9 additions & 6 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
4 changes: 4 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ network-config:
# The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node.
# Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history.
gossipsub-rpc-sent-tracker-cache-size: 1_000_000
# Cache size of the rpc sent tracker queue used for async tracking.
gossipsub-rpc-sent-tracker-queue-cache-size: 1000
# Number of workers for rpc sent tracker worker pool.
gossipsub-rpc-sent-tracker-workers: 5
# Peer scoring is the default value for enabling peer scoring
gossipsub-peer-scoring-enabled: true
# Gossipsub rpc inspectors configs
Expand Down
15 changes: 9 additions & 6 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: builder.FlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
15 changes: 12 additions & 3 deletions module/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func NoopWorker(ctx irrecoverable.SignalerContext, ready ReadyFunc) {
type ComponentManagerBuilder interface {
// AddWorker adds a worker routine for the ComponentManager
AddWorker(ComponentWorker) ComponentManagerBuilder

// AddWorkers adds n number of worker routines for the ComponentManager.
AddWorkers(int, ComponentWorker) ComponentManagerBuilder
// Build builds and returns a new ComponentManager instance
Build() *ComponentManager
}
Expand All @@ -157,15 +158,23 @@ func NewComponentManagerBuilder() ComponentManagerBuilder {
return &componentManagerBuilderImpl{}
}

// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder
// All worker functions will be run in parallel when the ComponentManager is started.
// Note: AddWorker is not concurrency-safe, and should only be called on an individual builder
// within a single goroutine.
// within a single goroutine.// AddWorker adds a ComponentWorker closure to the ComponentManagerBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the formatting here is messed up. Can you revert the changes to this file?


func (c *componentManagerBuilderImpl) AddWorker(worker ComponentWorker) ComponentManagerBuilder {
c.workers = append(c.workers, worker)
return c
}

// AddWorkers adds n number of workers for the ComponentManager.
func (c *componentManagerBuilderImpl) AddWorkers(n int, worker ComponentWorker) ComponentManagerBuilder {
for i := 0; i < n; i++ {
c.workers = append(c.workers, worker)
}
return c
}

// Build returns a new ComponentManager instance with the configured workers
// Build may be called multiple times to create multiple individual ComponentManagers. This will
// result in the worker routines being called multiple times. If this is unsafe, do not call it
Expand Down
16 changes: 16 additions & 0 deletions module/component/mock/component_manager_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions module/metrics/herocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType
return f(namespaceNetwork, r)
}

func GossipSubRPCSentTrackerQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
// we don't use the public prefix for the metrics here for sake of backward compatibility of metric name.
r := ResourceNetworkingRPCSentTrackerQueue
if networkType == network.PublicNetwork {
r = PrependPublicPrefix(r)
}
return f(namespaceNetwork, r)
}

func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
r := ResourceNetworkingRpcInspectorNotificationQueue
if networkType == network.PublicNetwork {
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const (
ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache"
ResourceNetworkingDisallowListCache = "disallow_list_cache"
ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache"
ResourceNetworkingRPCSentTrackerQueue = "gossipsub_rpc_sent_tracker_queue"

ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
Expand Down
15 changes: 9 additions & 6 deletions network/internal/p2pfixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,15 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif
require.NoError(t, err)

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.NewNoopCollector(),
RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
Logger: logger,
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.NewNoopCollector(),
RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: defaultFlowConfig.NetworkConfig.GossipSubConfig.RpcSentTrackerNumOfWorkers,
RpcSentTrackerWorkerQueueCacheCollector: metrics.NewNoopCollector(),
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
16 changes: 10 additions & 6 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ const (
gracePeriod = "libp2p-grace-period"
silencePeriod = "libp2p-silence-period"
// gossipsub
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
scoreTracerInterval = "gossipsub-score-tracer-interval"
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
rpcSentTrackerQueueCacheSize = "gossipsub-rpc-sent-tracker-queue-cache-size"
rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers"
scoreTracerInterval = "gossipsub-score-tracer-interval"
// gossipsub validation inspector
gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size"
validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers"
Expand All @@ -67,8 +69,8 @@ func AllFlagNames() []string {
return []string{
networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay,
dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval,
gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, rpcSentTrackerQueueCacheSize, rpcSentTrackerNumOfWorkers,
scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage,
ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval,
}
Expand Down Expand Up @@ -109,6 +111,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub")
flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.")
flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.")
// gossipsub RPC control message validation limits used for validation configuration and rate limiting
flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers")
flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.")
Expand Down
15 changes: 9 additions & 6 deletions network/p2p/p2pbuilder/libp2pNodeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,15 @@ func DefaultNodeBuilder(
}

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metricsCfg.Metrics,
IDProvider: idProvider,
LoggerInterval: gossipCfg.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork),
RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize,
Logger: logger,
Metrics: metricsCfg.Metrics,
IDProvider: idProvider,
LoggerInterval: gossipCfg.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork),
RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize,
RpcSentTrackerWorkerQueueCacheSize: gossipCfg.RPCSentTrackerQueueCacheSize,
RpcSentTrackerNumOfWorkers: gossipCfg.RpcSentTrackerNumOfWorkers,
RpcSentTrackerWorkerQueueCacheCollector: metrics.GossipSubRPCSentTrackerQueueMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork),
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

Expand Down
4 changes: 4 additions & 0 deletions network/p2p/p2pconf/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type GossipSubConfig struct {
ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"`
// RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer.
RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"`
// RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking.
RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"`
// RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool.
RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"`
// PeerScoring is whether to enable GossipSub peer scoring.
PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"`
}
Loading