Skip to content

Commit

Permalink
add component and worker pool to rpc sent tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
kc1116 committed Jul 12, 2023
1 parent 36c7855 commit 398b49c
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 27 deletions.
4 changes: 4 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ network-config:
# The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node.
# Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history.
gossipsub-rpc-sent-tracker-cache-size: 1_000_000
# Cache size of the rpc sent tracker queue used for async tracking.
gossipsub-rpc-sent-tracker-queue-cache-size: 1000
# Number of workers for rpc sent tracker worker pool.
gossipsub-rpc-sent-tracker-workers: 5
# Peer scoring is the default value for enabling peer scoring
gossipsub-peer-scoring-enabled: true
# Gossipsub rpc inspectors configs
Expand Down
9 changes: 9 additions & 0 deletions module/metrics/herocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType
return f(namespaceNetwork, r)
}

func GossipSubRPCSentTrackerQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
// we don't use the public prefix for the metrics here for sake of backward compatibility of metric name.
r := ResourceNetworkingRPCSentTrackerQueue
if networkType == network.PublicNetwork {
r = PrependPublicPrefix(r)
}
return f(namespaceNetwork, r)
}

func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
r := ResourceNetworkingRpcInspectorNotificationQueue
if networkType == network.PublicNetwork {
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ const (
ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache"
ResourceNetworkingDisallowListCache = "disallow_list_cache"
ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache"
ResourceNetworkingRPCSentTrackerQueue = "gossipsub_rpc_sent_tracker_queue"

ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
Expand Down
16 changes: 10 additions & 6 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ const (
gracePeriod = "libp2p-grace-period"
silencePeriod = "libp2p-silence-period"
// gossipsub
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
scoreTracerInterval = "gossipsub-score-tracer-interval"
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
rpcSentTrackerQueueCacheSize = "gossipsub-rpc-sent-tracker-queue-cache-size"
rpcSentTrackerNumOfWorkers = "gossipsub-rpc-sent-tracker-workers"
scoreTracerInterval = "gossipsub-score-tracer-interval"
// gossipsub validation inspector
gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size"
validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers"
Expand All @@ -67,8 +69,8 @@ func AllFlagNames() []string {
return []string{
networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay,
dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval,
gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, rpcSentTrackerQueueCacheSize, rpcSentTrackerNumOfWorkers,
scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage,
ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval,
}
Expand Down Expand Up @@ -109,6 +111,8 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub")
flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
flags.Uint32(rpcSentTrackerQueueCacheSize, config.GossipSubConfig.RPCSentTrackerQueueCacheSize, "cache size of the rpc sent tracker worker queue.")
flags.Int(rpcSentTrackerNumOfWorkers, config.GossipSubConfig.RpcSentTrackerNumOfWorkers, "number of workers for the rpc sent tracker worker pool.")
// gossipsub RPC control message validation limits used for validation configuration and rate limiting
flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers")
flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.")
Expand Down
4 changes: 4 additions & 0 deletions network/p2p/p2pconf/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type GossipSubConfig struct {
ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"`
// RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer.
RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"`
// RPCSentTrackerQueueCacheSize cache size of the rpc sent tracker queue used for async tracking.
RPCSentTrackerQueueCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-queue-cache-size"`
// RpcSentTrackerNumOfWorkers number of workers for rpc sent tracker worker pool.
RpcSentTrackerNumOfWorkers int `mapstructure:"gossipsub-rpc-sent-tracker-workers"`
// PeerScoring is whether to enable GossipSub peer scoring.
PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"`
}
41 changes: 29 additions & 12 deletions network/p2p/tracer/gossipSubMeshTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ type GossipSubMeshTracer struct {
var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil)

type GossipSubMeshTracerConfig struct {
Logger zerolog.Logger
Metrics module.GossipSubLocalMeshMetrics
IDProvider module.IdentityProvider
LoggerInterval time.Duration
RpcSentTrackerCacheCollector module.HeroCacheMetrics
RpcSentTrackerCacheSize uint32
Logger zerolog.Logger
Metrics module.GossipSubLocalMeshMetrics
IDProvider module.IdentityProvider
LoggerInterval time.Duration
RpcSentTrackerCacheCollector module.HeroCacheMetrics
RpcSentTrackerCacheSize uint32
RpcSentTrackerWorkerQueueCacheCollector module.HeroCacheMetrics
RpcSentTrackerWorkerQueueCacheSize uint32
RpcSentTrackerNumOfWorkers int
}

// NewGossipSubMeshTracer creates a new *GossipSubMeshTracer.
Expand All @@ -64,13 +67,21 @@ type GossipSubMeshTracerConfig struct {
// Returns:
// - *GossipSubMeshTracer: new mesh tracer.
func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer {
rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector)
lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger()
rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{
Logger: lg,
RPCSentCacheSize: config.RpcSentTrackerCacheSize,
RPCSentCacheCollector: config.RpcSentTrackerCacheCollector,
WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector,
WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize,
NumOfWorkers: config.RpcSentTrackerNumOfWorkers,
})
g := &GossipSubMeshTracer{
RawTracer: NewGossipSubNoopTracer(),
topicMeshMap: make(map[string]map[peer.ID]struct{}),
idProvider: config.IDProvider,
metrics: config.Metrics,
logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(),
logger: lg,
loggerInterval: config.LoggerInterval,
rpcSentTracker: rpcSentTracker,
}
Expand All @@ -80,6 +91,15 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTra
ready()
g.logLoop(ctx)
}).
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
lg.Debug().Msg("starting rpc sent tracker")
g.rpcSentTracker.Start(ctx)
lg.Debug().Msg("rpc sent tracker started")

<-g.rpcSentTracker.Done()
lg.Debug().Msg("rpc sent tracker stopped")
}).
Build()

