Skip to content

Commit

Permalink
Applies PR feedback and some clippy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mango-dee committed Nov 26, 2024
1 parent 214886a commit e90ecd1
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 53 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "geyser-grpc-connector"
version = "2.0.0"
version = "0.11.0+yellowstone.2.0.0"
edition = "2021"

description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming"
Expand All @@ -13,7 +13,7 @@ yellowstone-grpc-client = { version = "2.0.0", git = "https://github.com/rpcpool
yellowstone-grpc-proto = { version = "2.0.0", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v2.0.0+solana.2.0.16" }

# required for CommitmentConfig
solana-sdk = "2.0.16"
solana-sdk = "~2.0.16"

url = "2.5.0"
async-stream = "0.3.5"
Expand All @@ -30,9 +30,7 @@ base64 = "0.21.5"
bincode = "1.3.3"

csv = "1.3.0"

dashmap = "5.5.3"

dashmap = "6.1.0"
tonic = { version= "0.12.3", features=["gzip"] }
tonic-health = "0.12.3"
regex = "1.10.4"
Expand Down
7 changes: 2 additions & 5 deletions examples/bench_geyser_grpc_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn start_tracking_slots(current_processed_slot: AtomicSlot) {
exit_notify.resubscribe(),
);

let mut tip: Slot = 0;
// let mut tip: Slot = 0;

loop {
match multiplex_rx.recv().await {
Expand Down Expand Up @@ -199,8 +199,6 @@ fn start_tracking_account_consumer(

// seconds since epoch
let mut block_time_per_slot = HashMap::<Slot, UnixTimestamp>::new();
// wall clock time of block completion (i.e. processed) reported by the block meta stream
let mut block_completion_notification_time_per_slot = HashMap::<Slot, SystemTime>::new();

let debouncer = debouncer::Debouncer::new(Duration::from_millis(50));

Expand All @@ -218,11 +216,10 @@ fn start_tracking_account_consumer(
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let started_at = Instant::now();
let now = SystemTime::now();
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
let _account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
// note: slot is referencing the block that is just built while the slot number reported from BlockMeta/Slot uses the slot after the block is built
let slot = update.slot;
let account_receive_time = get_epoch_sec();
Expand Down
38 changes: 13 additions & 25 deletions examples/stream_blocks_mainnet_stream.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
use futures::{Stream, StreamExt};
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;

use base64::Engine;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
use solana_sdk::pubkey::Pubkey;
/// This file mocks the core model of the RPC server.
use solana_sdk::{borsh1, compute_budget};

use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
/// This file mocks the core model of the RPC server.
use solana_sdk::{borsh1, compute_budget};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;

use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

pub mod debouncer;

Expand Down Expand Up @@ -218,21 +217,10 @@ pub fn map_produced_block(
.transactions
.into_iter()
.filter_map(|tx| {
let Some(meta) = tx.meta else {
return None;
};

let Some(transaction) = tx.transaction else {
return None;
};

let Some(message) = transaction.message else {
return None;
};

let Some(header) = message.header else {
return None;
};
let meta = tx.meta?;
let transaction = tx.transaction?;
let message = transaction.message?;
let header = message.header?;

let signatures = transaction
.signatures
Expand Down
9 changes: 4 additions & 5 deletions examples/stream_blocks_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn main() {
tracing_subscriber::fmt::init();
// console_subscriber::init();

let COMMITMENT_LEVEL = CommitmentConfig::processed();
let commitment_level = CommitmentConfig::processed();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();

Expand All @@ -97,12 +97,12 @@ pub async fn main() {

let green_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(COMMITMENT_LEVEL).accounts(),
GeyserFilter(commitment_level).accounts(),
);

let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(),
GeyserFilter(commitment_level).blocks_and_txs(),
);

tokio::spawn(async move {
Expand All @@ -120,7 +120,6 @@ pub async fn main() {
account_pk,
account_info.data.len()
);
let bytes: [u8; 32] = account_pk.to_bytes();
}
_ => {}
}
Expand All @@ -135,7 +134,7 @@ pub async fn main() {

tokio::spawn(async move {
let mut blue_stream = pin!(blue_stream);
let extractor = BlockMiniExtractor(COMMITMENT_LEVEL);
let extractor = BlockMiniExtractor(commitment_level);
while let Some(message) = blue_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
Expand Down
12 changes: 6 additions & 6 deletions examples/stream_token_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn main() {

info!("Write Block stream..");

let (exit_signal, exit_notify) = tokio::sync::broadcast::channel(1);
let (exit_signal, _exit_notify) = tokio::sync::broadcast::channel(1);
let (autoconnect_tx, mut accounts_rx) = tokio::sync::mpsc::channel(1000);

let _jh_green = create_geyser_autoconnection_task_with_mpsc(
Expand Down Expand Up @@ -167,7 +167,7 @@ pub async fn main() {
// }
// all different states are covered
// is_native: both true+false are sent
assert_eq!(account.executable, false);
assert!(!account.executable);
assert_eq!(account.rent_epoch, u64::MAX);

let owner = Pubkey::from_str(&account_ui.owner).unwrap();
Expand Down Expand Up @@ -217,7 +217,7 @@ pub async fn main() {
}
changing_slot = slot;
}
Ok(TokenAccountType::Mint(mint)) => {
Ok(TokenAccountType::Mint(_mint)) => {
// not interesting
}
Ok(TokenAccountType::Multisig(_)) => {}
Expand All @@ -229,8 +229,6 @@ pub async fn main() {
);
}
}

let bytes: [u8; 32] = account_pk.to_bytes();
}
_ => {}
}
Expand All @@ -249,7 +247,7 @@ pub async fn main() {
for accounts_by_mint in token_account_by_ownermint_read.iter() {
for token_account_mint in accounts_by_mint.iter() {
total += 1;
let (owner, mint, account) = (
let (_owner, _mint, _account) = (
accounts_by_mint.key(),
token_account_mint.key(),
token_account_mint.value(),
Expand Down Expand Up @@ -297,6 +295,7 @@ pub fn token_accounts() -> SubscribeRequest {
commitment: Some(map_commitment_level(CommitmentConfig::processed()).into()),
accounts_data_slice: Default::default(),
ping: None,
transactions_status: Default::default(),
}
}

Expand Down Expand Up @@ -333,5 +332,6 @@ pub fn token_accounts_finalized() -> SubscribeRequest {
commitment: Some(map_commitment_level(CommitmentConfig::confirmed()).into()),
accounts_data_slice: Default::default(),
ping: None,
transactions_status: Default::default(),
}
}
1 change: 1 addition & 0 deletions examples/stream_vote_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,6 @@ pub fn transaction_filter() -> SubscribeRequest {
commitment: None,
accounts_data_slice: Default::default(),
ping: None,
transactions_status: Default::default(),
}
}
4 changes: 1 addition & 3 deletions examples/subscribe_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,13 @@ pub async fn main() {
// note: this keeps track of lot of data and might blow up memory
fn start_tracking_account_consumer(
mut geyser_messages_rx: Receiver<Message>,
current_processed_slot: Arc<AtomicU64>,
_current_processed_slot: Arc<AtomicU64>,
) {
tokio::spawn(async move {
loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update)) => {
let started_at = Instant::now();
let now = SystemTime::now();
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
let account_owner_pk = Pubkey::try_from(account_info.owner).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.81.0"
channel = "1.78.0"

0 comments on commit e90ecd1

Please sign in to comment.