Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] add tracing and basic metrics to tonic #17921

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,13 @@ pub struct TonicParameters {
#[serde(default = "TonicParameters::default_connection_buffer_size")]
pub connection_buffer_size: usize,

/// Message size limits for both requests and responses.
/// Hard message size limit for both requests and responses.
/// This value is higher than strictly necessary, to allow overheads.
/// Message size targets and soft limits are computed based on this value.
///
/// If unspecified, this will default to 8MiB.
/// If unspecified, this will default to 32MiB.
#[serde(default = "TonicParameters::default_message_size_limit")]
mwtian marked this conversation as resolved.
Show resolved Hide resolved
pub message_size_limit: usize,
message_size_limit: usize,
}

impl TonicParameters {
Expand All @@ -199,7 +201,15 @@ impl TonicParameters {
}

fn default_message_size_limit() -> usize {
8 << 20
32 << 20
}

pub fn excessive_message_size(&self) -> usize {
self.message_size_limit * 3 / 4
}

pub fn max_message_size(&self) -> usize {
self.message_size_limit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ tonic:
secs: 5
nanos: 0
connection_buffer_size: 33554432
message_size_limit: 8388608
message_size_limit: 33554432
1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dashmap.workspace = true
enum_dispatch.workspace = true
fastcrypto.workspace = true
futures.workspace = true
http.workspace = true
hyper.workspace = true
hyper-rustls.workspace = true
itertools.workspace = true
Expand Down
25 changes: 1 addition & 24 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use prometheus::{
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};

use crate::network::metrics::{NetworkRouteMetrics, QuinnConnectionMetrics};
use crate::network::metrics::{NetworkMetrics, QuinnConnectionMetrics};

// starts from 1μs, 50μs, 100μs...
const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -486,26 +486,3 @@ impl ChannelMetrics {
}
}
}

// Fields for network-agnostic metrics can be added here
pub(crate) struct NetworkMetrics {
pub(crate) network_type: IntGaugeVec,
pub(crate) inbound: NetworkRouteMetrics,
pub(crate) outbound: NetworkRouteMetrics,
}

impl NetworkMetrics {
pub(crate) fn new(registry: &Registry) -> Self {
Self {
network_type: register_int_gauge_vec_with_registry!(
"network_type",
"Type of the network used: anemo or tonic",
&["type"],
registry
)
.unwrap(),
inbound: NetworkRouteMetrics::new("inbound", registry),
outbound: NetworkRouteMetrics::new("outbound", registry),
}
}
}
169 changes: 49 additions & 120 deletions consensus/core/src/network/anemo_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use cfg_if::cfg_if;
use consensus_config::{AuthorityIndex, NetworkKeyPair};
use prometheus::HistogramTimer;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, warn};
Expand All @@ -34,7 +33,7 @@ use super::{
},
connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle},
epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY},
metrics::NetworkRouteMetrics,
metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse},
BlockStream, NetworkClient, NetworkManager, NetworkService,
};
use crate::{
Expand Down Expand Up @@ -403,10 +402,8 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
}
);
let epoch_string: String = self.context.committee.epoch().to_string();
let inbound_network_metrics =
Arc::new(self.context.metrics.network_metrics.inbound.clone());
let outbound_network_metrics =
Arc::new(self.context.metrics.network_metrics.outbound.clone());
let inbound_network_metrics = self.context.metrics.network_metrics.inbound.clone();
let outbound_network_metrics = self.context.metrics.network_metrics.outbound.clone();
let quinn_connection_metrics = self.context.metrics.quinn_connection_metrics.clone();
let all_peer_ids = self
.context
Expand All @@ -430,7 +427,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
.layer(CallbackLayer::new(MetricsCallbackMaker::new(
inbound_network_metrics,
self.context.parameters.anemo.excessive_message_size,
)))
Expand All @@ -446,7 +443,7 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
.make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
.on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
)
.layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
.layer(CallbackLayer::new(MetricsCallbackMaker::new(
outbound_network_metrics,
self.context.parameters.anemo.excessive_message_size,
)))
Expand Down Expand Up @@ -576,6 +573,50 @@ impl<S: NetworkService> NetworkManager<S> for AnemoManager {
}
}

// Adapt MetricsCallbackMaker and MetricsResponseCallback to anemo.

impl SizedRequest for anemo::Request<Bytes> {
fn size(&self) -> usize {
self.body().len()
}

fn route(&self) -> String {
self.route().to_string()
}
}

