Skip to content

Commit

Permalink
split impl
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 18, 2024
1 parent ca55a8b commit 33cb8cb
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 228 deletions.
5 changes: 3 additions & 2 deletions examples/stream_blocks_mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;

use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
create_geyser_reconnecting_stream, GeyserFilter,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{

Check warning on line 27 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs
create_multiplexed_stream, FromYellowstoneExtractor,
};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig};

Check warning on line 33 in examples/stream_blocks_mainnet.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_mainnet.rs

fn start_example_block_consumer(
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
Expand Down
28 changes: 19 additions & 9 deletions examples/stream_blocks_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;

use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, create_geyser_reconnecting_task, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{

Check warning on line 8 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_single.rs
create_geyser_reconnecting_stream, GeyserFilter,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use tokio::time::{sleep, Duration};

Check warning on line 14 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_single.rs
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/stream_blocks_single.rs
use yellowstone_grpc_proto::prost::Message;
use yellowstone_grpc_proto::prost::Message as _;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_reconnecting_task, Message};
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig};

fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
Expand Down Expand Up @@ -93,19 +98,24 @@ pub async fn main() {

let (jh_geyser_task, mut green_stream) = create_geyser_reconnecting_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);

tokio::spawn(async move {
while let Some(mini) = green_stream.recv().await {
info!(
"emitted block mini #{}@{} with {} bytes from multiplexer",
mini.slot, mini.commitment_config.commitment, mini.blocksize
);
while let Ok(message) = green_stream.recv().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
// info!("got update: {:?}", subscriber_update.update_oneof.);
info!("got update!!!");
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});


// let green_stream = create_geyser_reconnecting_stream(
// green_config.clone(),
// GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
Expand Down
270 changes: 270 additions & 0 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
use async_stream::stream;

Check warning on line 1 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
use futures::channel::mpsc;
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::time::Duration;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use tokio::time::{sleep, timeout, Timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
use yellowstone_grpc_proto::tonic::service::Interceptor;

Check warning on line 24 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::{Code, Status};
use crate::GrpcSourceConfig;

type Attempt = u32;

// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
}

enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
Ready(Attempt, S),
WaitReconnect(Attempt),
}

#[derive(Clone)]
pub struct GeyserFilter(pub CommitmentConfig);

impl GeyserFilter {
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}

pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});

SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}

fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
}
}

// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(0);

// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
let the_stream = stream! {
loop {
let yield_value;

(state, yield_value) = match state {

ConnectionState::NotConnected(mut attempt) => {
attempt += 1;

let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {

let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let mut client = connect_result?;


debug!("Subscribe with filter {:?}", subscribe_filter);

let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once2(subscribe_filter))
.await;

// maybe not optimal
subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?
}
});

(ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt))
}

ConnectionState::Connecting(attempt, connection_task) => {
let subscribe_result = connection_task.await;

match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
Ok(Err(geyser_error)) => {
// ATM we consider all errors recoverable
warn!("! subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
},
Err(geyser_grpc_task_error) => {
panic!("! task aborted - should not happen :{geyser_grpc_task_error}");
}
}

}

ConnectionState::Ready(attempt, mut geyser_stream) => {

match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", grpc_source);
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
}
Some(Err(tonic_status)) => {
// ATM we consider all errors recoverable
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
None => {
// should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
}

}

ConnectionState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
sleep(Duration::from_secs_f32(backoff_secs)).await;
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
}

}; // -- match

yield yield_value
}

}; // -- stream!

the_stream
}

Check warning on line 223 in src/grpc_subscription_autoreconnect_streams.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/src/grpc_subscription_autoreconnect_streams.rs
#[cfg(test)]
mod tests {
use crate::GrpcConnectionTimeouts;
use super::*;

#[tokio::test]
async fn test_debug_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{:?}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}

#[tokio::test]
async fn test_display_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}
}
Loading

0 comments on commit 33cb8cb

Please sign in to comment.