From 430847b2c124e7b44dbab2a6688f7354e3da1419 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 8 May 2024 15:53:16 +0200 Subject: [PATCH] debounce update log --- examples/bench_geyser_grpc_accounts.rs | 8 +++++- examples/debouncer.rs | 35 ++++++++++++++++++++++++ examples/stream_blocks_mainnet_stream.rs | 6 ++-- 3 files changed, 46 insertions(+), 3 deletions(-) create mode 100644 examples/debouncer.rs diff --git a/examples/bench_geyser_grpc_accounts.rs b/examples/bench_geyser_grpc_accounts.rs index bdda15d..d323539 100644 --- a/examples/bench_geyser_grpc_accounts.rs +++ b/examples/bench_geyser_grpc_accounts.rs @@ -23,6 +23,8 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate}; use yellowstone_grpc_proto::prost::Message as _; +mod debouncer; + #[tokio::main] pub async fn main() { // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace @@ -174,6 +176,8 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver, cu // wall clock time of block completion (i.e. processed) reported by the block meta stream let mut block_completion_notification_time_per_slot = HashMap::::new(); + let debouncer = debouncer::Debouncer::new(Duration::from_millis(5)); + loop { match geyser_messages_rx.recv().await { Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof { @@ -190,7 +194,9 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver, cu if latest_slot != 0 { // the perfect is value "-1" let delta = (latest_slot as i64) - (slot as i64); - debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta); + if debouncer.can_fire() { + debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta); + } } // if account_info.data.len() > 1000 { diff --git a/examples/debouncer.rs b/examples/debouncer.rs new file mode 100644 index 0000000..49ef1ad --- /dev/null +++ b/examples/debouncer.rs @@ -0,0 +1,35 @@ +use std::sync::atomic::{AtomicI64, Ordering}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +pub struct Debouncer { + started_at: Instant, + cooldown_ms: i64, + last: AtomicI64, +} + +impl Debouncer { + pub fn new(cooldown: Duration) -> Self { + Self { + started_at: Instant::now(), + cooldown_ms: cooldown.as_millis() as i64, + last: AtomicI64::new(0), + } + } + pub fn can_fire(&self) -> bool { + let passed_total_ms = self.started_at.elapsed().as_millis() as i64; + + let results = self.last.fetch_update(Ordering::SeqCst, Ordering::SeqCst, + |last| { + + if passed_total_ms - last > self.cooldown_ms { + Some(passed_total_ms) + } else { + None + } + + }); + + results.is_ok() + } +} diff --git a/examples/stream_blocks_mainnet_stream.rs b/examples/stream_blocks_mainnet_stream.rs index e2c2731..302da98 100644 --- a/examples/stream_blocks_mainnet_stream.rs +++ b/examples/stream_blocks_mainnet_stream.rs @@ -14,7 +14,7 @@ use solana_sdk::compute_budget::ComputeBudgetInstruction; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::message::v0::MessageAddressTableLookup; -use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; +use solana_sdk::message::{MessageHeader, v0, VersionedMessage}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; @@ -26,10 +26,12 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; -use tokio::time::{sleep, Duration}; +use tokio::time::{Duration, sleep}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; +pub mod debouncer; + fn start_example_block_consumer( multiplex_stream: impl Stream + Send + 'static, ) {