Skip to content

Commit

Permalink
Making many changes at once (not working)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed May 17, 2024
1 parent f39b2f3 commit 3611f81
Show file tree
Hide file tree
Showing 24 changed files with 275 additions and 633 deletions.
595 changes: 136 additions & 459 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[workspace]
resolver = "2"

members = [
"plugin",
"client",
"common",
"tester",
]

resolver = "2"

[workspace.package]
version = "0.1.0"
authors = ["gmgalactus <[email protected]>"]
Expand All @@ -19,8 +19,6 @@ edition = "2021"
tokio = "1.37.0"
solana-sdk = "=1.17.31"
agave-geyser-plugin-interface = "=1.17.31"
solana-net-utils = "=1.17.31"
solana-streamer = "=1.17.31"
solana-transaction-status = "=1.17.31"
solana-logger = "=1.17.31"

Expand All @@ -42,11 +40,11 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.16"
chrono = "0.4.24"
native-tls = "0.2.11"
quinn = "0.11.0"
rustls = "=0.20.8"
quinn = "0.10.2"
quinn-proto = "0.10.5"
rustls = "0.21.7"
rcgen = "0.10.0"
pkcs8 = "0.8.0"
pem = "1.1.1"
lz4 = "1.24.0"
async-stream = "0.3.5"

Expand Down
14 changes: 7 additions & 7 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
quinn = { workspace = "true" }
solana-sdk = { workspace = "true" }
anyhow = { workspace = "true" }
futures = { workspace = "true" }
async-stream = { workspace = "true" }
tokio = { workspace = "true" }
log = { workspace = "true" }
quinn = { workspace = true }
solana-sdk = { workspace = true }
anyhow = { workspace = true }
futures = { workspace = true }
async-stream = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }

quic-geyser-common = { path = "../common" }
23 changes: 9 additions & 14 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use quic_geyser_common::quic::quinn_reciever::recv_message;
use quic_geyser_common::quic::quinn_sender::send_message;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use quinn::{Connection, ConnectionError};
use solana_sdk::signature::Keypair;

