diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 5039698bb..79fd798af 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,7 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(256); + let (command_sender, mut command_rcv) = mpsc::channel::(4096); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index ae6f132e4..dc0a59dbb 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 2024; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(2048); + let (command_sender, command_receiver) = mpsc::channel(204_800); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 41808c13a..d26100175 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 2048; + pub(crate) const CHANNEL_SIZE: usize = 10_000; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 4_096; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 30e46f24d..dd792e5b2 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -6,7 +6,7 @@ lazy_static! { std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(20_480); /// Size of the channel between double echo and the task manager pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") @@ -18,10 +18,10 @@ lazy_static! { std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(20_480); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE - .checked_mul(10) + .checked_mul(100) .map(|v| { let r: usize = v.checked_div(100).unwrap_or(*COMMAND_CHANNEL_SIZE); r @@ -32,5 +32,5 @@ lazy_static! { std::env::var("TOPOS_PENDING_LIMIT_PER_REQUEST_TO_STORAGE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(1000); + .unwrap_or(10_000); } diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 3b9e0274f..ddcf9af76 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -55,7 +55,7 @@ pub struct DoubleEcho { } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 2048; + pub const MAX_BUFFER_SIZE: usize = 20_480; #[allow(clippy::too_many_arguments)] pub fn new( diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index a6682abb5..efb866b82 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 10_000; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 10_000; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 10_000; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0869893af..537c1c70e 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -22,8 +22,8 @@ impl TceProxyWorker { /// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { - let (command_sender, mut command_rcv) = mpsc::channel::(128); - let (evt_sender, evt_rcv) = mpsc::channel::(128); + let (command_sender, mut command_rcv) = mpsc::channel::(128_000); + let (evt_sender, evt_rcv) = mpsc::channel::(128_000); let (tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index 0f2612246..64551b7ee 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(100); + let (events, receiver) = mpsc::channel(100_000); ( Self { is_validator,