From 71a9eaecfecfc10ad9ea3a754fb8b777ad6a0436 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Tue, 9 Apr 2024 17:05:30 +0200 Subject: [PATCH] WIP integrate window patch --- Cargo.lock | 71 ++++++++++--------- Cargo.toml | 6 +- ...grpc_subscription_autoreconnect_streams.rs | 67 ++++++++++++----- src/lib.rs | 12 ++++ 4 files changed, 102 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec59b1f..0f6d172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72832d73be48bac96a5d7944568f305d829ed55b0ce3b483647089dfaf6cf704" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if", "getrandom 0.2.11", @@ -1345,7 +1345,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.4", + "ahash 0.8.5", ] [[package]] @@ -2822,9 +2822,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ea4bedfcc8686ae6d01a3d8288f5b9746cd00ec63f0ce9a6415849d35add50" +checksum = "d145d4e1e33bfecd209059a0c4c75d623dbcaeb565b4c197f1815257be45726a" dependencies = [ "Inflector", "base64 0.21.5", @@ -2847,9 +2847,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de23cd0dd8673f4590e90bfa47ff19eb629f4b7dc15a3fb173a62d932801d07" +checksum = "4000f4717f86c5f9e1105378e3a6521db770d0ad68417f59960ca4b51103fcd0" dependencies = [ "bincode", "chrono", @@ -2861,11 +2861,11 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4090f2ac64149ce1fbabd5277f41e278edc1f38121927fe8f6355e67ead3e199" +checksum = "1790013c7969353000c22907fc21610adb3389a7c9a27a386ebe7fb32b2ad307" dependencies = [ - "ahash 0.8.4", + "ahash 0.8.5", "blake3", "block-buffer 0.10.4", "bs58", @@ -2891,9 +2891,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765bcdc1ecc31ea5d3d7ddb680ffa6645809c122b4ffdc223b161850e6ba352b" +checksum = "a3ed2b49a3dd03ddd5107d6e629e8e5895724227a057b3511bf0c107c6d48308" dependencies = [ "proc-macro2", "quote", @@ -2903,9 +2903,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7f3cad088bc5f00569cb5b4c3aaba8d935f8f7cc25c91cc0c55a8a7de2b137" +checksum = "bfc0d5b4f046d07e845b69178989a6b3bf168a82eeee006adb77391b339bce64" dependencies = [ "env_logger", "lazy_static", @@ -2914,9 +2914,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2de5041d16120852c0deea047c024e1fad8819e49041491f0cca6c91c243fd5d" +checksum = "857178177c6b378bcfc35df6867a6eef211059f5e4ab01ee87355d6b7493b556" dependencies = [ "log", "solana-sdk", @@ -2924,9 +2924,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd6f25f0076b6eb873f7e2a85e53191ac2affe6782131be1a2867d057307e20" +checksum = "1c68f5cbfbafd002b4d94728748f632a3bd27772ca5c7139710d65940c95477c" dependencies = [ "crossbeam-channel", "gethostname", @@ -2939,9 +2939,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1141d1dffbe68852128f7bbcc3c43a5d2cb715ecffeeb64eb81bb93cbaf80bb" +checksum = "4b2ae4ec9dd6fc76202c94d23942da3cf624325a178e1b0125e70db90b8d7f15" dependencies = [ "ark-bn254", "ark-ec", @@ -2993,9 +2993,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942de577a2865cec28fc174575c9bd6cf7af815832af67fe40ca856075550998" +checksum = "b50a6da7b501117f68ef51fc113d771b52af646dc42c43af23a85e32461d59c9" dependencies = [ "base64 0.21.5", "bincode", @@ -3021,9 +3021,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "278a95acb99377dd4585599fdbec23d0a6fcb94ec78285283723fdd365fe885e" +checksum = "368430d6c9f033e86f8f590d19232d10986d1188c3ad3a6836628d2acc09c21a" dependencies = [ "assert_matches", "base64 0.21.5", @@ -3075,9 +3075,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92dbaf563210f61828800f2a3d8c188fa2afede91920d364982e280318db2eb5" +checksum = "f554d2a144bb0138cfdeced9961cc8a09aaa09f0c3c9a63bd10da41c4a06d420" dependencies = [ "bs58", "proc-macro2", @@ -3094,9 +3094,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-transaction-status" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e2031070cba17802f7108b53f6db01b82cdfb0360b0a8b9d51c584f2e9dd9e4" +checksum = "ba7131d11c8d5a068bfc26a9dc8c9ee0d77eaf60856dd0c8be880542fc5fbbd6" dependencies = [ "Inflector", "base64 0.21.5", @@ -3119,9 +3119,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.17.15" +version = "1.17.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef26fb44734aa940e6648bbbeead677edc68c7e1ec09128e5f16a8924c389a38" +checksum = "112944743b08f7e1101368ff6d84745e7b4abb075fabaccc02e01bd3ce4b6d6c" dependencies = [ "aes-gcm-siv", "base64 0.21.5", @@ -4186,12 +4186,12 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.13.0+solana.1.17.15" -source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978d1379db10aa5e430e305f5a01598439d3414ef808443f9f92d1201dacda65" dependencies = [ "bytes", "futures", - "http", "thiserror", "tonic", "tonic-health", @@ -4200,8 +4200,9 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.12.0+solana.1.17.15" -source = "git+https://github.com/rpcpool/yellowstone-grpc.git?tag=v1.12.0+solana.1.17.15#c7b72cc8781c2dc48e4a7c94e411f95df495cf2f" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "742e431794099868a1ac668d440d724eb25b6f5a3739986cdd8bfc15fa5661c3" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 1d4ece6..18cc4f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,12 @@ authors = ["GroovieGermanikus "] repository = "https://github.com/blockworks-foundation/geyser-grpc-connector" [dependencies] -yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } -yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } +yellowstone-grpc-client = "1.15.0" +yellowstone-grpc-proto = "1.14.0" # required for CommitmentConfig -solana-sdk = "~1.17.15" +solana-sdk = "~1.17.28" url = "2.5.0" async-stream = "0.3.5" diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index 114a75a..98e5eec 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -5,8 +5,9 @@ use log::{debug, info, log, trace, warn, Level}; use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; +use yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcBuilderResult, GeyserGrpcClient, GeyserGrpcClientResult, InterceptorXToken}; use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; +use yellowstone_grpc_proto::tonic::metadata::AsciiMetadataValue; use yellowstone_grpc_proto::tonic::Status; enum ConnectionState>> { @@ -24,41 +25,48 @@ pub fn create_geyser_reconnecting_stream( ) -> impl Stream { let mut state = ConnectionState::NotConnected(1); + + // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro let the_stream = stream! { + + let addr = grpc_source.grpc_addr.clone(); + let grpc_x_token = grpc_source.grpc_x_token.clone(); + let tls_config = grpc_source.tls_config.clone(); + // let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); + let receive_timeout = Some(Duration::from_secs(10)); // FIXME + loop { let yield_value; + (state, yield_value) = match state { ConnectionState::NotConnected(attempt) => { + // let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); + // let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); + // let subscribe_timeout = grpc_source.timeouts.map(|t| t.subscribe_timeout); + let subscribe_timeout = Some(Duration::from_secs(10)); // FIXME + + + + let connection_task = tokio::spawn({ - let addr = grpc_source.grpc_addr.clone(); - let token = grpc_source.grpc_x_token.clone(); - let config = grpc_source.tls_config.clone(); - let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); - let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); - let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); + let builder = build_client(grpc_source.clone()).unwrap(); // TODO instead of unwrap, move to Fatal state log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); async move { - let connect_result = GeyserGrpcClient::connect_with_timeout( - addr, token, config, - connect_timeout, - request_timeout, - false) - .await; - let mut client = connect_result?; + let mut client = builder.connect().await.unwrap(); // FIXME debug!("Subscribe with filter {:?}", subscribe_filter); let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX), client - .subscribe_once2(subscribe_filter)) + .subscribe_once(subscribe_filter)) .await; // maybe not optimal @@ -88,7 +96,6 @@ pub fn create_geyser_reconnecting_stream( } ConnectionState::Ready(mut geyser_stream) => { - let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); @@ -130,6 +137,34 @@ pub fn create_geyser_reconnecting_stream( the_stream } +fn build_client(grpc_source_config: GrpcSourceConfig) -> GeyserGrpcBuilderResult { + let mut builder = GeyserGrpcClient::build_from_shared(grpc_source_config.grpc_addr)?; + + if let Some(tls_config) = grpc_source_config.tls_config { + builder = builder.tls_config(tls_config)?; + } + + if let Some(timeouts) = grpc_source_config.timeouts { + + builder = builder.timeout(timeouts.connect_timeout); + + builder = builder.timeout(timeouts.request_timeout); + + // subscribe + receive timeout are handled somewhere else + + } + + let x_token: Option = match grpc_source_config.grpc_x_token { + Some(x_token) => Some(x_token.try_into()?), + None => None, + }; + let interceptor = InterceptorXToken { x_token }; + + builder = builder.interceptor(interceptor); + + Ok(builder) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 64edd90..926bcca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate}; +use yellowstone_grpc_proto::prost::bytes::Bytes; +use yellowstone_grpc_proto::tonic; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; pub mod channel_plugger; @@ -31,6 +33,7 @@ pub struct GrpcConnectionTimeouts { pub receive_timeout: Duration, } + #[derive(Clone)] pub struct GrpcSourceConfig { pub grpc_addr: String, @@ -78,6 +81,15 @@ impl GrpcSourceConfig { timeouts: Some(timeouts), } } + + pub fn build_tonic_endpoint(&self) -> tonic::transport::Endpoint { + let mut endpoint = tonic::transport::Endpoint::from_shared(self.grpc_addr.clone()) + .expect("grpc_addr must be a valid url"); + if let Some(tls_config) = &self.tls_config { + endpoint = endpoint.tls_config(tls_config.clone()).expect("tls_config must be valid"); + } + endpoint + } } #[derive(Clone)]