return g
Expand Down Expand Up @@ -155,10 +175,7 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) {
// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent.
// This function can be updated to track other control messages in the future as required.
func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) {
switch {
case len(rpc.GetControl().GetIhave()) > 0:
t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave())
}
t.rpcSentTracker.RPCSent(rpc)
}

// logLoop logs the mesh peers of the local node for each topic at a regular interval.
Expand Down
79 changes: 70 additions & 9 deletions network/p2p/tracer/internal/rpc_sent_tracker.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,93 @@
package internal

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine/common/worker"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/mempool/queue"
p2pmsg "github.com/onflow/flow-go/network/p2p/message"
)

// trackRpcSentWork is an internal data structure for "temporarily" storing *pubsub.RPC sent in the queue before they are processed
// by the *RPCSentTracker.
type trackRpcSentWork struct {
rpc *pubsub.RPC
}

// RPCSentTracker tracks RPC messages that are sent.
type RPCSentTracker struct {
cache *rpcSentCache
component.Component
cache *rpcSentCache
workerPool *worker.Pool[trackRpcSentWork]
}

// RPCSentTrackerConfig configuration for the RPCSentTracker.
type RPCSentTrackerConfig struct {
Logger zerolog.Logger
//RPCSentCacheSize size of the *rpcSentCache cache.
RPCSentCacheSize uint32
// RPCSentCacheCollector metrics collector for the *rpcSentCache cache.
RPCSentCacheCollector module.HeroCacheMetrics
// WorkerQueueCacheCollector metrics factory for the worker pool.
WorkerQueueCacheCollector module.HeroCacheMetrics
// WorkerQueueCacheSize the worker pool herostore cache size.
WorkerQueueCacheSize uint32
// NumOfWorkers number of workers in the worker pool.
NumOfWorkers int
}

// NewRPCSentTracker returns a new *NewRPCSentTracker.
func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker {
config := &rpcCtrlMsgSentCacheConfig{
sizeLimit: sizeLimit,
logger: logger,
collector: collector,
func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker {
cacheConfig := &rpcCtrlMsgSentCacheConfig{
sizeLimit: config.RPCSentCacheSize,
logger: config.Logger,
collector: config.RPCSentCacheCollector,
}

store := queue.NewHeroStore(
config.WorkerQueueCacheSize,
config.Logger,
config.WorkerQueueCacheCollector)

tracker := &RPCSentTracker{cache: newRPCSentCache(cacheConfig)}
tracker.workerPool = worker.NewWorkerPoolBuilder[trackRpcSentWork](
config.Logger,
store,
tracker.rpcSent).Build()

builder := component.NewComponentManagerBuilder()
for i := 0; i < config.NumOfWorkers; i++ {
builder.AddWorker(tracker.workerPool.WorkerLogic())
}
return &RPCSentTracker{cache: newRPCSentCache(config)}
tracker.Component = builder.Build()

return tracker
}

// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message.
// RPCSent submits the control message to the worker queue for async tracking.
// Args:
// - *pubsub.RPC: the rpc sent.
func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) {
func (t *RPCSentTracker) RPCSent(rpc *pubsub.RPC) {
t.workerPool.Submit(trackRpcSentWork{rpc})
}

// rpcSent tracks control messages sent in *pubsub.RPC.
func (t *RPCSentTracker) rpcSent(work trackRpcSentWork) error {
switch {
case len(work.rpc.GetControl().GetIhave()) > 0:
t.iHaveRPCSent(work.rpc.GetControl().GetIhave())
}
return nil
}

// iHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message.
// Args:
// - []*pb.ControlIHave: list of iHave control messages.
func (t *RPCSentTracker) iHaveRPCSent(iHaves []*pb.ControlIHave) {
controlMsgType := p2pmsg.CtrlMsgIHave
for _, iHave := range iHaves {
topicID := iHave.GetTopicID()
Expand Down

0 comments on commit 398b49c

Please sign in to comment.