Skip to content

Commit

Permalink
add create_geyser_autoconnection_task_with_mpsc
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Feb 15, 2024
1 parent 66c4256 commit a6ab686
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,17 @@ pub fn create_geyser_autoconnection_task(
(abort_handle, receiver_channel)
}

/// connect to grpc source performing autoconect if required,
/// connect to grpc source performing autoconnect if required,
/// returns mpsc channel; task will abort on fatal error
///
/// implementation hints:
/// * no panic/unwrap
/// * do not use "?"
/// * do not "return" unless you really want to abort the task
/// will shut down when receiver is dropped
pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_sender: tokio::sync::mpsc::Sender<Message>,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
) -> AbortHandle {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/

// task will be aborted when downstream receiver gets dropped
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(0);
let mut messages_forwarded = 0;
Expand Down Expand Up @@ -231,7 +228,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Duration::from_millis(500)
};
let started_at = Instant::now();
match mpsc_sender
match mpsc_downstream
.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
Expand All @@ -255,7 +252,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis());

match mpsc_sender.send(the_message).await {
match mpsc_downstream.send(the_message).await {
Ok(()) => {
messages_forwarded += 1;
trace!(
Expand Down

0 comments on commit a6ab686

Please sign in to comment.