Skip to content

Commit

Permalink
geyser: wrap message into Box in snapshot channel (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Sep 2, 2024
1 parent 8048d9c commit 147be78
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 25 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2024-09-02

- yellowstone-grpc-geyser-1.16.3+solana.2.0.7

### Features

- geyser: wrap message into `Box` in snapshot channel ([#418](https://github.com/rpcpool/yellowstone-grpc/pull/418))

## 2024-08-26

- yellowstone-grpc-client-1.16.2+solana.2.0.7
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = [
"examples/rust", # 1.14.1+solana.2.0.7
"yellowstone-grpc-client", # 1.16.2+solana.2.0.7
"yellowstone-grpc-geyser", # 1.16.2+solana.2.0.7
"yellowstone-grpc-geyser", # 1.16.3+solana.2.0.7
"yellowstone-grpc-proto", # 1.15.0+solana.2.0.7
"yellowstone-grpc-tools", # 1.0.0-rc.12+solana.2.0.7
]
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.16.2+solana.2.0.7"
version = "1.16.3+solana.2.0.7"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
16 changes: 6 additions & 10 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ pub struct GrpcService {
config_filters: Arc<ConfigGrpcFilters>,
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Option<Message>>>>,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Box<Message>>>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
}
Expand All @@ -741,7 +741,7 @@ impl GrpcService {
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
is_reload: bool,
) -> anyhow::Result<(
Option<crossbeam_channel::Sender<Option<Message>>>,
Option<crossbeam_channel::Sender<Box<Message>>>,
mpsc::UnboundedSender<Arc<Message>>,
Arc<Notify>,
)> {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ impl GrpcService {
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Option<Message>>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Box<Message>>>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
debug_client_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
drop_client: impl FnOnce(),
Expand Down Expand Up @@ -1256,7 +1256,7 @@ impl GrpcService {
endpoint: &str,
stream_tx: &mpsc::Sender<TonicResult<SubscribeUpdate>>,
client_rx: &mut mpsc::UnboundedReceiver<Option<Filter>>,
snapshot_rx: crossbeam_channel::Receiver<Option<Message>>,
snapshot_rx: crossbeam_channel::Receiver<Box<Message>>,
is_alive: &mut bool,
filter: &mut Filter,
) {
Expand Down Expand Up @@ -1292,18 +1292,14 @@ impl GrpcService {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
}
message
}
Err(crossbeam_channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1)).await;
continue;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("client #{id}: snapshot channel disconnected");
*is_alive = false;
info!("client #{id}: end of startup");
break;
}
};
Expand Down
20 changes: 8 additions & 12 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
concat, env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
time::Duration,
},
Expand All @@ -26,7 +26,7 @@ use {
#[derive(Debug)]
pub struct PluginInner {
runtime: Runtime,
snapshot_channel: Option<crossbeam_channel::Sender<Option<Message>>>,
snapshot_channel: Mutex<Option<crossbeam_channel::Sender<Box<Message>>>>,
snapshot_channel_closed: AtomicBool,
grpc_channel: mpsc::UnboundedSender<Arc<Message>>,
grpc_shutdown: Arc<Notify>,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl GeyserPlugin for Plugin {

self.inner = Some(PluginInner {
runtime,
snapshot_channel,
snapshot_channel: Mutex::new(snapshot_channel),
snapshot_channel_closed: AtomicBool::new(false),
grpc_channel,
grpc_shutdown,
Expand Down Expand Up @@ -137,10 +137,10 @@ impl GeyserPlugin for Plugin {
ReplicaAccountInfoVersions::V0_0_3(info) => info,
};

let message = Message::Account((account, slot, is_startup).into());
if is_startup {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
if let Some(channel) = inner.snapshot_channel.lock().unwrap().as_ref() {
let message = Message::Account((account, slot, is_startup).into());
match channel.send(Box::new(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => {
if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) {
Expand All @@ -152,6 +152,7 @@ impl GeyserPlugin for Plugin {
}
}
} else {
let message = Message::Account((account, slot, is_startup).into());
inner.send_message(message);
}

Expand All @@ -161,12 +162,7 @@ impl GeyserPlugin for Plugin {

fn notify_end_of_startup(&self) -> PluginResult<()> {
self.with_inner(|inner| {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(None) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
}
}
let _snapshot_channel = inner.snapshot_channel.lock().unwrap().take();
Ok(())
})
}
Expand Down

0 comments on commit 147be78

Please sign in to comment.