Skip to content

Commit

Permalink
debounce update log
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed May 8, 2024
1 parent 86412cf commit 430847b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
8 changes: 7 additions & 1 deletion examples/bench_geyser_grpc_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};

Check warning on line 23 in examples/bench_geyser_grpc_accounts.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/bench_geyser_grpc_accounts.rs
use yellowstone_grpc_proto::prost::Message as _;

mod debouncer;

#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
Expand Down Expand Up @@ -174,6 +176,8 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
// wall clock time of block completion (i.e. processed) reported by the block meta stream
let mut block_completion_notification_time_per_slot = HashMap::<Slot, SystemTime>::new();

let debouncer = debouncer::Debouncer::new(Duration::from_millis(5));

loop {
match geyser_messages_rx.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Expand All @@ -190,7 +194,9 @@ fn start_tracking_account_consumer(mut geyser_messages_rx: Receiver<Message>, cu
if latest_slot != 0 {
// the perfect is value "-1"
let delta = (latest_slot as i64) - (slot as i64);
debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta);
if debouncer.can_fire() {
debug!("Account info for upcoming slot {} was {} behind current processed slot", slot, delta);
}
}

// if account_info.data.len() > 1000 {
Expand Down
35 changes: 35 additions & 0 deletions examples/debouncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, Instant};

#[derive(Debug)]
pub struct Debouncer {
started_at: Instant,
cooldown_ms: i64,
last: AtomicI64,
}

impl Debouncer {
pub fn new(cooldown: Duration) -> Self {
Self {
started_at: Instant::now(),
cooldown_ms: cooldown.as_millis() as i64,
last: AtomicI64::new(0),
}
}
pub fn can_fire(&self) -> bool {
let passed_total_ms = self.started_at.elapsed().as_millis() as i64;

let results = self.last.fetch_update(Ordering::SeqCst, Ordering::SeqCst,
|last| {

if passed_total_ms - last > self.cooldown_ms {
Some(passed_total_ms)
} else {
None
}

});

results.is_ok()
}
}
6 changes: 4 additions & 2 deletions examples/stream_blocks_mainnet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
use solana_sdk::message::{MessageHeader, v0, VersionedMessage};
use solana_sdk::pubkey::Pubkey;

use solana_sdk::signature::Signature;
Expand All @@ -26,10 +26,12 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use tokio::time::{sleep, Duration};
use tokio::time::{Duration, sleep};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

pub mod debouncer;

fn start_example_block_consumer(
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
) {
Expand Down

0 comments on commit 430847b

Please sign in to comment.