Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force client disconnects when node is unhealthy #13

Open
wants to merge 14 commits into
base: master-2.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,116 changes: 1,657 additions & 459 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 9 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"
members = [
# "examples/rust", # 2.0.0
"examples/rust", # 2.0.0
"yellowstone-grpc-client", # 2.0.0
"yellowstone-grpc-geyser", # 2.0.0
"yellowstone-grpc-proto", # 2.0.0
Expand All @@ -17,7 +17,7 @@ keywords = ["solana"]
publish = false

[workspace.dependencies]
agave-geyser-plugin-interface = { git = "https://github.com/helius-labs/agave", branch = "master" }
agave-geyser-plugin-interface = "~2.1.2"
anyhow = "1.0.62"
backoff = "0.4.0"
base64 = "0.22.1"
Expand All @@ -30,6 +30,7 @@ clap = "4.3.0"
crossbeam-channel = "0.5.8"
env_logger = "0.11.3"
futures = "0.3.24"
once_cell = "1.19.0"
git-version = "0.3.5"
hex = "0.4.3"
hostname = "0.4.0"
Expand All @@ -46,13 +47,14 @@ maplit = "1.0.2"
prometheus = "0.13.2"
prost = "0.13.1"
protobuf-src = "1.1.0"
scylla = "0.13.0"
serde = "1.0.145"
serde_json = "1.0.86"
solana-account-decoder = { git = "https://github.com/helius-labs/agave", branch = "master" }
solana-logger = { git = "https://github.com/helius-labs/agave", branch = "master" }
solana-sdk = { git = "https://github.com/helius-labs/agave", branch = "master" }
solana-transaction-status = { git = "https://github.com/helius-labs/agave", branch = "master" }
solana-account-decoder = "~2.1.2"
solana-logger = "~2.1.2"
solana-sdk = "~2.1.2"
solana-transaction-status = "~2.1.2"
solana-client = "~2.1.2"
solana-rpc-client-api = "~2.1.2"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the master git versions was giving me package conflict errors. So I will use the latest version of the release instead. I tested the code using the Solana test validator, and it worked. So we should be good.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is risky, we should keep using the master versions since geyser usually has breaking changes between minor versions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I retrospectively agree, but since this version has been battle tested with 2.2 and it's fixed, then I now prefer this over using the master version.

spl-token-2022 = "4.0.0"
thiserror = "1.0.69"
tokio = "1.21.2"
Expand All @@ -77,8 +79,3 @@ codegen-units = 1
[patch.crates-io.curve25519-dalek]
git = "https://github.com/anza-xyz/curve25519-dalek.git"
rev = "b500cdc2a920cd5bff9e2dd974d7b97349d61464"


[patch.crates-io]
solana-program = { git = "https://github.com/helius-labs/agave", branch = "master" }
solana-zk-token-sdk = { git = "https://github.com/helius-labs/agave", branch = "master" }
3 changes: 3 additions & 0 deletions yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ tokio-stream = { workspace = true }
tonic = { workspace = true, features = ["gzip", "zstd", "tls", "tls-roots"] }
tonic-health = { workspace = true }
yellowstone-grpc-proto = { workspace = true, features = ["convert", "plugin"] }
solana-client = { workspace = true }
once_cell = { workspace = true }
solana-rpc-client-api = { workspace = true }

[build-dependencies]
anyhow = { workspace = true }
Expand Down
22 changes: 13 additions & 9 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
},
"grpc": {
"address": "0.0.0.0:10000",
"tls_config": {
pmantica11 marked this conversation as resolved.
Show resolved Hide resolved
"cert_path": "",
"key_path": ""
},
"compression": {
"accept": ["gzip"],
"send": ["gzip"]
"accept": [
"gzip"
],
"send": [
"gzip"
]
},
"max_decoding_message_size": "4_194_304",
"snapshot_plugin_channel_capacity": null,
Expand All @@ -28,9 +28,13 @@
"max": 1,
"any": false,
"account_max": 10,
"account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_reject": [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
],
"owner_max": 10,
"owner_reject": ["11111111111111111111111111111111"],
"owner_reject": [
"11111111111111111111111111111111"
],
"data_slice_max": 2
},
"slots": {
Expand Down Expand Up @@ -79,4 +83,4 @@
"address": "0.0.0.0:8999"
},
"block_fail_action": "log"
}
}
13 changes: 13 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ pub struct ConfigGrpc {
with = "humantime_serde"
)]
pub filter_names_cleanup_interval: Duration,
/// Disconnect if node is lagging behind
#[serde(default)]
pub force_disconnect_if_node_is_unhealthy: bool,
/// RPC port to use for health monitoring
pub rpc_port: Option<u16>,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigHealthMonitor {
/// Recommended to set as same threshold as the RPC
pub max_slot_behind_threshold: u64,
pub rpc_port: u16,
}