pub struct Client {
pub address: String,
Expand All @@ -18,18 +17,17 @@ pub struct Client {
impl Client {
pub async fn new(
server_address: String,
identity: &Keypair,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<Client> {
let endpoint =
configure_client(identity, connection_parameters.max_number_of_streams).await?;
configure_client(connection_parameters.max_number_of_streams).await?;
let socket_addr = SocketAddr::from_str(&server_address)?;
let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
let connection = connecting.await?;
let send_stream = connection.open_uni().await?;
send_message(
send_stream,
Message::ConnectionParameters(connection_parameters),
&Message::ConnectionParameters(connection_parameters),
)
.await?;

Expand All @@ -41,11 +39,11 @@ impl Client {

pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let send_stream = self.connection.open_uni().await?;
send_message(send_stream, Message::Filters(filters)).await?;
send_message(send_stream, &Message::Filters(filters)).await?;
Ok(())
}

pub fn get_stream(&self) -> impl Stream<Item = Message> {
pub fn create_stream(&self) -> impl Stream<Item = Message> {
let connection = self.connection.clone();
let (sender, mut reciever) = tokio::sync::mpsc::unbounded_channel::<Message>();
tokio::spawn(async move {
Expand Down Expand Up @@ -90,7 +88,7 @@ impl Client {
#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr, UdpSocket},
net::UdpSocket,
sync::Arc,
};

Expand All @@ -102,16 +100,14 @@ mod tests {
types::{account::Account, connections_parameters::ConnectionParameters},
};
use quinn::{Endpoint, EndpointConfig, TokioRuntime};
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
use solana_sdk::pubkey::Pubkey;
use tokio::{pin, sync::Notify};

use crate::client::Client;

#[tokio::test]
pub async fn test_client() {
let (config, _) = configure_server(
&Keypair::new(),
IpAddr::V4(Ipv4Addr::LOCALHOST),
let config = configure_server(
1,
100000,
1,
Expand Down Expand Up @@ -148,7 +144,7 @@ mod tests {
notify_server_start.notify_one();
notify_subscription.notified().await;
for msg in msgs {
connection_manager.dispach(msg, 10).await;
connection_manager.dispatch(msg, 10).await;
}
});
}
Expand All @@ -158,7 +154,6 @@ mod tests {

let client = Client::new(
url,
&Keypair::new(),
ConnectionParameters {
max_number_of_streams: 3,
streams_for_slot_data: 1,
Expand All @@ -177,7 +172,7 @@ mod tests {

notify_subscription.notify_one();

let stream = client.get_stream();
let stream = client.create_stream();
pin!(stream);
for _ in 0..5 {
let msg = stream.next().await.unwrap();
Expand Down
28 changes: 14 additions & 14 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ version = "0.1.0"
edition = "2021"

[dependencies]
solana-sdk = { workspace = "true" }
solana-streamer = { workspace = "true" }
solana-net-utils = { workspace = "true" }
solana-transaction-status = { workspace = "true" }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }

serde = { workspace = "true" }
bincode = { workspace = "true" }
lz4 = { workspace = "true" }
quinn = { workspace = "true" }
rustls = { workspace = "true", default-features = false }
pem = { workspace = "true" }
anyhow = { workspace = "true" }
tokio = { workspace = "true" }
log = { workspace = "true" }
thiserror = {workspace = "true"}
serde = { workspace = true }
bincode = { workspace = true }
lz4 = { workspace = true }
quinn = { workspace = true }
quinn-proto = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration", "quic"] }
rcgen = { workspace = true }
pkcs8 = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}

[dev-dependencies]
1 change: 1 addition & 0 deletions common/src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[repr(C)]
pub enum CompressionType {
None,
Lz4Fast(u32),
Expand Down
1 change: 1 addition & 0 deletions common/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use solana_sdk::{pubkey::Pubkey, signature::Signature};
use crate::message::Message;

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum Filter {
Account(AccountFilter),
Slot,
Expand Down
1 change: 1 addition & 0 deletions common/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum Message {
AccountMsg(Account),
SlotMsg(SlotMeta),
Expand Down
30 changes: 11 additions & 19 deletions common/src/quic/configure_client.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
use std::{
net::{IpAddr, Ipv4Addr},
net::UdpSocket,
sync::Arc,
time::Duration,
};

use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
use solana_sdk::signature::Keypair;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;

use crate::quic::{
configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification,
};

pub const DEFAULT_MAX_STREAMS: u32 = 16384;
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 24;
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 1000;
pub const DEFAULT_MAX_STREAMS: u32 = 32768;
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 128;
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 8192;
pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 =
DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS;

pub fn create_client_endpoint(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
maximum_streams: u32,
) -> Endpoint {
const DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 64 * 1024 * 1024;
Expand All @@ -31,10 +27,7 @@ pub fn create_client_endpoint(
const MINIMUM_MAXIMUM_TRANSMISSION_UNIT: u16 = 2000;

let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
.expect("create_endpoint bind_in_range")
.1;
let client_socket = UdpSocket::bind("0.0.0.0:0").expect("Client socket should be binded");
let mut config = EndpointConfig::default();
config
.max_udp_payload_size(MINIMUM_MAXIMUM_TRANSMISSION_UNIT)
Expand All @@ -43,11 +36,15 @@ pub fn create_client_endpoint(
.expect("create_endpoint quinn::Endpoint::new")
};

let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser_client".into()]).unwrap();
let key = rustls::PrivateKey(cert.serialize_private_key_der());
let cert = rustls::Certificate(cert.serialize_der().unwrap());

let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(ClientSkipServerVerification {}))
.with_client_auth_cert(vec![certificate], key)
.unwrap();
.with_client_auth_cert(vec![cert], key).expect("Should create client config");

crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];

Expand All @@ -73,14 +70,9 @@ pub fn create_client_endpoint(
}

pub async fn configure_client(
identity: &Keypair,
maximum_concurrent_streams: u32,
) -> anyhow::Result<Endpoint> {
let (certificate, key) =
new_self_signed_tls_certificate(identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))?;
Ok(create_client_endpoint(
certificate,
key,
maximum_concurrent_streams,
))
}
33 changes: 7 additions & 26 deletions common/src/quic/configure_server.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
use std::{net::IpAddr, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use pem::Pem;
use quinn::{IdleTimeout, ServerConfig};
use solana_sdk::{pubkey::Pubkey, signature::Keypair};
use solana_streamer::{
quic::QuicServerError,
tls_certificates::{get_pubkey_from_tls_certificate, new_self_signed_tls_certificate},
};

use super::skip_verification::ServerSkipClientVerification;

pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"quic_geyser_plugin";

pub fn configure_server(
identity_keypair: &Keypair,
host: IpAddr,
max_concurrent_streams: u32,
recieve_window_size: u32,
connection_timeout: u64,
) -> Result<(ServerConfig, String), QuicServerError> {
let (cert, priv_key) = new_self_signed_tls_certificate(identity_keypair, host)?;
let cert_chain_pem_parts = vec![Pem::new("CERTIFICATE", cert.0.clone())];

let cert_chain_pem = pem::encode_many(&cert_chain_pem_parts);
) -> anyhow::Result<ServerConfig> {
let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser_server".into()]).unwrap();
let key = rustls::PrivateKey(cert.serialize_private_key_der());
let cert = rustls::Certificate(cert.serialize_der().unwrap());

let mut server_tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(ServerSkipClientVerification::new())
.with_single_cert(vec![cert], priv_key)?;
.with_single_cert(vec![cert], key)?;
server_tls_config.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];

let mut server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
Expand All @@ -47,16 +38,6 @@ pub fn configure_server(
config.max_concurrent_bidi_streams(0u32.into());
config.datagram_receive_buffer_size(None);

Ok((server_config, cert_chain_pem))
Ok(server_config)
}

pub fn get_remote_pubkey(connection: &quinn::Connection) -> Option<Pubkey> {
// Use the client cert only if it is self signed and the chain length is 1.
connection
.peer_identity()?
.downcast::<Vec<rustls::Certificate>>()
.ok()
.filter(|certs| certs.len() == 1)?
.first()
.and_then(get_pubkey_from_tls_certificate)
}
Loading

0 comments on commit 3611f81

Please sign in to comment.