From 9cf7e268893511d356f6f2bb45927dffe2a73941 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 17 May 2024 15:18:04 +0200 Subject: [PATCH] Making many changes at once (not working) --- Cargo.lock | 4 ++ Cargo.toml | 9 +++- client/src/client.rs | 4 +- common/Cargo.toml | 5 ++ common/src/quic/connection_manager.rs | 71 +++++++++++++-------------- common/src/quic/mod.rs | 2 +- common/src/quic/quinn_reciever.rs | 4 +- common/src/quic/quinn_sender.rs | 4 +- plugin/src/quic_plugin.rs | 7 +++ tester/src/main.rs | 8 +++ 10 files changed, 73 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf922ed..c8f605b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2294,6 +2294,10 @@ dependencies = [ "lz4", "pkcs8", "quinn", +<<<<<<< HEAD +======= + "quinn-proto", +>>>>>>> 3611f81 (Making many changes at once (not working)) "rcgen", "rustls", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7f60249..7e66ade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,4 @@ [workspace] -resolver = "2" - members = [ "plugin", "client", @@ -8,6 +6,8 @@ members = [ "tester", ] +resolver = "2" + [workspace.package] version = "0.1.0" authors = ["gmgalactus "] @@ -40,7 +40,12 @@ tracing-subscriber = "0.3.16" chrono = "0.4.24" native-tls = "0.2.11" quinn = "0.10.2" +<<<<<<< HEAD rustls = "=0.21.7" +======= +quinn-proto = "0.10.5" +rustls = "0.21.7" +>>>>>>> 3611f81 (Making many changes at once (not working)) rcgen = "0.10.0" pkcs8 = "0.8.0" lz4 = "1.24.0" diff --git a/client/src/client.rs b/client/src/client.rs index 872b78e..de2b0ef 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -26,7 +26,7 @@ impl Client { let send_stream = connection.open_uni().await?; send_message( send_stream, - Message::ConnectionParameters(connection_parameters), + &Message::ConnectionParameters(connection_parameters), ) .await?; @@ -38,7 +38,7 @@ impl Client { pub async fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { let send_stream = self.connection.open_uni().await?; - send_message(send_stream, Message::Filters(filters)).await?; + send_message(send_stream, &Message::Filters(filters)).await?; Ok(()) } diff --git a/common/Cargo.toml b/common/Cargo.toml index 87b6751..4c21048 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -11,7 +11,12 @@ serde = { workspace = true } bincode = { workspace = true } lz4 = { workspace = true } quinn = { workspace = true } +<<<<<<< HEAD rustls = { workspace = true, default-features = false } +======= +quinn-proto = { workspace = true } +rustls = { workspace = true, features = ["dangerous_configuration", "quic"] } +>>>>>>> 3611f81 (Making many changes at once (not working)) rcgen = { workspace = true } pkcs8 = { workspace = true } anyhow = { workspace = true } diff --git a/common/src/quic/connection_manager.rs b/common/src/quic/connection_manager.rs index b99c2ab..6b74ae9 100644 --- a/common/src/quic/connection_manager.rs +++ b/common/src/quic/connection_manager.rs @@ -1,4 +1,4 @@ -use quinn::{Connection, Endpoint}; +use quinn::{Connection, Endpoint, VarInt}; use std::sync::Arc; use std::{collections::VecDeque, time::Duration}; use tokio::sync::Semaphore; @@ -216,50 +216,49 @@ impl ConnectionManager { let id = connection_data.id; tokio::spawn(async move { - let permit_result = semaphore.clone().try_acquire_owned(); + let permit_result = semaphore.try_acquire_owned(); - let _permit = match permit_result { - Ok(permit) => permit, - Err(_) => { - // all permits are taken wait log warning and wait for permit - log::warn!( - "Stream {} seems to be lagging for {} message type", - id, - message_type - ); - semaphore - .acquire_owned() - .await - .expect("Should aquire the permit") - } - }; - - for _ in 0..retry_count { - let send_stream = connection.open_uni().await; - match send_stream { - Ok(send_stream) => { - match send_message(send_stream, message.clone()).await { - Ok(_) => { - log::debug!("Message sucessfully sent"); - break; + match permit_result { + Ok(permit) => { + let _permit = permit; + for _ in 0..retry_count { + let send_stream = connection.open_uni().await; + match send_stream { + Ok(send_stream) => { + match send_message(send_stream, &message).await { + Ok(_) => { + log::debug!("Message sucessfully sent"); + break; + } + Err(e) => { + log::error!( + "error dispatching message and sending data : {}", + e + ) + } + } } Err(e) => { log::error!( - "error dispatching message and sending data : {}", + "error dispatching message while creating stream : {}", e - ) + ); + break; } } } - Err(e) => { - log::error!( - "error dispatching message while creating stream : {}", - e - ); - break; - } + }, + Err(_) => { + // all permits are taken wait log warning and wait for permit + log::error!( + "Stream {} seems to be lagging for {} message type, stopping the laggy client", + id, + message_type + ); + connection.close(VarInt::from_u32(0), b"laggy client"); + } - } + }; }); } } diff --git a/common/src/quic/mod.rs b/common/src/quic/mod.rs index eea63ec..179f8e9 100644 --- a/common/src/quic/mod.rs +++ b/common/src/quic/mod.rs @@ -4,4 +4,4 @@ pub mod connection_manager; pub mod quic_server; pub mod quinn_reciever; pub mod quinn_sender; -pub mod skip_verification; +pub mod skip_verification; \ No newline at end of file diff --git a/common/src/quic/quinn_reciever.rs b/common/src/quic/quinn_reciever.rs index 0530d21..3663d1e 100644 --- a/common/src/quic/quinn_reciever.rs +++ b/common/src/quic/quinn_reciever.rs @@ -107,7 +107,7 @@ mod tests { let connection = connecting.await.unwrap(); let send_stream = connection.open_uni().await.unwrap(); - send_message(send_stream, message).await.unwrap(); + send_message(send_stream, &message).await.unwrap(); jh.await.unwrap(); } @@ -143,7 +143,7 @@ mod tests { .unwrap(); let connection = connecting.await.unwrap(); let send_stream = connection.open_uni().await.unwrap(); - send_message(send_stream, sent_message).await.unwrap(); + send_message(send_stream, &sent_message).await.unwrap(); }) }; diff --git a/common/src/quic/quinn_sender.rs b/common/src/quic/quinn_sender.rs index acc336b..9d6fcce 100644 --- a/common/src/quic/quinn_sender.rs +++ b/common/src/quic/quinn_sender.rs @@ -2,7 +2,7 @@ use quinn::SendStream; use crate::message::Message; -pub fn convert_to_binary(message: Message) -> anyhow::Result> { +pub fn convert_to_binary(message: &Message) -> anyhow::Result> { let mut binary = bincode::serialize(&message)?; let size = binary.len() as u64; // prepend size to the binary object @@ -11,7 +11,7 @@ pub fn convert_to_binary(message: Message) -> anyhow::Result> { Ok(binary) } -pub async fn send_message(mut send_stream: SendStream, message: Message) -> anyhow::Result<()> { +pub async fn send_message(mut send_stream: SendStream, message: &Message) -> anyhow::Result<()> { let binary = convert_to_binary(message)?; send_stream.write_all(&binary).await?; send_stream.finish().await?; diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 103f153..1337604 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -50,9 +50,16 @@ impl GeyserPlugin for QuicGeyserPlugin { GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) })?; +<<<<<<< HEAD let quic_server = QuicServer::new(runtime, config.quic_plugin).map_err(|_| { GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) })?; +======= + let quic_server = + QuicServer::new(runtime, config.quic_plugin).map_err(|_| { + GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) + })?; +>>>>>>> 3611f81 (Making many changes at once (not working)) self.quic_server = Some(quic_server); Ok(()) diff --git a/tester/src/main.rs b/tester/src/main.rs index 7560187..123b320 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -12,7 +12,15 @@ use quic_geyser_common::{ types::connections_parameters::ConnectionParameters, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; +<<<<<<< HEAD use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; +======= +use solana_sdk::{ + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signature::Signature, +}; +>>>>>>> 3611f81 (Making many changes at once (not working)) use tokio::pin; pub mod cli;