diff --git a/cmd/algod/main.go b/cmd/algod/main.go
index 311b1507e6..47008256ac 100644
--- a/cmd/algod/main.go
+++ b/cmd/algod/main.go
@@ -445,6 +445,8 @@ var startupConfigCheckFields = []string{
"TxPoolExponentialIncreaseFactor",
"TxPoolSize",
"VerifiedTranscationsCacheSize",
+ "EnableP2P",
+ "EnableP2PHybridMode",
}
func resolveDataDir() string {
diff --git a/network/metrics.go b/network/metrics.go
new file mode 100644
index 0000000000..a1e92b2424
--- /dev/null
+++ b/network/metrics.go
@@ -0,0 +1,206 @@
+// Copyright (C) 2019-2024 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package network
+
+import (
+ pubsub "github.com/libp2p/go-libp2p-pubsub"
+ "github.com/libp2p/go-libp2p/core/peer"
+ p2proto "github.com/libp2p/go-libp2p/core/protocol"
+
+ "github.com/algorand/go-algorand/network/p2p"
+ "github.com/algorand/go-algorand/protocol"
+ "github.com/algorand/go-algorand/util/metrics"
+)
+
+func init() {
+ // all tags are tracked by ws net
+ tagStringList := make([]string, 0, len(protocol.TagList))
+ for _, t := range protocol.TagList {
+ tagStringList = append(tagStringList, string(t))
+ }
+ networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
+ networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
+ networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
+ networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
+ networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK")
+ networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK")
+
+ networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
+ networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
+ networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
+ networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
+}
+
+var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
+var networkP2PSentBytesTotal = metrics.MakeCounter(metrics.NetworkP2PSentBytesTotal)
+var networkSentBytesByTag *metrics.TagCounter
+var networkP2PSentBytesByTag *metrics.TagCounter
+var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
+var networkP2PReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkP2PReceivedBytesTotal)
+var networkReceivedBytesByTag *metrics.TagCounter
+var networkP2PReceivedBytesByTag *metrics.TagCounter
+
+var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
+var networkP2PMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkP2PMessageReceivedTotal)
+var networkMessageReceivedByTag *metrics.TagCounter
+var networkP2PMessageReceivedByTag *metrics.TagCounter
+var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
+var networkP2PMessageSentTotal = metrics.MakeCounter(metrics.NetworkP2PMessageSentTotal)
+var networkMessageSentByTag *metrics.TagCounter
+var networkP2PMessageSentByTag *metrics.TagCounter
+
+var networkHandleMicrosByTag *metrics.TagCounter
+var networkHandleCountByTag *metrics.TagCounter
+
+var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
+var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
+var networkP2PMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_message_sent_queue_micros_total", Description: "Total microseconds p2p message spent waiting in queue to be sent"})
+
+var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal)
+var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal)
+var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
+var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
+var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)
+var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal)
+
+var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
+var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)
+
+var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"})
+var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})
+
+var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
+var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
+var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
+var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
+var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
+var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"})
+
+var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"})
+var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"})
+var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"})
+
+var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
+var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})
+
+var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."})
+var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
+var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})
+
+var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
+var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
+var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
+var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)
+
+var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"})
+var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"})
+
+var _ = pubsub.RawTracer(pubsubMetricsTracer{})
+
+// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.
+type pubsubMetricsTracer struct{}
+
+// AddPeer is invoked when a new peer is added.
+func (t pubsubMetricsTracer) AddPeer(p peer.ID, proto p2proto.ID) {}
+
+// RemovePeer is invoked when a peer is removed.
+func (t pubsubMetricsTracer) RemovePeer(p peer.ID) {}
+
+// Join is invoked when a new topic is joined
+func (t pubsubMetricsTracer) Join(topic string) {}
+
+// Leave is invoked when a topic is abandoned
+func (t pubsubMetricsTracer) Leave(topic string) {}
+
+// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
+func (t pubsubMetricsTracer) Graft(p peer.ID, topic string) {}
+
+// Prune is invoked when a peer is pruned from the message (gossipsub)
+func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {}
+
+// ValidateMessage is invoked when a message first enters the validation pipeline.
+func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) {
+ if msg != nil && msg.Topic != nil {
+ switch *msg.Topic {
+ case p2p.TXTopicName:
+ networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil)
+ networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data)))
+ networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
+ }
+ }
+}
+
+// DeliverMessage is invoked when a message is delivered
+func (t pubsubMetricsTracer) DeliverMessage(msg *pubsub.Message) {
+ transactionMessagesP2PDeliverMessage.Inc(nil)
+}
+
+// RejectMessage is invoked when a message is Rejected or Ignored.
+// The reason argument can be one of the named strings Reject*.
+func (t pubsubMetricsTracer) RejectMessage(msg *pubsub.Message, reason string) {
+ // TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly.
+ // Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals.
+ switch reason {
+ case pubsub.RejectValidationThrottled:
+ transactionMessagesP2PRejectMessage.Add("throttled", 1)
+ case pubsub.RejectValidationQueueFull:
+ transactionMessagesP2PRejectMessage.Add("full", 1)
+ case pubsub.RejectValidationFailed:
+ transactionMessagesP2PRejectMessage.Add("failed", 1)
+ case pubsub.RejectValidationIgnored:
+ transactionMessagesP2PRejectMessage.Add("ignored", 1)
+ default:
+ transactionMessagesP2PRejectMessage.Add("other", 1)
+ }
+}
+
+// DuplicateMessage is invoked when a duplicate message is dropped.
+func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) {
+ transactionMessagesP2PDuplicateMessage.Inc(nil)
+}
+
+// ThrottlePeer is invoked when a peer is throttled by the peer gater.
+func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {}
+
+// RecvRPC is invoked when an incoming RPC is received.
+func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {
+ networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil)
+}
+
+// SendRPC is invoked when a RPC is sent.
+func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
+ networkP2PGossipSubSentBytesTotal.AddUint64(uint64(rpc.Size()), nil)
+ for i := range rpc.GetPublish() {
+ if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil {
+ switch *rpc.Publish[i].Topic {
+ case p2p.TXTopicName:
+ networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
+ networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
+ networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1)
+ }
+ }
+ }
+}
+
+// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
+func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}
+
+// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
+// the pressure release mechanism trigger, dropping messages.
+func (t pubsubMetricsTracer) UndeliverableMessage(msg *pubsub.Message) {
+ transactionMessagesP2PUnderdeliverableMessage.Inc(nil)
+}
diff --git a/network/metrics_test.go b/network/metrics_test.go
new file mode 100644
index 0000000000..857ab57051
--- /dev/null
+++ b/network/metrics_test.go
@@ -0,0 +1,76 @@
+// Copyright (C) 2019-2024 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package network
+
+import (
+ "go/ast"
+ "go/parser"
+ "go/token"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+// TestPubsubTracer_TagList makes sure pubsubMetricsTracer traces pubsub messages
+// by counting switch cases in SendRPC and ValidateMessage
+func TestMetrics_PubsubTracer_TagList(t *testing.T) {
+ t.Parallel()
+ partitiontest.PartitionTest(t)
+
+ fset := token.NewFileSet()
+ f, err := parser.ParseFile(fset, "metrics.go", nil, 0)
+ require.NoError(t, err)
+
+ // Find the SendRPC/ValidateMessage functions and count the switch cases
+ var sendCaseCount int
+ var recvCaseCount int
+ ast.Inspect(f, func(n ast.Node) bool {
+ switch stmt := n.(type) {
+ case *ast.FuncDecl:
+ if stmt.Name.Name == "SendRPC" {
+ ast.Inspect(stmt.Body, func(n ast.Node) bool {
+ if switchStmt, ok := n.(*ast.SwitchStmt); ok {
+ for _, stmt := range switchStmt.Body.List {
+ if _, ok := stmt.(*ast.CaseClause); ok {
+ sendCaseCount++
+ }
+ }
+ }
+ return true
+ })
+ }
+ if stmt.Name.Name == "ValidateMessage" {
+ ast.Inspect(stmt.Body, func(n ast.Node) bool {
+ if switchStmt, ok := n.(*ast.SwitchStmt); ok {
+ for _, stmt := range switchStmt.Body.List {
+ if _, ok := stmt.(*ast.CaseClause); ok {
+ recvCaseCount++
+ }
+ }
+ }
+ return true
+ })
+ }
+ }
+ return true
+ })
+
+ require.Equal(t, len(gossipSubTags), sendCaseCount)
+ require.Equal(t, len(gossipSubTags), recvCaseCount)
+}
diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go
index 3b467b0b27..70fe09027a 100644
--- a/network/p2p/p2p.go
+++ b/network/p2p/p2p.go
@@ -176,7 +176,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
}
// MakeService creates a P2P service instance
-func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) {
+func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) {
sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
@@ -188,7 +188,7 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho
telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance)
h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() })
- ps, err := makePubSub(ctx, cfg, h)
+ ps, err := makePubSub(ctx, cfg, h, metricsTracer)
if err != nil {
return nil, err
}
diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go
index 657baecdde..a592657010 100644
--- a/network/p2p/pubsub.go
+++ b/network/p2p/pubsub.go
@@ -51,11 +51,14 @@ const (
)
// TXTopicName defines a pubsub topic for TX messages
-const TXTopicName = "/algo/tx/0.1.0"
+// There is a micro optimization for const string comparison:
+// 8 bytes const string require a single x86-64 CMPQ instruction.
+// Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version
+const TXTopicName = "algotx01"
const incomingThreads = 20 // matches to number wsNetwork workers
-func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) {
+func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) {
//defaultParams := pubsub.DefaultGossipSubParams()
options := []pubsub.Option{
@@ -98,7 +101,10 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithValidateThrottle(cfg.TxBacklogSize),
pubsub.WithValidateWorkers(incomingThreads),
- pubsub.WithRawTracer(pubsubTracer{}),
+ }
+
+ if metricsTracer != nil {
+ options = append(options, pubsub.WithRawTracer(metricsTracer))
}
return pubsub.NewGossipSub(ctx, host, options...)
diff --git a/network/p2p/pubsubTracer.go b/network/p2p/pubsubTracer.go
deleted file mode 100644
index ca57bc69ce..0000000000
--- a/network/p2p/pubsubTracer.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright (C) 2019-2024 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see .
-
-package p2p
-
-import (
- pubsub "github.com/libp2p/go-libp2p-pubsub"
- "github.com/libp2p/go-libp2p/core/peer"
- "github.com/libp2p/go-libp2p/core/protocol"
-
- "github.com/algorand/go-algorand/util/metrics"
-)
-
-var _ = pubsub.RawTracer(pubsubTracer{})
-
-var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
-var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
-var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
-var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)
-var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage)
-
-// pubsubTracer is a tracer for pubsub events used to track metrics.
-type pubsubTracer struct{}
-
-// AddPeer is invoked when a new peer is added.
-func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {}
-
-// RemovePeer is invoked when a peer is removed.
-func (t pubsubTracer) RemovePeer(p peer.ID) {}
-
-// Join is invoked when a new topic is joined
-func (t pubsubTracer) Join(topic string) {}
-
-// Leave is invoked when a topic is abandoned
-func (t pubsubTracer) Leave(topic string) {}
-
-// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
-func (t pubsubTracer) Graft(p peer.ID, topic string) {}
-
-// Prune is invoked when a peer is pruned from the message (gossipsub)
-func (t pubsubTracer) Prune(p peer.ID, topic string) {}
-
-// ValidateMessage is invoked when a message first enters the validation pipeline.
-func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {
- transactionMessagesP2PValidateMessage.Inc(nil)
-}
-
-// DeliverMessage is invoked when a message is delivered
-func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {
- transactionMessagesP2PDeliverMessage.Inc(nil)
-}
-
-// RejectMessage is invoked when a message is Rejected or Ignored.
-// The reason argument can be one of the named strings Reject*.
-func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) {
- switch reason {
- case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored:
- transactionMessagesP2PRejectMessage.Add(reason, 1)
- default:
- transactionMessagesP2PRejectMessage.Add("other", 1)
- }
-}
-
-// DuplicateMessage is invoked when a duplicate message is dropped.
-func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) {
- transactionMessagesP2PDuplicateMessage.Inc(nil)
-}
-
-// ThrottlePeer is invoked when a peer is throttled by the peer gater.
-func (t pubsubTracer) ThrottlePeer(p peer.ID) {}
-
-// RecvRPC is invoked when an incoming RPC is received.
-func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {}
-
-// SendRPC is invoked when a RPC is sent.
-func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {}
-
-// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
-func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}
-
-// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
-// the pressure release mechanism trigger, dropping messages.
-func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {
- transactionMessagesP2PUnderdeliverableMessage.Inc(nil)
-}
diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go
index f9dc04b785..d3af60a223 100644
--- a/network/p2pNetwork.go
+++ b/network/p2pNetwork.go
@@ -194,6 +194,11 @@ type p2pPeerStats struct {
txReceived atomic.Uint64
}
+// gossipSubTags defines protocol messages that are relayed using GossipSub
+var gossipSubTags = map[protocol.Tag]string{
+ protocol.TxnTag: p2p.TXTopicName,
+}
+
// NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service
func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo, identityOpts *identityOpts) (*P2PNetwork, error) {
const readBufferLen = 2048
@@ -214,7 +219,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
config: cfg,
genesisID: genesisID,
networkID: networkID,
- topicTags: map[protocol.Tag]string{protocol.TxnTag: p2p.TXTopicName},
+ topicTags: gossipSubTags,
wsPeers: make(map[peer.ID]*wsPeer),
wsPeersToIDs: make(map[*wsPeer]peer.ID),
peerStats: make(map[peer.ID]*p2pPeerStats),
@@ -261,7 +266,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
}
log.Infof("P2P host created: peer ID %s addrs %s", h.ID(), h.Addrs())
- net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler)
+ net.service, err = p2p.MakeService(net.ctx, log, cfg, h, la, net.wsStreamHandler, pubsubMetricsTracer{})
if err != nil {
return nil, err
}
@@ -791,6 +796,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, strea
conn: &wsPeerConnP2P{stream: stream},
outgoing: !incoming,
identity: netIdentPeerID,
+ peerType: peerTypeP2P,
}
protos, err := n.pstore.GetProtocols(p2pPeer)
if err != nil {
diff --git a/network/wsNetwork.go b/network/wsNetwork.go
index 2af3a9b6bf..ecb636c8e2 100644
--- a/network/wsNetwork.go
+++ b/network/wsNetwork.go
@@ -53,7 +53,6 @@ import (
tools_network "github.com/algorand/go-algorand/tools/network"
"github.com/algorand/go-algorand/tools/network/dnssec"
"github.com/algorand/go-algorand/util"
- "github.com/algorand/go-algorand/util/metrics"
)
const incomingThreads = 20
@@ -117,35 +116,6 @@ const wsMaxHeaderBytes = 4096
// used from the ReservedFDs pool, as this pool is meant for short-lived usage (dns queries, disk i/o, etc.)
const ReservedHealthServiceConnections = 10
-var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
-var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)
-
-var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"})
-var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})
-
-var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
-var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
-var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
-var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
-var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
-var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"})
-
-var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"})
-var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"})
-var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"})
-
-var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
-var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})
-
-var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."})
-var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."})
-var medianPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_median_ping_seconds", Description: "Network round trip time to median peer in seconds."})
-var maxPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_max_ping_seconds", Description: "Network round trip time to slowest peer in seconds."})
-
-var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."})
-var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
-var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})
-
// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete.
const peerDisconnectionAckDuration = 5 * time.Second
diff --git a/network/wsPeer.go b/network/wsPeer.go
index 88a0c615f9..a6a982af59 100644
--- a/network/wsPeer.go
+++ b/network/wsPeer.go
@@ -39,7 +39,6 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
- "github.com/algorand/go-algorand/util/metrics"
)
// MaxMessageLength is the maximum length of a message that can be sent or received, exported to be used in the node.TestMaxSizesCorrect test
@@ -52,20 +51,7 @@ const averageMessageLength = 2 * 1024 // Most of the messages are smaller tha
// buffer and starve messages from other peers.
const msgsInReadBufferPerPeer = 10
-var tagStringList []string
-
func init() {
- tagStringList = make([]string, len(protocol.TagList))
- for i, t := range protocol.TagList {
- tagStringList[i] = string(t)
- }
- networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
- networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
- networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
- networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
- networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK")
- networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK")
-
matched := false
for _, version := range SupportedProtocolVersions {
if version == versionPeerFeatures {
@@ -83,29 +69,6 @@ func init() {
}
}
-var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
-var networkSentBytesByTag *metrics.TagCounter
-var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
-var networkReceivedBytesByTag *metrics.TagCounter
-
-var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
-var networkMessageReceivedByTag *metrics.TagCounter
-var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
-var networkMessageSentByTag *metrics.TagCounter
-
-var networkHandleMicrosByTag *metrics.TagCounter
-var networkHandleCountByTag *metrics.TagCounter
-
-var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
-var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
-
-var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal)
-var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal)
-var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
-var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
-var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)
-var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal)
-
// defaultSendMessageTags is the default list of messages which a peer would
// allow to be sent without receiving any explicit request.
var defaultSendMessageTags = map[protocol.Tag]bool{
@@ -204,6 +167,16 @@ type sendMessages struct {
onRelease func()
}
+//msgp:ignore peerType
+type peerType int
+
+const (
+ // peerTypeWs is a peer that is connected over a websocket connection
+ peerTypeWs peerType = iota
+ // peerTypeP2P is a peer that is connected over an P2P connection
+ peerTypeP2P
+)
+
type wsPeer struct {
// lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer.
// "successful communication" above refers to either reading from or writing to a connection without receiving any
@@ -318,6 +291,10 @@ type wsPeer struct {
// closers is a slice of functions to run when the peer is closed
closers []func()
+
+ // peerType defines the peer's underlying connection type
+ // used for separate p2p vs ws metrics
+ peerType peerType
}
// HTTPPeer is what the opaque Peer might be.
@@ -639,10 +616,17 @@ func (wp *wsPeer) readLoop() {
}
msg.Net = wp.net
wp.lastPacketTime.Store(msg.Received)
- networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil)
- networkMessageReceivedTotal.AddUint64(1, nil)
- networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2))
- networkMessageReceivedByTag.Add(string(tag[:]), 1)
+ if wp.peerType == peerTypeWs {
+ networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil)
+ networkMessageReceivedTotal.AddUint64(1, nil)
+ networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2))
+ networkMessageReceivedByTag.Add(string(tag[:]), 1)
+ } else {
+ networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil)
+ networkP2PMessageReceivedTotal.AddUint64(1, nil)
+ networkP2PReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2))
+ networkP2PMessageReceivedByTag.Add(string(tag[:]), 1)
+ }
msg.Sender = wp
// for outgoing connections, we want to notify the connection monitor that we've received
@@ -863,11 +847,19 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason {
return disconnectWriteError
}
wp.lastPacketTime.Store(time.Now().UnixNano())
- networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil)
- networkSentBytesByTag.Add(string(tag), uint64(len(msg.data)))
- networkMessageSentTotal.AddUint64(1, nil)
- networkMessageSentByTag.Add(string(tag), 1)
- networkMessageQueueMicrosTotal.AddUint64(uint64(time.Now().Sub(msg.peerEnqueued).Nanoseconds()/1000), nil)
+ if wp.peerType == peerTypeWs {
+ networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil)
+ networkSentBytesByTag.Add(string(tag), uint64(len(msg.data)))
+ networkMessageSentTotal.AddUint64(1, nil)
+ networkMessageSentByTag.Add(string(tag), 1)
+ networkMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil)
+ } else {
+ networkP2PSentBytesTotal.AddUint64(uint64(len(msg.data)), nil)
+ networkP2PSentBytesByTag.Add(string(tag), uint64(len(msg.data)))
+ networkP2PMessageSentTotal.AddUint64(1, nil)
+ networkP2PMessageSentByTag.Add(string(tag), 1)
+ networkP2PMessageQueueMicrosTotal.AddUint64(uint64(time.Since(msg.peerEnqueued).Nanoseconds()/1000), nil)
+ }
return disconnectReasonNone
}
diff --git a/test/heapwatch/block_history_plot.py b/test/heapwatch/block_history_plot.py
index d8c86b454f..0bf8a2c8a2 100644
--- a/test/heapwatch/block_history_plot.py
+++ b/test/heapwatch/block_history_plot.py
@@ -23,6 +23,7 @@
# Graph over time of TPS or 10-round-moving-average-TPS
import base64
+import json
import os
import statistics
import sys
@@ -106,12 +107,25 @@ def process(path, args):
prevtc = tc
prevts = ts
prevtime = _time
- print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}]'.format(
+ print('{} blocks, block txns [{}-{}], block seconds [{}-{}], tps [{}-{}], total txns {}'.format(
count,
mintxn,maxtxn,
mindt,maxdt,
mintps,maxtps,
+ tc,
))
+ if tc > 0:
+ with open(path + '.stats', 'w') as fout:
+ fout.write(json.dumps({
+ 'blocks': count,
+ 'tc': tc,
+ 'mintxn': mintxn,
+ 'maxtxn': maxtxn,
+ 'mindt': mindt,
+ 'maxdt': maxdt,
+ 'mintps': mintps,
+ 'maxtps': maxtps,
+ }))
start = 0
end = len(txnv)-1
diff --git a/test/heapwatch/metrics_aggs.py b/test/heapwatch/metrics_aggs.py
index 0189634be5..d20593c097 100644
--- a/test/heapwatch/metrics_aggs.py
+++ b/test/heapwatch/metrics_aggs.py
@@ -33,7 +33,7 @@
from plotly.subplots import make_subplots
-from metrics_lib import MetricType, parse_metrics, gather_metrics_files_by_nick
+from metrics_lib import Metric, MetricType, parse_metrics, gather_metrics_files_by_nick
logger = logging.getLogger(__name__)
@@ -53,6 +53,8 @@ def main():
ap.add_argument('--nick-lre', action='append', default=[], help='label:regexp to filter node names, may be repeated')
ap.add_argument('-s', '--save', type=str, choices=['png', 'html'], help=f'save plot to \'{default_img_filename}\' or \'{default_html_filename}\' file instead of showing it')
ap.add_argument('--verbose', default=False, action='store_true')
+ ap.add_argument('--avg-max', default=False, action='store_true', help='print avg of max values across nodes for each metric')
+ ap.add_argument('--avg-max-min', default=False, action='store_true', help='print avg of max-min values across nodes for each metric')
args = ap.parse_args()
if args.verbose:
@@ -99,6 +101,7 @@ def main():
}
fig['layout']['height'] = 500 * nrows
+ nick_series = {}
for nick, files_by_date in filesByNick.items():
active_metrics = {}
@@ -146,7 +149,10 @@ def main():
active_metric_names.sort()
active_metrics[full_name] = active_metric_names
idx += 1
-
+
+ if args.avg_max or args.avg_max_min:
+ nick_series[nick] = raw_series
+
for i, metric_pair in enumerate(sorted(active_metrics.items())):
metric_name, metric_fullnames = metric_pair
for metric_fullname in metric_fullnames:
@@ -158,6 +164,23 @@ def main():
line=dict(width=1),
), i+1, 1)
+ if args.avg_max or args.avg_max_min:
+ metric_names_nick_max_avg = {}
+ for nick, raw_series in nick_series.items():
+ for metric_name, rw in raw_series.items():
+ mmax = max(rw)
+ mmin = min(rw)
+ print(f'{nick}: {metric_name}: count {len(rw)}, max {mmax}, min {mmin}, min-max {mmax - mmin}')
+ metric = Metric(metric_name, 0, MetricType.COUNTER)
+ if metric.short_name() not in metric_names_nick_max_avg:
+ metric_names_nick_max_avg[metric.short_name()] = []
+ if args.avg_max_min:
+ metric_names_nick_max_avg[metric.short_name()].append(mmax - mmin)
+ if args.avg_max:
+ metric_names_nick_max_avg[metric.short_name()].append(mmax)
+ for metric_name, val in metric_names_nick_max_avg.items():
+ print(f'{metric_name}: avg {sum(val)/len(val)}')
+
if args.save:
if args.save == 'html':
target_path = os.path.join(args.dir, default_html_filename)
diff --git a/test/heapwatch/metrics_delta.py b/test/heapwatch/metrics_delta.py
index 2d64ee097a..3ff1afca65 100644
--- a/test/heapwatch/metrics_delta.py
+++ b/test/heapwatch/metrics_delta.py
@@ -136,6 +136,8 @@ def __init__(self, label=None):
self.tpsMeanSum = 0
self.txBpsMeanSum = 0
self.rxBpsMeanSum = 0
+ self.txP2PBpsMeanSum = 0
+ self.rxP2PBpsMeanSum = 0
self.tpsSum = 0
self.blockTimeSum = 0
self.sumsCount = 0
@@ -152,6 +154,8 @@ def __call__(self, ttr, nick):
self.tpsMeanSum += meanOrZero(ttr.tpsList)
self.txBpsMeanSum += meanOrZero(ttr.txBpsList)
self.rxBpsMeanSum += meanOrZero(ttr.rxBpsList)
+ self.txP2PBpsMeanSum += meanOrZero(ttr.txP2PBpsList)
+ self.rxP2PBpsMeanSum += meanOrZero(ttr.rxP2PBpsList)
self.tpsSum += ttr.tps
self.blockTimeSum += ttr.blockTime
self.sumsCount += 1
@@ -164,8 +168,10 @@ def blockinfo(self, curtime):
return self.biByTime.get(curtime)
def byMsg(self, html=False):
- txPSums = {}
- rxPSums = {}
+ txWsPSums = {}
+ rxWsPSums = {}
+ txP2PPSums = {}
+ rxP2PPSums = {}
secondsSum = 0
txMax = {}
txMin = {}
@@ -175,8 +181,10 @@ def byMsg(self, html=False):
for nick, ns in self.nodes.items():
nicks.append(nick)
secondsSum += ns.secondsSum
- dictSum(txPSums, ns.txPSums)
- dictSum(rxPSums, ns.rxPSums)
+ dictSum(txWsPSums, ns.txPSums)
+ dictSum(rxWsPSums, ns.rxPSums)
+ dictSum(txP2PPSums, ns.txP2PPSums)
+ dictSum(rxP2PPSums, ns.rxP2PPSums)
dictMax(txMax, ns.txPLists)
dictMax(rxMax, ns.rxPLists)
dictMin(txMin, ns.txPLists)
@@ -185,23 +193,36 @@ def byMsg(self, html=False):
lines = []
if html:
lines.append('
{}
'.format(nodesummary))
- lines.append(' | tx B/s | rx B/s |
')
+ lines.append('') # traffic per tag two columns: ws and p2p
else:
lines.append(nodesummary)
- lines.append('\ttx B/s\trx B/s')
- for msg, txB in txPSums.items():
- if msg not in rxPSums:
- rxPSums[msg] = 0
- for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True):
- txBps = txPSums.get(msg,0)/secondsSum
- if (txBps < 0.5) and (rxBps < 0.5):
- continue
+
+ for title, txPSums, rxPSums in [
+ ('ws', txWsPSums, rxWsPSums),
+ ('p2p', txP2PPSums, rxP2PPSums),
+ ]:
if html:
- lines.append('
{} | {:.0f} | {:.0f} |
'.format(msg, txBps, rxBps))
+ lines.append('')
+ lines.append(f'{title} traffic per tag | tx B/s | rx B/s | ')
else:
- lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps))
+ lines.append(f'{title} traffic per tag')
+ lines.append('\ttx B/s\trx B/s')
+ for msg, txB in txPSums.items():
+ if msg not in rxPSums:
+ rxPSums[msg] = 0
+ for rxBps, msg in sorted([(rxB/secondsSum, msg) for msg, rxB in rxPSums.items()], reverse=True):
+ txBps = txPSums.get(msg,0)/secondsSum
+ if (txBps < 0.5) and (rxBps < 0.5):
+ continue
+ if html:
+ lines.append('{} | {:.0f} | {:.0f} | '.format(msg, txBps, rxBps))
+ else:
+ lines.append('{}\t{:.0f}\t{:.0f}'.format(msg, txBps, rxBps))
+ if html:
+ lines.append(' ')
+ lines.append(' | ')
if html:
- lines.append('
')
+ lines.append('
') # traffic per tag two columns: ws and p2p
return '\n'.join(lines)
def txPool(self):
@@ -230,7 +251,7 @@ def html(self):
def str(self, html=False):
if not self.sumsCount:
- tps, txbps, rxbps = math.nan, math.nan, math.nan
+ tps, txbps, rxbps, txP2Pbps, rxP2Pbps = math.nan, math.nan, math.nan, math.nan, math.nan
blockTimes = math.nan
else:
#tps = self.tpsMeanSum/self.sumsCount
@@ -238,6 +259,8 @@ def str(self, html=False):
blockTimes = self.blockTimeSum/self.sumsCount
txbps = self.txBpsMeanSum/self.sumsCount
rxbps = self.rxBpsMeanSum/self.sumsCount
+ txP2Pbps = self.txP2PBpsMeanSum/self.sumsCount
+ rxP2Pbps = self.rxP2PBpsMeanSum/self.sumsCount
labelspace = ""
if self.label:
labelspace = self.label + " "
@@ -248,12 +271,12 @@ def str(self, html=False):
else:
verifyMillis = ''
if html:
- fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}
\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s
'
+ fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}
\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s
'
if self.label:
fmt = '' + self.label + '
' + fmt
else:
- fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s'
- return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), bt=blockTimes, verifyMillis=verifyMillis)
+ fmt = '{byMsg}\n{verifyMillis}{labelspace}{txPool}\n{labelspace}summary: {TPS:0.2f} TPS, {bt:1.2f}s/block, tx {txBps}B/s, rx {rxBps}B/s, p2p tx {txP2PBps}B/s, p2p rx {rxP2PBps}B/s'
+ return fmt.format(labelspace=labelspace, byMsg=self.byMsg(html), txPool=self.txPool(), TPS=tps, txBps=hunum(txbps), rxBps=hunum(rxbps), txP2PBps=hunum(txP2Pbps), rxP2PBps=hunum(rxP2Pbps), bt=blockTimes, verifyMillis=verifyMillis)
def plot_pool(self, outpath):
from matplotlib import pyplot as plt
@@ -486,17 +509,27 @@ def __init__(self):
self.deltas = []
self.txBpsList = []
self.rxBpsList = []
+ self.txP2PBpsList = []
+ self.rxP2PBpsList = []
self.tpsList = []
self.txBSum = 0
self.rxBSum = 0
+ self.txP2PBSum = 0
+ self.rxP2PBSum = 0
self.txnSum = 0
self.secondsSum = 0
# algod_network_received_bytes_*
self.rxPLists = {}
self.rxPSums = {}
+ # algod_network_p2p_received_bytes_*
+ self.rxP2PPLists = {}
+ self.rxP2PPSums = {}
# algod_network_sent_bytes_*
self.txPLists = {}
self.txPSums = {}
+ # algod_network_p2p_sent_bytes_*
+ self.txP2PPLists = {}
+ self.txP2PPSums = {}
self.times = []
# algod_tx_pool_count
self.txPool = []
@@ -533,7 +566,7 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None):
reportpath = args.report[:-4] + nick + '.csv'
reportf = open(reportpath, 'wt')
writer = csv.writer(reportf)
- writer.writerow(('when', 'tx bytes/s', 'rx bytes/s','TPS', 's/block'))
+ writer.writerow(('when', 'tx bytes/s', 'rx bytes/s', 'tx p2p bytes/s', 'rx p2p bytes/s', 'TPS', 's/block'))
prev = None
prevtime = None
prevPath = None
@@ -587,6 +620,11 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None):
rxBytes = d.get('algod_network_received_bytes_total',0)
txBytesPerSec = txBytes / dt
rxBytesPerSec = rxBytes / dt
+ txP2PBytes = d.get('algod_network_p2p_sent_bytes_total',0)
+ rxP2PBytes = d.get('algod_network_p2p_received_bytes_total',0)
+ txP2PBytesPerSec = txP2PBytes / dt
+ rxP2PBytesPerSec = rxP2PBytes / dt
+
# TODO: gather algod_network_sent_bytes_* and algod_network_received_bytes_*
if (tps is None) or ((args.mintps is not None) and (tps < args.mintps)):
# do not sum up this row
@@ -594,18 +632,26 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None):
else:
self.txBpsList.append(txBytesPerSec)
self.rxBpsList.append(rxBytesPerSec)
+ self.txP2PBpsList.append(txP2PBytesPerSec)
+ self.rxP2PBpsList.append(rxP2PBytesPerSec)
self.tpsList.append(tps)
self.txBSum += txBytes
self.rxBSum += rxBytes
+ self.txP2PBSum += txP2PBytes
+ self.rxP2PBSum += rxP2PBytes
self.txnSum += txnCount
self.secondsSum += dt
perProtocol('algod_network_sent_bytes_', self.txPLists, self.txPSums, d, dt)
perProtocol('algod_network_received_bytes_', self.rxPLists, self.rxPSums, d, dt)
+ perProtocol('algod_network_p2p_sent_bytes_', self.txP2PPLists, self.txP2PPSums, d, dt)
+ perProtocol('algod_network_p2p_received_bytes_', self.rxP2PPLists, self.rxP2PPSums, d, dt)
if writer:
writer.writerow((
time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(curtime)),
txBytesPerSec,
rxBytesPerSec,
+ txP2PBytesPerSec,
+ rxP2PBytesPerSec,
tps,
blocktime,
))
@@ -631,13 +677,14 @@ def process_files(self, args, nick=None, metrics_files=None, bisource=None):
self.blockTime = totalDt / rounds
if writer and self.txBpsList:
writer.writerow([])
- for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]):
- pass
+ # TODO: summarize
+ # for bsum, msg in sorted([(bsum,msg) for msg,bsum in self.txPSums.items()]):
+ # pass
writer.writerow([])
- writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.tpsList)])
- writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txnSum/self.secondsSum])
- writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.tpsList)])
- writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.tpsList)])
+ writer.writerow(['min', min(self.txBpsList), min(self.rxBpsList), min(self.txP2PBpsList), min(self.rxP2PBpsList), min(self.tpsList)])
+ writer.writerow(['avg', self.txBSum/self.secondsSum, self.rxBSum/self.secondsSum, self.txP2PBSum/self.secondsSum, self.rxP2PBSum/self.secondsSum, self.txnSum/self.secondsSum])
+ writer.writerow(['max', max(self.txBpsList), max(self.rxBpsList), max(self.txP2PBpsList), max(self.rxP2PBpsList), max(self.tpsList)])
+ writer.writerow(['std', statistics.pstdev(self.txBpsList), statistics.pstdev(self.rxBpsList), statistics.pstdev(self.txP2PBpsList), statistics.pstdev(self.rxP2PBpsList), statistics.pstdev(self.tpsList)])
if reportf:
reportf.close()
if self.deltas and args.deltas:
diff --git a/test/heapwatch/metrics_lib.py b/test/heapwatch/metrics_lib.py
index fbda555b90..5fc7b36075 100644
--- a/test/heapwatch/metrics_lib.py
+++ b/test/heapwatch/metrics_lib.py
@@ -54,7 +54,7 @@ def hunum(x):
return '{:.1f}k'.format(x / 1000.0)
if x >= 1000:
return '{:.2f}k'.format(x / 1000.0)
- return '{:.2f}x'.format(x)
+ return '{:.2f}'.format(x)
def test_metric_line_re():
diff --git a/test/heapwatch/requirements.txt b/test/heapwatch/requirements.txt
index db92372c6d..cf443a24e4 100644
--- a/test/heapwatch/requirements.txt
+++ b/test/heapwatch/requirements.txt
@@ -6,5 +6,5 @@ plotly==5.16.0
py-algorand-sdk==2.3.0
kaleido==0.2.1
networkx==3.3
-gravis=0.1.0
-termcolor=2.4.0
+gravis==0.1.0
+termcolor==2.4.0
diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go
index fcc566312f..d7afe6439c 100644
--- a/util/metrics/metrics.go
+++ b/util/metrics/metrics.go
@@ -39,6 +39,14 @@ var (
NetworkMessageReceivedTotal = MetricName{Name: "algod_network_message_received_total", Description: "Total number of complete messages that were received from the network"}
// NetworkMessageSentTotal Total number of complete messages that were sent to the network
NetworkMessageSentTotal = MetricName{Name: "algod_network_message_sent_total", Description: "Total number of complete messages that were sent to the network"}
+ // NetworkP2PSentBytesTotal Total number of bytes that were sent over the p2p network
+ NetworkP2PSentBytesTotal = MetricName{Name: "algod_network_p2p_sent_bytes_total", Description: "Total number of bytes that were sent over the p2p network"}
+ // NetworkP2PReceivedBytesTotal Total number of bytes that were received from the p2p network
+ NetworkP2PReceivedBytesTotal = MetricName{Name: "algod_network_p2p_received_bytes_total", Description: "Total number of bytes that were received from the p2p network"}
+ // NetworkP2PMessageReceivedTotal Total number of complete messages that were received from the p2p network
+ NetworkP2PMessageReceivedTotal = MetricName{Name: "algod_network_p2p_message_received_total", Description: "Total number of complete messages that were received from the p2p network"}
+ // NetworkP2PMessageSentTotal Total number of complete messages that were sent to the p2p network
+ NetworkP2PMessageSentTotal = MetricName{Name: "algod_network_p2p_message_sent_total", Description: "Total number of complete messages that were sent to the p2p network"}
// NetworkConnectionsDroppedTotal Total number of connections that were dropped before a message
NetworkConnectionsDroppedTotal = MetricName{Name: "algod_network_connections_dropped_total", Description: "Total number of connections that were dropped before a message"}
// NetworkSentDecompressedBytesTotal Total number of bytes that were sent over the network prior of being compressed