impl SizedResponse for anemo::Response<Bytes> {
fn size(&self) -> usize {
self.body().len()
}

fn error_type(&self) -> Option<String> {
if self.status().is_success() {
None
} else {
Some(self.status().to_string())
}
}
}

impl MakeCallbackHandler for MetricsCallbackMaker {
type Handler = MetricsResponseCallback;

fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
self.handle_request(request)
}
}

impl ResponseHandler for MetricsResponseCallback {
fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
self.on_response(response)
}

fn on_error<E>(self, err: &E) {
self.on_error(err)
}
}

/// Network message types.
#[derive(Clone, Serialize, Deserialize)]
pub(crate) struct SendBlockRequest {
Expand Down Expand Up @@ -611,115 +652,3 @@ pub(crate) struct FetchCommitsResponse {
// Serialized SignedBlock that certify the last commit from above.
certifier_blocks: Vec<Bytes>,
}

#[derive(Clone)]
pub(crate) struct MetricsMakeCallbackHandler {
metrics: Arc<NetworkRouteMetrics>,
/// Size in bytes above which a request or response message is considered excessively large
excessive_message_size: usize,
}

impl MetricsMakeCallbackHandler {
pub fn new(metrics: Arc<NetworkRouteMetrics>, excessive_message_size: usize) -> Self {
Self {
metrics,
excessive_message_size,
}
}
}

impl MakeCallbackHandler for MetricsMakeCallbackHandler {
type Handler = MetricsResponseHandler;

fn make_handler(&self, request: &anemo::Request<bytes::Bytes>) -> Self::Handler {
let route = request.route().to_owned();

self.metrics.requests.with_label_values(&[&route]).inc();
self.metrics
.inflight_requests
.with_label_values(&[&route])
.inc();
let body_len = request.body().len();
self.metrics
.request_size
.with_label_values(&[&route])
.observe(body_len as f64);
if body_len > self.excessive_message_size {
warn!(
"Saw excessively large request with size {body_len} for {route} with peer {:?}",
request.peer_id()
);
self.metrics
.excessive_size_requests
.with_label_values(&[&route])
.inc();
}

let timer = self
.metrics
.request_latency
.with_label_values(&[&route])
.start_timer();

MetricsResponseHandler {
metrics: self.metrics.clone(),
timer,
route,
excessive_message_size: self.excessive_message_size,
}
}
}

pub(crate) struct MetricsResponseHandler {
metrics: Arc<NetworkRouteMetrics>,
// The timer is held on to and "observed" once dropped
#[allow(unused)]
timer: HistogramTimer,
route: String,
excessive_message_size: usize,
}

impl ResponseHandler for MetricsResponseHandler {
fn on_response(self, response: &anemo::Response<bytes::Bytes>) {
let body_len = response.body().len();
self.metrics
.response_size
.with_label_values(&[&self.route])
.observe(body_len as f64);
if body_len > self.excessive_message_size {
warn!(
"Saw excessively large response with size {body_len} for {} with peer {:?}",
self.route,
response.peer_id()
);
self.metrics
.excessive_size_responses
.with_label_values(&[&self.route])
.inc();
}

if !response.status().is_success() {
let status = response.status().to_u16().to_string();
self.metrics
.errors
.with_label_values(&[&self.route, &status])
.inc();
}
}

fn on_error<E>(self, _error: &E) {
self.metrics
.errors
.with_label_values(&[&self.route, "unknown"])
.inc();
}
}

impl Drop for MetricsResponseHandler {
fn drop(&mut self) {
self.metrics
.inflight_requests
.with_label_values(&[&self.route])
.dec();
}
}
25 changes: 25 additions & 0 deletions consensus/core/src/network/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};

// Fields for network-agnostic metrics can be added here
pub(crate) struct NetworkMetrics {
pub(crate) network_type: IntGaugeVec,
pub(crate) inbound: Arc<NetworkRouteMetrics>,
pub(crate) outbound: Arc<NetworkRouteMetrics>,
}

impl NetworkMetrics {
pub(crate) fn new(registry: &Registry) -> Self {
Self {
network_type: register_int_gauge_vec_with_registry!(
"network_type",
"Type of the network used: anemo or tonic",
&["type"],
registry
)
.unwrap(),
inbound: Arc::new(NetworkRouteMetrics::new("inbound", registry)),
outbound: Arc::new(NetworkRouteMetrics::new("outbound", registry)),
}
}
}

#[derive(Clone)]
pub(crate) struct QuinnConnectionMetrics {
/// The connection status of known peers. 0 if not connected, 1 if connected.
Expand Down
Loading
Loading