Skip to content

Commit

Permalink
[Consensus] add tracing and basic metrics to tonic (#17921)
Browse files Browse the repository at this point in the history
## Description 

Reuse the "callback layer" implementations for anemo and tonic.
Add trace layer to tonic.

Request and response size monitoring will be added as a followup.

## Test plan 

CI
Private testnet.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored and arun-koshy committed Jun 6, 2024
1 parent b50e744 commit 80643cb
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 159 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,13 @@ pub struct TonicParameters {
#[serde(default = "TonicParameters::default_connection_buffer_size")]
pub connection_buffer_size: usize,

/// Message size limits for both requests and responses.
/// Hard message size limit for both requests and responses.
/// This value is higher than strictly necessary, to allow overheads.
/// Message size targets and soft limits are computed based on this value.
///
/// If unspecified, this will default to 8MiB.
/// If unspecified, this will default to 32MiB.
#[serde(default = "TonicParameters::default_message_size_limit")]
pub message_size_limit: usize,
message_size_limit: usize,
}

impl TonicParameters {
Expand All @@ -199,7 +201,15 @@ impl TonicParameters {
}

fn default_message_size_limit() -> usize {
8 << 20
32 << 20
}

pub fn excessive_message_size(&self) -> usize {
self.message_size_limit * 3 / 4
}

pub fn max_message_size(&self) -> usize {
self.message_size_limit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ tonic:
secs: 5
nanos: 0
connection_buffer_size: 33554432
message_size_limit: 8388608
message_size_limit: 33554432
1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dashmap.workspace = true
enum_dispatch.workspace = true
fastcrypto.workspace = true
futures.workspace = true
http.workspace = true
hyper.workspace = true
hyper-rustls.workspace = true
itertools.workspace = true
Expand Down
25 changes: 1 addition & 24 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use prometheus::{
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};

use crate::network::metrics::{NetworkRouteMetrics, QuinnConnectionMetrics};
use crate::network::metrics::{NetworkMetrics, QuinnConnectionMetrics};

// starts from 1μs, 50μs, 100μs...
const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -486,26 +486,3 @@ impl ChannelMetrics {
}
}
}

// Fields for network-agnostic metrics can be added here
pub(crate) struct NetworkMetrics {
pub(crate) network_type: IntGaugeVec,
pub(crate) inbound: NetworkRouteMetrics,
pub(crate) outbound: NetworkRouteMetrics,
}

impl NetworkMetrics {
pub(crate) fn new(registry: &Registry) -> Self {
Self {
network_type: register_int_gauge_vec_with_registry!(
"network_type",
"Type of the network used: anemo or tonic",
&["type"],
registry
)
.unwrap(),
inbound: NetworkRouteMetrics::new("inbound", registry),
outbound: NetworkRouteMetrics::new("outbound", registry),
}
}
}
169 changes: 49 additions & 120 deletions consensus/core/src/network/anemo_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use cfg_if::cfg_if;
use consensus_config::{AuthorityIndex, NetworkKeyPair};
use prometheus::HistogramTimer;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, warn};
Expand All @@ -34,7 +33,7 @@ use super::{
},
connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle},
epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY},
metrics::NetworkRouteMetrics,
metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse},
BlockStream, NetworkClient, NetworkManager, NetworkService,
};
use crate::{
Expand Down Expand Up @@ -403,10 +402,8 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
}
);
let epoch_string: String = self.context.committee.epoch().to_string();
let inbound_network_metrics =
Arc::new(self.context.metrics.network_metrics.inbound.clone());
let outbound_network_metrics =
Arc::new(self.context.metrics.network_metrics.outbound.clone());
let inbound_network_metrics = self.context.metrics.network_metrics.inbound.clone();
let outbound_network_metrics = self.context.metrics.network_metrics.outbound.clone();
let quinn_connection_metrics = self.context.metrics.quinn_connection_metrics.clone();
let all_peer_ids = self
.context
Expand All @@ -430,7 +427,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
.layer(CallbackLayer::new(MetricsCallbackMaker::new(
inbound_network_metrics,
self.context.parameters.anemo.excessive_message_size,
)))
Expand All @@ -446,7 +443,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
.layer(CallbackLayer::new(MetricsCallbackMaker::new(
outbound_network_metrics,
self.context.parameters.anemo.excessive_message_size,
)))
Expand Down Expand Up @@ -576,6 +573,50 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
}
}

// Adapt MetricsCallbackMaker and MetricsResponseCallback to anemo.

