diff --git a/cmd/algod/main.go b/cmd/algod/main.go index 311b1507e6..47008256ac 100644 --- a/cmd/algod/main.go +++ b/cmd/algod/main.go @@ -445,6 +445,8 @@ var startupConfigCheckFields = []string{ "TxPoolExponentialIncreaseFactor", "TxPoolSize", "VerifiedTranscationsCacheSize", + "EnableP2P", + "EnableP2PHybridMode", } func resolveDataDir() string { diff --git a/network/metrics.go b/network/metrics.go new file mode 100644 index 0000000000..a1e92b2424 --- /dev/null +++ b/network/metrics.go @@ -0,0 +1,206 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + p2proto "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/algorand/go-algorand/network/p2p" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/util/metrics" +) + +func init() { + // all tags are tracked by ws net + tagStringList := make([]string, 0, len(protocol.TagList)) + for _, t := range protocol.TagList { + tagStringList = append(tagStringList, string(t)) + } + networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") + networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") + networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") + networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") + networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK") + networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK") + + networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") + networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") + networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") + networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") +} + +var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) +var networkP2PSentBytesTotal = metrics.MakeCounter(metrics.NetworkP2PSentBytesTotal) +var networkSentBytesByTag *metrics.TagCounter +var networkP2PSentBytesByTag *metrics.TagCounter +var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal) +var networkP2PReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkP2PReceivedBytesTotal) +var networkReceivedBytesByTag *metrics.TagCounter +var networkP2PReceivedBytesByTag *metrics.TagCounter + +var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal) +var networkP2PMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkP2PMessageReceivedTotal) +var networkMessageReceivedByTag *metrics.TagCounter +var networkP2PMessageReceivedByTag *metrics.TagCounter +var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal) +var networkP2PMessageSentTotal = metrics.MakeCounter(metrics.NetworkP2PMessageSentTotal) +var networkMessageSentByTag *metrics.TagCounter +var networkP2PMessageSentByTag *metrics.TagCounter + +var networkHandleMicrosByTag *metrics.TagCounter +var networkHandleCountByTag *metrics.TagCounter + +var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) +var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) +var networkP2PMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_message_sent_queue_micros_total", Description: "Total microseconds p2p message spent waiting in queue to be sent"}) + +var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) +var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) +var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal) +var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal) +var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal) +var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal) + +var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections) +var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections) + +var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"}) +var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) + +var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) +var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) +var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) +var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) +var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) +var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) + +var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) +var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) +var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) + +var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) +var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) + +var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."}) +var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) +var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) + +var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description) +var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) +var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) +var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) + +var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"}) +var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"}) + +var _ = pubsub.RawTracer(pubsubMetricsTracer{}) + +// pubsubMetricsTracer is a tracer for pubsub events used to track metrics. +type pubsubMetricsTracer struct{} + +// AddPeer is invoked when a new peer is added. +func (t pubsubMetricsTracer) AddPeer(p peer.ID, proto p2proto.ID) {} + +// RemovePeer is invoked when a peer is removed. +func (t pubsubMetricsTracer) RemovePeer(p peer.ID) {} + +// Join is invoked when a new topic is joined +func (t pubsubMetricsTracer) Join(topic string) {} + +// Leave is invoked when a topic is abandoned +func (t pubsubMetricsTracer) Leave(topic string) {} + +// Graft is invoked when a new peer is grafted on the mesh (gossipsub) +func (t pubsubMetricsTracer) Graft(p peer.ID, topic string) {} + +// Prune is invoked when a peer is pruned from the message (gossipsub) +func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {} + +// ValidateMessage is invoked when a message first enters the validation pipeline. +func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) { + if msg != nil && msg.Topic != nil { + switch *msg.Topic { + case p2p.TXTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) + } + } +} + +// DeliverMessage is invoked when a message is delivered +func (t pubsubMetricsTracer) DeliverMessage(msg *pubsub.Message) { + transactionMessagesP2PDeliverMessage.Inc(nil) +} + +// RejectMessage is invoked when a message is Rejected or Ignored. +// The reason argument can be one of the named strings Reject*. +func (t pubsubMetricsTracer) RejectMessage(msg *pubsub.Message, reason string) { + // TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly. + // Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals. + switch reason { + case pubsub.RejectValidationThrottled: + transactionMessagesP2PRejectMessage.Add("throttled", 1) + case pubsub.RejectValidationQueueFull: + transactionMessagesP2PRejectMessage.Add("full", 1) + case pubsub.RejectValidationFailed: + transactionMessagesP2PRejectMessage.Add("failed", 1) + case pubsub.RejectValidationIgnored: + transactionMessagesP2PRejectMessage.Add("ignored", 1) + default: + transactionMessagesP2PRejectMessage.Add("other", 1) + } +} + +// DuplicateMessage is invoked when a duplicate message is dropped. +func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) { + transactionMessagesP2PDuplicateMessage.Inc(nil) +} + +// ThrottlePeer is invoked when a peer is throttled by the peer gater. +func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {} + +// RecvRPC is invoked when an incoming RPC is received. +func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) { + networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil) +} + +// SendRPC is invoked when a RPC is sent. +func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + networkP2PGossipSubSentBytesTotal.AddUint64(uint64(rpc.Size()), nil) + for i := range rpc.GetPublish() { + if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil { + switch *rpc.Publish[i].Topic { + case p2p.TXTopicName: + networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1) + } + } + } +} + +// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. +func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} + +// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and +// the pressure release mechanism trigger, dropping messages. +func (t pubsubMetricsTracer) UndeliverableMessage(msg *pubsub.Message) { + transactionMessagesP2PUnderdeliverableMessage.Inc(nil) +} diff --git a/network/metrics_test.go b/network/metrics_test.go new file mode 100644 index 0000000000..857ab57051 --- /dev/null +++ b/network/metrics_test.go @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2024 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "go/ast" + "go/parser" + "go/token" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestPubsubTracer_TagList makes sure pubsubMetricsTracer traces pubsub messages +// by counting switch cases in SendRPC and ValidateMessage +func TestMetrics_PubsubTracer_TagList(t *testing.T) { + t.Parallel() + partitiontest.PartitionTest(t) + + fset := token.NewFileSet() + f, err := parser.ParseFile(fset, "metrics.go", nil, 0) + require.NoError(t, err) + + // Find the SendRPC/ValidateMessage functions and count the switch cases + var sendCaseCount int + var recvCaseCount int + ast.Inspect(f, func(n ast.Node) bool { + switch stmt := n.(type) { + case *ast.FuncDecl: + if stmt.Name.Name == "SendRPC" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + sendCaseCount++ + } + } + } + return true + }) + } + if stmt.Name.Name == "ValidateMessage" { + ast.Inspect(stmt.Body, func(n ast.Node) bool { + if switchStmt, ok := n.(*ast.SwitchStmt); ok { + for _, stmt := range switchStmt.Body.List { + if _, ok := stmt.(*ast.CaseClause); ok { + recvCaseCount++ + } + } + } + return true + }) + } + } + return true + }) + + require.Equal(t, len(gossipSubTags), sendCaseCount) + require.Equal(t, len(gossipSubTags), recvCaseCount) +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 3b467b0b27..70fe09027a 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -176,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error) } // MakeService creates a P2P service instance -func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) { +func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) { sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService) h.Network().Notify(sm) @@ -188,7 +188,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance) h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() }) - ps, err := makePubSub(ctx, cfg, h) + ps, err := makePubSub(ctx, cfg, h, metricsTracer) if err != nil { return nil, err } diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 657baecdde..a592657010 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -51,11 +51,14 @@ const ( ) // TXTopicName defines a pubsub topic for TX messages -const TXTopicName = "/algo/tx/0.1.0" +// There is a micro optimization for const string comparison: +// 8 bytes const string require a single x86-64 CMPQ instruction. +// Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version +const TXTopicName = "algotx01" const incomingThreads = 20 // matches to number wsNetwork workers -func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) { +func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) { //defaultParams := pubsub.DefaultGossipSubParams() options := []pubsub.Option{ @@ -98,7 +101,10 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub. pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), // pubsub.WithValidateThrottle(cfg.TxBacklogSize), pubsub.WithValidateWorkers(incomingThreads), - pubsub.WithRawTracer(pubsubTracer{}), + } + + if metricsTracer != nil { + options = append(options, pubsub.WithRawTracer(metricsTracer)) } return pubsub.NewGossipSub(ctx, host, options...) diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go deleted file mode 100644 index ca57bc69ce..0000000000 --- a/network/p2p/pubsubTracer.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright (C) 2019-2024 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package p2p - -import ( - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - - "github.com/algorand/go-algorand/util/metrics" -) - -var _ = pubsub.RawTracer(pubsubTracer{}) - -var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description) -var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage) -var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage) -var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage) -var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage) - -// pubsubTracer is a tracer for pubsub events used to track metrics. -type pubsubTracer struct{} - -// AddPeer is invoked when a new peer is added. -func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {} - -// RemovePeer is invoked when a peer is removed. -func (t pubsubTracer) RemovePeer(p peer.ID) {} - -// Join is invoked when a new topic is joined -func (t pubsubTracer) Join(topic string) {} - -// Leave is invoked when a topic is abandoned -func (t pubsubTracer) Leave(topic string) {} - -// Graft is invoked when a new peer is grafted on the mesh (gossipsub) -func (t pubsubTracer) Graft(p peer.ID, topic string) {} - -// Prune is invoked when a peer is pruned from the message (gossipsub) -func (t pubsubTracer) Prune(p peer.ID, topic string) {} - -// ValidateMessage is invoked when a message first enters the validation pipeline. -func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) { - transactionMessagesP2PValidateMessage.Inc(nil) -} - -// DeliverMessage is invoked when a message is delivered -func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) { - transactionMessagesP2PDeliverMessage.Inc(nil) -} - -// RejectMessage is invoked when a message is Rejected or Ignored. -// The reason argument can be one of the named strings Reject*. -func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) { - switch reason { - case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored: - transactionMessagesP2PRejectMessage.Add(reason, 1) - default: - transactionMessagesP2PRejectMessage.Add("other", 1) - } -} - -// DuplicateMessage is invoked when a duplicate message is dropped. -func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) { - transactionMessagesP2PDuplicateMessage.Inc(nil) -} - -// ThrottlePeer is invoked when a peer is throttled by the peer gater. -func (t pubsubTracer) ThrottlePeer(p peer.ID) {} - -// RecvRPC is invoked when an incoming RPC is received. -func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {} - -// SendRPC is invoked when a RPC is sent. -func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {} - -// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. -func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {} - -// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and -// the pressure release mechanism trigger, dropping messages. -func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) { - transactionMessagesP2PUnderdeliverableMessage.Inc(nil) -} diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f9dc04b785..d3af60a223 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -194,6 +194,11 @@ type p2pPeerStats struct { txReceived atomic.Uint64 } +// gossipSubTags defines protocol messages that are relayed using GossipSub +var gossipSubTags = map[protocol.Tag]string{ + protocol.TxnTag: p2p.TXTopicName, +} + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -214,7 +219,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo config: cfg, genesisID: genesisID, networkID: networkID, - topicTags: map[protocol.Tag]string{protocol.TxnTag: p2p.TXTopicName}, + topicTags: gossipSubTags, wsPeers: make(map[peer.ID]*wsPeer), wsPeersToIDs: make(map[*wsPeer]peer.ID), peerStats: make(map[peer.ID]*p2pPeerStats), @@ -261,7 +266,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo } log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs()) - net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler) + net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, pubsubMetricsTracer{}) if err != nil { return nil, err } @@ -791,6 +796,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea conn: &wsPeerConnP2P{stream: stream}, outgoing: !incoming, identity: netIdentPeerID, + peerType: peerTypeP2P, } protos, err := n.pstore.GetProtocols(p2pPeer) if err != nil { diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 2af3a9b6bf..ecb636c8e2 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -53,7 +53,6 @@ import ( tools_network "github.com/algorand/go-algorand/tools/network" "github.com/algorand/go-algorand/tools/network/dnssec" "github.com/algorand/go-algorand/util" - "github.com/algorand/go-algorand/util/metrics" ) const incomingThreads = 20 @@ -117,35 +116,6 @@ const wsMaxHeaderBytes = 4096 // used from the ReservedFDs pool, as this pool is meant for short-lived usage (dns queries, disk i/o, etc.) const ReservedHealthServiceConnections = 10 -var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections) -var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections) - -var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"}) -var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) - -var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) -var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) -var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) -var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) -var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) -var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) - -var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"}) -var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"}) -var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"}) - -var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) -var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) - -var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) -var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) -var medianPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_median_ping_seconds", Description: "Network round trip time to median peer in seconds."}) -var maxPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_max_ping_seconds", Description: "Network round trip time to slowest peer in seconds."}) - -var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."}) -var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."}) -var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."}) - // peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete. const peerDisconnectionAckDuration = 5 * time.Second diff --git a/network/wsPeer.go b/network/wsPeer.go index 88a0c615f9..a6a982af59 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -39,7 +39,6 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util" - "github.com/algorand/go-algorand/util/metrics" ) // MaxMessageLength is the maximum length of a message that can be sent or received, exported to be used in the node.TestMaxSizesCorrect test @@ -52,20 +51,7 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha // buffer and starve messages from other peers. const msgsInReadBufferPerPeer = 10 -var tagStringList []string - func init() { - tagStringList = make([]string, len(protocol.TagList)) - for i, t := range protocol.TagList { - tagStringList[i] = string(t) - } - networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK") - networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK") - networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK") - networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK") - networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK") - networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK") - matched := false for _, version := range SupportedProtocolVersions { if version == versionPeerFeatures { @@ -83,29 +69,6 @@ func init() { } } -var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal) -var networkSentBytesByTag *metrics.TagCounter -var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal) -var networkReceivedBytesByTag *metrics.TagCounter - -var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal) -var networkMessageReceivedByTag *metrics.TagCounter -var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal) -var networkMessageSentByTag *metrics.TagCounter - -var networkHandleMicrosByTag *metrics.TagCounter -var networkHandleCountByTag *metrics.TagCounter - -var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal) -var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"}) - -var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal) -var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal) -var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal) -var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal) -var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal) -var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal) - // defaultSendMessageTags is the default list of messages which a peer would // allow to be sent without receiving any explicit request. var defaultSendMessageTags = map[protocol.Tag]bool{ @@ -204,6 +167,16 @@ type sendMessages struct { onRelease func() } +//msgp:ignore peerType +type peerType int + +const ( + // peerTypeWs is a peer that is connected over a websocket connection + peerTypeWs peerType = iota + // peerTypeP2P is a peer that is connected over an P2P connection + peerTypeP2P +) + type wsPeer struct { // lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer. // "successful communication" above refers to either reading from or writing to a connection without receiving any @@ -318,6 +291,10 @@ type wsPeer struct { // closers is a slice of functions to run when the peer is closed closers []func() + + // peerType defines the peer's underlying connection type + // used for separate p2p vs ws metrics + peerType peerType } // HTTPPeer is what the opaque Peer might be. @@ -639,10 +616,17 @@ func (wp *wsPeer) readLoop() { } msg.Net = wp.net wp.lastPacketTime.Store(msg.Received) - networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) - networkMessageReceivedTotal.AddUint64(1, nil) - networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) - networkMessageReceivedByTag.Add(string(tag[:]), 1) + if wp.peerType == peerTypeWs { + networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) + networkMessageReceivedTotal.AddUint64(1, nil) + networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) + networkMessageReceivedByTag.Add(string(tag[:]), 1) + } else { + networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) + networkP2PMessageReceivedTotal.AddUint64(1, nil) + networkP2PReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) + networkP2PMessageReceivedByTag.Add(string(tag[:]), 1) + } msg.Sender = wp // for outgoing connections, we want to notify the connection monitor that we've received @@ -863,11 +847,19 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { return disconnectWriteError } wp.lastPacketTime.Store(time.Now().UnixNano()) - networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) - networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) - networkMessageSentTotal.AddUint64(1, nil) - networkMessageSentByTag.Add(string(tag), 1) - networkMessageQueueMicrosTotal.AddUint64(uint64(time.Now().Sub(msg.peerEnqueued).Nanoseconds()/1000), nil) + if wp.peerType == peerTypeWs { + networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) + networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) + networkMessageSentTotal.AddUint64(1, nil) + networkMessageSentByTag.Add(string(tag), 1) + networkMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) + } else { + networkP2PSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) + networkP2PSentBytesByTag.Add(string(tag), uint64(len(msg.data))) + networkP2PMessageSentTotal.AddUint64(1, nil) + networkP2PMessageSentByTag.Add(string(tag), 1) + networkP2PMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil) + } return disconnectReasonNone } diff --git a/test/heapwatch/block_history_plot.py b/test/heapwatch/block_history_plot.py index d8c86b454f..0bf8a2c8a2 100644 --- a/test/heapwatch/block_history_plot.py +++ b/test/heapwatch/block_history_plot.py @@ -23,6 +23,7 @@ # Graph over time of TPS or 10-round-moving-average-TPS import base64 +import json import os import statistics import sys @@ -106,12 +107,25 @@ def process(path, args): prevtc = tc prevts = ts prevtime = _time - print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}]'.format( + print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}], total txns {}'.format( count, mintxn,maxtxn, mindt,maxdt, mintps,maxtps, + tc, )) + if tc > 0: + with open(path + '.stats', 'w') as fout: + fout.write(json.dumps({ + 'blocks': count, + 'tc': tc, + 'mintxn': mintxn, + 'maxtxn': maxtxn, + 'mindt': mindt, + 'maxdt': maxdt, + 'mintps': mintps, + 'maxtps': maxtps, + })) start = 0 end = len(txnv)-1 diff --git a/test/heapwatch/metrics_aggs.py b/test/heapwatch/metrics_aggs.py index 0189634be5..d20593c097 100644 --- a/test/heapwatch/metrics_aggs.py +++ b/test/heapwatch/metrics_aggs.py @@ -33,7 +33,7 @@ from plotly.subplots import make_subplots -from metrics_lib import MetricType, parse_metrics, gather_metrics_files_by_nick +from metrics_lib import Metric, MetricType, parse_metrics, gather_metrics_files_by_nick logger = logging.getLogger(__name__) @@ -53,6 +53,8 @@ def main(): ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated') ap.add_argument('-s', '--save', type=str, choices=['png', 'html'], help=f'save plot to \'{default_img_filename}\' or \'{default_html_filename}\' file instead of showing it') ap.add_argument('--verbose', default=False, action='store_true') + ap.add_argument('--avg-max', default=False, action='store_true', help='print avg of max values across nodes for each metric') + ap.add_argument('--avg-max-min', default=False, action='store_true', help='print avg of max-min values across nodes for each metric') args = ap.parse_args() if args.verbose: @@ -99,6 +101,7 @@ def main(): } fig['layout']['height'] = 500 * nrows + nick_series = {} for nick, files_by_date in filesByNick.items(): active_metrics = {} @@ -146,7 +149,10 @@ def main(): active_metric_names.sort() active_metrics[full_name] = active_metric_names idx += 1 - + + if args.avg_max or args.avg_max_min: + nick_series[nick] = raw_series + for i, metric_pair in enumerate(sorted(active_metrics.items())): metric_name, metric_fullnames = metric_pair for metric_fullname in metric_fullnames: @@ -158,6 +164,23 @@ def main(): line=dict(width=1), ), i+1, 1) + if args.avg_max or args.avg_max_min: + metric_names_nick_max_avg = {} + for nick, raw_series in nick_series.items(): + for metric_name, rw in raw_series.items(): + mmax = max(rw) + mmin = min(rw) + print(f'{nick}: {metric_name}: count {len(rw)}, max {mmax}, min {mmin}, min-max {mmax - mmin}') + metric = Metric(metric_name, 0, MetricType.COUNTER) + if metric.short_name() not in metric_names_nick_max_avg: + metric_names_nick_max_avg[metric.short_name()] = [] + if args.avg_max_min: + metric_names_nick_max_avg[metric.short_name()].append(mmax - mmin) + if args.avg_max: + metric_names_nick_max_avg[metric.short_name()].append(mmax) + for metric_name, val in metric_names_nick_max_avg.items(): + print(f'{metric_name}: avg {sum(val)/len(val)}') + if args.save: if args.save == 'html': target_path = os.path.join(args.dir, default_html_filename) diff --git a/test/heapwatch/metrics_delta.py b/test/heapwatch/metrics_delta.py index 2d64ee097a..3ff1afca65 100644 --- a/test/heapwatch/metrics_delta.py +++ b/test/heapwatch/metrics_delta.py @@ -136,6 +136,8 @@ def __init__(self, label=None): self.tpsMeanSum = 0 self.txBpsMeanSum = 0 self.rxBpsMeanSum = 0 + self.txP2PBpsMeanSum = 0 + self.rxP2PBpsMeanSum = 0 self.tpsSum = 0 self.blockTimeSum = 0 self.sumsCount = 0 @@ -152,6 +154,8 @@ def __call__(self, ttr, nick): self.tpsMeanSum += meanOrZero(ttr.tpsList) self.txBpsMeanSum += meanOrZero(ttr.txBpsList) self.rxBpsMeanSum += meanOrZero(ttr.rxBpsList) + self.txP2PBpsMeanSum += meanOrZero(ttr.txP2PBpsList) + self.rxP2PBpsMeanSum += meanOrZero(ttr.rxP2PBpsList) self.tpsSum += ttr.tps self.blockTimeSum += ttr.blockTime self.sumsCount += 1 @@ -164,8 +168,10 @@ def blockinfo(self, curtime): return self.biByTime.get(curtime) def byMsg(self, html=False): - txPSums = {} - rxPSums = {} + txWsPSums = {} + rxWsPSums = {} + txP2PPSums = {} + rxP2PPSums = {} secondsSum = 0 txMax = {} txMin = {} @@ -175,8 +181,10 @@ def byMsg(self, html=False): for nick, ns in self.nodes.items(): nicks.append(nick) secondsSum += ns.secondsSum - dictSum(txPSums, ns.txPSums) - dictSum(rxPSums, ns.rxPSums) + dictSum(txWsPSums, ns.txPSums) + dictSum(rxWsPSums, ns.rxPSums) + dictSum(txP2PPSums, ns.txP2PPSums) + dictSum(rxP2PPSums, ns.rxP2PPSums) dictMax(txMax, ns.txPLists) dictMax(rxMax, ns.rxPLists) dictMin(txMin, ns.txPLists) @@ -185,23 +193,36 @@ def byMsg(self, html=False): lines = [] if html: lines.append('
{}
'.format(nodesummary)) - lines.append('') + lines.append('
tx B/srx B/s
') # traffic per tag two columns: ws and p2p else: lines.append(nodesummary) - lines.append('\ttx B/s\trx B/s') - for msg, txB in txPSums.items(): - if msg not in rxPSums: - rxPSums[msg] = 0 - for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True): - txBps = txPSums.get(msg,0)/secondsSum - if (txBps < 0.5) and (rxBps < 0.5): - continue + + for title, txPSums, rxPSums in [ + ('ws', txWsPSums, rxWsPSums), + ('p2p', txP2PPSums, rxP2PPSums), + ]: if html: - lines.append(''.format(msg, txBps, rxBps)) + lines.append('') if html: - lines.append('
{}{:.0f}{:.0f}
') + lines.append(f'') else: - lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps)) + lines.append(f'{title} traffic per tag') + lines.append('\ttx B/s\trx B/s') + for msg, txB in txPSums.items(): + if msg not in rxPSums: + rxPSums[msg] = 0 + for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True): + txBps = txPSums.get(msg,0)/secondsSum + if (txBps < 0.5) and (rxBps < 0.5): + continue + if html: + lines.append(''.format(msg, txBps, rxBps)) + else: + lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps)) + if html: + lines.append('
{title} traffic per tag
tx B/srx B/s
{}{:.0f}{:.0f}
') + lines.append('
') + lines.append('') # traffic per tag two columns: ws and p2p return '\n'.join(lines) def txPool(self): @@ -230,7 +251,7 @@ def html(self): def str(self, html=False): if not self.sumsCount: - tps, txbps, rxbps = math.nan, math.nan, math.nan + tps, txbps, rxbps, txP2Pbps, rxP2Pbps = math.nan, math.nan, math.nan, math.nan, math.nan blockTimes = math.nan else: #tps = self.tpsMeanSum/self.sumsCount @@ -238,6 +259,8 @@ def str(self, html=False): blockTimes = self.blockTimeSum/self.sumsCount txbps = self.txBpsMeanSum/self.sumsCount rxbps = self.rxBpsMeanSum/self.sumsCount + txP2Pbps = self.txP2PBpsMeanSum/self.sumsCount + rxP2Pbps = self.rxP2PBpsMeanSum/self.sumsCount labelspace = "" if self.label: labelspace = self.label + " " @@ -248,12 +271,12 @@ def str(self, html=False): else: verifyMillis = '' if html: - fmt = '{byMsg}\n{verifyMillis}
{labelspace}{txPool}
\n
{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s
' + fmt = '{byMsg}\n{verifyMillis}
{labelspace}{txPool}
\n
{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s
' if self.label: fmt = '
' + self.label + '
' + fmt else: - fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s' - return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), bt=blockTimes, verifyMillis=verifyMillis) + fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s' + return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), txP2PBps=hunum(txP2Pbps), rxP2PBps=hunum(rxP2Pbps), bt=blockTimes, verifyMillis=verifyMillis) def plot_pool(self, outpath): from matplotlib import pyplot as plt @@ -486,17 +509,27 @@ def __init__(self): self.deltas = [] self.txBpsList = [] self.rxBpsList = [] + self.txP2PBpsList = [] + self.rxP2PBpsList = [] self.tpsList = [] self.txBSum = 0 self.rxBSum = 0 + self.txP2PBSum = 0 + self.rxP2PBSum = 0 self.txnSum = 0 self.secondsSum = 0 # algod_network_received_bytes_* self.rxPLists = {} self.rxPSums = {} + # algod_network_p2p_received_bytes_* + self.rxP2PPLists = {} + self.rxP2PPSums = {} # algod_network_sent_bytes_* self.txPLists = {} self.txPSums = {} + # algod_network_p2p_sent_bytes_* + self.txP2PPLists = {} + self.txP2PPSums = {} self.times = [] # algod_tx_pool_count self.txPool = [] @@ -533,7 +566,7 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): reportpath = args.report[:-4] + nick + '.csv' reportf = open(reportpath, 'wt') writer = csv.writer(reportf) - writer.writerow(('when', 'tx bytes/s', 'rx bytes/s','TPS', 's/block')) + writer.writerow(('when', 'tx bytes/s', 'rx bytes/s', 'tx p2p bytes/s', 'rx p2p bytes/s', 'TPS', 's/block')) prev = None prevtime = None prevPath = None @@ -587,6 +620,11 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): rxBytes = d.get('algod_network_received_bytes_total',0) txBytesPerSec = txBytes / dt rxBytesPerSec = rxBytes / dt + txP2PBytes = d.get('algod_network_p2p_sent_bytes_total',0) + rxP2PBytes = d.get('algod_network_p2p_received_bytes_total',0) + txP2PBytesPerSec = txP2PBytes / dt + rxP2PBytesPerSec = rxP2PBytes / dt + # TODO: gather algod_network_sent_bytes_* and algod_network_received_bytes_* if (tps is None) or ((args.mintps is not None) and (tps < args.mintps)): # do not sum up this row @@ -594,18 +632,26 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): else: self.txBpsList.append(txBytesPerSec) self.rxBpsList.append(rxBytesPerSec) + self.txP2PBpsList.append(txP2PBytesPerSec) + self.rxP2PBpsList.append(rxP2PBytesPerSec) self.tpsList.append(tps) self.txBSum += txBytes self.rxBSum += rxBytes + self.txP2PBSum += txP2PBytes + self.rxP2PBSum += rxP2PBytes self.txnSum += txnCount self.secondsSum += dt perProtocol('algod_network_sent_bytes_', self.txPLists, self.txPSums, d, dt) perProtocol('algod_network_received_bytes_', self.rxPLists, self.rxPSums, d, dt) + perProtocol('algod_network_p2p_sent_bytes_', self.txP2PPLists, self.txP2PPSums, d, dt) + perProtocol('algod_network_p2p_received_bytes_', self.rxP2PPLists, self.rxP2PPSums, d, dt) if writer: writer.writerow(( time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(curtime)), txBytesPerSec, rxBytesPerSec, + txP2PBytesPerSec, + rxP2PBytesPerSec, tps, blocktime, )) @@ -631,13 +677,14 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None): self.blockTime = totalDt / rounds if writer and self.txBpsList: writer.writerow([]) - for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]): - pass + # TODO: summarize + # for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]): + # pass writer.writerow([]) - writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.tpsList)]) - writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txnSum/self.secondsSum]) - writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.tpsList)]) - writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.tpsList)]) + writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.txP2PBpsList), min(self.rxP2PBpsList), min(self.tpsList)]) + writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txP2PBSum/self.secondsSum, self.rxP2PBSum/self.secondsSum, self.txnSum/self.secondsSum]) + writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.txP2PBpsList), max(self.rxP2PBpsList), max(self.tpsList)]) + writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.txP2PBpsList), statistics.pstdev(self.rxP2PBpsList), statistics.pstdev(self.tpsList)]) if reportf: reportf.close() if self.deltas and args.deltas: diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py index fbda555b90..5fc7b36075 100644 --- a/test/heapwatch/metrics_lib.py +++ b/test/heapwatch/metrics_lib.py @@ -54,7 +54,7 @@ def hunum(x): return '{:.1f}k'.format(x / 1000.0) if x >= 1000: return '{:.2f}k'.format(x / 1000.0) - return '{:.2f}x'.format(x) + return '{:.2f}'.format(x) def test_metric_line_re(): diff --git a/test/heapwatch/requirements.txt b/test/heapwatch/requirements.txt index db92372c6d..cf443a24e4 100644 --- a/test/heapwatch/requirements.txt +++ b/test/heapwatch/requirements.txt @@ -6,5 +6,5 @@ plotly==5.16.0 py-algorand-sdk==2.3.0 kaleido==0.2.1 networkx==3.3 -gravis=0.1.0 -termcolor=2.4.0 +gravis==0.1.0 +termcolor==2.4.0 diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index fcc566312f..d7afe6439c 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -39,6 +39,14 @@ var ( NetworkMessageReceivedTotal = MetricName{Name: "algod_network_message_received_total", Description: "Total number of complete messages that were received from the network"} // NetworkMessageSentTotal Total number of complete messages that were sent to the network NetworkMessageSentTotal = MetricName{Name: "algod_network_message_sent_total", Description: "Total number of complete messages that were sent to the network"} + // NetworkP2PSentBytesTotal Total number of bytes that were sent over the p2p network + NetworkP2PSentBytesTotal = MetricName{Name: "algod_network_p2p_sent_bytes_total", Description: "Total number of bytes that were sent over the p2p network"} + // NetworkP2PReceivedBytesTotal Total number of bytes that were received from the p2p network + NetworkP2PReceivedBytesTotal = MetricName{Name: "algod_network_p2p_received_bytes_total", Description: "Total number of bytes that were received from the p2p network"} + // NetworkP2PMessageReceivedTotal Total number of complete messages that were received from the p2p network + NetworkP2PMessageReceivedTotal = MetricName{Name: "algod_network_p2p_message_received_total", Description: "Total number of complete messages that were received from the p2p network"} + // NetworkP2PMessageSentTotal Total number of complete messages that were sent to the p2p network + NetworkP2PMessageSentTotal = MetricName{Name: "algod_network_p2p_message_sent_total", Description: "Total number of complete messages that were sent to the p2p network"} // NetworkConnectionsDroppedTotal Total number of connections that were dropped before a message NetworkConnectionsDroppedTotal = MetricName{Name: "algod_network_connections_dropped_total", Description: "Total number of connections that were dropped before a message"} // NetworkSentDecompressedBytesTotal Total number of bytes that were sent over the network prior of being compressed