Skip to content

Proactively detect and report duplicate gossiped/malformed messages #3839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1198773
update
tsachiherman Mar 25, 2025
637cc11
update
tsachiherman Mar 25, 2025
3d3ef61
update
tsachiherman Mar 26, 2025
dd077a4
update
tsachiherman Mar 26, 2025
0383040
update
tsachiherman Mar 26, 2025
ba8eff7
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Mar 26, 2025
6db3b0a
update
tsachiherman Apr 14, 2025
6a2bf8e
update
tsachiherman Apr 14, 2025
4fefa30
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 14, 2025
abb5585
update
tsachiherman Apr 15, 2025
414cbe1
lint
tsachiherman Apr 15, 2025
6fb19e9
remove unwanted changes.
tsachiherman Apr 15, 2025
8b2e2e7
restore original test files.
tsachiherman Apr 15, 2025
17dd1bc
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 15, 2025
96b23c1
update unit tests
tsachiherman Apr 15, 2025
9b30577
update per CR
tsachiherman Apr 15, 2025
8cb17ad
update per CR
tsachiherman Apr 15, 2025
026bcf8
update
tsachiherman Apr 15, 2025
73adf37
update
tsachiherman Apr 15, 2025
8afbc19
update
tsachiherman Apr 21, 2025
b04474c
update
tsachiherman Apr 21, 2025
eca575b
lint
tsachiherman Apr 21, 2025
aa9163e
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 21, 2025
b17a494
update
tsachiherman Apr 21, 2025
c021096
rollback
tsachiherman Apr 21, 2025
66ac11f
update
tsachiherman Apr 21, 2025
1a4a092
update
tsachiherman Apr 21, 2025
e2c0953
cleanup
tsachiherman Apr 21, 2025
a3ea3da
update
tsachiherman Apr 21, 2025
08c8466
update
tsachiherman Apr 21, 2025
b25971c
update
tsachiherman Apr 22, 2025
8c57c5b
update
tsachiherman Apr 22, 2025
a7272ea
simplify
tsachiherman Apr 22, 2025
9d02b55
spacing
tsachiherman Apr 22, 2025
2654c5f
update method names
tsachiherman Apr 22, 2025
cb86f14
remove unused log
tsachiherman Apr 22, 2025
392d581
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 22, 2025
148531d
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 24, 2025
d0a8fe9
Update network/p2p/gossip/mempool.go
tsachiherman Apr 25, 2025
45332dc
Update vms/platformvm/network/gossip.go
tsachiherman Apr 25, 2025
1be67ee
update
tsachiherman Apr 25, 2025
b006d96
update
tsachiherman Apr 25, 2025
e07878f
update
tsachiherman Apr 25, 2025
e031561
update dependency
tsachiherman Apr 25, 2025
967bfd1
remove other
tsachiherman Apr 25, 2025
03f1a3c
Merge branch 'master' into tsachi/add-duplicate-gossip-metrics
tsachiherman Apr 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/NYTimes/gziphandler v1.1.1
github.com/StephenButtolph/canoto v0.15.0
github.com/antithesishq/antithesis-sdk-go v0.3.8
github.com/ava-labs/coreth v0.15.1-rc.0
github.com/ava-labs/coreth v0.15.1-rc.0.0.20250425192338-cfbc3b41caa3
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60
github.com/ava-labs/libevm v1.13.14-0.2.0.release
github.com/btcsuite/btcd/btcutil v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/antithesishq/antithesis-sdk-go v0.3.8/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/ava-labs/coreth v0.15.1-rc.0 h1:ahuIyp1hb4xiZ3DTA9uKIpVBMpMyXO3VHvwcxQUboSo=
github.com/ava-labs/coreth v0.15.1-rc.0/go.mod h1:BDKMO8sL2wuvWbtMRzJAsJrleQULHA+CHRbkh5hRG5o=
github.com/ava-labs/coreth v0.15.1-rc.0.0.20250425192338-cfbc3b41caa3 h1:Sd8ilJTcX8fFln4MvOFZ7BB2dZX0GmG777U/ga8u0JA=
github.com/ava-labs/coreth v0.15.1-rc.0.0.20250425192338-cfbc3b41caa3/go.mod h1:whFaTNemeCJ5khmbeZ9Wg7vpu9qRDbJQkgHwAsQmBng=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60 h1:EL66gtXOAwR/4KYBjOV03LTWgkEXvLePribLlJNu4g0=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20241009183145-e6f90a8a1a60/go.mod h1:/7qKobTfbzBu7eSTVaXMTr56yTYk4j2Px6/8G+idxHo=
github.com/ava-labs/libevm v1.13.14-0.2.0.release h1:uKGCc5/ceeBbfAPRVtBUxbQt50WzB2pEDb8Uy93ePgQ=
Expand Down
174 changes: 40 additions & 134 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
)

