From 9ac99db122ab16415bcd76045a25b9150c261a72 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Thu, 23 May 2024 16:47:25 -0700 Subject: [PATCH 1/5] Move NetworkMetrics definition --- consensus/core/src/metrics.rs | 25 +-------------------- consensus/core/src/network/anemo_network.rs | 6 ++--- consensus/core/src/network/metrics.rs | 25 +++++++++++++++++++++ 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 99ed8641d80e9..422bf91985f35 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -10,7 +10,7 @@ use prometheus::{ HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; -use crate::network::metrics::{NetworkRouteMetrics, QuinnConnectionMetrics}; +use crate::network::metrics::{NetworkMetrics, QuinnConnectionMetrics}; // starts from 1μs, 50μs, 100μs... const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ @@ -486,26 +486,3 @@ impl ChannelMetrics { } } } - -// Fields for network-agnostic metrics can be added here -pub(crate) struct NetworkMetrics { - pub(crate) network_type: IntGaugeVec, - pub(crate) inbound: NetworkRouteMetrics, - pub(crate) outbound: NetworkRouteMetrics, -} - -impl NetworkMetrics { - pub(crate) fn new(registry: &Registry) -> Self { - Self { - network_type: register_int_gauge_vec_with_registry!( - "network_type", - "Type of the network used: anemo or tonic", - &["type"], - registry - ) - .unwrap(), - inbound: NetworkRouteMetrics::new("inbound", registry), - outbound: NetworkRouteMetrics::new("outbound", registry), - } - } -} diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index d46cc00b80ee6..bbb422bb275d1 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -403,10 +403,8 @@ impl NetworkManager for AnemoManager { } ); let epoch_string: String = self.context.committee.epoch().to_string(); - let inbound_network_metrics = - Arc::new(self.context.metrics.network_metrics.inbound.clone()); - let outbound_network_metrics = - Arc::new(self.context.metrics.network_metrics.outbound.clone()); + let inbound_network_metrics = self.context.metrics.network_metrics.inbound.clone(); + let outbound_network_metrics = self.context.metrics.network_metrics.outbound.clone(); let quinn_connection_metrics = self.context.metrics.quinn_connection_metrics.clone(); let all_peer_ids = self .context diff --git a/consensus/core/src/network/metrics.rs b/consensus/core/src/network/metrics.rs index f9c9278feccff..ebc972079b1ec 100644 --- a/consensus/core/src/network/metrics.rs +++ b/consensus/core/src/network/metrics.rs @@ -1,12 +1,37 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, register_int_gauge_with_registry, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; +// Fields for network-agnostic metrics can be added here +pub(crate) struct NetworkMetrics { + pub(crate) network_type: IntGaugeVec, + pub(crate) inbound: Arc, + pub(crate) outbound: Arc, +} + +impl NetworkMetrics { + pub(crate) fn new(registry: &Registry) -> Self { + Self { + network_type: register_int_gauge_vec_with_registry!( + "network_type", + "Type of the network used: anemo or tonic", + &["type"], + registry + ) + .unwrap(), + inbound: Arc::new(NetworkRouteMetrics::new("inbound", registry)), + outbound: Arc::new(NetworkRouteMetrics::new("outbound", registry)), + } + } +} + #[derive(Clone)] pub(crate) struct QuinnConnectionMetrics { /// The connection status of known peers. 0 if not connected, 1 if connected. From 42c6fc1dbf27f7d56265bbb011791619b5c5abeb Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Tue, 21 May 2024 16:47:41 -0700 Subject: [PATCH 2/5] Abstract metrics layer with anemo support. --- consensus/core/src/network/anemo_network.rs | 163 ++++++-------------- consensus/core/src/network/metrics_layer.rs | 122 +++++++++++++++ consensus/core/src/network/mod.rs | 1 + 3 files changed, 170 insertions(+), 116 deletions(-) create mode 100644 consensus/core/src/network/metrics_layer.rs diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index bbb422bb275d1..3ce71492d317b 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -22,7 +22,6 @@ use async_trait::async_trait; use bytes::Bytes; use cfg_if::cfg_if; use consensus_config::{AuthorityIndex, NetworkKeyPair}; -use prometheus::HistogramTimer; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::error::RecvError; use tracing::{debug, error, warn}; @@ -34,7 +33,7 @@ use super::{ }, connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle}, epoch_filter::{AllowedEpoch, EPOCH_HEADER_KEY}, - metrics::NetworkRouteMetrics, + metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse}, BlockStream, NetworkClient, NetworkManager, NetworkService, }; use crate::{ @@ -428,7 +427,7 @@ impl NetworkManager for AnemoManager { .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)), ) - .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new( + .layer(CallbackLayer::new(MetricsCallbackMaker::new( inbound_network_metrics, self.context.parameters.anemo.excessive_message_size, ))) @@ -444,7 +443,7 @@ impl NetworkManager for AnemoManager { .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)), ) - .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new( + .layer(CallbackLayer::new(MetricsCallbackMaker::new( outbound_network_metrics, self.context.parameters.anemo.excessive_message_size, ))) @@ -574,6 +573,50 @@ impl NetworkManager for AnemoManager { } } +// Adapt MetricsCallbackMaker and MetricsResponseCallback to anemo. + +impl SizedRequest for anemo::Request { + fn size(&self) -> usize { + self.body().len() + } + + fn route(&self) -> String { + self.route().to_string() + } +} + +impl SizedResponse for anemo::Response { + fn size(&self) -> usize { + self.body().len() + } + + fn error_type(&self) -> Option { + if self.status().is_success() { + None + } else { + Some(self.status().to_string()) + } + } +} + +impl MakeCallbackHandler for MetricsCallbackMaker { + type Handler = MetricsResponseCallback; + + fn make_handler(&self, request: &anemo::Request) -> Self::Handler { + self.handle_request(request) + } +} + +impl ResponseHandler for MetricsResponseCallback { + fn on_response(self, response: &anemo::Response) { + self.on_response(response) + } + + fn on_error(self, err: &E) { + self.on_error(err) + } +} + /// Network message types. #[derive(Clone, Serialize, Deserialize)] pub(crate) struct SendBlockRequest { @@ -609,115 +652,3 @@ pub(crate) struct FetchCommitsResponse { // Serialized SignedBlock that certify the last commit from above. certifier_blocks: Vec, } - -#[derive(Clone)] -pub(crate) struct MetricsMakeCallbackHandler { - metrics: Arc, - /// Size in bytes above which a request or response message is considered excessively large - excessive_message_size: usize, -} - -impl MetricsMakeCallbackHandler { - pub fn new(metrics: Arc, excessive_message_size: usize) -> Self { - Self { - metrics, - excessive_message_size, - } - } -} - -impl MakeCallbackHandler for MetricsMakeCallbackHandler { - type Handler = MetricsResponseHandler; - - fn make_handler(&self, request: &anemo::Request) -> Self::Handler { - let route = request.route().to_owned(); - - self.metrics.requests.with_label_values(&[&route]).inc(); - self.metrics - .inflight_requests - .with_label_values(&[&route]) - .inc(); - let body_len = request.body().len(); - self.metrics - .request_size - .with_label_values(&[&route]) - .observe(body_len as f64); - if body_len > self.excessive_message_size { - warn!( - "Saw excessively large request with size {body_len} for {route} with peer {:?}", - request.peer_id() - ); - self.metrics - .excessive_size_requests - .with_label_values(&[&route]) - .inc(); - } - - let timer = self - .metrics - .request_latency - .with_label_values(&[&route]) - .start_timer(); - - MetricsResponseHandler { - metrics: self.metrics.clone(), - timer, - route, - excessive_message_size: self.excessive_message_size, - } - } -} - -pub(crate) struct MetricsResponseHandler { - metrics: Arc, - // The timer is held on to and "observed" once dropped - #[allow(unused)] - timer: HistogramTimer, - route: String, - excessive_message_size: usize, -} - -impl ResponseHandler for MetricsResponseHandler { - fn on_response(self, response: &anemo::Response) { - let body_len = response.body().len(); - self.metrics - .response_size - .with_label_values(&[&self.route]) - .observe(body_len as f64); - if body_len > self.excessive_message_size { - warn!( - "Saw excessively large response with size {body_len} for {} with peer {:?}", - self.route, - response.peer_id() - ); - self.metrics - .excessive_size_responses - .with_label_values(&[&self.route]) - .inc(); - } - - if !response.status().is_success() { - let status = response.status().to_u16().to_string(); - self.metrics - .errors - .with_label_values(&[&self.route, &status]) - .inc(); - } - } - - fn on_error(self, _error: &E) { - self.metrics - .errors - .with_label_values(&[&self.route, "unknown"]) - .inc(); - } -} - -impl Drop for MetricsResponseHandler { - fn drop(&mut self) { - self.metrics - .inflight_requests - .with_label_values(&[&self.route]) - .dec(); - } -} diff --git a/consensus/core/src/network/metrics_layer.rs b/consensus/core/src/network/metrics_layer.rs new file mode 100644 index 0000000000000..83faa9bbcb8de --- /dev/null +++ b/consensus/core/src/network/metrics_layer.rs @@ -0,0 +1,122 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use prometheus::HistogramTimer; + +use super::metrics::NetworkRouteMetrics; + +/// Tower layer adapters that allow specifying callbacks for request and response handling +/// exist for both anemo and http. So the metrics layer implementation can be reused across +/// networking stacks. + +pub(crate) trait SizedRequest { + fn size(&self) -> usize; + fn route(&self) -> String; +} + +pub(crate) trait SizedResponse { + fn size(&self) -> usize; + fn error_type(&self) -> Option; +} + +#[derive(Clone)] +pub(crate) struct MetricsCallbackMaker { + metrics: Arc, + /// Size in bytes above which a request or response message is considered excessively large + excessive_message_size: usize, +} + +impl MetricsCallbackMaker { + pub(crate) fn new(metrics: Arc, excessive_message_size: usize) -> Self { + Self { + metrics, + excessive_message_size, + } + } + + // Update request metrics. And create a callback that should be called on response. + pub(crate) fn handle_request(&self, request: &dyn SizedRequest) -> MetricsResponseCallback { + let route = request.route(); + + self.metrics.requests.with_label_values(&[&route]).inc(); + self.metrics + .inflight_requests + .with_label_values(&[&route]) + .inc(); + let request_size = request.size(); + self.metrics + .request_size + .with_label_values(&[&route]) + .observe(request_size as f64); + if request_size > self.excessive_message_size { + self.metrics + .excessive_size_requests + .with_label_values(&[&route]) + .inc(); + } + + let timer = self + .metrics + .request_latency + .with_label_values(&[&route]) + .start_timer(); + + MetricsResponseCallback { + metrics: self.metrics.clone(), + timer, + route, + excessive_message_size: self.excessive_message_size, + } + } +} + +pub(crate) struct MetricsResponseCallback { + metrics: Arc, + // The timer is held on to and "observed" once dropped + #[allow(unused)] + timer: HistogramTimer, + route: String, + excessive_message_size: usize, +} + +impl MetricsResponseCallback { + // Update response metrics. + pub(crate) fn on_response(self, response: &dyn SizedResponse) { + let response_size = response.size(); + self.metrics + .response_size + .with_label_values(&[&self.route]) + .observe(response_size as f64); + if response_size > self.excessive_message_size { + self.metrics + .excessive_size_responses + .with_label_values(&[&self.route]) + .inc(); + } + + if let Some(err) = response.error_type() { + self.metrics + .errors + .with_label_values(&[&self.route, &err]) + .inc(); + } + } + + pub(crate) fn on_error(self, _error: &E) { + self.metrics + .errors + .with_label_values(&[&self.route, "unknown"]) + .inc(); + } +} + +impl Drop for MetricsResponseCallback { + fn drop(&mut self) { + self.metrics + .inflight_requests + .with_label_values(&[&self.route]) + .dec(); + } +} diff --git a/consensus/core/src/network/mod.rs b/consensus/core/src/network/mod.rs index 24539d1ef9813..7ad87a8abf121 100644 --- a/consensus/core/src/network/mod.rs +++ b/consensus/core/src/network/mod.rs @@ -45,6 +45,7 @@ pub(crate) mod anemo_network; pub(crate) mod connection_monitor; pub(crate) mod epoch_filter; pub(crate) mod metrics; +mod metrics_layer; #[cfg(test)] mod network_tests; #[cfg(test)] From bb15ab7aa98426d082f3f9552615e42d90a63c2c Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Fri, 24 May 2024 09:09:10 -0700 Subject: [PATCH 3/5] Tonic metrics without size --- Cargo.lock | 1 + consensus/config/src/parameters.rs | 6 +- consensus/core/Cargo.toml | 1 + consensus/core/src/network/metrics_layer.rs | 20 ++-- consensus/core/src/network/tonic_network.rs | 101 ++++++++++++++++++-- 5 files changed, 114 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df8d287b72a85..c99be0cacfa1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2604,6 +2604,7 @@ dependencies = [ "enum_dispatch", "fastcrypto", "futures", + "http", "hyper", "hyper-rustls 0.24.0", "itertools 0.10.5", diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index c21df9a058611..6e1e6e9fe3362 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -199,7 +199,11 @@ impl TonicParameters { } fn default_message_size_limit() -> usize { - 8 << 20 + 32 << 20 + } + + pub fn excessive_message_size(&self) -> usize { + self.message_size_limit * 3 / 4 } } diff --git a/consensus/core/Cargo.toml b/consensus/core/Cargo.toml index 11314ba9e14a6..ae558c94e7563 100644 --- a/consensus/core/Cargo.toml +++ b/consensus/core/Cargo.toml @@ -21,6 +21,7 @@ dashmap.workspace = true enum_dispatch.workspace = true fastcrypto.workspace = true futures.workspace = true +http.workspace = true hyper.workspace = true hyper-rustls.workspace = true itertools.workspace = true diff --git a/consensus/core/src/network/metrics_layer.rs b/consensus/core/src/network/metrics_layer.rs index 83faa9bbcb8de..5b54eabcd7191 100644 --- a/consensus/core/src/network/metrics_layer.rs +++ b/consensus/core/src/network/metrics_layer.rs @@ -46,10 +46,12 @@ impl MetricsCallbackMaker { .with_label_values(&[&route]) .inc(); let request_size = request.size(); - self.metrics - .request_size - .with_label_values(&[&route]) - .observe(request_size as f64); + if request_size > 0 { + self.metrics + .request_size + .with_label_values(&[&route]) + .observe(request_size as f64); + } if request_size > self.excessive_message_size { self.metrics .excessive_size_requests @@ -85,10 +87,12 @@ impl MetricsResponseCallback { // Update response metrics. pub(crate) fn on_response(self, response: &dyn SizedResponse) { let response_size = response.size(); - self.metrics - .response_size - .with_label_values(&[&self.route]) - .observe(response_size as f64); + if response_size > 0 { + self.metrics + .response_size + .with_label_values(&[&self.route]) + .observe(response_size as f64); + } if response_size > self.excessive_message_size { self.metrics .excessive_size_responses diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 61e11e82115b4..2da19432ef6c6 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -16,7 +16,11 @@ use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey}; use futures::{stream, Stream, StreamExt as _}; use hyper::server::conn::Http; use mysten_common::sync::notify_once::NotifyOnce; -use mysten_network::{multiaddr::Protocol, Multiaddr}; +use mysten_network::{ + callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler}, + multiaddr::Protocol, + Multiaddr, +}; use parking_lot::RwLock; use tokio::{ pin, @@ -25,14 +29,15 @@ use tokio::{ }; use tokio_rustls::TlsAcceptor; use tokio_stream::{iter, Iter}; -use tonic::{ - transport::{Channel, Server}, - Request, Response, Streaming, +use tonic::{transport::Server, Request, Response, Streaming}; +use tower_http::{ + trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer}, + ServiceBuilderExt, }; -use tower_http::ServiceBuilderExt; use tracing::{debug, error, info, trace, warn}; use super::{ + metrics_layer::{MetricsCallbackMaker, MetricsResponseCallback, SizedRequest, SizedResponse}, tonic_gen::{ consensus_service_client::ConsensusServiceClient, consensus_service_server::ConsensusService, @@ -240,6 +245,15 @@ impl NetworkClient for TonicClient { } } +// Tonic channel wrapped with layers. +type Channel = mysten_network::callback::Callback< + tower_http::trace::Trace< + tonic::transport::Channel, + tower_http::classify::SharedClassifier, + >, + MetricsCallbackMaker, +>; + /// Manages a pool of connections to peers to avoid constantly reconnecting, /// which can be expensive. struct ChannelPool { @@ -276,7 +290,7 @@ impl ChannelPool { let address = format!("https://{address}"); let config = &self.context.parameters.tonic; let buffer_size = config.connection_buffer_size; - let endpoint = Channel::from_shared(address.clone()) + let endpoint = tonic::transport::Channel::from_shared(address.clone()) .unwrap() .connect_timeout(timeout) .initial_connection_window_size(Some(buffer_size as u32)) @@ -316,6 +330,18 @@ impl ChannelPool { }; trace!("Connected to {address}"); + let channel = tower::ServiceBuilder::new() + .layer(CallbackLayer::new(MetricsCallbackMaker::new( + self.context.metrics.network_metrics.outbound.clone(), + self.context.parameters.tonic.excessive_message_size(), + ))) + .layer( + TraceLayer::new_for_grpc() + .make_span_with(DefaultMakeSpan::new().level(tracing::Level::TRACE)) + .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)), + ) + .service(channel); + let mut channels = self.channels.write(); // There should not be many concurrent attempts at connecting to the same peer. let channel = channels.entry(peer).or_insert(channel); @@ -531,6 +557,11 @@ impl NetworkManager for TonicManager { let config = &self.context.parameters.tonic; let consensus_service = Server::builder() + .layer( + TraceLayer::new_for_grpc() + .make_span_with(DefaultMakeSpan::new().level(tracing::Level::TRACE)) + .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)), + ) .initial_connection_window_size(64 << 20) .initial_stream_window_size(32 << 20) .http2_keepalive_interval(Some(config.keepalive_interval)) @@ -543,6 +574,9 @@ impl NetworkManager for TonicManager { ) .into_service(); + let inbound_metrics = self.context.metrics.network_metrics.inbound.clone(); + let excessive_message_size = self.context.parameters.tonic.excessive_message_size(); + let mut http = Http::new(); http.http2_only(true); let http = Arc::new(http); @@ -649,6 +683,7 @@ impl NetworkManager for TonicManager { let tls_acceptor = tls_acceptor.clone(); let consensus_service = consensus_service.clone(); + let inbound_metrics = inbound_metrics.clone(); let http = http.clone(); let connections_info = connections_info.clone(); let shutdown_notif = shutdown_notif.clone(); @@ -698,6 +733,10 @@ impl NetworkManager for TonicManager { // NOTE: the PeerInfo extension is copied to every request served. // If PeerInfo starts to contain complex values, it should be wrapped in an Arc<>. .add_extension(PeerInfo { authority_index }) + .layer(CallbackLayer::new(MetricsCallbackMaker::new( + inbound_metrics, + excessive_message_size, + ))) .service(consensus_service.clone()); pin! { @@ -827,6 +866,56 @@ struct PeerInfo { authority_index: AuthorityIndex, } +// Adapt MetricsCallbackMaker and MetricsResponseCallback to http. + +impl SizedRequest for http::request::Parts { + fn size(&self) -> usize { + // TODO: implement this. + 0 + } + + fn route(&self) -> String { + let path = self.uri.path(); + path.rsplit_once('/') + .map(|(_, route)| route) + .unwrap_or("unknown") + .to_string() + } +} + +impl SizedResponse for http::response::Parts { + fn size(&self) -> usize { + // TODO: implement this. + 0 + } + + fn error_type(&self) -> Option { + if self.status.is_success() { + None + } else { + Some(self.status.to_string()) + } + } +} + +impl MakeCallbackHandler for MetricsCallbackMaker { + type Handler = MetricsResponseCallback; + + fn make_handler(&self, request: &http::request::Parts) -> Self::Handler { + self.handle_request(request) + } +} + +impl ResponseHandler for MetricsResponseCallback { + fn on_response(self, response: &http::response::Parts) { + self.on_response(response) + } + + fn on_error(self, err: &E) { + self.on_error(err) + } +} + /// Network message types. #[derive(Clone, prost::Message)] pub(crate) struct SendBlockRequest { From 09d3a15172f23f1dfa3d75089f701947646764e5 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Fri, 24 May 2024 14:40:09 -0700 Subject: [PATCH 4/5] . --- consensus/config/src/parameters.rs | 6 +++++- .../tests/snapshots/parameters_test__parameters.snap | 2 +- consensus/core/src/network/tonic_network.rs | 8 ++++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index 6e1e6e9fe3362..0a6ec01f3e4de 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -186,7 +186,7 @@ pub struct TonicParameters { /// /// If unspecified, this will default to 8MiB. #[serde(default = "TonicParameters::default_message_size_limit")] - pub message_size_limit: usize, + message_size_limit: usize, } impl TonicParameters { @@ -205,6 +205,10 @@ impl TonicParameters { pub fn excessive_message_size(&self) -> usize { self.message_size_limit * 3 / 4 } + + pub fn max_message_size(&self) -> usize { + self.message_size_limit + } } impl Default for TonicParameters { diff --git a/consensus/config/tests/snapshots/parameters_test__parameters.snap b/consensus/config/tests/snapshots/parameters_test__parameters.snap index 61d9085c26426..65e1ddbec609b 100644 --- a/consensus/config/tests/snapshots/parameters_test__parameters.snap +++ b/consensus/config/tests/snapshots/parameters_test__parameters.snap @@ -24,4 +24,4 @@ tonic: secs: 5 nanos: 0 connection_buffer_size: 33554432 - message_size_limit: 8388608 + message_size_limit: 33554432 diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 2da19432ef6c6..cff9cb87e0d07 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -94,8 +94,8 @@ impl TonicClient { .get_channel(self.network_keypair.clone(), peer, timeout) .await?; Ok(ConsensusServiceClient::new(channel) - .max_encoding_message_size(config.message_size_limit) - .max_decoding_message_size(config.message_size_limit)) + .max_encoding_message_size(config.max_message_size()) + .max_decoding_message_size(config.max_message_size())) } } @@ -569,8 +569,8 @@ impl NetworkManager for TonicManager { // tcp keepalive is unsupported by msim .add_service( ConsensusServiceServer::new(service) - .max_encoding_message_size(config.message_size_limit) - .max_decoding_message_size(config.message_size_limit), + .max_encoding_message_size(config.max_message_size()) + .max_decoding_message_size(config.max_message_size()), ) .into_service(); From 94d2eeb4ccc539fe54e2d7cd3a557b16226895b4 Mon Sep 17 00:00:00 2001 From: Mingwei Tian Date: Tue, 28 May 2024 10:11:02 -0700 Subject: [PATCH 5/5] 1 --- consensus/config/src/parameters.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index 0a6ec01f3e4de..d2c8001e47a31 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -182,9 +182,11 @@ pub struct TonicParameters { #[serde(default = "TonicParameters::default_connection_buffer_size")] pub connection_buffer_size: usize, - /// Message size limits for both requests and responses. + /// Hard message size limit for both requests and responses. + /// This value is higher than strictly necessary, to allow overheads. + /// Message size targets and soft limits are computed based on this value. /// - /// If unspecified, this will default to 8MiB. + /// If unspecified, this will default to 32MiB. #[serde(default = "TonicParameters::default_message_size_limit")] message_size_limit: usize, }