Skip to content

Commit

Permalink
improve connect_with_timeout_with_buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Mar 25, 2024
1 parent 130845f commit 34a01ba
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 29 deletions.
21 changes: 8 additions & 13 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,

Check warning on line 22 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs

Check warning on line 22 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
subscribe_filter: SubscribeRequest,
) -> impl Stream<Item = Message> {

let mut state = ConnectionState::NotConnected(1);

// in case of cancellation, we restart from here:
Expand All @@ -45,25 +46,19 @@ pub fn create_geyser_reconnecting_stream(
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {

// let connect_result = GeyserGrpcClient::connect_with_timeout(
// addr, token, config,
// connect_timeout,
// request_timeout,
// false)
// .await;
warn!("Use HACKED version of connect_with_timeout_hacked");
let connect_result = yellowstone_grpc_util::connect_with_timeout_hacked(
let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
debug!("Using Grpc Buffer config {:?}", buffer_config);
let connect_result = yellowstone_grpc_util::connect_with_timeout_with_buffers(
addr,
token,
// config,
// connect_timeout,
// request_timeout,
// false,
config,
connect_timeout,
request_timeout,
buffer_config,
)
.await;
let mut client = connect_result?;


debug!("Subscribe with filter {:?}", subscribe_filter);

let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
Expand Down
14 changes: 8 additions & 6 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
) -> AbortHandle {

Check warning on line 52 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs

Check warning on line 52 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_tasks.rs
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/


// task will be aborted when downstream receiver gets dropped
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(1);
Expand All @@ -76,14 +77,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
addr
);

warn!("Use HACKED version of connect_with_timeout_hacked");
let connect_result = yellowstone_grpc_util::connect_with_timeout_hacked(
let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
debug!("Using Grpc Buffer config {:?}", buffer_config);
let connect_result = yellowstone_grpc_util::connect_with_timeout_with_buffers(
addr,
token,
// config,
// connect_timeout,
// request_timeout,
// false,
config,
connect_timeout,
request_timeout,
buffer_config,
)
.await;

Expand Down
79 changes: 69 additions & 10 deletions src/yellowstone_grpc_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken};
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest;

Check warning on line 5 in src/yellowstone_grpc_util.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/yellowstone_grpc_util.rs

Check warning on line 5 in src/yellowstone_grpc_util.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/yellowstone_grpc_util.rs
use yellowstone_grpc_proto::prost::bytes::Bytes;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue;
Expand All @@ -27,19 +28,77 @@ pub async fn connect_with_timeout<E, T>(
}


pub async fn connect_with_timeout_hacked<E, T>(endpoint: E,
x_token: Option<T>,) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
// see https://github.com/hyperium/tonic/blob/v0.10.2/tonic/src/transport/channel/mod.rs
const DEFAULT_BUFFER_SIZE: usize = 1024;
// see https://github.com/hyperium/hyper/blob/v0.14.28/src/proto/h2/client.rs#L45
const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb

#[derive(Debug, Clone)]
pub struct GeyserGrpcClientBufferConfig {
pub buffer_size: Option<usize>,
pub conn_window: Option<u32>,
pub stream_window: Option<u32>,
}

impl Default for GeyserGrpcClientBufferConfig {
fn default() -> Self {
GeyserGrpcClientBufferConfig {
buffer_size: Some(DEFAULT_BUFFER_SIZE),
conn_window: Some(DEFAULT_CONN_WINDOW),
stream_window: Some(DEFAULT_STREAM_WINDOW),
}
}
}

Check warning on line 52 in src/yellowstone_grpc_util.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/yellowstone_grpc_util.rs

impl GeyserGrpcClientBufferConfig {

pub fn optimize_for_subscription(filter: &SubscribeRequest) -> GeyserGrpcClientBufferConfig {
if !filter.blocks.is_empty() {
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536), // 64kb (default: 1k)

Check warning on line 59 in src/yellowstone_grpc_util.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/yellowstone_grpc_util.rs
conn_window: Some(5242880), // 5mb (=default)
stream_window: Some(4194304), // 4mb (default: 2m)
}
} else {
GeyserGrpcClientBufferConfig::default()
}
}

}


pub async fn connect_with_timeout_with_buffers<E, T>(
endpoint: E,
x_token: Option<T>,
tls_config: Option<ClientTlsConfig>,
connect_timeout: Option<Duration>,
request_timeout: Option<Duration>,
buffer_config: GeyserGrpcClientBufferConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>, {
let endpoint = tonic::transport::Endpoint::from_shared(endpoint).unwrap() // FIXME
.buffer_size(Some(65536))
.initial_connection_window_size(4194304)
.initial_stream_window_size(4194304)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
// .http2_adaptive_window()
.tls_config(ClientTlsConfig::new()).unwrap(); // FIXME
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(buffer_config.buffer_size)
.initial_connection_window_size(buffer_config.conn_window)
.initial_stream_window_size(buffer_config.stream_window);
// .buffer_size(Some(65536)) // 64kb (default: 1024)
// .initial_stream_window_size(4194304);// 4mb (default: 2mb)
// // .tls_config(tls_config.unwrap()).unwrap(); // FIXME

if let Some(tls_config) = tls_config {
endpoint = endpoint.tls_config(tls_config)?;
}

if let Some(connect_timeout) = connect_timeout {
endpoint = endpoint.timeout(connect_timeout);
}

if let Some(request_timeout) = request_timeout {
endpoint = endpoint.timeout(request_timeout);
}

let x_token: Option<AsciiMetadataValue> = match x_token {
Some(x_token) => Some(x_token.try_into().unwrap()), // FIXME replace unwrap
Expand Down

0 comments on commit 34a01ba

Please sign in to comment.