const (
ioLabel = "io"
sentIO = "sent"
receivedIO = "received"

typeLabel = "type"
pushType = "push"
pullType = "pull"
unsentType = "unsent"
sentType = "sent"

defaultGossipableCount = 64
)
const defaultGossipableCount = 64

var (
_ Gossiper = (*ValidatorGossiper)(nil)
Expand All @@ -44,31 +32,6 @@ var (

_ Set[*testTx] = (*FullSet[*testTx])(nil)

ioTypeLabels = []string{ioLabel, typeLabel}
sentPushLabels = prometheus.Labels{
ioLabel: sentIO,
typeLabel: pushType,
}
receivedPushLabels = prometheus.Labels{
ioLabel: receivedIO,
typeLabel: pushType,
}
sentPullLabels = prometheus.Labels{
ioLabel: sentIO,
typeLabel: pullType,
}
receivedPullLabels = prometheus.Labels{
ioLabel: receivedIO,
typeLabel: pullType,
}
typeLabels = []string{typeLabel}
unsentLabels = prometheus.Labels{
typeLabel: unsentType,
}
sentLabels = prometheus.Labels{
typeLabel: sentType,
}

ErrInvalidNumValidators = errors.New("num validators cannot be negative")
ErrInvalidNumNonValidators = errors.New("num non-validators cannot be negative")
ErrInvalidNumPeers = errors.New("num peers cannot be negative")
Expand All @@ -92,86 +55,6 @@ type ValidatorGossiper struct {
Validators p2p.ValidatorSet
}

// Metrics that are tracked across a gossip protocol. A given protocol should
// only use a single instance of Metrics.
type Metrics struct {
count *prometheus.CounterVec
bytes *prometheus.CounterVec
tracking *prometheus.GaugeVec
trackingLifetimeAverage prometheus.Gauge
topValidators *prometheus.GaugeVec
}

// NewMetrics returns a common set of metrics
func NewMetrics(
metrics prometheus.Registerer,
namespace string,
) (Metrics, error) {
m := Metrics{
count: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_count",
Help: "amount of gossip (n)",
},
ioTypeLabels,
),
bytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_bytes",
Help: "amount of gossip (bytes)",
},
ioTypeLabels,
),
tracking: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking",
Help: "number of gossipables being tracked",
},
typeLabels,
),
trackingLifetimeAverage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking_lifetime_average",
Help: "average duration a gossipable has been tracked (ns)",
}),
topValidators: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "top_validators",
Help: "number of validators gossipables are sent to due to stake",
},
typeLabels,
),
}
err := errors.Join(
metrics.Register(m.count),
metrics.Register(m.bytes),
metrics.Register(m.tracking),
metrics.Register(m.trackingLifetimeAverage),
metrics.Register(m.topValidators),
)
return m, err
}

func (m *Metrics) observeMessage(labels prometheus.Labels, count int, bytes int) error {
countMetric, err := m.count.GetMetricWith(labels)
if err != nil {
return fmt.Errorf("failed to get count metric: %w", err)
}

bytesMetric, err := m.bytes.GetMetricWith(labels)
if err != nil {
return fmt.Errorf("failed to get bytes metric: %w", err)
}

countMetric.Add(float64(count))
bytesMetric.Add(float64(bytes))
return nil
}

