Skip to content

Commit

Permalink
Adding notify channel to stop tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 27, 2024
1 parent ae56e0f commit ce6ca26
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 65 deletions.
5 changes: 5 additions & 0 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;

use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
Expand Down Expand Up @@ -87,9 +89,12 @@ pub async fn main() {

info!("Write Block stream..");

let exit_notify = Arc::new(Notify::new());

let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
Expand Down
15 changes: 8 additions & 7 deletions examples/stream_blocks_mainnet_task.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures::Stream;
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::sync::Arc;
use tokio::sync::Notify;

use base64::Engine;
use itertools::Itertools;
Expand All @@ -21,12 +22,8 @@ use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;

use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task, create_geyser_autoconnection_task_with_mpsc,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -123,6 +120,7 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let exit_notify = Arc::new(Notify::new());

let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
Expand All @@ -136,16 +134,19 @@ pub async fn main() {
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
);
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
);
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
);
start_example_blockmeta_consumer(blockmeta_rx);

Expand Down
5 changes: 2 additions & 3 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util};
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::GeyserGrpcClientResult;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::Status;

Expand All @@ -22,7 +22,6 @@ pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> impl Stream<Item = Message> {

let mut state = ConnectionState::NotConnected(1);

// in case of cancellation, we restart from here:
Expand Down
132 changes: 93 additions & 39 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{Attempt, GrpcSourceConfig, Message, yellowstone_grpc_util};
use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::task::AbortHandle;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
Expand Down Expand Up @@ -33,13 +35,18 @@ enum FatalErrorReason {
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) {
exit_notify: Arc<Notify>,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);

let abort_handle =
create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender);
let join_handle = create_geyser_autoconnection_task_with_mpsc(
grpc_source,
subscribe_filter,
sender,
exit_notify,
);

(abort_handle, receiver_channel)
(join_handle, receiver_channel)
}

/// connect to grpc source performing autoconnect if required,
Expand All @@ -49,16 +56,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
) -> AbortHandle {
exit_notify: Arc<Notify>,
) -> JoinHandle<()> {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/


// task will be aborted when downstream receiver gets dropped
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(1);
let mut messages_forwarded = 0;

loop {
'main_loop: loop {
state = match state {
ConnectionState::NotConnected(attempt) => {
let addr = grpc_source.grpc_addr.clone();
Expand All @@ -79,15 +86,21 @@ pub fn create_geyser_autoconnection_task_with_mpsc(

let buffer_config = yellowstone_grpc_util::GeyserGrpcClientBufferConfig::optimize_for_subscription(&subscribe_filter);
debug!("Using Grpc Buffer config {:?}", buffer_config);
let connect_result = yellowstone_grpc_util::connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
)
.await;
let connect_result = tokio::select! {
res = yellowstone_grpc_util::connect_with_timeout_with_buffers(
addr,
token,
config,
connect_timeout,
request_timeout,
buffer_config,
) => {
res
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};

match connect_result {
Ok(client) => ConnectionState::Connecting(attempt, client),
Expand Down Expand Up @@ -136,13 +149,17 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let subscribe_filter = subscribe_filter.clone();
debug!("Subscribe with filter {:?}", subscribe_filter);



let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),
)
.await;
let subscribe_result_timeout = tokio::select! {
res = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),
) => {
res
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};

match subscribe_result_timeout {
Ok(subscribe_result) => {
Expand Down Expand Up @@ -198,7 +215,14 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
tokio::select! {
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};
ConnectionState::NotConnected(attempt)
}
ConnectionState::FatalError(_attempt, reason) => match reason {
Expand All @@ -225,18 +249,31 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
sleep(Duration::from_secs_f32(backoff_secs)).await;
tokio::select! {
_ = sleep(Duration::from_secs_f32(backoff_secs)) => {
//slept
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};
ConnectionState::NotConnected(attempt)
}
ConnectionState::Ready(mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {
match timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
)
.await
{
let geyser_stream_res = tokio::select! {
res = timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
) => {
res
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};
match geyser_stream_res {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
// note: first send never blocks as the mpsc channel has capacity 1
Expand All @@ -246,13 +283,21 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Duration::from_millis(500)
};
let started_at = Instant::now();
match mpsc_downstream

let mpsc_downstream_result = tokio::select! {
res = mpsc_downstream
.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
)
.await
{
) => {
res
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};

match mpsc_downstream_result {
Ok(()) => {
messages_forwarded += 1;
if messages_forwarded == 1 {
Expand All @@ -270,7 +315,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis());

match mpsc_downstream.send(the_message).await {
let mpsc_downstream_result = tokio::select! {
res = mpsc_downstream.send(the_message)=> {
res
},
_ = exit_notify.notified() => {
break 'main_loop;
}
};

match mpsc_downstream_result {
Ok(()) => {
messages_forwarded += 1;
trace!(
Expand Down Expand Up @@ -317,7 +371,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
} // -- endless state loop
});

jh_geyser_task.abort_handle()
jh_geyser_task

Check warning on line 374 in src/grpc_subscription_autoreconnect_tasks.rs

View workflow job for this annotation

GitHub Actions / test

returning the result of a `let` binding from a block
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit ce6ca26

Please sign in to comment.