Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: increase channel sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Apr 16, 2024
1 parent 146ed90 commit 1d652f1
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SubnetRuntimeProxy {
address: {}, ",
&config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address
);
let (command_sender, mut command_rcv) = mpsc::channel::<SubnetRuntimeProxyCommand>(256);
let (command_sender, mut command_rcv) = mpsc::channel::<SubnetRuntimeProxyCommand>(4096);
let ws_runtime_endpoint = config.ws_endpoint.clone();
let http_runtime_endpoint = config.http_endpoint.clone();
let subnet_contract_address = Arc::new(config.subnet_contract_address.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) mod console;
#[cfg(test)]
mod tests;

const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100;
const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 2024;

pub(crate) mod builder;
pub(crate) mod messaging;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService {
.map(move |message| Self::parse_stream(message, stream_id))
.boxed();

let (command_sender, command_receiver) = mpsc::channel(2048);
let (command_sender, command_receiver) = mpsc::channel(204_800);

let (outbound_stream, rx) = mpsc::channel::<Result<(Option<Uuid>, OutboundMessage), Status>>(
DEFAULT_CHANNEL_STREAM_CAPACITY,
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod tests;

pub(crate) mod constants {
/// Constant size of every channel in the crate
pub(crate) const CHANNEL_SIZE: usize = 2048;
pub(crate) const CHANNEL_SIZE: usize = 10_000;

/// Constant size of every transient stream channel in the crate
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024;
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 4_096;
}
pub use runtime::{
error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent,
Expand Down
8 changes: 4 additions & 4 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ lazy_static! {
std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(20_480);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
Expand All @@ -18,10 +18,10 @@ lazy_static! {
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(20_480);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
.checked_mul(100)
.map(|v| {
let r: usize = v.checked_div(100).unwrap_or(*COMMAND_CHANNEL_SIZE);
r
Expand All @@ -32,5 +32,5 @@ lazy_static! {
std::env::var("TOPOS_PENDING_LIMIT_PER_REQUEST_TO_STORAGE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
.unwrap_or(10_000);
}
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct DoubleEcho {
}

impl DoubleEcho {
pub const MAX_BUFFER_SIZE: usize = 2048;
pub const MAX_BUFFER_SIZE: usize = 20_480;

#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use topos_core::{
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100;
const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 10_000;
const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 10_000;
const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 10_000;

// Maximum backoff retry timeout in seconds (1 hour)
const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600);
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-proxy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ impl TceProxyWorker {
/// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// The worker holds a [`crate::client::TceClient`]
pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> {
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(128);
let (evt_sender, evt_rcv) = mpsc::channel::<TceProxyEvent>(128);
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(128_000);
let (evt_sender, evt_rcv) = mpsc::channel::<TceProxyEvent>(128_000);
let (tce_client_shutdown_channel, shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);

Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl AppContext {
validator_store: Arc<ValidatorStore>,
api_context: RuntimeContext,
) -> (Self, mpsc::Receiver<Events>) {
let (events, receiver) = mpsc::channel(100);
let (events, receiver) = mpsc::channel(100_000);
(
Self {
is_validator,
Expand Down

0 comments on commit 1d652f1

Please sign in to comment.