From 4c635c5e37b4cfbc9bbbee92da5dbb60779ebf01 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Wed, 5 Jul 2023 12:34:45 -0400 Subject: [PATCH 01/19] add control message rpc sent tracker for use in gossip sub tracer --- network/p2p/inspector/internal/cache/cache.go | 4 +- .../p2p/p2pbuilder/inspector/suite/suite.go | 2 +- network/p2p/tracer/internal/cache.go | 86 +++++++ network/p2p/tracer/internal/cache_test.go | 225 ++++++++++++++++++ .../p2p/tracer/internal/rpc_send_entity.go | 37 +++ .../p2p/tracer/internal/rpc_sent_tracker.go | 64 +++++ .../tracer/internal/rpc_sent_tracker_test.go | 71 ++++++ 7 files changed, 485 insertions(+), 4 deletions(-) create mode 100644 network/p2p/tracer/internal/cache.go create mode 100644 network/p2p/tracer/internal/cache_test.go create mode 100644 network/p2p/tracer/internal/rpc_send_entity.go create mode 100644 network/p2p/tracer/internal/rpc_sent_tracker.go create mode 100644 network/p2p/tracer/internal/rpc_sent_tracker_test.go diff --git a/network/p2p/inspector/internal/cache/cache.go b/network/p2p/inspector/internal/cache/cache.go index 133fd0a9ac7..82d8f781a98 100644 --- a/network/p2p/inspector/internal/cache/cache.go +++ b/network/p2p/inspector/internal/cache/cache.go @@ -40,9 +40,7 @@ type RecordCache struct { // NewRecordCache creates a new *RecordCache. // Args: -// - sizeLimit: the maximum number of records that the cache can hold. -// - logger: the logger used by the cache. -// - collector: the metrics collector used by the cache. +// - config: record cache config. // - recordEntityFactory: a factory function that creates a new spam record. // Returns: // - *RecordCache, the created cache. diff --git a/network/p2p/p2pbuilder/inspector/suite/suite.go b/network/p2p/p2pbuilder/inspector/suite/suite.go index 6271d627cf4..2aa7532ad41 100644 --- a/network/p2p/p2pbuilder/inspector/suite/suite.go +++ b/network/p2p/p2pbuilder/inspector/suite/suite.go @@ -62,7 +62,7 @@ func (s *GossipSubInspectorSuite) InspectFunc() func(peer.ID, *pubsub.RPC) error return s.aggregatedInspector.Inspect } -// AddInvalidCtrlMsgNotificationConsumer adds a consumer to the invalid control message notification distributor. +// AddInvCtrlMsgNotifConsumer adds a consumer to the invalid control message notification distributor. // This consumer is notified when a misbehaving peer regarding gossipsub control messages is detected. This follows a pub/sub // pattern where the consumer is notified when a new notification is published. // A consumer is only notified once for each notification, and only receives notifications that were published after it was added. diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go new file mode 100644 index 00000000000..597f25c1534 --- /dev/null +++ b/network/p2p/tracer/internal/cache.go @@ -0,0 +1,86 @@ +package internal + +import ( + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata" + "github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool" + "github.com/onflow/flow-go/module/mempool/stdmap" + "github.com/onflow/flow-go/network/p2p" +) + +type entityFactory func(id flow.Identifier, controlMsgType p2p.ControlMessageType) rpcSentEntity + +type RPCSentCacheConfig struct { + sizeLimit uint32 + logger zerolog.Logger + collector module.HeroCacheMetrics +} + +// rpcSentCache cache that stores rpcSentEntity. These entity's represent RPC control messages sent from the local node. +type rpcSentCache struct { + // c is the underlying cache. + c *stdmap.Backend +} + +// newRPCSentCache creates a new *rpcSentCache. +// Args: +// - config: record cache config. +// Returns: +// - *rpcSentCache: the created cache. +// Note that this cache is intended to track control messages sent by the local node, +// it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked. +func newRPCSentCache(config *RPCSentCacheConfig) (*rpcSentCache, error) { + backData := herocache.NewCache(config.sizeLimit, + herocache.DefaultOversizeFactor, + heropool.LRUEjection, + config.logger.With().Str("mempool", "gossipsub=rpc-control-messages-sent").Logger(), + config.collector) + return &rpcSentCache{ + c: stdmap.NewBackend(stdmap.WithBackData(backData)), + }, nil +} + +// init initializes the record cached for the given messageID if it does not exist. +// Returns true if the record is initialized, false otherwise (i.e.: the record already exists). +// Args: +// - flow.Identifier: the messageID to store the rpc control message. +// - p2p.ControlMessageType: the rpc control message type. +// Returns: +// - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). +// Note that if init is called multiple times for the same messageID, the record is initialized only once, and the +// subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). +func (r *rpcSentCache) init(messageID flow.Identifier, controlMsgType p2p.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(messageID, controlMsgType)) +} + +// has checks if the RPC message has been cached indicating it has been sent. +// Args: +// - flow.Identifier: the messageID to store the rpc control message. +// Returns: +// - bool: true if the RPC has been cache indicating it was sent from the local node. +func (r *rpcSentCache) has(messageId flow.Identifier) bool { + return r.c.Has(messageId) +} + +// ids returns the list of ids of each rpcSentEntity stored. +func (r *rpcSentCache) ids() []flow.Identifier { + return flow.GetIDs(r.c.All()) +} + +// remove the record of the given messageID from the cache. +// Returns true if the record is removed, false otherwise (i.e., the record does not exist). +// Args: +// - flow.Identifier: the messageID to store the rpc control message. +// Returns: +// - true if the record is removed, false otherwise (i.e., the record does not exist). +func (r *rpcSentCache) remove(messageID flow.Identifier) bool { + return r.c.Remove(messageID) +} + +// size returns the number of records in the cache. +func (r *rpcSentCache) size() uint { + return r.c.Size() +} diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go new file mode 100644 index 00000000000..2d4017c54e9 --- /dev/null +++ b/network/p2p/tracer/internal/cache_test.go @@ -0,0 +1,225 @@ +package internal + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/p2p" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestCache_Init tests the init method of the rpcSentCache. +// It ensures that the method returns true when a new record is initialized +// and false when an existing record is initialized. +func TestCache_Init(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + id1 := unittest.IdentifierFixture() + id2 := unittest.IdentifierFixture() + + // test initializing a record for an ID that doesn't exist in the cache + initialized := cache.init(id1, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(id1), "expected record to exist") + + // test initializing a record for an ID that already exists in the cache + initialized = cache.init(id1, controlMsgType) + require.False(t, initialized, "expected record not to be initialized") + require.True(t, cache.has(id1), "expected record to exist") + + // test initializing a record for another ID + initialized = cache.init(id2, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(id2), "expected record to exist") +} + +// TestCache_ConcurrentInit tests the concurrent initialization of records. +// The test covers the following scenarios: +// 1. Multiple goroutines initializing records for different ids. +// 2. Ensuring that all records are correctly initialized. +func TestCache_ConcurrentInit(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + ids := unittest.IdentifierListFixture(10) + + var wg sync.WaitGroup + wg.Add(len(ids)) + + for _, id := range ids { + go func(id flow.Identifier) { + defer wg.Done() + cache.init(id, controlMsgType) + }(id) + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that all records are correctly initialized + for _, id := range ids { + require.True(t, cache.has(id)) + } +} + +// TestCache_ConcurrentSameRecordInit tests the concurrent initialization of the same record. +// The test covers the following scenarios: +// 1. Multiple goroutines attempting to initialize the same record concurrently. +// 2. Only one goroutine successfully initializes the record, and others receive false on initialization. +// 3. The record is correctly initialized in the cache and can be retrieved using the Get method. +func TestCache_ConcurrentSameRecordInit(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + id := unittest.IdentifierFixture() + const concurrentAttempts = 10 + + var wg sync.WaitGroup + wg.Add(concurrentAttempts) + + successGauge := atomic.Int32{} + + for i := 0; i < concurrentAttempts; i++ { + go func() { + defer wg.Done() + initSuccess := cache.init(id, controlMsgType) + if initSuccess { + successGauge.Inc() + } + }() + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that only one goroutine successfully initialized the record + require.Equal(t, int32(1), successGauge.Load()) + + // ensure that the record is correctly initialized in the cache + require.True(t, cache.has(id)) +} + +// TestCache_Remove tests the remove method of the RecordCache. +// The test covers the following scenarios: +// 1. Initializing the cache with multiple records. +// 2. Removing a record and checking if it is removed correctly. +// 3. Ensuring the other records are still in the cache after removal. +// 4. Attempting to remove a non-existent ID. +func TestCache_Remove(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + // initialize spam records for a few ids + id1 := unittest.IdentifierFixture() + id2 := unittest.IdentifierFixture() + id3 := unittest.IdentifierFixture() + + require.True(t, cache.init(id1, controlMsgType)) + require.True(t, cache.init(id2, controlMsgType)) + require.True(t, cache.init(id3, controlMsgType)) + + numOfIds := uint(3) + require.Equal(t, numOfIds, cache.size(), fmt.Sprintf("expected size of the cache to be %d", numOfIds)) + // remove id1 and check if the record is removed + require.True(t, cache.remove(id1)) + require.NotContains(t, id1, cache.ids()) + + // check if the other ids are still in the cache + require.True(t, cache.has(id2)) + require.True(t, cache.has(id3)) + + // attempt to remove a non-existent ID + id4 := unittest.IdentifierFixture() + require.False(t, cache.remove(id4)) +} + +// TestCache_ConcurrentRemove tests the concurrent removal of records for different ids. +// The test covers the following scenarios: +// 1. Multiple goroutines removing records for different ids concurrently. +// 2. The records are correctly removed from the cache. +func TestCache_ConcurrentRemove(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + ids := unittest.IdentifierListFixture(10) + for _, id := range ids { + cache.init(id, controlMsgType) + } + + var wg sync.WaitGroup + wg.Add(len(ids)) + + for _, id := range ids { + go func(id flow.Identifier) { + defer wg.Done() + require.True(t, cache.remove(id)) + require.NotContains(t, id, cache.ids()) + }(id) + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + require.Equal(t, uint(0), cache.size()) +} + +// TestRecordCache_ConcurrentInitAndRemove tests the concurrent initialization and removal of records for different +// ids. The test covers the following scenarios: +// 1. Multiple goroutines initializing records for different ids concurrently. +// 2. Multiple goroutines removing records for different ids concurrently. +// 3. The initialized records are correctly added to the cache. +// 4. The removed records are correctly removed from the cache. +func TestRecordCache_ConcurrentInitAndRemove(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2p.CtrlMsgIHave + ids := unittest.IdentifierListFixture(20) + idsToAdd := ids[:10] + idsToRemove := ids[10:] + + for _, id := range idsToRemove { + cache.init(id, controlMsgType) + } + + var wg sync.WaitGroup + wg.Add(len(ids)) + + // initialize spam records concurrently + for _, id := range idsToAdd { + go func(id flow.Identifier) { + defer wg.Done() + cache.init(id, controlMsgType) + }(id) + } + + // remove spam records concurrently + for _, id := range idsToRemove { + go func(id flow.Identifier) { + defer wg.Done() + require.True(t, cache.remove(id)) + require.NotContains(t, id, cache.ids()) + }(id) + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that the initialized records are correctly added to the cache + // and removed records are correctly removed from the cache + require.ElementsMatch(t, idsToAdd, cache.ids()) +} + +// cacheFixture returns a new *RecordCache. +func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *rpcSentCache { + config := &RPCSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + r, err := newRPCSentCache(config) + require.NoError(t, err) + // expect cache to be empty + require.Equalf(t, uint(0), r.size(), "cache size must be 0") + require.NotNil(t, r) + return r +} diff --git a/network/p2p/tracer/internal/rpc_send_entity.go b/network/p2p/tracer/internal/rpc_send_entity.go new file mode 100644 index 00000000000..493e4808440 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_send_entity.go @@ -0,0 +1,37 @@ +package internal + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/network/p2p" +) + +// rpcSentEntity struct representing an RPC control message sent from local node. +// This struct implements the flow.Entity interface and uses messageID field deduplication. +type rpcSentEntity struct { + // messageID the messageID of the rpc control message. + messageID flow.Identifier + // controlMsgType the control message type. + controlMsgType p2p.ControlMessageType +} + +var _ flow.Entity = (*rpcSentEntity)(nil) + +// ID returns the node ID of the sender, which is used as the unique identifier of the entity for maintenance and +// deduplication purposes in the cache. +func (r rpcSentEntity) ID() flow.Identifier { + return r.messageID +} + +// Checksum returns the node ID of the sender, it does not have any purpose in the cache. +// It is implemented to satisfy the flow.Entity interface. +func (r rpcSentEntity) Checksum() flow.Identifier { + return r.messageID +} + +// newRPCSentEntity returns a new rpcSentEntity. +func newRPCSentEntity(id flow.Identifier, controlMessageType p2p.ControlMessageType) rpcSentEntity { + return rpcSentEntity{ + messageID: id, + controlMsgType: controlMessageType, + } +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go new file mode 100644 index 00000000000..047976cd424 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -0,0 +1,64 @@ +package internal + +import ( + "fmt" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/network/p2p" +) + +// RPCSentTracker tracks RPC messages that are sent. +type RPCSentTracker struct { + cache *rpcSentCache +} + +// NewRPCSentTracker returns a new *NewRPCSentTracker. +func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) (*RPCSentTracker, error) { + config := &RPCSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + cache, err := newRPCSentCache(config) + if err != nil { + return nil, fmt.Errorf("failed to create new rpc sent cahe: %w", err) + } + return &RPCSentTracker{cache: cache}, nil +} + +// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Args: +// - *pubsub.RPC: the rpc sent. +func (t *RPCSentTracker) OnIHaveRPCSent(rpc *pubsub.RPC) { + controlMsgType := p2p.CtrlMsgIHave + for _, iHave := range rpc.GetControl().GetIhave() { + topicID := iHave.GetTopicID() + for _, messageID := range iHave.GetMessageIDs() { + entityMsgID := iHaveRPCSentEntityID(topicID, messageID) + t.cache.init(entityMsgID, controlMsgType) + } + } +} + +// WasIHaveRPCSent checks if an iHave control message with the provided message ID was sent. +// Args: +// - string: the topic ID of the iHave RPC. +// - string: the message ID of the iHave RPC. +// Returns: +// - bool: true if the iHave rpc with the provided message ID was sent. +func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { + entityMsgID := iHaveRPCSentEntityID(topicID, messageID) + return t.cache.has(entityMsgID) +} + +// iHaveRPCSentEntityID appends the topicId and messageId and returns the flow.Identifier hash. +// Each iHave RPC control message contains a single topicId and multiple messageIds, to ensure we +// produce a unique id for each message we append the messageId to the topicId. +func iHaveRPCSentEntityID(topicId, messageId string) flow.Identifier { + b := []byte(fmt.Sprintf("%s%s", topicId, messageId)) + return flow.HashToID(b) +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go new file mode 100644 index 00000000000..f0a0dda5195 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -0,0 +1,71 @@ +package internal + +import ( + "testing" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. +func TestNewRPCSentTracker(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) +} + +// TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. +func TestRPCSentTracker_IHave(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) + + t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { + require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) + }) + + t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { + topicID := channels.PushBlocks.String() + messageID := unittest.IdentifierFixture().String() + iHaves := []*pb.ControlIHave{{ + TopicID: &topicID, + MessageIDs: []string{messageID}, + }} + rpc := rpcFixture(withIhaves(iHaves)) + tracker.OnIHaveRPCSent(rpc) + require.True(t, tracker.WasIHaveRPCSent(topicID, messageID)) + }) +} + +func mockTracker(t *testing.T) *RPCSentTracker { + logger := zerolog.Nop() + sizeLimit := uint32(100) + collector := metrics.NewNoopCollector() + tracker, err := NewRPCSentTracker(logger, sizeLimit, collector) + require.NoError(t, err) + return tracker +} + +type rpcFixtureOpt func(*pubsub.RPC) + +func withIhaves(iHave []*pb.ControlIHave) rpcFixtureOpt { + return func(rpc *pubsub.RPC) { + rpc.Control.Ihave = iHave + } +} + +func rpcFixture(opts ...rpcFixtureOpt) *pubsub.RPC { + rpc := &pubsub.RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{}, + }, + } + for _, opt := range opts { + opt(rpc) + } + return rpc +} From 2850cacaf64fbc7c6133d9202565bb14817c323e Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Thu, 6 Jul 2023 01:33:07 -0400 Subject: [PATCH 02/19] add rpc sent tracker to gossipsub mesh tracer - track iHave messages on each RPC sent --- .../node_builder/access_node_builder.go | 17 +++++--- cmd/observer/node_builder/observer_builder.go | 17 +++++--- config/default-config.yml | 2 + follower/follower_builder.go | 17 +++++--- module/metrics/herocache.go | 9 ++++ module/metrics/labels.go | 1 + network/internal/p2pfixtures/fixtures.go | 16 +++++--- network/netconf/flags.go | 10 +++-- network/p2p/p2pbuilder/libp2pNodeBuilder.go | 14 ++++++- network/p2p/p2pconf/gossipsub.go | 2 + network/p2p/tracer/gossipSubMeshTracer.go | 41 ++++++++++++++----- .../p2p/tracer/gossipSubMeshTracer_test.go | 12 +++++- network/p2p/tracer/internal/cache.go | 6 +-- .../p2p/tracer/internal/rpc_send_entity.go | 6 +-- .../p2p/tracer/internal/rpc_sent_tracker.go | 10 ++--- 15 files changed, 132 insertions(+), 48 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index bf7a52047b4..e4c21e47ca6 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1192,11 +1192,18 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri return nil, fmt.Errorf("could not create connection manager: %w", err) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - networkMetrics, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + if err != nil { + return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) + } libp2pNode, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index be518249714..cbaa51cdc62 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -703,11 +703,18 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + if err != nil { + return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) + } node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/config/default-config.yml b/config/default-config.yml index 371fc4c385c..7788aa6a91d 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -63,6 +63,8 @@ network-config: # The default interval at which the gossipsub score tracer logs the peer scores. This is used for debugging and forensics purposes. # Note that we purposefully choose this logging interval high enough to avoid spamming the logs. gossipsub-score-tracer-interval: 1m + # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. + gossipsub-rpc-sent-tracker-cache-size: 10000 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs diff --git a/follower/follower_builder.go b/follower/follower_builder.go index 36486907d1c..95c7cb60108 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -605,11 +605,18 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + if err != nil { + return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) + } node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/module/metrics/herocache.go b/module/metrics/herocache.go index 54e287bdb1b..f3a88341c87 100644 --- a/module/metrics/herocache.go +++ b/module/metrics/herocache.go @@ -146,6 +146,15 @@ func GossipSubRPCInspectorQueueMetricFactory(f HeroCacheMetricsFactory, networkT return f(namespaceNetwork, r) } +func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { + // we don't use the public prefix for the metrics here for sake of backward compatibility of metric name. + r := ResourceNetworkingRPCSentTrackerCache + if networkType == network.PublicNetwork { + r = PrependPublicPrefix(r) + } + return f(namespaceNetwork, r) +} + func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { r := ResourceNetworkingRpcInspectorNotificationQueue if networkType == network.PublicNetwork { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 353e1b3ca25..d57dd418b56 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -92,6 +92,7 @@ const ( ResourceNetworkingApplicationLayerSpamReportQueue = "application_layer_spam_report_queue" ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" + ResourceNetworkingRPCSentTrackerCache = "rpc_sent_tracker_cache" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 40229337dfa..2d2c18983ab 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -102,11 +102,17 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif idProvider := id.NewFixedIdentityProvider(nodeIds) defaultFlowConfig, err := config.DefaultConfig() require.NoError(t, err) - meshTracer := tracer.NewGossipSubMeshTracer( - logger, - metrics.NewNoopCollector(), - idProvider, - defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + require.NoError(t, err) builder := p2pbuilder.NewNodeBuilder( logger, diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 2b76042e6c8..504d9ee177a 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -36,9 +36,10 @@ const ( gracePeriod = "libp2p-grace-period" silencePeriod = "libp2p-silence-period" // gossipsub - peerScoring = "gossipsub-peer-scoring-enabled" - localMeshLogInterval = "gossipsub-local-mesh-logging-interval" - scoreTracerInterval = "gossipsub-score-tracer-interval" + peerScoring = "gossipsub-peer-scoring-enabled" + localMeshLogInterval = "gossipsub-local-mesh-logging-interval" + rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" + scoreTracerInterval = "gossipsub-score-tracer-interval" // gossipsub validation inspector gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size" validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers" @@ -65,7 +66,7 @@ func AllFlagNames() []string { return []string{ networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay, dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio, - fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, scoreTracerInterval, + fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage, ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval, @@ -106,6 +107,7 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Bool(peerScoring, config.GossipSubConfig.PeerScoring, "enabling peer scoring on pubsub network") flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub") flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable") + flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index c0a6412297c..da8768402ed 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/netconf" "github.com/onflow/flow-go/network/p2p" @@ -494,7 +495,18 @@ func DefaultNodeBuilder( builder.EnableGossipSubPeerScoring(nil) } - meshTracer := tracer.NewGossipSubMeshTracer(logger, metricsCfg.Metrics, idProvider, gossipCfg.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + if err != nil { + return nil, fmt.Errorf("could not create gossipsub mesh tracer: %w", err) + } builder.SetGossipSubTracer(meshTracer) builder.SetGossipSubScoreTracerInterval(gossipCfg.ScoreTracerInterval) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index f9155129efd..d297f5cba8b 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -21,6 +21,8 @@ type GossipSubConfig struct { LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. + RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` // PeerScoring is whether to enable GossipSub peer scoring. PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 7cd4dd2b692..48054427b1d 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/network/p2p" + "github.com/onflow/flow-go/network/p2p/tracer/internal" "github.com/onflow/flow-go/utils/logging" ) @@ -43,23 +44,34 @@ type GossipSubMeshTracer struct { idProvider module.IdentityProvider loggerInterval time.Duration metrics module.GossipSubLocalMeshMetrics + rpcSentTracker *internal.RPCSentTracker } var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) -func NewGossipSubMeshTracer( - logger zerolog.Logger, - metrics module.GossipSubLocalMeshMetrics, - idProvider module.IdentityProvider, - loggerInterval time.Duration) *GossipSubMeshTracer { +type GossipSubMeshTracerConfig struct { + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheCollector module.HeroCacheMetrics + RpcSentTrackerCacheSize uint32 +} + +func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTracer, error) { + rpcSentTracker, err := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) + if err != nil { + return nil, err + } g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), - idProvider: idProvider, - metrics: metrics, - logger: logger.With().Str("component", "gossip_sub_topology_tracer").Logger(), - loggerInterval: loggerInterval, + idProvider: config.IDProvider, + metrics: config.Metrics, + logger: config.Logger.With().Str("component", "gossip_sub_topology_tracer").Logger(), + loggerInterval: config.LoggerInterval, + rpcSentTracker: rpcSentTracker, } g.Component = component.NewComponentManagerBuilder(). @@ -69,7 +81,7 @@ func NewGossipSubMeshTracer( }). Build() - return g + return g, nil } // GetMeshPeers returns the local mesh peers for the given topic. @@ -139,6 +151,15 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { lg.Info().Hex("flow_id", logging.ID(id.NodeID)).Str("role", id.Role.String()).Msg("pruned peer") } +// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. +// This function can be updated to track other control messages in the future as required. +func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { + switch { + case len(rpc.GetControl().GetIhave()) > 0: + t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + } +} + // logLoop logs the mesh peers of the local node for each topic at a regular interval. func (t *GossipSubMeshTracer) logLoop(ctx irrecoverable.SignalerContext) { ticker := time.NewTicker(t.loggerInterval) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index fc14b280282..4b469beb2e7 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p" @@ -61,7 +62,16 @@ func TestGossipSubMeshTracer(t *testing.T) { // we only need one node with a meshTracer to test the meshTracer. // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) - meshTracer := tracer.NewGossipSubMeshTracer(logger, collector, idProvider, 1*time.Second) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: 1 * time.Second, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: uint32(100), + } + meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) + require.NoError(t, err) tracerNode, tracerId := p2ptest.NodeFixture( t, sporkId, diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index 597f25c1534..34f34c4fa01 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -8,11 +8,9 @@ import ( herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata" "github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool" "github.com/onflow/flow-go/module/mempool/stdmap" - "github.com/onflow/flow-go/network/p2p" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) -type entityFactory func(id flow.Identifier, controlMsgType p2p.ControlMessageType) rpcSentEntity - type RPCSentCacheConfig struct { sizeLimit uint32 logger zerolog.Logger @@ -52,7 +50,7 @@ func newRPCSentCache(config *RPCSentCacheConfig) (*rpcSentCache, error) { // - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). // Note that if init is called multiple times for the same messageID, the record is initialized only once, and the // subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). -func (r *rpcSentCache) init(messageID flow.Identifier, controlMsgType p2p.ControlMessageType) bool { +func (r *rpcSentCache) init(messageID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool { return r.c.Add(newRPCSentEntity(messageID, controlMsgType)) } diff --git a/network/p2p/tracer/internal/rpc_send_entity.go b/network/p2p/tracer/internal/rpc_send_entity.go index 493e4808440..b3f56ce0b55 100644 --- a/network/p2p/tracer/internal/rpc_send_entity.go +++ b/network/p2p/tracer/internal/rpc_send_entity.go @@ -2,7 +2,7 @@ package internal import ( "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/network/p2p" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) // rpcSentEntity struct representing an RPC control message sent from local node. @@ -11,7 +11,7 @@ type rpcSentEntity struct { // messageID the messageID of the rpc control message. messageID flow.Identifier // controlMsgType the control message type. - controlMsgType p2p.ControlMessageType + controlMsgType p2pmsg.ControlMessageType } var _ flow.Entity = (*rpcSentEntity)(nil) @@ -29,7 +29,7 @@ func (r rpcSentEntity) Checksum() flow.Identifier { } // newRPCSentEntity returns a new rpcSentEntity. -func newRPCSentEntity(id flow.Identifier, controlMessageType p2p.ControlMessageType) rpcSentEntity { +func newRPCSentEntity(id flow.Identifier, controlMessageType p2pmsg.ControlMessageType) rpcSentEntity { return rpcSentEntity{ messageID: id, controlMsgType: controlMessageType, diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 047976cd424..f580c443f41 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -3,12 +3,12 @@ package internal import ( "fmt" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/rs/zerolog" + pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/network/p2p" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) // RPCSentTracker tracks RPC messages that are sent. @@ -33,9 +33,9 @@ func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module // OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. // Args: // - *pubsub.RPC: the rpc sent. -func (t *RPCSentTracker) OnIHaveRPCSent(rpc *pubsub.RPC) { - controlMsgType := p2p.CtrlMsgIHave - for _, iHave := range rpc.GetControl().GetIhave() { +func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { + controlMsgType := p2pmsg.CtrlMsgIHave + for _, iHave := range iHaves { topicID := iHave.GetTopicID() for _, messageID := range iHave.GetMessageIDs() { entityMsgID := iHaveRPCSentEntityID(topicID, messageID) From 1e758003cf88215662f02e05502453542405befc Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Fri, 7 Jul 2023 12:32:45 -0400 Subject: [PATCH 03/19] use MakeIDFromFingerPrint --- network/p2p/tracer/internal/cache_test.go | 14 +++++++------- network/p2p/tracer/internal/rpc_sent_tracker.go | 3 +-- .../p2p/tracer/internal/rpc_sent_tracker_test.go | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index 2d4017c54e9..fc043ee116f 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -13,7 +13,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/network/p2p" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" "github.com/onflow/flow-go/utils/unittest" ) @@ -22,7 +22,7 @@ import ( // and false when an existing record is initialized. func TestCache_Init(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave id1 := unittest.IdentifierFixture() id2 := unittest.IdentifierFixture() @@ -48,7 +48,7 @@ func TestCache_Init(t *testing.T) { // 2. Ensuring that all records are correctly initialized. func TestCache_ConcurrentInit(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave ids := unittest.IdentifierListFixture(10) var wg sync.WaitGroup @@ -76,7 +76,7 @@ func TestCache_ConcurrentInit(t *testing.T) { // 3. The record is correctly initialized in the cache and can be retrieved using the Get method. func TestCache_ConcurrentSameRecordInit(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave id := unittest.IdentifierFixture() const concurrentAttempts = 10 @@ -112,7 +112,7 @@ func TestCache_ConcurrentSameRecordInit(t *testing.T) { // 4. Attempting to remove a non-existent ID. func TestCache_Remove(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave // initialize spam records for a few ids id1 := unittest.IdentifierFixture() id2 := unittest.IdentifierFixture() @@ -143,7 +143,7 @@ func TestCache_Remove(t *testing.T) { // 2. The records are correctly removed from the cache. func TestCache_ConcurrentRemove(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave ids := unittest.IdentifierListFixture(10) for _, id := range ids { cache.init(id, controlMsgType) @@ -173,7 +173,7 @@ func TestCache_ConcurrentRemove(t *testing.T) { // 4. The removed records are correctly removed from the cache. func TestRecordCache_ConcurrentInitAndRemove(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2p.CtrlMsgIHave + controlMsgType := p2pmsg.CtrlMsgIHave ids := unittest.IdentifierListFixture(20) idsToAdd := ids[:10] idsToRemove := ids[10:] diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index f580c443f41..e2db526d549 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -59,6 +59,5 @@ func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { // Each iHave RPC control message contains a single topicId and multiple messageIds, to ensure we // produce a unique id for each message we append the messageId to the topicId. func iHaveRPCSentEntityID(topicId, messageId string) flow.Identifier { - b := []byte(fmt.Sprintf("%s%s", topicId, messageId)) - return flow.HashToID(b) + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s", topicId, messageId))) } diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index f0a0dda5195..09d54ea6f04 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -36,7 +36,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { MessageIDs: []string{messageID}, }} rpc := rpcFixture(withIhaves(iHaves)) - tracker.OnIHaveRPCSent(rpc) + tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) require.True(t, tracker.WasIHaveRPCSent(topicID, messageID)) }) } From c7119f9d482369d0bd39517ad775e247294ff9c3 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Fri, 7 Jul 2023 12:40:59 -0400 Subject: [PATCH 04/19] update WasIHaveRPCSent test ensure false positive not returned --- network/p2p/tracer/internal/rpc_sent_tracker_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 09d54ea6f04..5ebfc2da1d1 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -30,14 +30,19 @@ func TestRPCSentTracker_IHave(t *testing.T) { t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { topicID := channels.PushBlocks.String() - messageID := unittest.IdentifierFixture().String() + messageID1 := unittest.IdentifierFixture().String() iHaves := []*pb.ControlIHave{{ TopicID: &topicID, - MessageIDs: []string{messageID}, + MessageIDs: []string{messageID1}, }} rpc := rpcFixture(withIhaves(iHaves)) tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) - require.True(t, tracker.WasIHaveRPCSent(topicID, messageID)) + require.True(t, tracker.WasIHaveRPCSent(topicID, messageID1)) + + // manipulate last byte of message ID ensure false positive not returned + messageID2 := []byte(messageID1) + messageID2[len(messageID2)-1] = 'X' + require.False(t, tracker.WasIHaveRPCSent(topicID, string(messageID2))) }) } From 33681ece86ea7f40ba696640283f30c7ab03434c Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Fri, 7 Jul 2023 12:42:08 -0400 Subject: [PATCH 05/19] Update rpc_sent_tracker.go --- network/p2p/tracer/internal/rpc_sent_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index e2db526d549..66791217c57 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -3,9 +3,9 @@ package internal import ( "fmt" + pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" - pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" p2pmsg "github.com/onflow/flow-go/network/p2p/message" From c7a4d5820fcb1d705c606a2a70f03a17683cd1b2 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:30:04 -0400 Subject: [PATCH 06/19] Update config/default-config.yml Co-authored-by: Yahya Hassanzadeh, Ph.D. --- config/default-config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config/default-config.yml b/config/default-config.yml index 7788aa6a91d..9834694b0e2 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -64,7 +64,8 @@ network-config: # Note that we purposefully choose this logging interval high enough to avoid spamming the logs. gossipsub-score-tracer-interval: 1m # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. - gossipsub-rpc-sent-tracker-cache-size: 10000 + # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. + gossipsub-rpc-sent-tracker-cache-size: 1_000_000 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs From 252dd2d2addb7e98f670f7fca9962b7287d700df Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:30:19 -0400 Subject: [PATCH 07/19] Update module/metrics/labels.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- module/metrics/labels.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/metrics/labels.go b/module/metrics/labels.go index d57dd418b56..9febc9ab391 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -92,7 +92,7 @@ const ( ResourceNetworkingApplicationLayerSpamReportQueue = "application_layer_spam_report_queue" ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" - ResourceNetworkingRPCSentTrackerCache = "rpc_sent_tracker_cache" + ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel From 647065b7f909915cf3c72a22a05becc935130fb5 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:30:26 -0400 Subject: [PATCH 08/19] Update network/p2p/tracer/gossipSubMeshTracer.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/tracer/gossipSubMeshTracer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 48054427b1d..6ea32c1b464 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -69,7 +69,7 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTr topicMeshMap: make(map[string]map[peer.ID]struct{}), idProvider: config.IDProvider, metrics: config.Metrics, - logger: config.Logger.With().Str("component", "gossip_sub_topology_tracer").Logger(), + logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(), loggerInterval: config.LoggerInterval, rpcSentTracker: rpcSentTracker, } From 72e6b7db62a82452760e26a3fdd988d343b032a9 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:38:54 -0400 Subject: [PATCH 09/19] document irrecoverable error in NewGossipSubMeshTracer --- network/p2p/tracer/gossipSubMeshTracer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 48054427b1d..21a93cddb22 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -58,6 +58,12 @@ type GossipSubMeshTracerConfig struct { RpcSentTrackerCacheSize uint32 } +// NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. +// Args: +// - *GossipSubMeshTracerConfig: the mesh tracer config. +// Returns: +// - *GossipSubMeshTracer: new mesh tracer. +// - error: if any error is encountered during the creation of the gossipsub mesh tracer, all errors are considered irrecoverable. func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTracer, error) { rpcSentTracker, err := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) if err != nil { From 1dcb3a4408a998cf454a313a4d67beaefe3ed0ba Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:39:39 -0400 Subject: [PATCH 10/19] Update network/p2p/tracer/gossipSubMeshTracer.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/tracer/gossipSubMeshTracer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 6ea32c1b464..7ea6b0c7b78 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -61,7 +61,7 @@ type GossipSubMeshTracerConfig struct { func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTracer, error) { rpcSentTracker, err := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) if err != nil { - return nil, err + return nil, fmt.Errof("could not create rpc send tracker: %w", err) } g := &GossipSubMeshTracer{ From 2b2d2fc2651e38e2c20f14f50f297f1f1f39da44 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:44:28 -0400 Subject: [PATCH 11/19] use cache size from default config --- network/p2p/tracer/gossipSubMeshTracer_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index 4b469beb2e7..f9d409a762d 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" @@ -30,6 +31,8 @@ import ( // One of the nodes is running with an unknown peer id, which the identity provider is mocked to return an error and // the mesh tracer should log a warning message. func TestGossipSubMeshTracer(t *testing.T) { + defaultConfig, err := config.DefaultConfig() + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) sporkId := unittest.IdentifierFixture() @@ -66,9 +69,9 @@ func TestGossipSubMeshTracer(t *testing.T) { Logger: logger, Metrics: collector, IDProvider: idProvider, - LoggerInterval: 1 * time.Second, + LoggerInterval: time.Second, RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), - RpcSentTrackerCacheSize: uint32(100), + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) require.NoError(t, err) From 6b6a74a0386dacdf74aada3985f6aee949d0b63e Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:49:34 -0400 Subject: [PATCH 12/19] Update network/p2p/tracer/internal/cache.go Co-authored-by: Yahya Hassanzadeh, Ph.D. --- network/p2p/tracer/internal/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index 34f34c4fa01..a16c502f907 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -34,7 +34,7 @@ func newRPCSentCache(config *RPCSentCacheConfig) (*rpcSentCache, error) { backData := herocache.NewCache(config.sizeLimit, herocache.DefaultOversizeFactor, heropool.LRUEjection, - config.logger.With().Str("mempool", "gossipsub=rpc-control-messages-sent").Logger(), + config.logger.With().Str("mempool", "gossipsub-rpc-control-messages-sent").Logger(), config.collector) return &rpcSentCache{ c: stdmap.NewBackend(stdmap.WithBackData(backData)), From 45889037518c7a67efbbd7e685b2a5c9a2a8acab Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:51:11 -0400 Subject: [PATCH 13/19] rename RPCSentCacheConfig -> rpcCtrlMsgSentCacheConfig --- network/p2p/tracer/internal/cache.go | 4 ++-- network/p2p/tracer/internal/cache_test.go | 2 +- network/p2p/tracer/internal/rpc_sent_tracker.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index 34f34c4fa01..668e05e3199 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -11,7 +11,7 @@ import ( p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) -type RPCSentCacheConfig struct { +type rpcCtrlMsgSentCacheConfig struct { sizeLimit uint32 logger zerolog.Logger collector module.HeroCacheMetrics @@ -30,7 +30,7 @@ type rpcSentCache struct { // - *rpcSentCache: the created cache. // Note that this cache is intended to track control messages sent by the local node, // it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked. -func newRPCSentCache(config *RPCSentCacheConfig) (*rpcSentCache, error) { +func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) (*rpcSentCache, error) { backData := herocache.NewCache(config.sizeLimit, herocache.DefaultOversizeFactor, heropool.LRUEjection, diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index fc043ee116f..ae8671c8f68 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -211,7 +211,7 @@ func TestRecordCache_ConcurrentInitAndRemove(t *testing.T) { // cacheFixture returns a new *RecordCache. func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *rpcSentCache { - config := &RPCSentCacheConfig{ + config := &rpcCtrlMsgSentCacheConfig{ sizeLimit: sizeLimit, logger: logger, collector: collector, diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 66791217c57..9b08a79efc5 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -18,7 +18,7 @@ type RPCSentTracker struct { // NewRPCSentTracker returns a new *NewRPCSentTracker. func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) (*RPCSentTracker, error) { - config := &RPCSentCacheConfig{ + config := &rpcCtrlMsgSentCacheConfig{ sizeLimit: sizeLimit, logger: logger, collector: collector, From 8d95bb4e2373438dbb9316443b93bd3cea3ff857 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 09:52:06 -0400 Subject: [PATCH 14/19] add godoc for rpcCtrlMsgSentCacheConfig --- network/p2p/tracer/internal/cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index bfa35ba6b35..556463256f0 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -11,9 +11,10 @@ import ( p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) +// rpcCtrlMsgSentCacheConfig configuration for the rpc sent cache. type rpcCtrlMsgSentCacheConfig struct { - sizeLimit uint32 logger zerolog.Logger + sizeLimit uint32 collector module.HeroCacheMetrics } From cfc36723646b2a7e5b88ec4322d1b23ec4375d9f Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 10:00:46 -0400 Subject: [PATCH 15/19] remove error return from NewGossipSubMeshTracer func signature --- cmd/access/node_builder/access_node_builder.go | 5 +---- cmd/observer/node_builder/observer_builder.go | 5 +---- follower/follower_builder.go | 5 +---- network/internal/p2pfixtures/fixtures.go | 3 +-- network/p2p/p2pbuilder/libp2pNodeBuilder.go | 6 ++---- network/p2p/tracer/gossipSubMeshTracer.go | 11 +++-------- network/p2p/tracer/gossipSubMeshTracer_test.go | 3 +-- network/p2p/tracer/internal/cache.go | 4 ++-- network/p2p/tracer/internal/cache_test.go | 3 +-- network/p2p/tracer/internal/rpc_sent_tracker.go | 8 ++------ network/p2p/tracer/internal/rpc_sent_tracker_test.go | 9 ++++----- 11 files changed, 19 insertions(+), 43 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index cbfad7b6e05..5edd2629ee2 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1199,10 +1199,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - if err != nil { - return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) - } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) libp2pNode, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 9eca3748a63..78ddc464fb7 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -710,10 +710,7 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - if err != nil { - return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) - } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/follower/follower_builder.go b/follower/follower_builder.go index eaa2a4a0c82..e2eb43cb49c 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -612,10 +612,7 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - if err != nil { - return nil, fmt.Errorf("could not create gossipsub mesh tracer for staked access node: %w", err) - } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 2d2c18983ab..29ee0509fbb 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -111,8 +111,7 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - require.NoError(t, err) + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) builder := p2pbuilder.NewNodeBuilder( logger, diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index da8768402ed..8e550b4fa94 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -503,10 +503,8 @@ func DefaultNodeBuilder( RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - if err != nil { - return nil, fmt.Errorf("could not create gossipsub mesh tracer: %w", err) - } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) + builder.SetGossipSubTracer(meshTracer) builder.SetGossipSubScoreTracerInterval(gossipCfg.ScoreTracerInterval) diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 5135c920390..1cc25fd2565 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -63,13 +63,8 @@ type GossipSubMeshTracerConfig struct { // - *GossipSubMeshTracerConfig: the mesh tracer config. // Returns: // - *GossipSubMeshTracer: new mesh tracer. -// - error: if any error is encountered during the creation of the gossipsub mesh tracer, all errors are considered irrecoverable. -func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTracer, error) { - rpcSentTracker, err := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) - if err != nil { - return nil, fmt.Errof("could not create rpc send tracker: %w", err) - } - +func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { + rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), @@ -87,7 +82,7 @@ func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) (*GossipSubMeshTr }). Build() - return g, nil + return g } // GetMeshPeers returns the local mesh peers for the given topic. diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index f9d409a762d..a2da0584f94 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -73,8 +73,7 @@ func TestGossipSubMeshTracer(t *testing.T) { RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, } - meshTracer, err := tracer.NewGossipSubMeshTracer(meshTracerCfg) - require.NoError(t, err) + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( t, sporkId, diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index 556463256f0..1094aeca95c 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -31,7 +31,7 @@ type rpcSentCache struct { // - *rpcSentCache: the created cache. // Note that this cache is intended to track control messages sent by the local node, // it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked. -func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) (*rpcSentCache, error) { +func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { backData := herocache.NewCache(config.sizeLimit, herocache.DefaultOversizeFactor, heropool.LRUEjection, @@ -39,7 +39,7 @@ func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) (*rpcSentCache, error) { config.collector) return &rpcSentCache{ c: stdmap.NewBackend(stdmap.WithBackData(backData)), - }, nil + } } // init initializes the record cached for the given messageID if it does not exist. diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index ae8671c8f68..3383382a488 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -216,8 +216,7 @@ func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collect logger: logger, collector: collector, } - r, err := newRPCSentCache(config) - require.NoError(t, err) + r := newRPCSentCache(config) // expect cache to be empty require.Equalf(t, uint(0), r.size(), "cache size must be 0") require.NotNil(t, r) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 9b08a79efc5..4ab68cb6e19 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -17,17 +17,13 @@ type RPCSentTracker struct { } // NewRPCSentTracker returns a new *NewRPCSentTracker. -func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) (*RPCSentTracker, error) { +func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker { config := &rpcCtrlMsgSentCacheConfig{ sizeLimit: sizeLimit, logger: logger, collector: collector, } - cache, err := newRPCSentCache(config) - if err != nil { - return nil, fmt.Errorf("failed to create new rpc sent cahe: %w", err) - } - return &RPCSentTracker{cache: cache}, nil + return &RPCSentTracker{cache: newRPCSentCache(config)} } // OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 5ebfc2da1d1..f113f862cbf 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -15,13 +15,13 @@ import ( // TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. func TestNewRPCSentTracker(t *testing.T) { - tracker := mockTracker(t) + tracker := mockTracker() require.NotNil(t, tracker) } // TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. func TestRPCSentTracker_IHave(t *testing.T) { - tracker := mockTracker(t) + tracker := mockTracker() require.NotNil(t, tracker) t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { @@ -46,12 +46,11 @@ func TestRPCSentTracker_IHave(t *testing.T) { }) } -func mockTracker(t *testing.T) *RPCSentTracker { +func mockTracker() *RPCSentTracker { logger := zerolog.Nop() sizeLimit := uint32(100) collector := metrics.NewNoopCollector() - tracker, err := NewRPCSentTracker(logger, sizeLimit, collector) - require.NoError(t, err) + tracker := NewRPCSentTracker(logger, sizeLimit, collector) return tracker } From d270551bb6f3a2280833ff5e1c051e611c3df30c Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 10:10:25 -0400 Subject: [PATCH 16/19] rename messageId -> messageEntityID --- network/p2p/tracer/internal/cache.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index 1094aeca95c..e5a8e3da373 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -42,26 +42,26 @@ func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { } } -// init initializes the record cached for the given messageID if it does not exist. +// init initializes the record cached for the given messageEntityID if it does not exist. // Returns true if the record is initialized, false otherwise (i.e.: the record already exists). // Args: -// - flow.Identifier: the messageID to store the rpc control message. +// - flow.Identifier: the messageEntityID to store the rpc control message. // - p2p.ControlMessageType: the rpc control message type. // Returns: // - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). -// Note that if init is called multiple times for the same messageID, the record is initialized only once, and the +// Note that if init is called multiple times for the same messageEntityID, the record is initialized only once, and the // subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). -func (r *rpcSentCache) init(messageID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Add(newRPCSentEntity(messageID, controlMsgType)) +func (r *rpcSentCache) init(messageEntityID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(messageEntityID, controlMsgType)) } // has checks if the RPC message has been cached indicating it has been sent. // Args: -// - flow.Identifier: the messageID to store the rpc control message. +// - flow.Identifier: the messageEntityID to store the rpc control message. // Returns: // - bool: true if the RPC has been cache indicating it was sent from the local node. -func (r *rpcSentCache) has(messageId flow.Identifier) bool { - return r.c.Has(messageId) +func (r *rpcSentCache) has(messageEntityID flow.Identifier) bool { + return r.c.Has(messageEntityID) } // ids returns the list of ids of each rpcSentEntity stored. @@ -69,14 +69,14 @@ func (r *rpcSentCache) ids() []flow.Identifier { return flow.GetIDs(r.c.All()) } -// remove the record of the given messageID from the cache. +// remove the record of the given messageEntityID from the cache. // Returns true if the record is removed, false otherwise (i.e., the record does not exist). // Args: -// - flow.Identifier: the messageID to store the rpc control message. +// - flow.Identifier: the messageEntityID to store the rpc control message. // Returns: // - true if the record is removed, false otherwise (i.e., the record does not exist). -func (r *rpcSentCache) remove(messageID flow.Identifier) bool { - return r.c.Remove(messageID) +func (r *rpcSentCache) remove(messageEntityID flow.Identifier) bool { + return r.c.Remove(messageEntityID) } // size returns the number of records in the cache. From 8edb7444a56f2ddbf3fbe6338a2668681a7235e1 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 10:10:55 -0400 Subject: [PATCH 17/19] remove unused funcs remove & ids --- network/p2p/tracer/internal/cache.go | 15 --- network/p2p/tracer/internal/cache_test.go | 106 ---------------------- 2 files changed, 121 deletions(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index e5a8e3da373..b5f0a635c47 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -64,21 +64,6 @@ func (r *rpcSentCache) has(messageEntityID flow.Identifier) bool { return r.c.Has(messageEntityID) } -// ids returns the list of ids of each rpcSentEntity stored. -func (r *rpcSentCache) ids() []flow.Identifier { - return flow.GetIDs(r.c.All()) -} - -// remove the record of the given messageEntityID from the cache. -// Returns true if the record is removed, false otherwise (i.e., the record does not exist). -// Args: -// - flow.Identifier: the messageEntityID to store the rpc control message. -// Returns: -// - true if the record is removed, false otherwise (i.e., the record does not exist). -func (r *rpcSentCache) remove(messageEntityID flow.Identifier) bool { - return r.c.Remove(messageEntityID) -} - // size returns the number of records in the cache. func (r *rpcSentCache) size() uint { return r.c.Size() diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index 3383382a488..fa746adbc8a 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -1,7 +1,6 @@ package internal import ( - "fmt" "sync" "testing" "time" @@ -104,111 +103,6 @@ func TestCache_ConcurrentSameRecordInit(t *testing.T) { require.True(t, cache.has(id)) } -// TestCache_Remove tests the remove method of the RecordCache. -// The test covers the following scenarios: -// 1. Initializing the cache with multiple records. -// 2. Removing a record and checking if it is removed correctly. -// 3. Ensuring the other records are still in the cache after removal. -// 4. Attempting to remove a non-existent ID. -func TestCache_Remove(t *testing.T) { - cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2pmsg.CtrlMsgIHave - // initialize spam records for a few ids - id1 := unittest.IdentifierFixture() - id2 := unittest.IdentifierFixture() - id3 := unittest.IdentifierFixture() - - require.True(t, cache.init(id1, controlMsgType)) - require.True(t, cache.init(id2, controlMsgType)) - require.True(t, cache.init(id3, controlMsgType)) - - numOfIds := uint(3) - require.Equal(t, numOfIds, cache.size(), fmt.Sprintf("expected size of the cache to be %d", numOfIds)) - // remove id1 and check if the record is removed - require.True(t, cache.remove(id1)) - require.NotContains(t, id1, cache.ids()) - - // check if the other ids are still in the cache - require.True(t, cache.has(id2)) - require.True(t, cache.has(id3)) - - // attempt to remove a non-existent ID - id4 := unittest.IdentifierFixture() - require.False(t, cache.remove(id4)) -} - -// TestCache_ConcurrentRemove tests the concurrent removal of records for different ids. -// The test covers the following scenarios: -// 1. Multiple goroutines removing records for different ids concurrently. -// 2. The records are correctly removed from the cache. -func TestCache_ConcurrentRemove(t *testing.T) { - cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2pmsg.CtrlMsgIHave - ids := unittest.IdentifierListFixture(10) - for _, id := range ids { - cache.init(id, controlMsgType) - } - - var wg sync.WaitGroup - wg.Add(len(ids)) - - for _, id := range ids { - go func(id flow.Identifier) { - defer wg.Done() - require.True(t, cache.remove(id)) - require.NotContains(t, id, cache.ids()) - }(id) - } - - unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") - - require.Equal(t, uint(0), cache.size()) -} - -// TestRecordCache_ConcurrentInitAndRemove tests the concurrent initialization and removal of records for different -// ids. The test covers the following scenarios: -// 1. Multiple goroutines initializing records for different ids concurrently. -// 2. Multiple goroutines removing records for different ids concurrently. -// 3. The initialized records are correctly added to the cache. -// 4. The removed records are correctly removed from the cache. -func TestRecordCache_ConcurrentInitAndRemove(t *testing.T) { - cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) - controlMsgType := p2pmsg.CtrlMsgIHave - ids := unittest.IdentifierListFixture(20) - idsToAdd := ids[:10] - idsToRemove := ids[10:] - - for _, id := range idsToRemove { - cache.init(id, controlMsgType) - } - - var wg sync.WaitGroup - wg.Add(len(ids)) - - // initialize spam records concurrently - for _, id := range idsToAdd { - go func(id flow.Identifier) { - defer wg.Done() - cache.init(id, controlMsgType) - }(id) - } - - // remove spam records concurrently - for _, id := range idsToRemove { - go func(id flow.Identifier) { - defer wg.Done() - require.True(t, cache.remove(id)) - require.NotContains(t, id, cache.ids()) - }(id) - } - - unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") - - // ensure that the initialized records are correctly added to the cache - // and removed records are correctly removed from the cache - require.ElementsMatch(t, idsToAdd, cache.ids()) -} - // cacheFixture returns a new *RecordCache. func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *rpcSentCache { config := &rpcCtrlMsgSentCacheConfig{ From 60b764dc9ce99628578fb3407689316c3e22c3f9 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 10:40:28 -0400 Subject: [PATCH 18/19] improve cache func signature cohesion --- network/p2p/tracer/internal/cache.go | 34 ++++++++++---- network/p2p/tracer/internal/cache_test.go | 46 ++++++++++--------- .../p2p/tracer/internal/rpc_sent_tracker.go | 16 +------ 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go index b5f0a635c47..b916133b270 100644 --- a/network/p2p/tracer/internal/cache.go +++ b/network/p2p/tracer/internal/cache.go @@ -1,6 +1,8 @@ package internal import ( + "fmt" + "github.com/rs/zerolog" "github.com/onflow/flow-go/model/flow" @@ -42,29 +44,43 @@ func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { } } -// init initializes the record cached for the given messageEntityID if it does not exist. +// add initializes the record cached for the given messageEntityID if it does not exist. // Returns true if the record is initialized, false otherwise (i.e.: the record already exists). // Args: -// - flow.Identifier: the messageEntityID to store the rpc control message. -// - p2p.ControlMessageType: the rpc control message type. +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. // Returns: // - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). -// Note that if init is called multiple times for the same messageEntityID, the record is initialized only once, and the +// Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the // subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). -func (r *rpcSentCache) init(messageEntityID flow.Identifier, controlMsgType p2pmsg.ControlMessageType) bool { - return r.c.Add(newRPCSentEntity(messageEntityID, controlMsgType)) +func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType)) } // has checks if the RPC message has been cached indicating it has been sent. // Args: -// - flow.Identifier: the messageEntityID to store the rpc control message. +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. // Returns: // - bool: true if the RPC has been cache indicating it was sent from the local node. -func (r *rpcSentCache) has(messageEntityID flow.Identifier) bool { - return r.c.Has(messageEntityID) +func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType)) } // size returns the number of records in the cache. func (r *rpcSentCache) size() uint { return r.c.Size() } + +// rpcSentEntityID creates an entity ID from the topic, messageID and control message type. +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - flow.Identifier: the entity ID. +func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType))) +} diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go index fa746adbc8a..c92b42b5e02 100644 --- a/network/p2p/tracer/internal/cache_test.go +++ b/network/p2p/tracer/internal/cache_test.go @@ -12,59 +12,62 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" p2pmsg "github.com/onflow/flow-go/network/p2p/message" "github.com/onflow/flow-go/utils/unittest" ) -// TestCache_Init tests the init method of the rpcSentCache. +// TestCache_Add tests the add method of the rpcSentCache. // It ensures that the method returns true when a new record is initialized // and false when an existing record is initialized. -func TestCache_Init(t *testing.T) { +func TestCache_Add(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - id1 := unittest.IdentifierFixture() - id2 := unittest.IdentifierFixture() + topic := channels.PushBlocks.String() + messageID1 := unittest.IdentifierFixture().String() + messageID2 := unittest.IdentifierFixture().String() // test initializing a record for an ID that doesn't exist in the cache - initialized := cache.init(id1, controlMsgType) + initialized := cache.add(topic, messageID1, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(id1), "expected record to exist") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") // test initializing a record for an ID that already exists in the cache - initialized = cache.init(id1, controlMsgType) + initialized = cache.add(topic, messageID1, controlMsgType) require.False(t, initialized, "expected record not to be initialized") - require.True(t, cache.has(id1), "expected record to exist") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") // test initializing a record for another ID - initialized = cache.init(id2, controlMsgType) + initialized = cache.add(topic, messageID2, controlMsgType) require.True(t, initialized, "expected record to be initialized") - require.True(t, cache.has(id2), "expected record to exist") + require.True(t, cache.has(topic, messageID2, controlMsgType), "expected record to exist") } // TestCache_ConcurrentInit tests the concurrent initialization of records. // The test covers the following scenarios: // 1. Multiple goroutines initializing records for different ids. // 2. Ensuring that all records are correctly initialized. -func TestCache_ConcurrentInit(t *testing.T) { +func TestCache_ConcurrentAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - ids := unittest.IdentifierListFixture(10) + topic := channels.PushBlocks.String() + messageIds := unittest.IdentifierListFixture(10) var wg sync.WaitGroup - wg.Add(len(ids)) + wg.Add(len(messageIds)) - for _, id := range ids { + for _, id := range messageIds { go func(id flow.Identifier) { defer wg.Done() - cache.init(id, controlMsgType) + cache.add(topic, id.String(), controlMsgType) }(id) } unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") // ensure that all records are correctly initialized - for _, id := range ids { - require.True(t, cache.has(id)) + for _, id := range messageIds { + require.True(t, cache.has(topic, id.String(), controlMsgType)) } } @@ -73,10 +76,11 @@ func TestCache_ConcurrentInit(t *testing.T) { // 1. Multiple goroutines attempting to initialize the same record concurrently. // 2. Only one goroutine successfully initializes the record, and others receive false on initialization. // 3. The record is correctly initialized in the cache and can be retrieved using the Get method. -func TestCache_ConcurrentSameRecordInit(t *testing.T) { +func TestCache_ConcurrentSameRecordAdd(t *testing.T) { cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) controlMsgType := p2pmsg.CtrlMsgIHave - id := unittest.IdentifierFixture() + topic := channels.PushBlocks.String() + messageID := unittest.IdentifierFixture().String() const concurrentAttempts = 10 var wg sync.WaitGroup @@ -87,7 +91,7 @@ func TestCache_ConcurrentSameRecordInit(t *testing.T) { for i := 0; i < concurrentAttempts; i++ { go func() { defer wg.Done() - initSuccess := cache.init(id, controlMsgType) + initSuccess := cache.add(topic, messageID, controlMsgType) if initSuccess { successGauge.Inc() } @@ -100,7 +104,7 @@ func TestCache_ConcurrentSameRecordInit(t *testing.T) { require.Equal(t, int32(1), successGauge.Load()) // ensure that the record is correctly initialized in the cache - require.True(t, cache.has(id)) + require.True(t, cache.has(topic, messageID, controlMsgType)) } // cacheFixture returns a new *RecordCache. diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 4ab68cb6e19..6d44ac984a3 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -1,12 +1,9 @@ package internal import ( - "fmt" - pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/rs/zerolog" - "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -34,8 +31,7 @@ func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { for _, iHave := range iHaves { topicID := iHave.GetTopicID() for _, messageID := range iHave.GetMessageIDs() { - entityMsgID := iHaveRPCSentEntityID(topicID, messageID) - t.cache.init(entityMsgID, controlMsgType) + t.cache.add(topicID, messageID, controlMsgType) } } } @@ -47,13 +43,5 @@ func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { // Returns: // - bool: true if the iHave rpc with the provided message ID was sent. func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { - entityMsgID := iHaveRPCSentEntityID(topicID, messageID) - return t.cache.has(entityMsgID) -} - -// iHaveRPCSentEntityID appends the topicId and messageId and returns the flow.Identifier hash. -// Each iHave RPC control message contains a single topicId and multiple messageIds, to ensure we -// produce a unique id for each message we append the messageId to the topicId. -func iHaveRPCSentEntityID(topicId, messageId string) flow.Identifier { - return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s", topicId, messageId))) + return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) } From 64f629d6d65a2db87b2c1e1a3ce83f3647ce7805 Mon Sep 17 00:00:00 2001 From: Khalil Claybon Date: Tue, 11 Jul 2023 12:18:04 -0400 Subject: [PATCH 19/19] update WasIHaveRPCSent to check multiple topic IDs with multiple message ID's --- .../tracer/internal/rpc_sent_tracker_test.go | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index f113f862cbf..7b9c4ec9acb 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -8,6 +8,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/utils/unittest" @@ -15,13 +16,13 @@ import ( // TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. func TestNewRPCSentTracker(t *testing.T) { - tracker := mockTracker() + tracker := mockTracker(t) require.NotNil(t, tracker) } // TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. func TestRPCSentTracker_IHave(t *testing.T) { - tracker := mockTracker() + tracker := mockTracker(t) require.NotNil(t, tracker) t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { @@ -29,28 +30,41 @@ func TestRPCSentTracker_IHave(t *testing.T) { }) t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { - topicID := channels.PushBlocks.String() - messageID1 := unittest.IdentifierFixture().String() - iHaves := []*pb.ControlIHave{{ - TopicID: &topicID, - MessageIDs: []string{messageID1}, - }} + numOfMsgIds := 100 + testCases := []struct { + topic string + messageIDS []string + }{ + {channels.PushBlocks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.ReceiveApprovals.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.SyncCommittee.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.RequestChunks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + } + iHaves := make([]*pb.ControlIHave, len(testCases)) + for i, testCase := range testCases { + testCase := testCase + iHaves[i] = &pb.ControlIHave{ + TopicID: &testCase.topic, + MessageIDs: testCase.messageIDS, + } + } rpc := rpcFixture(withIhaves(iHaves)) tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) - require.True(t, tracker.WasIHaveRPCSent(topicID, messageID1)) - // manipulate last byte of message ID ensure false positive not returned - messageID2 := []byte(messageID1) - messageID2[len(messageID2)-1] = 'X' - require.False(t, tracker.WasIHaveRPCSent(topicID, string(messageID2))) + for _, testCase := range testCases { + for _, messageID := range testCase.messageIDS { + require.True(t, tracker.WasIHaveRPCSent(testCase.topic, messageID)) + } + } }) } -func mockTracker() *RPCSentTracker { +func mockTracker(t *testing.T) *RPCSentTracker { logger := zerolog.Nop() - sizeLimit := uint32(100) + cfg, err := config.DefaultConfig() + require.NoError(t, err) collector := metrics.NewNoopCollector() - tracker := NewRPCSentTracker(logger, sizeLimit, collector) + tracker := NewRPCSentTracker(logger, cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, collector) return tracker }