Skip to content

Commit

Permalink
Making many changes at once (not working)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed May 17, 2024
1 parent f1008f4 commit 9cf7e26
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 45 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[workspace]
resolver = "2"

members = [
"plugin",
"client",
"common",
"tester",
]

resolver = "2"

[workspace.package]
version = "0.1.0"
authors = ["gmgalactus <[email protected]>"]
Expand Down Expand Up @@ -40,7 +40,12 @@ tracing-subscriber = "0.3.16"
chrono = "0.4.24"
native-tls = "0.2.11"
quinn = "0.10.2"
<<<<<<< HEAD
rustls = "=0.21.7"
=======
quinn-proto = "0.10.5"
rustls = "0.21.7"
>>>>>>> 3611f81 (Making many changes at once (not working))
rcgen = "0.10.0"
pkcs8 = "0.8.0"
lz4 = "1.24.0"
Expand Down
4 changes: 2 additions & 2 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Client {
let send_stream = connection.open_uni().await?;
send_message(
send_stream,
Message::ConnectionParameters(connection_parameters),
&Message::ConnectionParameters(connection_parameters),
)
.await?;

Expand All @@ -38,7 +38,7 @@ impl Client {

pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let send_stream = self.connection.open_uni().await?;
send_message(send_stream, Message::Filters(filters)).await?;
send_message(send_stream, &Message::Filters(filters)).await?;
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ serde = { workspace = true }
bincode = { workspace = true }
lz4 = { workspace = true }
quinn = { workspace = true }
<<<<<<< HEAD
rustls = { workspace = true, default-features = false }
=======
quinn-proto = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration", "quic"] }
>>>>>>> 3611f81 (Making many changes at once (not working))
rcgen = { workspace = true }
pkcs8 = { workspace = true }
anyhow = { workspace = true }
Expand Down
71 changes: 35 additions & 36 deletions common/src/quic/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use quinn::{Connection, Endpoint};
use quinn::{Connection, Endpoint, VarInt};
use std::sync::Arc;
use std::{collections::VecDeque, time::Duration};
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -216,50 +216,49 @@ impl ConnectionManager {
let id = connection_data.id;

tokio::spawn(async move {
let permit_result = semaphore.clone().try_acquire_owned();
let permit_result = semaphore.try_acquire_owned();

let _permit = match permit_result {
Ok(permit) => permit,
Err(_) => {
// all permits are taken wait log warning and wait for permit
log::warn!(
"Stream {} seems to be lagging for {} message type",
id,
message_type
);
semaphore
.acquire_owned()
.await
.expect("Should aquire the permit")
}
};

for _ in 0..retry_count {
let send_stream = connection.open_uni().await;
match send_stream {
Ok(send_stream) => {
match send_message(send_stream, message.clone()).await {
Ok(_) => {
log::debug!("Message sucessfully sent");
break;
match permit_result {
Ok(permit) => {
let _permit = permit;
for _ in 0..retry_count {
let send_stream = connection.open_uni().await;
match send_stream {
Ok(send_stream) => {
match send_message(send_stream, &message).await {
Ok(_) => {
log::debug!("Message sucessfully sent");
break;
}
Err(e) => {
log::error!(
"error dispatching message and sending data : {}",
e
)
}
}
}
Err(e) => {
log::error!(
"error dispatching message and sending data : {}",
"error dispatching message while creating stream : {}",
e
)
);
break;
}
}
}
Err(e) => {
log::error!(
"error dispatching message while creating stream : {}",
e
);
break;
}
},
Err(_) => {
// all permits are taken wait log warning and wait for permit
log::error!(
"Stream {} seems to be lagging for {} message type, stopping the laggy client",
id,
message_type
);
connection.close(VarInt::from_u32(0), b"laggy client");

}
}
};
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/src/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub mod connection_manager;
pub mod quic_server;
pub mod quinn_reciever;
pub mod quinn_sender;
pub mod skip_verification;
pub mod skip_verification;
4 changes: 2 additions & 2 deletions common/src/quic/quinn_reciever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ mod tests {
let connection = connecting.await.unwrap();

let send_stream = connection.open_uni().await.unwrap();
send_message(send_stream, message).await.unwrap();
send_message(send_stream, &message).await.unwrap();
jh.await.unwrap();
}

Expand Down Expand Up @@ -143,7 +143,7 @@ mod tests {
.unwrap();
let connection = connecting.await.unwrap();
let send_stream = connection.open_uni().await.unwrap();
send_message(send_stream, sent_message).await.unwrap();
send_message(send_stream, &sent_message).await.unwrap();
})
};

Expand Down
4 changes: 2 additions & 2 deletions common/src/quic/quinn_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use quinn::SendStream;

use crate::message::Message;

pub fn convert_to_binary(message: Message) -> anyhow::Result<Vec<u8>> {
pub fn convert_to_binary(message: &Message) -> anyhow::Result<Vec<u8>> {
let mut binary = bincode::serialize(&message)?;
let size = binary.len() as u64;
// prepend size to the binary object
Expand All @@ -11,7 +11,7 @@ pub fn convert_to_binary(message: Message) -> anyhow::Result<Vec<u8>> {
Ok(binary)
}

pub async fn send_message(mut send_stream: SendStream, message: Message) -> anyhow::Result<()> {
pub async fn send_message(mut send_stream: SendStream, message: &Message) -> anyhow::Result<()> {
let binary = convert_to_binary(message)?;
send_stream.write_all(&binary).await?;
send_stream.finish().await?;
Expand Down
7 changes: 7 additions & 0 deletions plugin/src/quic_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ impl GeyserPlugin for QuicGeyserPlugin {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;

<<<<<<< HEAD
let quic_server = QuicServer::new(runtime, config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
=======
let quic_server =
QuicServer::new(runtime, config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
>>>>>>> 3611f81 (Making many changes at once (not working))
self.quic_server = Some(quic_server);

Ok(())
Expand Down
8 changes: 8 additions & 0 deletions tester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@ use quic_geyser_common::{
types::connections_parameters::ConnectionParameters,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
<<<<<<< HEAD
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature};
=======
use solana_sdk::{
commitment_config::CommitmentConfig,
pubkey::Pubkey,
signature::Signature,
};
>>>>>>> 3611f81 (Making many changes at once (not working))
use tokio::pin;

pub mod cli;
Expand Down

0 comments on commit 9cf7e26

Please sign in to comment.