-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
dd78804
commit 929b20e
Showing
1 changed file
with
141 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
use futures::{Stream, StreamExt}; | ||
use log::info; | ||
use solana_sdk::clock::Slot; | ||
use solana_sdk::commitment_config::CommitmentConfig; | ||
use solana_sdk::pubkey::Pubkey; | ||
use std::env; | ||
use std::pin::pin; | ||
use tokio::sync::mpsc::Receiver; | ||
|
||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; | ||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; | ||
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor; | ||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message}; | ||
use tokio::time::{sleep, Duration}; | ||
use tracing::warn; | ||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; | ||
use yellowstone_grpc_proto::geyser::SubscribeUpdate; | ||
use yellowstone_grpc_proto::prost::Message as _; | ||
|
||
fn start_example_account_consumer(mut multiplex_channel: Receiver<Message>) { | ||
tokio::spawn(async move { | ||
loop { | ||
match multiplex_channel.recv().await { | ||
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { | ||
Some(UpdateOneof::Account(update)) => { | ||
let account_info = update.account.unwrap(); | ||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); | ||
info!( | ||
"got account update (green)!!! {} - {:?} - {} bytes", | ||
update.slot, | ||
account_pk, | ||
account_info.data.len() | ||
); | ||
let bytes: [u8; 32] = account_pk.to_bytes(); | ||
} | ||
None => {} | ||
_ => {} | ||
}, | ||
None => { | ||
log::warn!("multiplexer channel closed - aborting"); | ||
return; | ||
} | ||
Some(Message::Connecting(_)) => {} | ||
} | ||
} | ||
}); | ||
} | ||
|
||
#[allow(dead_code)] | ||
fn start_example_blockmini_consumer( | ||
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static, | ||
) { | ||
tokio::spawn(async move { | ||
let mut blockmeta_stream = pin!(multiplex_stream); | ||
while let Some(mini) = blockmeta_stream.next().await { | ||
info!( | ||
"emitted block mini #{}@{} with {} bytes from multiplexer", | ||
mini.slot, mini.commitment_config.commitment, mini.blocksize | ||
); | ||
} | ||
}); | ||
} | ||
|
||
pub struct BlockMini { | ||
pub blocksize: usize, | ||
pub slot: Slot, | ||
pub commitment_config: CommitmentConfig, | ||
} | ||
|
||
struct BlockMiniExtractor(CommitmentConfig); | ||
|
||
impl FromYellowstoneExtractor for BlockMiniExtractor { | ||
type Target = BlockMini; | ||
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { | ||
match update.update_oneof { | ||
Some(UpdateOneof::Block(update_block_message)) => { | ||
let blocksize = update_block_message.encoded_len(); | ||
let slot = update_block_message.slot; | ||
let mini = BlockMini { | ||
blocksize, | ||
slot, | ||
commitment_config: self.0, | ||
}; | ||
Some((slot, mini)) | ||
} | ||
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { | ||
let blocksize = update_blockmeta_message.encoded_len(); | ||
let slot = update_blockmeta_message.slot; | ||
let mini = BlockMini { | ||
blocksize, | ||
slot, | ||
commitment_config: self.0, | ||
}; | ||
Some((slot, mini)) | ||
} | ||
_ => None, | ||
} | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
pub async fn main() { | ||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace | ||
tracing_subscriber::fmt::init(); | ||
// console_subscriber::init(); | ||
|
||
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); | ||
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); | ||
|
||
info!( | ||
"Using grpc source on {} ({})", | ||
grpc_addr_green, | ||
grpc_x_token_green.is_some() | ||
); | ||
|
||
let timeouts = GrpcConnectionTimeouts { | ||
connect_timeout: Duration::from_secs(25), | ||
request_timeout: Duration::from_secs(25), | ||
subscribe_timeout: Duration::from_secs(25), | ||
receive_timeout: Duration::from_secs(25), | ||
}; | ||
|
||
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); | ||
|
||
info!("Write Block stream.."); | ||
|
||
let (autoconnect_tx, accounts_rx) = tokio::sync::mpsc::channel(10); | ||
let (_exit, exit_notify) = tokio::sync::broadcast::channel(1); | ||
|
||
let _accounts_task = create_geyser_autoconnection_task_with_mpsc( | ||
config.clone(), | ||
GeyserFilter(CommitmentConfig::processed()).accounts(), | ||
autoconnect_tx.clone(), | ||
exit_notify.resubscribe(), | ||
); | ||
|
||
start_example_account_consumer(accounts_rx); | ||
|
||
// "infinite" sleep | ||
sleep(Duration::from_secs(1800)).await; | ||
} |