From 3eb13b099ba6eeea893bf00aa373778173107b64 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 16 Aug 2024 17:11:19 -0400 Subject: [PATCH 01/16] p2p: Telemetry and metrics improvements --- cmd/algod/main.go | 2 ++ network/p2p/pubsubTracer.go | 13 ++++++++++++- test/heapwatch/requirements.txt | 4 ++-- util/metrics/metrics.go | 5 +++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/cmd/algod/main.go b/cmd/algod/main.go index 311b1507e6..47008256ac 100644 --- a/cmd/algod/main.go +++ b/cmd/algod/main.go @@ -445,6 +445,8 @@ var startupConfigCheckFields = []string{ "TxPoolExponentialIncreaseFactor", "TxPoolSize", "VerifiedTranscationsCacheSize", + "EnableP2P", + "EnableP2PHybridMode", } func resolveDataDir() string { diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index ca57bc69ce..919e50acca 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -31,6 +31,8 @@ var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.Transac var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage) +var transactionMessagesP2PSentMessages = metrics.MakeCounter(metrics.TransactionMessagesP2PSentMessage) +var transactionMessagesP2PSentBytes = metrics.MakeCounter(metrics.TransactionMessagesP2PSentBytes) // pubsubTracer is a tracer for pubsub events used to track metrics. type pubsubTracer struct{} @@ -86,7 +88,16 @@ func (t pubsubTracer) ThrottlePeer(p peer.ID) {} func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {} // SendRPC is invoked when a RPC is sent. -func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {} +func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + if rpc != nil && len(rpc.Publish) > 0 { + for i := range rpc.Publish { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil && *rpc.Publish[i].Topic == TXTopicName { + transactionMessagesP2PSentMessages.Inc(nil) + transactionMessagesP2PSentBytes.AddUint64(uint64(len(rpc.Publish[0].Data)), nil) + } + } + } +} // DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} diff --git a/test/heapwatch/requirements.txt b/test/heapwatch/requirements.txt index db92372c6d..cf443a24e4 100644 --- a/test/heapwatch/requirements.txt +++ b/test/heapwatch/requirements.txt @@ -6,5 +6,5 @@ plotly==5.16.0 py-algorand-sdk==2.3.0 kaleido==0.2.1 networkx==3.3 -gravis=0.1.0 -termcolor=2.4.0 +gravis==0.1.0 +termcolor==2.4.0 diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index fcc566312f..416d6a9dfb 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -139,6 +139,11 @@ var ( // TransactionMessagesP2PValidateMessage "Number of p2p pubsub transaction messages received for validation" TransactionMessagesP2PValidateMessage = MetricName{Name: "algod_transaction_messages_p2p_validate", Description: "Number of p2p pubsub transaction messages received for validation"} + // TransactionMessagesP2PSentMessage "Number of p2p pubsub transaction messages received for validation" + TransactionMessagesP2PSentMessage = MetricName{Name: "algod_transaction_messages_p2p_sent", Description: "Number of p2p pubsub transaction messages sent"} + // TransactionMessagesP2PSentBytes "Number p2p pubsub transaction bytes sent" + TransactionMessagesP2PSentBytes = MetricName{Name: "algod_transaction_messages_p2p_bytes", Description: "Number p2p pubsub transaction bytes sent"} + // TransactionGroupTxSyncHandled "Number of transaction groups handled via txsync" TransactionGroupTxSyncHandled = MetricName{Name: "algod_transaction_group_txsync_handled", Description: "Number of transaction groups handled via txsync"} // TransactionGroupTxSyncRemember "Number of transaction groups remembered via txsync" From 0fe5308c3d905f920f05690988fd1bcbf2e71d4a Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 19 Aug 2024 17:25:31 -0400 Subject: [PATCH 02/16] metrics: add algod_network_p2p_sent_bytes_{TAG} --- network/metrics.go | 109 ++++++++++++++++++++++++++++++++++++ network/p2p/pubsubTracer.go | 22 ++++++-- network/p2pNetwork.go | 8 ++- network/wsNetwork.go | 30 ---------- network/wsPeer.go | 84 +++++++++++++-------------- util/metrics/metrics.go | 8 +++ 6 files changed, 178 insertions(+), 83 deletions(-) create mode 100644 network/metrics.go diff --git a/network/metrics.go b/network/metrics.go new file mode 100644 index 0000000000..e64cac51fe --- /dev/null +++ b/network/metrics.go @@ -0,0 +1,109 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/util/metrics" +) + +func init() { + // all tags are tracked by ws net + tagStringListWs := make([]string, len(protocol.TagList)) + for i, t := range protocol.TagList { + tagStringListWs[i] = string(t) + } + networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListWs, "UNK") + networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringListWs, "UNK") + networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringListWs, "UNK") + networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringListWs, "UNK") + networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringListWs, "UNK") + networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringListWs, "UNK") + + // all but gossipSub tags are tracked by p2p net + // the remaining tags are tracked by gossipSub tracer p2p sub-package + tagStringListP2P := make([]string, len(protocol.TagList)) + for i, t := range protocol.TagList { + if _, ok := gossipSubTags[t]; !ok { + tagStringListWs[i] = string(t) + } + } + networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListP2P, "UNK") + networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringListP2P, "UNK") + networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringListP2P, "UNK") + networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringListP2P, "UNK") +} + +var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) +var networkP2PSentBytesTotal = metrics.MakeCounter(metrics.NetworkP2PSentBytesTotal) +var networkSentBytesByTag *metrics.TagCounter +var networkP2PSentBytesByTag *metrics.TagCounter +var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal) +var networkP2PReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkP2PReceivedBytesTotal) +var networkReceivedBytesByTag *metrics.TagCounter +var networkP2PReceivedBytesByTag *metrics.TagCounter + +var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal) +var networkP2PMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkP2PMessageReceivedTotal) +var networkMessageReceivedByTag *metrics.TagCounter +var networkP2PMessageReceivedByTag *metrics.TagCounter +var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal) +var networkP2PMessageSentTotal = metrics.MakeCounter(metrics.NetworkP2PMessageSentTotal) +var networkMessageSentByTag *metrics.TagCounter +var networkP2PMessageSentByTag *metrics.TagCounter + +var networkHandleMicrosByTag *metrics.TagCounter +var networkHandleCountByTag *metrics.TagCounter + +var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) +var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) + +var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) +var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) +var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal) +var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal) +var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal) +var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal) + +var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections) +var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections) + +var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"}) +var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) + +var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) +var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) +var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) +var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) +var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) +var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) + +var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) +var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) +var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) + +var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) +var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) + +var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) +var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) +var medianPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_median_ping_seconds", Description: "Network round trip time to median peer in seconds."}) +var maxPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_max_ping_seconds", Description: "Network round trip time to slowest peer in seconds."}) + +var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."}) +var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) +var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index 919e50acca..f451ed8be6 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + ap "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/metrics" ) @@ -30,9 +31,15 @@ var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.Transact var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) -var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage) -var transactionMessagesP2PSentMessages = metrics.MakeCounter(metrics.TransactionMessagesP2PSentMessage) -var transactionMessagesP2PSentBytes = metrics.MakeCounter(metrics.TransactionMessagesP2PSentBytes) + +// This list must be exclusive of the gossipSubTags list in ../metrics.go +// TODO: as adding more messages into gossipSub need to maintain a mapping of topic to tag. +// There is a benefic of using const string in a comparison `*rpc.Publish[i].Topic == TXTopicName` below since it most likely optimized to a single instruction. +var tagTXList = []string{string(ap.Transaction)} +var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagTXList, "") +var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagTXList, "") +var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagTXList, "") +var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagTXList, "") // pubsubTracer is a tracer for pubsub events used to track metrics. type pubsubTracer struct{} @@ -57,7 +64,10 @@ func (t pubsubTracer) Prune(p peer.ID, topic string) {} // ValidateMessage is invoked when a message first enters the validation pipeline. func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) { - transactionMessagesP2PValidateMessage.Inc(nil) + if msg != nil && msg.Topic != nil && *msg.Topic == TXTopicName { + networkP2PReceivedBytesByTag.Add(string(ap.Transaction), uint64(len(msg.Data))) + networkP2PMessageReceivedByTag.Add(string(ap.Transaction), 1) + } } // DeliverMessage is invoked when a message is delivered @@ -92,8 +102,8 @@ func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { if rpc != nil && len(rpc.Publish) > 0 { for i := range rpc.Publish { if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil && *rpc.Publish[i].Topic == TXTopicName { - transactionMessagesP2PSentMessages.Inc(nil) - transactionMessagesP2PSentBytes.AddUint64(uint64(len(rpc.Publish[0].Data)), nil) + networkP2PSentBytesByTag.Add(string(ap.Transaction), uint64(len(rpc.Publish[0].Data))) + networkP2PMessageSentByTag.Add(string(ap.Transaction), 1) } } } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f9dc04b785..2b2ac861fb 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -194,6 +194,11 @@ type p2pPeerStats struct { txReceived atomic.Uint64 } +// gossipSubTags defines protocol messages that are relayed using GossipSub +var gossipSubTags = map[protocol.Tag]string{ + protocol.TxnTag: p2p.TXTopicName, +} + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -214,7 +219,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo config: cfg, genesisID: genesisID, networkID: networkID, - topicTags: map[protocol.Tag]string{protocol.TxnTag: p2p.TXTopicName}, + topicTags: gossipSubTags, wsPeers: make(map[peer.ID]*wsPeer), wsPeersToIDs: make(map[*wsPeer]peer.ID), peerStats: make(map[peer.ID]*p2pPeerStats), @@ -791,6 +796,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea conn: &wsPeerConnP2P{stream: stream}, outgoing: !incoming, identity: netIdentPeerID, + peerType: peerTypeP2P, } protos, err := n.pstore.GetProtocols(p2pPeer) if err != nil { diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 2af3a9b6bf..ecb636c8e2 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -53,7 +53,6 @@ import ( tools_network "github.com/algorand/go-algorand/tools/network" "github.com/algorand/go-algorand/tools/network/dnssec" "github.com/algorand/go-algorand/util" - "github.com/algorand/go-algorand/util/metrics" ) const incomingThreads = 20 @@ -117,35 +116,6 @@ const wsMaxHeaderBytes = 4096 // used from the ReservedFDs pool, as this pool is meant for short-lived usage (dns queries, disk i/o, etc.) const ReservedHealthServiceConnections = 10 -var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections) -var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections) - -var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"}) -var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) - -var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) -var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) -var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) -var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) -var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) -var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) - -var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) -var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) -var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) - -var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) -var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) - -var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) -var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) -var medianPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_median_ping_seconds", Description: "Network round trip time to median peer in seconds."}) -var maxPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_max_ping_seconds", Description: "Network round trip time to slowest peer in seconds."}) - -var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."}) -var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) -var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) - // peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete. const peerDisconnectionAckDuration = 5 * time.Second diff --git a/network/wsPeer.go b/network/wsPeer.go index 88a0c615f9..e8785c65fb 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -39,7 +39,6 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util" - "github.com/algorand/go-algorand/util/metrics" ) // MaxMessageLength is the maximum length of a message that can be sent or received, exported to be used in the node.TestMaxSizesCorrect test @@ -52,20 +51,7 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha // buffer and starve messages from other peers. const msgsInReadBufferPerPeer = 10 -var tagStringList []string - func init() { - tagStringList = make([]string, len(protocol.TagList)) - for i, t := range protocol.TagList { - tagStringList[i] = string(t) - } - networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") - networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") - networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") - networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") - networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK") - networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK") - matched := false for _, version := range SupportedProtocolVersions { if version == versionPeerFeatures { @@ -83,29 +69,6 @@ func init() { } } -var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) -var networkSentBytesByTag *metrics.TagCounter -var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal) -var networkReceivedBytesByTag *metrics.TagCounter - -var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal) -var networkMessageReceivedByTag *metrics.TagCounter -var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal) -var networkMessageSentByTag *metrics.TagCounter - -var networkHandleMicrosByTag *metrics.TagCounter -var networkHandleCountByTag *metrics.TagCounter - -var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) -var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) - -var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) -var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) -var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal) -var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal) -var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal) -var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal) - // defaultSendMessageTags is the default list of messages which a peer would // allow to be sent without receiving any explicit request. var defaultSendMessageTags = map[protocol.Tag]bool{ @@ -204,6 +167,16 @@ type sendMessages struct { onRelease func() } +//msgp:ignore peerType +type peerType int + +const ( + // peerTypeWs is a peer that is connected over a websocket connection + peerTypeWs peerType = iota + // peerTypeP2P is a peer that is connected over an P2P connection + peerTypeP2P +) + type wsPeer struct { // lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer. // "successful communication" above refers to either reading from or writing to a connection without receiving any @@ -318,6 +291,10 @@ type wsPeer struct { // closers is a slice of functions to run when the peer is closed closers []func() + + // peerType defines the peer's underlying connection type + // used for separate p2p vs ws metrics + peerType peerType } // HTTPPeer is what the opaque Peer might be. @@ -639,10 +616,17 @@ func (wp *wsPeer) readLoop() { } msg.Net = wp.net wp.lastPacketTime.Store(msg.Received) - networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) - networkMessageReceivedTotal.AddUint64(1, nil) - networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) - networkMessageReceivedByTag.Add(string(tag[:]), 1) + if wp.peerType == peerTypeWs { + networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) + networkMessageReceivedTotal.AddUint64(1, nil) + networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) + networkMessageReceivedByTag.Add(string(tag[:]), 1) + } else { + networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) + networkP2PMessageReceivedTotal.AddUint64(1, nil) + networkP2PReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) + networkP2PMessageReceivedByTag.Add(string(tag[:]), 1) + } msg.Sender = wp // for outgoing connections, we want to notify the connection monitor that we've received @@ -863,11 +847,19 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { return disconnectWriteError } wp.lastPacketTime.Store(time.Now().UnixNano()) - networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) - networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) - networkMessageSentTotal.AddUint64(1, nil) - networkMessageSentByTag.Add(string(tag), 1) - networkMessageQueueMicrosTotal.AddUint64(uint64(time.Now().Sub(msg.peerEnqueued).Nanoseconds()/1000), nil) + if wp.peerType == peerTypeWs { + networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) + networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) + networkMessageSentTotal.AddUint64(1, nil) + networkMessageSentByTag.Add(string(tag), 1) + } else { + networkP2PSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) + networkP2PSentBytesByTag.Add(string(tag), uint64(len(msg.data))) + networkP2PMessageSentTotal.AddUint64(1, nil) + networkP2PMessageSentByTag.Add(string(tag), 1) + + } + networkMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) return disconnectReasonNone } diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 416d6a9dfb..a1ebb75b8f 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -39,6 +39,14 @@ var ( NetworkMessageReceivedTotal = MetricName{Name: "algod_network_message_received_total", Description: "Total number of complete messages that were received from the network"} // NetworkMessageSentTotal Total number of complete messages that were sent to the network NetworkMessageSentTotal = MetricName{Name: "algod_network_message_sent_total", Description: "Total number of complete messages that were sent to the network"} + // NetworkP2PSentBytesTotal Total number of bytes that were sent over the p2p network + NetworkP2PSentBytesTotal = MetricName{Name: "algod_network_p2p_sent_bytes_total", Description: "Total number of bytes that were sent over the p2p network"} + // NetworkP2PReceivedBytesTotal Total number of bytes that were received from the p2p network + NetworkP2PReceivedBytesTotal = MetricName{Name: "algod_network_p2p_received_bytes_total", Description: "Total number of bytes that were received from the p2p network"} + // NetworkP2PMessageReceivedTotal Total number of complete messages that were received from the p2p network + NetworkP2PMessageReceivedTotal = MetricName{Name: "algod_network_p2p_message_received_total", Description: "Total number of complete messages that were received from the p2p network"} + // NetworkP2PMessageSentTotal Total number of complete messages that were sent to the p2p network + NetworkP2PMessageSentTotal = MetricName{Name: "algod_network_p2p_message_sent_total", Description: "Total number of complete messages that were sent to the p2p network"} // NetworkConnectionsDroppedTotal Total number of connections that were dropped before a message NetworkConnectionsDroppedTotal = MetricName{Name: "algod_network_connections_dropped_total", Description: "Total number of connections that were dropped before a message"} // NetworkSentDecompressedBytesTotal Total number of bytes that were sent over the network prior of being compressed From 61a491686f3e46c17d8983bdce05340d29cf271e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 12:50:54 -0400 Subject: [PATCH 03/16] make topic name comparison faster --- network/p2p/pubsub.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 657baecdde..b12ea0edf5 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -51,7 +51,9 @@ const ( ) // TXTopicName defines a pubsub topic for TX messages -const TXTopicName = "/algo/tx/0.1.0" +// There is a micro optimization for const string comparison: +// 8 bytes const string require a single x86-64 CMPQ instruction +const TXTopicName = "algo-0.1" const incomingThreads = 20 // matches to number wsNetwork workers From df5914c0f87b47ac4be3e44418f3e519d19495a9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 13:13:28 -0400 Subject: [PATCH 04/16] support p2p traffic metrics in heapwatch scripts --- test/heapwatch/metrics_delta.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/test/heapwatch/metrics_delta.py b/test/heapwatch/metrics_delta.py index 2d64ee097a..f6e99023d9 100644 --- a/test/heapwatch/metrics_delta.py +++ b/test/heapwatch/metrics_delta.py @@ -486,17 +486,27 @@ def __init__(self): self.deltas = [] self.txBpsList = [] self.rxBpsList = [] + self.txP2PBpsList = [] + self.rxP2PBpsList = [] self.tpsList = [] self.txBSum = 0 self.rxBSum = 0 + self.txP2PBSum = 0 + self.rxP2PBSum = 0 self.txnSum = 0 self.secondsSum = 0 # algod_network_received_bytes_* self.rxPLists = {} self.rxPSums = {} + # algod_network_p2p_received_bytes_* + self.rxP2PPLists = {} + self.rxP2PPSums = {} # algod_network_sent_bytes_* self.txPLists = {} self.txPSums = {} + # algod_network_p2p_sent_bytes_* + self.txP2PPLists = {} + self.txP2PPSums = {} self.times = [] # algod_tx_pool_count self.txPool = [] @@ -533,7 +543,7 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): reportpath = args.report[:-4] + nick + '.csv' reportf = open(reportpath, 'wt') writer = csv.writer(reportf) - writer.writerow(('when', 'tx bytes/s', 'rx bytes/s','TPS', 's/block')) + writer.writerow(('when', 'tx bytes/s', 'rx bytes/s', 'tx p2p bytes/s', 'rx p2p bytes/s', 'TPS', 's/block')) prev = None prevtime = None prevPath = None @@ -587,6 +597,11 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): rxBytes = d.get('algod_network_received_bytes_total',0) txBytesPerSec = txBytes / dt rxBytesPerSec = rxBytes / dt + txP2PBytes = d.get('algod_network_p2p_sent_bytes_total',0) + rxP2PBytes = d.get('algod_network_p2p_received_bytes_total',0) + txP2PBytesPerSec = txBytes / dt + rxP2PBytesPerSec = rxBytes / dt + # TODO: gather algod_network_sent_bytes_* and algod_network_received_bytes_* if (tps is None) or ((args.mintps is not None) and (tps < args.mintps)): # do not sum up this row @@ -594,18 +609,26 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): else: self.txBpsList.append(txBytesPerSec) self.rxBpsList.append(rxBytesPerSec) + self.txP2PBpsList.append(txP2PBytesPerSec) + self.rxP2PBpsList.append(rxP2PBytesPerSec) self.tpsList.append(tps) self.txBSum += txBytes self.rxBSum += rxBytes + self.txP2PBSum += txP2PBytes + self.rxP2PBSum += rxP2PBytes self.txnSum += txnCount self.secondsSum += dt perProtocol('algod_network_sent_bytes_', self.txPLists, self.txPSums, d, dt) perProtocol('algod_network_received_bytes_', self.rxPLists, self.rxPSums, d, dt) + perProtocol('algod_network_p2p_sent_bytes_', self.txP2PPLists, self.txP2PPSums, d, dt) + perProtocol('algod_network_p2p_received_bytes_', self.rxP2PPLists, self.rxP2PPSums, d, dt) if writer: writer.writerow(( time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(curtime)), txBytesPerSec, rxBytesPerSec, + txP2PBytesPerSec, + rxP2PBytesPerSec, tps, blocktime, )) @@ -634,10 +657,10 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]): pass writer.writerow([]) - writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.tpsList)]) - writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txnSum/self.secondsSum]) - writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.tpsList)]) - writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.tpsList)]) + writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.txP2PBpsList), min(self.rxP2PBpsList), min(self.tpsList)]) + writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txP2PBSum/self.secondsSum, self.rxP2PBSum/self.secondsSum, self.txnSum/self.secondsSum]) + writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.txP2PBpsList), max(self.rxP2PBpsList), max(self.tpsList)]) + writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.txP2PBpsList), statistics.pstdev(self.rxP2PBpsList), statistics.pstdev(self.tpsList)]) if reportf: reportf.close() if self.deltas and args.deltas: From cfd2e6008178f4a09a6627bbeb8afc541fd1689d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 16:00:07 -0400 Subject: [PATCH 05/16] CR fixes --- network/p2p/pubsubTracer.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index f451ed8be6..d14a79c5d0 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -33,13 +33,14 @@ var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.Transacti var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) // This list must be exclusive of the gossipSubTags list in ../metrics.go -// TODO: as adding more messages into gossipSub need to maintain a mapping of topic to tag. -// There is a benefic of using const string in a comparison `*rpc.Publish[i].Topic == TXTopicName` below since it most likely optimized to a single instruction. -var tagTXList = []string{string(ap.Transaction)} -var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagTXList, "") -var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagTXList, "") -var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagTXList, "") -var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagTXList, "") +// TODO: add a unit test checking that the two lists are disjoint +// There is a benefic of using const string in a comparison `*rpc.Publish[i].Topic == TXTopicName` below +// since it most to a single comparison (or two switch/case constructs) on x86-64. +var tracedNetworkMessageTags = []string{string(ap.TxnTag)} +var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tracedNetworkMessageTags, "") +var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tracedNetworkMessageTags, "") +var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tracedNetworkMessageTags, "") +var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tracedNetworkMessageTags, "") // pubsubTracer is a tracer for pubsub events used to track metrics. type pubsubTracer struct{} @@ -64,9 +65,12 @@ func (t pubsubTracer) Prune(p peer.ID, topic string) {} // ValidateMessage is invoked when a message first enters the validation pipeline. func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) { - if msg != nil && msg.Topic != nil && *msg.Topic == TXTopicName { - networkP2PReceivedBytesByTag.Add(string(ap.Transaction), uint64(len(msg.Data))) - networkP2PMessageReceivedByTag.Add(string(ap.Transaction), 1) + if msg != nil && msg.Topic != nil { + switch *msg.Topic { + case TXTopicName: + networkP2PReceivedBytesByTag.Add(string(ap.TxnTag), uint64(len(msg.Data))) + networkP2PMessageReceivedByTag.Add(string(ap.TxnTag), 1) + } } } @@ -101,9 +105,12 @@ func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {} func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { if rpc != nil && len(rpc.Publish) > 0 { for i := range rpc.Publish { - if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil && *rpc.Publish[i].Topic == TXTopicName { - networkP2PSentBytesByTag.Add(string(ap.Transaction), uint64(len(rpc.Publish[0].Data))) - networkP2PMessageSentByTag.Add(string(ap.Transaction), 1) + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case TXTopicName: + networkP2PSentBytesByTag.Add(string(ap.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageSentByTag.Add(string(ap.TxnTag), 1) + } } } } From 34c6fa0e128ad7fdc65a084bf202a705346528ee Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 16:41:35 -0400 Subject: [PATCH 06/16] metric tags tests --- network/metrics.go | 14 +++--- network/metrics_test.go | 49 +++++++++++++++++++++ network/p2p/pubsubTracer.go | 14 +++--- network/p2p/pubsubTracer_test.go | 75 ++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 13 deletions(-) create mode 100644 network/metrics_test.go create mode 100644 network/p2p/pubsubTracer_test.go diff --git a/network/metrics.go b/network/metrics.go index e64cac51fe..d09e1e1dc9 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -21,11 +21,13 @@ import ( "github.com/algorand/go-algorand/util/metrics" ) +var tagStringListP2P []string + func init() { // all tags are tracked by ws net - tagStringListWs := make([]string, len(protocol.TagList)) - for i, t := range protocol.TagList { - tagStringListWs[i] = string(t) + tagStringListWs := make([]string, 0, len(protocol.TagList)) + for _, t := range protocol.TagList { + tagStringListWs = append(tagStringListWs, string(t)) } networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListWs, "UNK") networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringListWs, "UNK") @@ -36,10 +38,10 @@ func init() { // all but gossipSub tags are tracked by p2p net // the remaining tags are tracked by gossipSub tracer p2p sub-package - tagStringListP2P := make([]string, len(protocol.TagList)) - for i, t := range protocol.TagList { + tagStringListP2P = make([]string, 0, len(protocol.TagList)-len(gossipSubTags)) + for _, t := range protocol.TagList { if _, ok := gossipSubTags[t]; !ok { - tagStringListWs[i] = string(t) + tagStringListP2P = append(tagStringListP2P, string(t)) } } networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListP2P, "UNK") diff --git a/network/metrics_test.go b/network/metrics_test.go new file mode 100644 index 0000000000..f1900e5a98 --- /dev/null +++ b/network/metrics_test.go @@ -0,0 +1,49 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/network/p2p" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestMetrics_TagList ensures p2p.TracedNetworkMessageTags and tagStringListP2P are disjoint +func TestMetrics_TagList(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + p2pTags := make(map[string]bool, len(p2p.TracedNetworkMessageTags)) + metricTags := make(map[string]bool, len(tagStringListP2P)) + + for _, tag := range p2p.TracedNetworkMessageTags { + p2pTags[string(tag)] = true + } + + for _, tag := range tagStringListP2P { + metricTags[string(tag)] = true + } + + require.Equal(t, len(protocol.TagMap), len(p2pTags)+len(metricTags)) + for tag := range protocol.TagMap { + require.True(t, p2pTags[string(tag)] || metricTags[string(tag)]) + } +} diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index d14a79c5d0..53247092ab 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -32,15 +32,15 @@ var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.Transac var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) -// This list must be exclusive of the gossipSubTags list in ../metrics.go -// TODO: add a unit test checking that the two lists are disjoint +// TracedNetworkMessageTags is a list of tags for network messages that handled by pubsubTracer. +// This list must be exclusive of the tagStringListP2P list in ../metrics.go. It is exported to ensure these lists are disjoint. // There is a benefic of using const string in a comparison `*rpc.Publish[i].Topic == TXTopicName` below // since it most to a single comparison (or two switch/case constructs) on x86-64. -var tracedNetworkMessageTags = []string{string(ap.TxnTag)} -var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tracedNetworkMessageTags, "") -var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tracedNetworkMessageTags, "") -var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tracedNetworkMessageTags, "") -var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tracedNetworkMessageTags, "") +var TracedNetworkMessageTags = []string{string(ap.TxnTag)} +var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", TracedNetworkMessageTags, "") +var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", TracedNetworkMessageTags, "") +var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", TracedNetworkMessageTags, "") +var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", TracedNetworkMessageTags, "") // pubsubTracer is a tracer for pubsub events used to track metrics. type pubsubTracer struct{} diff --git a/network/p2p/pubsubTracer_test.go b/network/p2p/pubsubTracer_test.go new file mode 100644 index 0000000000..396d62924b --- /dev/null +++ b/network/p2p/pubsubTracer_test.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package p2p + +import ( + "go/ast" + "go/parser" + "go/token" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestPubsubTracer_TagList makes sure all entries +func TestPubsubTracer_TagList(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + fset := token.NewFileSet() + f, err := parser.ParseFile(fset, "pubsubTracer.go", nil, 0) + require.NoError(t, err) + + // Find the SendRPC/ValidateMessage functions and count the switch cases + var sendCaseCount int + var recvCaseCount int + ast.Inspect(f, func(n ast.Node) bool { + switch stmt := n.(type) { + case *ast.FuncDecl: + if stmt.Name.Name == "SendRPC" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + sendCaseCount++ + } + } + } + return true + }) + } + if stmt.Name.Name == "ValidateMessage" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + recvCaseCount++ + } + } + } + return true + }) + } + } + return true + }) + + require.Equal(t, len(TracedNetworkMessageTags), sendCaseCount) + require.Equal(t, len(TracedNetworkMessageTags), recvCaseCount) +} From 5db009c83e5361be6c18eb7a443ec0006cb7edb0 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 17:16:54 -0400 Subject: [PATCH 07/16] fix transactionMessagesP2PRejectMessage TagCounter --- network/p2p/pubsubTracer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go index 53247092ab..31d5799b14 100644 --- a/network/p2p/pubsubTracer.go +++ b/network/p2p/pubsubTracer.go @@ -82,9 +82,17 @@ func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) { // RejectMessage is invoked when a message is Rejected or Ignored. // The reason argument can be one of the named strings Reject*. func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { + // TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly. + // Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals. switch reason { - case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored: - transactionMessagesP2PRejectMessage.Add(reason, 1) + case pubsub.RejectValidationThrottled: + transactionMessagesP2PRejectMessage.Add("throttled", 1) + case pubsub.RejectValidationQueueFull: + transactionMessagesP2PRejectMessage.Add("full", 1) + case pubsub.RejectValidationFailed: + transactionMessagesP2PRejectMessage.Add("failed", 1) + case pubsub.RejectValidationIgnored: + transactionMessagesP2PRejectMessage.Add("ignored", 1) default: transactionMessagesP2PRejectMessage.Add("other", 1) } From 1f9574cb35b155c5e964ba715e7afb7748719378 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 20 Aug 2024 17:31:09 -0400 Subject: [PATCH 08/16] fix test asserts --- network/metrics_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/network/metrics_test.go b/network/metrics_test.go index f1900e5a98..a7697b2e5a 100644 --- a/network/metrics_test.go +++ b/network/metrics_test.go @@ -45,5 +45,6 @@ func TestMetrics_TagList(t *testing.T) { require.Equal(t, len(protocol.TagMap), len(p2pTags)+len(metricTags)) for tag := range protocol.TagMap { require.True(t, p2pTags[string(tag)] || metricTags[string(tag)]) + require.False(t, p2pTags[string(tag)] && metricTags[string(tag)]) } } From 5a94ea5abdc1766f7cafd10832cc8481cff104bb Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 21 Aug 2024 13:15:35 -0400 Subject: [PATCH 09/16] move pubsubTracer to parent package --- network/metrics.go | 146 +++++++++++++++++++++++++------ network/metrics_test.go | 62 +++++++++---- network/p2p/p2p.go | 4 +- network/p2p/pubsub.go | 7 +- network/p2p/pubsubTracer.go | 134 ---------------------------- network/p2p/pubsubTracer_test.go | 75 ---------------- network/p2pNetwork.go | 2 +- 7 files changed, 170 insertions(+), 260 deletions(-) delete mode 100644 network/p2p/pubsubTracer.go delete mode 100644 network/p2p/pubsubTracer_test.go diff --git a/network/metrics.go b/network/metrics.go index d09e1e1dc9..cdd9487f96 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -17,37 +17,32 @@ package network import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + p2proto "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/algorand/go-algorand/network/p2p" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/metrics" ) -var tagStringListP2P []string - func init() { // all tags are tracked by ws net - tagStringListWs := make([]string, 0, len(protocol.TagList)) + tagStringList := make([]string, 0, len(protocol.TagList)) for _, t := range protocol.TagList { - tagStringListWs = append(tagStringListWs, string(t)) - } - networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListWs, "UNK") - networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringListWs, "UNK") - networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringListWs, "UNK") - networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringListWs, "UNK") - networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringListWs, "UNK") - networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringListWs, "UNK") - - // all but gossipSub tags are tracked by p2p net - // the remaining tags are tracked by gossipSub tracer p2p sub-package - tagStringListP2P = make([]string, 0, len(protocol.TagList)-len(gossipSubTags)) - for _, t := range protocol.TagList { - if _, ok := gossipSubTags[t]; !ok { - tagStringListP2P = append(tagStringListP2P, string(t)) - } + tagStringList = append(tagStringList, string(t)) } - networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringListP2P, "UNK") - networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringListP2P, "UNK") - networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringListP2P, "UNK") - networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringListP2P, "UNK") + networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") + networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") + networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") + networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") + networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK") + networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK") + + networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") + networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") + networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") + networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") } var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) @@ -101,11 +96,106 @@ var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "alg var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) -var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) -var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) -var medianPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_median_ping_seconds", Description: "Network round trip time to median peer in seconds."}) -var maxPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_max_ping_seconds", Description: "Network round trip time to slowest peer in seconds."}) - var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."}) var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) + +var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description) +var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) +var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) +var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) + +var _ = pubsub.RawTracer(pubsubMetricsTracer{}) + +// pubsubMetricsTracer is a tracer for pubsub events used to track metrics. +type pubsubMetricsTracer struct{} + +// AddPeer is invoked when a new peer is added. +func (t pubsubMetricsTracer) AddPeer(p peer.ID, proto p2proto.ID) {} + +// RemovePeer is invoked when a peer is removed. +func (t pubsubMetricsTracer) RemovePeer(p peer.ID) {} + +// Join is invoked when a new topic is joined +func (t pubsubMetricsTracer) Join(topic string) {} + +// Leave is invoked when a topic is abandoned +func (t pubsubMetricsTracer) Leave(topic string) {} + +// Graft is invoked when a new peer is grafted on the mesh (gossipsub) +func (t pubsubMetricsTracer) Graft(p peer.ID, topic string) {} + +// Prune is invoked when a peer is pruned from the message (gossipsub) +func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {} + +// ValidateMessage is invoked when a message first enters the validation pipeline. +func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) { + if msg != nil && msg.Topic != nil { + switch *msg.Topic { + case p2p.TXTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) + } + } +} + +// DeliverMessage is invoked when a message is delivered +func (t pubsubMetricsTracer) DeliverMessage(msg *pubsub.Message) { + transactionMessagesP2PDeliverMessage.Inc(nil) +} + +// RejectMessage is invoked when a message is Rejected or Ignored. +// The reason argument can be one of the named strings Reject*. +func (t pubsubMetricsTracer) RejectMessage(msg *pubsub.Message, reason string) { + // TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly. + // Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals. + switch reason { + case pubsub.RejectValidationThrottled: + transactionMessagesP2PRejectMessage.Add("throttled", 1) + case pubsub.RejectValidationQueueFull: + transactionMessagesP2PRejectMessage.Add("full", 1) + case pubsub.RejectValidationFailed: + transactionMessagesP2PRejectMessage.Add("failed", 1) + case pubsub.RejectValidationIgnored: + transactionMessagesP2PRejectMessage.Add("ignored", 1) + default: + transactionMessagesP2PRejectMessage.Add("other", 1) + } +} + +// DuplicateMessage is invoked when a duplicate message is dropped. +func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) { + transactionMessagesP2PDuplicateMessage.Inc(nil) +} + +// ThrottlePeer is invoked when a peer is throttled by the peer gater. +func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {} + +// RecvRPC is invoked when an incoming RPC is received. +func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {} + +// SendRPC is invoked when a RPC is sent. +func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + if rpc != nil && len(rpc.Publish) > 0 { + for i := range rpc.Publish { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case p2p.TXTopicName: + networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1) + } + } + } + } +} + +// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. +func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} + +// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and +// the pressure release mechanism trigger, dropping messages. +func (t pubsubMetricsTracer) UndeliverableMessage(msg *pubsub.Message) { + transactionMessagesP2PUnderdeliverableMessage.Inc(nil) +} diff --git a/network/metrics_test.go b/network/metrics_test.go index a7697b2e5a..857ab57051 100644 --- a/network/metrics_test.go +++ b/network/metrics_test.go @@ -17,34 +17,60 @@ package network import ( + "go/ast" + "go/parser" + "go/token" "testing" "github.com/stretchr/testify/require" - "github.com/algorand/go-algorand/network/p2p" - "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" ) -// TestMetrics_TagList ensures p2p.TracedNetworkMessageTags and tagStringListP2P are disjoint -func TestMetrics_TagList(t *testing.T) { +// TestPubsubTracer_TagList makes sure pubsubMetricsTracer traces pubsub messages +// by counting switch cases in SendRPC and ValidateMessage +func TestMetrics_PubsubTracer_TagList(t *testing.T) { t.Parallel() partitiontest.PartitionTest(t) - p2pTags := make(map[string]bool, len(p2p.TracedNetworkMessageTags)) - metricTags := make(map[string]bool, len(tagStringListP2P)) + fset := token.NewFileSet() + f, err := parser.ParseFile(fset, "metrics.go", nil, 0) + require.NoError(t, err) - for _, tag := range p2p.TracedNetworkMessageTags { - p2pTags[string(tag)] = true - } + // Find the SendRPC/ValidateMessage functions and count the switch cases + var sendCaseCount int + var recvCaseCount int + ast.Inspect(f, func(n ast.Node) bool { + switch stmt := n.(type) { + case *ast.FuncDecl: + if stmt.Name.Name == "SendRPC" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + sendCaseCount++ + } + } + } + return true + }) + } + if stmt.Name.Name == "ValidateMessage" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + recvCaseCount++ + } + } + } + return true + }) + } + } + return true + }) - for _, tag := range tagStringListP2P { - metricTags[string(tag)] = true - } - - require.Equal(t, len(protocol.TagMap), len(p2pTags)+len(metricTags)) - for tag := range protocol.TagMap { - require.True(t, p2pTags[string(tag)] || metricTags[string(tag)]) - require.False(t, p2pTags[string(tag)] && metricTags[string(tag)]) - } + require.Equal(t, len(gossipSubTags), sendCaseCount) + require.Equal(t, len(gossipSubTags), recvCaseCount) } diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 3b467b0b27..70fe09027a 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -176,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) } // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) { +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) { sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService) h.Network().Notify(sm) @@ -188,7 +188,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance) h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) - ps, err := makePubSub(ctx, cfg, h) + ps, err := makePubSub(ctx, cfg, h, metricsTracer) if err != nil { return nil, err } diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index b12ea0edf5..cb94ed4130 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -57,7 +57,7 @@ const TXTopicName = "algo-0.1" const incomingThreads = 20 // matches to number wsNetwork workers -func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { +func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() options := []pubsub.Option{ @@ -100,7 +100,10 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), pubsub.WithValidateWorkers(incomingThreads), - pubsub.WithRawTracer(pubsubTracer{}), + } + + if metricsTracer != nil { + options = append(options, pubsub.WithRawTracer(metricsTracer)) } return pubsub.NewGossipSub(ctx, host, options...) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go deleted file mode 100644 index 31d5799b14..0000000000 --- a/network/p2p/pubsubTracer.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (C) 2019-2024 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package p2p - -import ( - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - - ap "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util/metrics" -) - -var _ = pubsub.RawTracer(pubsubTracer{}) - -var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description) -var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) -var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) -var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) - -// TracedNetworkMessageTags is a list of tags for network messages that handled by pubsubTracer. -// This list must be exclusive of the tagStringListP2P list in ../metrics.go. It is exported to ensure these lists are disjoint. -// There is a benefic of using const string in a comparison `*rpc.Publish[i].Topic == TXTopicName` below -// since it most to a single comparison (or two switch/case constructs) on x86-64. -var TracedNetworkMessageTags = []string{string(ap.TxnTag)} -var networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", TracedNetworkMessageTags, "") -var networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", TracedNetworkMessageTags, "") -var networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", TracedNetworkMessageTags, "") -var networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", TracedNetworkMessageTags, "") - -// pubsubTracer is a tracer for pubsub events used to track metrics. -type pubsubTracer struct{} - -// AddPeer is invoked when a new peer is added. -func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {} - -// RemovePeer is invoked when a peer is removed. -func (t pubsubTracer) RemovePeer(p peer.ID) {} - -// Join is invoked when a new topic is joined -func (t pubsubTracer) Join(topic string) {} - -// Leave is invoked when a topic is abandoned -func (t pubsubTracer) Leave(topic string) {} - -// Graft is invoked when a new peer is grafted on the mesh (gossipsub) -func (t pubsubTracer) Graft(p peer.ID, topic string) {} - -// Prune is invoked when a peer is pruned from the message (gossipsub) -func (t pubsubTracer) Prune(p peer.ID, topic string) {} - -// ValidateMessage is invoked when a message first enters the validation pipeline. -func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) { - if msg != nil && msg.Topic != nil { - switch *msg.Topic { - case TXTopicName: - networkP2PReceivedBytesByTag.Add(string(ap.TxnTag), uint64(len(msg.Data))) - networkP2PMessageReceivedByTag.Add(string(ap.TxnTag), 1) - } - } -} - -// DeliverMessage is invoked when a message is delivered -func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) { - transactionMessagesP2PDeliverMessage.Inc(nil) -} - -// RejectMessage is invoked when a message is Rejected or Ignored. -// The reason argument can be one of the named strings Reject*. -func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { - // TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly. - // Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals. - switch reason { - case pubsub.RejectValidationThrottled: - transactionMessagesP2PRejectMessage.Add("throttled", 1) - case pubsub.RejectValidationQueueFull: - transactionMessagesP2PRejectMessage.Add("full", 1) - case pubsub.RejectValidationFailed: - transactionMessagesP2PRejectMessage.Add("failed", 1) - case pubsub.RejectValidationIgnored: - transactionMessagesP2PRejectMessage.Add("ignored", 1) - default: - transactionMessagesP2PRejectMessage.Add("other", 1) - } -} - -// DuplicateMessage is invoked when a duplicate message is dropped. -func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) { - transactionMessagesP2PDuplicateMessage.Inc(nil) -} - -// ThrottlePeer is invoked when a peer is throttled by the peer gater. -func (t pubsubTracer) ThrottlePeer(p peer.ID) {} - -// RecvRPC is invoked when an incoming RPC is received. -func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {} - -// SendRPC is invoked when a RPC is sent. -func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { - if rpc != nil && len(rpc.Publish) > 0 { - for i := range rpc.Publish { - if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { - switch *rpc.Publish[i].Topic { - case TXTopicName: - networkP2PSentBytesByTag.Add(string(ap.TxnTag), uint64(len(rpc.Publish[i].Data))) - networkP2PMessageSentByTag.Add(string(ap.TxnTag), 1) - } - } - } - } -} - -// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. -func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} - -// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and -// the pressure release mechanism trigger, dropping messages. -func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) { - transactionMessagesP2PUnderdeliverableMessage.Inc(nil) -} diff --git a/network/p2p/pubsubTracer_test.go b/network/p2p/pubsubTracer_test.go deleted file mode 100644 index 396d62924b..0000000000 --- a/network/p2p/pubsubTracer_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2019-2024 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package p2p - -import ( - "go/ast" - "go/parser" - "go/token" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/algorand/go-algorand/test/partitiontest" -) - -// TestPubsubTracer_TagList makes sure all entries -func TestPubsubTracer_TagList(t *testing.T) { - t.Parallel() - partitiontest.PartitionTest(t) - - fset := token.NewFileSet() - f, err := parser.ParseFile(fset, "pubsubTracer.go", nil, 0) - require.NoError(t, err) - - // Find the SendRPC/ValidateMessage functions and count the switch cases - var sendCaseCount int - var recvCaseCount int - ast.Inspect(f, func(n ast.Node) bool { - switch stmt := n.(type) { - case *ast.FuncDecl: - if stmt.Name.Name == "SendRPC" { - ast.Inspect(stmt.Body, func(n ast.Node) bool { - if switchStmt, ok := n.(*ast.SwitchStmt); ok { - for _, stmt := range switchStmt.Body.List { - if _, ok := stmt.(*ast.CaseClause); ok { - sendCaseCount++ - } - } - } - return true - }) - } - if stmt.Name.Name == "ValidateMessage" { - ast.Inspect(stmt.Body, func(n ast.Node) bool { - if switchStmt, ok := n.(*ast.SwitchStmt); ok { - for _, stmt := range switchStmt.Body.List { - if _, ok := stmt.(*ast.CaseClause); ok { - recvCaseCount++ - } - } - } - return true - }) - } - } - return true - }) - - require.Equal(t, len(TracedNetworkMessageTags), sendCaseCount) - require.Equal(t, len(TracedNetworkMessageTags), recvCaseCount) -} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 2b2ac861fb..d3af60a223 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -266,7 +266,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) - net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler) + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, pubsubMetricsTracer{}) if err != nil { return nil, err } From bf20dce531f6570e6c7ee596097caeb64ab6d676 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 21 Aug 2024 13:51:34 -0400 Subject: [PATCH 10/16] heapwatch: support p2p total bytes counters --- test/heapwatch/metrics_delta.py | 72 ++++++++++++++++++++++----------- test/heapwatch/metrics_lib.py | 2 +- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/test/heapwatch/metrics_delta.py b/test/heapwatch/metrics_delta.py index f6e99023d9..3ff1afca65 100644 --- a/test/heapwatch/metrics_delta.py +++ b/test/heapwatch/metrics_delta.py @@ -136,6 +136,8 @@ def __init__(self, label=None): self.tpsMeanSum = 0 self.txBpsMeanSum = 0 self.rxBpsMeanSum = 0 + self.txP2PBpsMeanSum = 0 + self.rxP2PBpsMeanSum = 0 self.tpsSum = 0 self.blockTimeSum = 0 self.sumsCount = 0 @@ -152,6 +154,8 @@ def __call__(self, ttr, nick): self.tpsMeanSum += meanOrZero(ttr.tpsList) self.txBpsMeanSum += meanOrZero(ttr.txBpsList) self.rxBpsMeanSum += meanOrZero(ttr.rxBpsList) + self.txP2PBpsMeanSum += meanOrZero(ttr.txP2PBpsList) + self.rxP2PBpsMeanSum += meanOrZero(ttr.rxP2PBpsList) self.tpsSum += ttr.tps self.blockTimeSum += ttr.blockTime self.sumsCount += 1 @@ -164,8 +168,10 @@ def blockinfo(self, curtime): return self.biByTime.get(curtime) def byMsg(self, html=False): - txPSums = {} - rxPSums = {} + txWsPSums = {} + rxWsPSums = {} + txP2PPSums = {} + rxP2PPSums = {} secondsSum = 0 txMax = {} txMin = {} @@ -175,8 +181,10 @@ def byMsg(self, html=False): for nick, ns in self.nodes.items(): nicks.append(nick) secondsSum += ns.secondsSum - dictSum(txPSums, ns.txPSums) - dictSum(rxPSums, ns.rxPSums) + dictSum(txWsPSums, ns.txPSums) + dictSum(rxWsPSums, ns.rxPSums) + dictSum(txP2PPSums, ns.txP2PPSums) + dictSum(rxP2PPSums, ns.rxP2PPSums) dictMax(txMax, ns.txPLists) dictMax(rxMax, ns.rxPLists) dictMin(txMin, ns.txPLists) @@ -185,23 +193,36 @@ def byMsg(self, html=False): lines = [] if html: lines.append('
{}
'.format(nodesummary)) - lines.append('') + lines.append('
tx B/srx B/s
') # traffic per tag two columns: ws and p2p else: lines.append(nodesummary) - lines.append('\ttx B/s\trx B/s') - for msg, txB in txPSums.items(): - if msg not in rxPSums: - rxPSums[msg] = 0 - for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True): - txBps = txPSums.get(msg,0)/secondsSum - if (txBps < 0.5) and (rxBps < 0.5): - continue + + for title, txPSums, rxPSums in [ + ('ws', txWsPSums, rxWsPSums), + ('p2p', txP2PPSums, rxP2PPSums), + ]: if html: - lines.append(''.format(msg, txBps, rxBps)) + lines.append('') if html: - lines.append('
{}{:.0f}{:.0f}
') + lines.append(f'') else: - lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps)) + lines.append(f'{title} traffic per tag') + lines.append('\ttx B/s\trx B/s') + for msg, txB in txPSums.items(): + if msg not in rxPSums: + rxPSums[msg] = 0 + for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True): + txBps = txPSums.get(msg,0)/secondsSum + if (txBps < 0.5) and (rxBps < 0.5): + continue + if html: + lines.append(''.format(msg, txBps, rxBps)) + else: + lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps)) + if html: + lines.append('
{title} traffic per tag
tx B/srx B/s
{}{:.0f}{:.0f}
') + lines.append('
') + lines.append('') # traffic per tag two columns: ws and p2p return '\n'.join(lines) def txPool(self): @@ -230,7 +251,7 @@ def html(self): def str(self, html=False): if not self.sumsCount: - tps, txbps, rxbps = math.nan, math.nan, math.nan + tps, txbps, rxbps, txP2Pbps, rxP2Pbps = math.nan, math.nan, math.nan, math.nan, math.nan blockTimes = math.nan else: #tps = self.tpsMeanSum/self.sumsCount @@ -238,6 +259,8 @@ def str(self, html=False): blockTimes = self.blockTimeSum/self.sumsCount txbps = self.txBpsMeanSum/self.sumsCount rxbps = self.rxBpsMeanSum/self.sumsCount + txP2Pbps = self.txP2PBpsMeanSum/self.sumsCount + rxP2Pbps = self.rxP2PBpsMeanSum/self.sumsCount labelspace = "" if self.label: labelspace = self.label + " " @@ -248,12 +271,12 @@ def str(self, html=False): else: verifyMillis = '' if html: - fmt = '{byMsg}\n{verifyMillis}
{labelspace}{txPool}
\n
{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s
' + fmt = '{byMsg}\n{verifyMillis}
{labelspace}{txPool}
\n
{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s
' if self.label: fmt = '
' + self.label + '
' + fmt else: - fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s' - return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), bt=blockTimes, verifyMillis=verifyMillis) + fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s' + return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), txP2PBps=hunum(txP2Pbps), rxP2PBps=hunum(rxP2Pbps), bt=blockTimes, verifyMillis=verifyMillis) def plot_pool(self, outpath): from matplotlib import pyplot as plt @@ -599,8 +622,8 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): rxBytesPerSec = rxBytes / dt txP2PBytes = d.get('algod_network_p2p_sent_bytes_total',0) rxP2PBytes = d.get('algod_network_p2p_received_bytes_total',0) - txP2PBytesPerSec = txBytes / dt - rxP2PBytesPerSec = rxBytes / dt + txP2PBytesPerSec = txP2PBytes / dt + rxP2PBytesPerSec = rxP2PBytes / dt # TODO: gather algod_network_sent_bytes_* and algod_network_received_bytes_* if (tps is None) or ((args.mintps is not None) and (tps < args.mintps)): @@ -654,8 +677,9 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): self.blockTime = totalDt / rounds if writer and self.txBpsList: writer.writerow([]) - for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]): - pass + # TODO: summarize + # for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]): + # pass writer.writerow([]) writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.txP2PBpsList), min(self.rxP2PBpsList), min(self.tpsList)]) writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txP2PBSum/self.secondsSum, self.rxP2PBSum/self.secondsSum, self.txnSum/self.secondsSum]) diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index fbda555b90..5fc7b36075 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -54,7 +54,7 @@ def hunum(x): return '{:.1f}k'.format(x / 1000.0) if x >= 1000: return '{:.2f}k'.format(x / 1000.0) - return '{:.2f}x'.format(x) + return '{:.2f}'.format(x) def test_metric_line_re(): From e83e5f5c58d1b6a05d2997a04ae075fe7d09dfbb Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 21 Aug 2024 16:48:09 -0400 Subject: [PATCH 11/16] adjust TXTopicName value --- network/p2p/pubsub.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index cb94ed4130..a592657010 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -52,8 +52,9 @@ const ( // TXTopicName defines a pubsub topic for TX messages // There is a micro optimization for const string comparison: -// 8 bytes const string require a single x86-64 CMPQ instruction -const TXTopicName = "algo-0.1" +// 8 bytes const string require a single x86-64 CMPQ instruction. +// Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version +const TXTopicName = "algotx01" const incomingThreads = 20 // matches to number wsNetwork workers From c2ceb8bbd63859efddf6ada6aaa7a7205021f224 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 21 Aug 2024 16:51:31 -0400 Subject: [PATCH 12/16] remove unused metric names --- util/metrics/metrics.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index a1ebb75b8f..d7afe6439c 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -147,11 +147,6 @@ var ( // TransactionMessagesP2PValidateMessage "Number of p2p pubsub transaction messages received for validation" TransactionMessagesP2PValidateMessage = MetricName{Name: "algod_transaction_messages_p2p_validate", Description: "Number of p2p pubsub transaction messages received for validation"} - // TransactionMessagesP2PSentMessage "Number of p2p pubsub transaction messages received for validation" - TransactionMessagesP2PSentMessage = MetricName{Name: "algod_transaction_messages_p2p_sent", Description: "Number of p2p pubsub transaction messages sent"} - // TransactionMessagesP2PSentBytes "Number p2p pubsub transaction bytes sent" - TransactionMessagesP2PSentBytes = MetricName{Name: "algod_transaction_messages_p2p_bytes", Description: "Number p2p pubsub transaction bytes sent"} - // TransactionGroupTxSyncHandled "Number of transaction groups handled via txsync" TransactionGroupTxSyncHandled = MetricName{Name: "algod_transaction_group_txsync_handled", Description: "Number of transaction groups handled via txsync"} // TransactionGroupTxSyncRemember "Number of transaction groups remembered via txsync" From 1f25c594c924df1b0bdf3f9803193c08e09dfcd7 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 22 Aug 2024 12:55:07 -0400 Subject: [PATCH 13/16] Add total gossibsub bytes metrics --- network/metrics.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index cdd9487f96..a64850dd32 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -105,6 +105,9 @@ var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.Transac var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) +var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) +var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) + var _ = pubsub.RawTracer(pubsubMetricsTracer{}) // pubsubMetricsTracer is a tracer for pubsub events used to track metrics. @@ -173,19 +176,20 @@ func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) { func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {} // RecvRPC is invoked when an incoming RPC is received. -func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {} +func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) { + networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil) +} // SendRPC is invoked when a RPC is sent. func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { - if rpc != nil && len(rpc.Publish) > 0 { - for i := range rpc.Publish { - if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { - switch *rpc.Publish[i].Topic { - case p2p.TXTopicName: - networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) - networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) - networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1) - } + networkP2PGossipSubSentBytesTotal.AddUint64(uint64(rpc.Size()), nil) + for i := range rpc.GetPublish() { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case p2p.TXTopicName: + networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1) } } } From c95cb91965bd0794af23e389a104fccba820a5d4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 22 Aug 2024 16:00:33 -0400 Subject: [PATCH 14/16] block history to report total number of transactions --- test/heapwatch/block_history_plot.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/heapwatch/block_history_plot.py b/test/heapwatch/block_history_plot.py index d8c86b454f..0bf8a2c8a2 100644 --- a/test/heapwatch/block_history_plot.py +++ b/test/heapwatch/block_history_plot.py @@ -23,6 +23,7 @@ # Graph over time of TPS or 10-round-moving-average-TPS import base64 +import json import os import statistics import sys @@ -106,12 +107,25 @@ def process(path, args): prevtc = tc prevts = ts prevtime = _time - print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}]'.format( + print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}], total txns {}'.format( count, mintxn,maxtxn, mindt,maxdt, mintps,maxtps, + tc, )) + if tc > 0: + with open(path + '.stats', 'w') as fout: + fout.write(json.dumps({ + 'blocks': count, + 'tc': tc, + 'mintxn': mintxn, + 'maxtxn': maxtxn, + 'mindt': mindt, + 'maxdt': maxdt, + 'mintps': mintps, + 'maxtps': maxtps, + })) start = 0 end = len(txnv)-1 From 72f89c87cff7b6975e467a7f1706c483712e91ec Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 22 Aug 2024 21:10:15 -0400 Subject: [PATCH 15/16] metrics_aggs: print max avg stats --- test/heapwatch/metrics_aggs.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/test/heapwatch/metrics_aggs.py b/test/heapwatch/metrics_aggs.py index 0189634be5..d20593c097 100644 --- a/test/heapwatch/metrics_aggs.py +++ b/test/heapwatch/metrics_aggs.py @@ -33,7 +33,7 @@ from plotly.subplots import make_subplots -from metrics_lib import MetricType, parse_metrics, gather_metrics_files_by_nick +from metrics_lib import Metric, MetricType, parse_metrics, gather_metrics_files_by_nick logger = logging.getLogger(__name__) @@ -53,6 +53,8 @@ def main(): ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated') ap.add_argument('-s', '--save', type=str, choices=['png', 'html'], help=f'save plot to \'{default_img_filename}\' or \'{default_html_filename}\' file instead of showing it') ap.add_argument('--verbose', default=False, action='store_true') + ap.add_argument('--avg-max', default=False, action='store_true', help='print avg of max values across nodes for each metric') + ap.add_argument('--avg-max-min', default=False, action='store_true', help='print avg of max-min values across nodes for each metric') args = ap.parse_args() if args.verbose: @@ -99,6 +101,7 @@ def main(): } fig['layout']['height'] = 500 * nrows + nick_series = {} for nick, files_by_date in filesByNick.items(): active_metrics = {} @@ -146,7 +149,10 @@ def main(): active_metric_names.sort() active_metrics[full_name] = active_metric_names idx += 1 - + + if args.avg_max or args.avg_max_min: + nick_series[nick] = raw_series + for i, metric_pair in enumerate(sorted(active_metrics.items())): metric_name, metric_fullnames = metric_pair for metric_fullname in metric_fullnames: @@ -158,6 +164,23 @@ def main(): line=dict(width=1), ), i+1, 1) + if args.avg_max or args.avg_max_min: + metric_names_nick_max_avg = {} + for nick, raw_series in nick_series.items(): + for metric_name, rw in raw_series.items(): + mmax = max(rw) + mmin = min(rw) + print(f'{nick}: {metric_name}: count {len(rw)}, max {mmax}, min {mmin}, min-max {mmax - mmin}') + metric = Metric(metric_name, 0, MetricType.COUNTER) + if metric.short_name() not in metric_names_nick_max_avg: + metric_names_nick_max_avg[metric.short_name()] = [] + if args.avg_max_min: + metric_names_nick_max_avg[metric.short_name()].append(mmax - mmin) + if args.avg_max: + metric_names_nick_max_avg[metric.short_name()].append(mmax) + for metric_name, val in metric_names_nick_max_avg.items(): + print(f'{metric_name}: avg {sum(val)/len(val)}') + if args.save: if args.save == 'html': target_path = os.path.join(args.dir, default_html_filename) From edc2d0c43b84e0f8c7e7a4cb3a059375a04912d3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 28 Aug 2024 16:04:30 -0400 Subject: [PATCH 16/16] Add networkP2PMessageQueueMicrosTotal --- network/metrics.go | 1 + network/wsPeer.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index a64850dd32..a1e92b2424 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -68,6 +68,7 @@ var networkHandleCountByTag *metrics.TagCounter var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) +var networkP2PMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_message_sent_queue_micros_total", Description: "Total microseconds p2p message spent waiting in queue to be sent"}) var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) diff --git a/network/wsPeer.go b/network/wsPeer.go index e8785c65fb..a6a982af59 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -852,14 +852,14 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) networkMessageSentTotal.AddUint64(1, nil) networkMessageSentByTag.Add(string(tag), 1) + networkMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) } else { networkP2PSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) networkP2PSentBytesByTag.Add(string(tag), uint64(len(msg.data))) networkP2PMessageSentTotal.AddUint64(1, nil) networkP2PMessageSentByTag.Add(string(tag), 1) - + networkP2PMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) } - networkMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) return disconnectReasonNone }