impl SizedRequest for anemo::Request<Bytes> {
fn size(&self) -> usize {
self.body().len()
}

fn route(&self) -> String {
self.route().to_string()
}
}

impl SizedResponse for anemo::Response<Bytes> {
fn size(&self) -> usize {
self.body().len()
}

fn error_type(&self) -> Option<String> {
if self.status().is_success() {
None
} else {
Some(self.status().to_string())
}
}
}

impl MakeCallbackHandler for MetricsCallbackMaker {
type Handler = MetricsResponseCallback;

fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
self.handle_request(request)
}
}

impl ResponseHandler for MetricsResponseCallback {
fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
self.on_response(response)
}

fn on_error<E>(self, err: &E) {
self.on_error(err)
}
}

/// Network message types.
#[derive(Clone, Serialize, Deserialize)]
pub(crate) struct SendBlockRequest {
Expand Down Expand Up @@ -611,115 +652,3 @@ pub(crate) struct FetchCommitsResponse {
// Serialized SignedBlock that certify the last commit from above.
certifier_blocks: Vec<Bytes>,
}

#[derive(Clone)]
pub(crate) struct MetricsMakeCallbackHandler {
metrics: Arc<NetworkRouteMetrics>,
/// Size in bytes above which a request or response message is considered excessively large
excessive_message_size: usize,
}

impl MetricsMakeCallbackHandler {
pub fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
Self {
metrics,
excessive_message_size,
}
}
}

impl MakeCallbackHandler for MetricsMakeCallbackHandler {
type Handler = MetricsResponseHandler;

fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
let route = request.route().to_owned();

self.metrics.requests.with_label_values(&[&route]).inc();
self.metrics
.inflight_requests
.with_label_values(&[&route])
.inc();
let body_len = request.body().len();
self.metrics
.request_size
.with_label_values(&[&route])
.observe(body_len as f64);
if body_len > self.excessive_message_size {
warn!(
"Saw excessively large request with size {body_len} for {route} with peer {:?}",
request.peer_id()
);
self.metrics
.excessive_size_requests
.with_label_values(&[&route])
.inc();
}

let timer = self
.metrics
.request_latency
.with_label_values(&[&route])
.start_timer();

MetricsResponseHandler {
metrics: self.metrics.clone(),
timer,
route,
excessive_message_size: self.excessive_message_size,
}
}
}

pub(crate) struct MetricsResponseHandler {
metrics: Arc<NetworkRouteMetrics>,
// The timer is held on to and "observed" once dropped
#[allow(unused)]
timer: HistogramTimer,
route: String,
excessive_message_size: usize,
}

impl ResponseHandler for MetricsResponseHandler {
fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
let body_len = response.body().len();
self.metrics
.response_size
.with_label_values(&[&self.route])
.observe(body_len as f64);
if body_len > self.excessive_message_size {
warn!(
"Saw excessively large response with size {body_len} for {} with peer {:?}",
self.route,
response.peer_id()
);
self.metrics
.excessive_size_responses
.with_label_values(&[&self.route])
.inc();
}

if !response.status().is_success() {
let status = response.status().to_u16().to_string();
self.metrics
.errors
.with_label_values(&[&self.route, &status])
.inc();
}
}

fn on_error<E>(self, _error: &E) {
self.metrics
.errors
.with_label_values(&[&self.route, "unknown"])
.inc();
}
}

impl Drop for MetricsResponseHandler {
fn drop(&mut self) {
self.metrics
.inflight_requests
.with_label_values(&[&self.route])
.dec();
}
}
25 changes: 25 additions & 0 deletions consensus/core/src/network/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};

// Fields for network-agnostic metrics can be added here
pub(crate) struct NetworkMetrics {
pub(crate) network_type: IntGaugeVec,
pub(crate) inbound: Arc<NetworkRouteMetrics>,
pub(crate) outbound: Arc<NetworkRouteMetrics>,
}

impl NetworkMetrics {
pub(crate) fn new(registry: &Registry) -> Self {
Self {
network_type: register_int_gauge_vec_with_registry!(
"network_type",
"Type of the network used: anemo or tonic",
&["type"],
registry
)
.unwrap(),
inbound: Arc::new(NetworkRouteMetrics::new("inbound", registry)),
outbound: Arc::new(NetworkRouteMetrics::new("outbound", registry)),
}
}
}

#[derive(Clone)]
pub(crate) struct QuinnConnectionMetrics {
/// The connection status of known peers. 0 if not connected, 1 if connected.
Expand Down
Loading

0 comments on commit 80643cb

Please sign in to comment.