impl ConfigGrpc {
Expand Down
16 changes: 16 additions & 0 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::Filter,
metrics::{self, DebugClientMessage},
monitor::SHOULD_DISCONNECT,
version::GrpcVersionInfo,
},
anyhow::Context,
Expand Down Expand Up @@ -838,6 +839,15 @@ impl GrpcService {
}
}
message = messages_rx.recv() => {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Disconnecting client #{id}");
stream_tx
.send(Err(Status::internal("Disconnecting since node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.")))
.await
.unwrap();
break 'outer;
}

let (commitment, messages) = match message {
Ok((commitment, messages)) => (commitment, messages),
Err(broadcast::error::RecvError::Closed) => {
Expand Down Expand Up @@ -964,6 +974,12 @@ impl Geyser for GrpcService {
&self,
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
if SHOULD_DISCONNECT.load(Ordering::SeqCst) {
error!("gRPC node is lagging behind. Preventing client from connecting.");
return Err(Status::internal(
"Node is lagging behind. If you are connected through a load balancer, please try reconnecting. You might be automatically routed to a healthy node.",
));
}
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);

let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod config;
pub mod filters;
pub mod grpc;
pub mod metrics;
pub mod monitor;
pub mod plugin;
pub mod version;

Expand Down
46 changes: 46 additions & 0 deletions yellowstone-grpc-geyser/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use once_cell::sync::Lazy;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{client_error, request};
use tokio::time::interval;

pub static SHOULD_DISCONNECT: Lazy<Arc<AtomicBool>> =
Lazy::new(|| Arc::new(AtomicBool::new(false)));

pub async fn is_node_healthy(client: &RpcClient) -> bool {
loop {
match client.get_health().await {
Ok(()) => return true,
Err(err) => {
if let client_error::ErrorKind::RpcError(request::RpcError::RpcResponseError {
code: _,
message: _,
data: request::RpcResponseErrorData::NodeUnhealthy { .. },
}) = &err.kind
{
return false;
} else {
log::error!("Failed to get health: {}", err);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}
}
}
}

pub async fn run_forced_disconnection_monitor(rpc_client: RpcClient) {
let mut interval = interval(Duration::from_millis(100));
loop {
interval.tick().await;
let is_healthy = !is_node_healthy(&rpc_client).await;
SHOULD_DISCONNECT.store(is_healthy, Ordering::SeqCst);
}
}
12 changes: 12 additions & 0 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use {
config::Config,
grpc::GrpcService,
metrics::{self, PrometheusService},
monitor::run_forced_disconnection_monitor,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
solana_client::nonblocking::rpc_client::RpcClient,
std::{
concat, env,
sync::{
Expand Down Expand Up @@ -75,6 +77,16 @@ impl GeyserPlugin for Plugin {
.build()
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;

if config.grpc.force_disconnect_if_node_is_unhealthy {
let rpc_client = RpcClient::new(format!(
"http://127.0.0.1:{}",
config.grpc.rpc_port.expect(
"RPC port is required when force_disconnect_if_node_is_unhealthy is true"
)
));
runtime.spawn(run_forced_disconnection_monitor(rpc_client));
}

let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) =
runtime.block_on(async move {
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
Expand Down
Loading