From e90ecd110a6ac93c3172f739c72159e9906cb882 Mon Sep 17 00:00:00 2001 From: mango-dee Date: Tue, 26 Nov 2024 11:07:08 +0800 Subject: [PATCH] Applies PR feedback and some clippy fixes --- Cargo.lock | 7 +++-- Cargo.toml | 8 ++--- examples/bench_geyser_grpc_accounts.rs | 7 ++--- examples/stream_blocks_mainnet_stream.rs | 38 ++++++++---------------- examples/stream_blocks_single.rs | 9 +++--- examples/stream_token_accounts.rs | 12 ++++---- examples/stream_vote_transactions.rs | 1 + examples/subscribe_accounts.rs | 4 +-- rust-toolchain.toml | 2 +- 9 files changed, 35 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 819c5e1..6ddefe0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,11 +983,12 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.5.3" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", + "crossbeam-utils", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -1341,7 +1342,7 @@ dependencies = [ [[package]] name = "geyser-grpc-connector" -version = "2.0.0" +version = "0.11.0+yellowstone.2.0.0" dependencies = [ "anyhow", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index c7a0ddc..fcf3c6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "geyser-grpc-connector" -version = "2.0.0" +version = "0.11.0+yellowstone.2.0.0" edition = "2021" description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming" @@ -13,7 +13,7 @@ yellowstone-grpc-client = { version = "2.0.0", git = "https://github.com/rpcpool yellowstone-grpc-proto = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" } # required for CommitmentConfig -solana-sdk = "2.0.16" +solana-sdk = "~2.0.16" url = "2.5.0" async-stream = "0.3.5" @@ -30,9 +30,7 @@ base64 = "0.21.5" bincode = "1.3.3" csv = "1.3.0" - -dashmap = "5.5.3" - +dashmap = "6.1.0" tonic = { version= "0.12.3", features=["gzip"] } tonic-health = "0.12.3" regex = "1.10.4" diff --git a/examples/bench_geyser_grpc_accounts.rs b/examples/bench_geyser_grpc_accounts.rs index 0010efd..391eb7c 100644 --- a/examples/bench_geyser_grpc_accounts.rs +++ b/examples/bench_geyser_grpc_accounts.rs @@ -155,7 +155,7 @@ fn start_tracking_slots(current_processed_slot: AtomicSlot) { exit_notify.resubscribe(), ); - let mut tip: Slot = 0; + // let mut tip: Slot = 0; loop { match multiplex_rx.recv().await { @@ -199,8 +199,6 @@ fn start_tracking_account_consumer( // seconds since epoch let mut block_time_per_slot = HashMap::::new(); - // wall clock time of block completion (i.e. processed) reported by the block meta stream - let mut block_completion_notification_time_per_slot = HashMap::::new(); let debouncer = debouncer::Debouncer::new(Duration::from_millis(50)); @@ -218,11 +216,10 @@ fn start_tracking_account_consumer( match geyser_messages_rx.recv().await { Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { Some(UpdateOneof::Account(update)) => { - let started_at = Instant::now(); let now = SystemTime::now(); let account_info = update.account.unwrap(); let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); - let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap(); + let _account_owner_pk = Pubkey::try_from(account_info.owner).unwrap(); // note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built let slot = update.slot; let account_receive_time = get_epoch_sec(); diff --git a/examples/stream_blocks_mainnet_stream.rs b/examples/stream_blocks_mainnet_stream.rs index eb351de..b0bbaeb 100644 --- a/examples/stream_blocks_mainnet_stream.rs +++ b/examples/stream_blocks_mainnet_stream.rs @@ -1,23 +1,25 @@ -use futures::{Stream, StreamExt}; -use log::info; -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; use base64::Engine; +use futures::{Stream, StreamExt}; use itertools::Itertools; +use log::info; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::compute_budget::ComputeBudgetInstruction; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::message::v0::MessageAddressTableLookup; use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; use solana_sdk::pubkey::Pubkey; -/// This file mocks the core model of the RPC server. -use solana_sdk::{borsh1, compute_budget}; - use solana_sdk::signature::Signature; use solana_sdk::transaction::TransactionError; +/// This file mocks the core model of the RPC server. +use solana_sdk::{borsh1, compute_budget}; +use tokio::time::{sleep, Duration}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; @@ -25,9 +27,6 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; -use tokio::time::{sleep, Duration}; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::SubscribeUpdate; pub mod debouncer; @@ -218,21 +217,10 @@ pub fn map_produced_block( .transactions .into_iter() .filter_map(|tx| { - let Some(meta) = tx.meta else { - return None; - }; - - let Some(transaction) = tx.transaction else { - return None; - }; - - let Some(message) = transaction.message else { - return None; - }; - - let Some(header) = message.header else { - return None; - }; + let meta = tx.meta?; + let transaction = tx.transaction?; + let message = transaction.message?; + let header = message.header?; let signatures = transaction .signatures diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 5f6734f..32fe7b5 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -74,7 +74,7 @@ pub async fn main() { tracing_subscriber::fmt::init(); // console_subscriber::init(); - let COMMITMENT_LEVEL = CommitmentConfig::processed(); + let commitment_level = CommitmentConfig::processed(); let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); @@ -97,12 +97,12 @@ pub async fn main() { let green_stream = create_geyser_reconnecting_stream( config.clone(), - GeyserFilter(COMMITMENT_LEVEL).accounts(), + GeyserFilter(commitment_level).accounts(), ); let blue_stream = create_geyser_reconnecting_stream( config.clone(), - GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(), + GeyserFilter(commitment_level).blocks_and_txs(), ); tokio::spawn(async move { @@ -120,7 +120,6 @@ pub async fn main() { account_pk, account_info.data.len() ); - let bytes: [u8; 32] = account_pk.to_bytes(); } _ => {} } @@ -135,7 +134,7 @@ pub async fn main() { tokio::spawn(async move { let mut blue_stream = pin!(blue_stream); - let extractor = BlockMiniExtractor(COMMITMENT_LEVEL); + let extractor = BlockMiniExtractor(commitment_level); while let Some(message) = blue_stream.next().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { diff --git a/examples/stream_token_accounts.rs b/examples/stream_token_accounts.rs index bcfe012..8b040a3 100644 --- a/examples/stream_token_accounts.rs +++ b/examples/stream_token_accounts.rs @@ -60,7 +60,7 @@ pub async fn main() { info!("Write Block stream.."); - let (exit_signal, exit_notify) = tokio::sync::broadcast::channel(1); + let (exit_signal, _exit_notify) = tokio::sync::broadcast::channel(1); let (autoconnect_tx, mut accounts_rx) = tokio::sync::mpsc::channel(1000); let _jh_green = create_geyser_autoconnection_task_with_mpsc( @@ -167,7 +167,7 @@ pub async fn main() { // } // all different states are covered // is_native: both true+false are sent - assert_eq!(account.executable, false); + assert!(!account.executable); assert_eq!(account.rent_epoch, u64::MAX); let owner = Pubkey::from_str(&account_ui.owner).unwrap(); @@ -217,7 +217,7 @@ pub async fn main() { } changing_slot = slot; } - Ok(TokenAccountType::Mint(mint)) => { + Ok(TokenAccountType::Mint(_mint)) => { // not interesting } Ok(TokenAccountType::Multisig(_)) => {} @@ -229,8 +229,6 @@ pub async fn main() { ); } } - - let bytes: [u8; 32] = account_pk.to_bytes(); } _ => {} } @@ -249,7 +247,7 @@ pub async fn main() { for accounts_by_mint in token_account_by_ownermint_read.iter() { for token_account_mint in accounts_by_mint.iter() { total += 1; - let (owner, mint, account) = ( + let (_owner, _mint, _account) = ( accounts_by_mint.key(), token_account_mint.key(), token_account_mint.value(), @@ -297,6 +295,7 @@ pub fn token_accounts() -> SubscribeRequest { commitment: Some(map_commitment_level(CommitmentConfig::processed()).into()), accounts_data_slice: Default::default(), ping: None, + transactions_status: Default::default(), } } @@ -333,5 +332,6 @@ pub fn token_accounts_finalized() -> SubscribeRequest { commitment: Some(map_commitment_level(CommitmentConfig::confirmed()).into()), accounts_data_slice: Default::default(), ping: None, + transactions_status: Default::default(), } } diff --git a/examples/stream_vote_transactions.rs b/examples/stream_vote_transactions.rs index 87c8a20..0cf6b2d 100644 --- a/examples/stream_vote_transactions.rs +++ b/examples/stream_vote_transactions.rs @@ -153,5 +153,6 @@ pub fn transaction_filter() -> SubscribeRequest { commitment: None, accounts_data_slice: Default::default(), ping: None, + transactions_status: Default::default(), } } diff --git a/examples/subscribe_accounts.rs b/examples/subscribe_accounts.rs index b62049b..9da5209 100644 --- a/examples/subscribe_accounts.rs +++ b/examples/subscribe_accounts.rs @@ -83,15 +83,13 @@ pub async fn main() { // note: this keeps track of lot of data and might blow up memory fn start_tracking_account_consumer( mut geyser_messages_rx: Receiver, - current_processed_slot: Arc, + _current_processed_slot: Arc, ) { tokio::spawn(async move { loop { match geyser_messages_rx.recv().await { Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { Some(UpdateOneof::Account(update)) => { - let started_at = Instant::now(); - let now = SystemTime::now(); let account_info = update.account.unwrap(); let account_pk = Pubkey::try_from(account_info.pubkey).unwrap(); let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap(); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 1de01fa..5198580 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.81.0" +channel = "1.78.0"