func (v ValidatorGossiper) Gossip(ctx context.Context) error {
if !v.Validators.Has(ctx, v.NodeID) {
return nil
Expand All @@ -185,7 +68,7 @@ func NewPullGossiper[T Gossipable](
marshaller Marshaller[T],
set Set[T],
client *p2p.Client,
metrics Metrics,
metrics *Metrics,
pollSize int,
) *PullGossiper[T] {
return &PullGossiper[T]{
Expand All @@ -203,7 +86,7 @@ type PullGossiper[T Gossipable] struct {
marshaller Marshaller[T]
set Set[T]
client *p2p.Client
metrics Metrics
metrics *Metrics
pollSize int
}

Expand Down Expand Up @@ -244,13 +127,34 @@ func (p *PullGossiper[_]) handleResponse(
return
}

receivedBytes := 0
for _, bytes := range gossip {
receivedBytes += len(bytes)
handleIncomingGossipables(p.log,
p.marshaller,
p.set,
p.metrics,
gossip,
nodeID,
pullType,
)
}

gossipable, err := p.marshaller.UnmarshalGossip(bytes)
func handleIncomingGossipables[T Gossipable](
log logging.Logger,
marshaller Marshaller[T],
set Set[T],
metrics *Metrics,
gossip [][]byte,
nodeID ids.NodeID,
typeLabel string,
) {
malformedBytes := 0
malformedCount := 0
gossipableSizes := make(map[ids.ID]int, len(gossip))
for _, bytes := range gossip {
gossipable, err := marshaller.UnmarshalGossip(bytes)
if err != nil {
p.log.Debug(
malformedBytes += len(bytes)
malformedCount++
log.Debug(
"failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
Expand All @@ -259,24 +163,26 @@ func (p *PullGossiper[_]) handleResponse(
}

gossipID := gossipable.GossipID()
p.log.Debug(
log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
)
if err := p.set.Add(gossipable); err != nil {
p.log.Debug(
gossipableSizes[gossipID] = len(bytes)
if err := set.Add(gossipable); err != nil {
log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Error(err),
)
continue
}
metrics.ObserveIncomingGossipable(gossipID, droppedNot)
}

if err := p.metrics.observeMessage(receivedPullLabels, len(gossip), receivedBytes); err != nil {
p.log.Error("failed to update metrics",
if err := metrics.observeReceivedMessage(typeLabel, gossipableSizes, malformedBytes, malformedCount); err != nil {
log.Error("failed to update metrics",
zap.Error(err),
)
}
Expand All @@ -285,10 +191,10 @@ func (p *PullGossiper[_]) handleResponse(
// NewPushGossiper returns an instance of PushGossiper
func NewPushGossiper[T Gossipable](
marshaller Marshaller[T],
mempool Set[T],
set Set[T],
validators p2p.ValidatorSubset,
client *p2p.Client,
metrics Metrics,
metrics *Metrics,
gossipParams BranchingFactor,
regossipParams BranchingFactor,
discardedSize int,
Expand All @@ -312,7 +218,7 @@ func NewPushGossiper[T Gossipable](

return &PushGossiper[T]{
marshaller: marshaller,
set: mempool,
set: set,
validators: validators,
client: client,
metrics: metrics,
Expand All @@ -334,7 +240,7 @@ type PushGossiper[T Gossipable] struct {
set Set[T]
validators p2p.ValidatorSubset
client *p2p.Client
metrics Metrics
metrics *Metrics

gossipParams BranchingFactor
regossipParams BranchingFactor
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestGossiperShutdown(*testing.T) {
nil,
nil,
nil,
Metrics{},
&Metrics{},
0,
)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestPushGossiperNew(t *testing.T) {
nil,
nil,
nil,
Metrics{},
&Metrics{},
tt.gossipParams,
tt.regossipParams,
tt.discardedSize,
Expand Down
39 changes: 10 additions & 29 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewHandler[T Gossipable](
log logging.Logger,
marshaller Marshaller[T],
set Set[T],
metrics Metrics,
metrics *Metrics,
targetResponseSize int,
) *Handler[T] {
return &Handler[T]{
Expand All @@ -40,7 +40,7 @@ type Handler[T Gossipable] struct {
marshaller Marshaller[T]
log logging.Logger
set Set[T]
metrics Metrics
metrics *Metrics
targetResponseSize int
}

Expand Down Expand Up @@ -96,31 +96,12 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes
return
}

receivedBytes := 0
for _, bytes := range gossip {
receivedBytes += len(bytes)
gossipable, err := h.marshaller.UnmarshalGossip(bytes)
if err != nil {
h.log.Debug("failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
continue
}

if err := h.set.Add(gossipable); err != nil {
h.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipable.GossipID()),
zap.Error(err),
)
}
}

if err := h.metrics.observeMessage(receivedPushLabels, len(gossip), receivedBytes); err != nil {
h.log.Error("failed to update metrics",
zap.Error(err),
)
}
handleIncomingGossipables(h.log,
h.marshaller,
h.set,
h.metrics,
gossip,
nodeID,
pushType,
)
}
Loading