Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: use gm tpuconnectionmanager for proxy #218

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

112 changes: 112 additions & 0 deletions core/src/atomic_timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

// ported from solana_sdk timing.rs

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use solana_sdk::unchecked_div_by_const;

pub fn duration_as_ms(d: &Duration) -> u64 {
d.as_secs()
.saturating_mul(1000)
.saturating_add(unchecked_div_by_const!(
u64::from(d.subsec_nanos()),
1_000_000
))
}

// milliseconds since unix epoch start
fn epoch_now_ms() -> u64 {
let since_epoch_start = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("create timestamp in timing");
duration_as_ms(&since_epoch_start)
}

#[derive(Debug)]
pub struct AtomicTiming {
// note: 0 is interpreted as "not updated yet"
last_update: AtomicU64,
}

impl Default for AtomicTiming {
/// initialize with "0" (start of unix epoch)
// 0 is magic value
fn default() -> Self {
Self {
last_update: AtomicU64::new(0),
}
}
}

impl AtomicTiming {
// initialize with "fired now"
pub fn new() -> Self {
Self {
last_update: AtomicU64::new(epoch_now_ms()),
}
}
}

impl AtomicTiming {
/// true if 'interval_time_ms' has elapsed since last time we returned true as long as it has been 'interval_time_ms' since this struct was created
pub fn should_update(&self, interval_time_ms: u64) -> bool {
self.should_update_ext(interval_time_ms, true)
}

pub fn update(&self) {
let now = epoch_now_ms();
self.last_update.store(now, Ordering::Relaxed);
}

/// a primary use case is periodic metric reporting, potentially from different threads
/// true if 'interval_time_ms' has elapsed since last time we returned true
/// except, if skip_first=false, false until 'interval_time_ms' has elapsed since this struct was created
pub fn should_update_ext(&self, interval_time_ms: u64, skip_first: bool) -> bool {
let now = epoch_now_ms();
let last = self.last_update.load(Ordering::Relaxed);

if now.saturating_sub(last) <= interval_time_ms {
return false;
}

if skip_first && last == 0 {
return false;
}

if self
.last_update
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
!= Ok(last)
{
// concurrent update
return false;
}

return true;
}

/// return ms elapsed since the last time the time was set
pub fn elapsed_ms(&self) -> u64 {
let now = epoch_now_ms();
let last = self.last_update.load(Ordering::Relaxed);
now.saturating_sub(last)
}

/// return ms elapsed since the last time the time was set
pub fn elapsed(&self) -> Duration {
let elapsed_ms = self.elapsed_ms();
Duration::from_millis(elapsed_ms)
}

/// return ms until the interval_time will have elapsed
pub fn remaining_until_next_interval(&self, interval_time_ms: u64) -> u64 {
interval_time_ms.saturating_sub(self.elapsed_ms())
}
}


#[test]
fn default() {
// note: race condition - this calls now() twice
assert_eq!(AtomicTiming::default().elapsed_ms(), epoch_now_ms());
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub mod stores;
pub mod structures;
pub mod traits;
pub mod types;
pub mod atomic_timing;

pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
2 changes: 1 addition & 1 deletion core/src/structures/proxy_request_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TpuForwardingRequest t9 {} tpu nodes",
"TpuForwardingRequest to {} tpu nodes",
&self.tpu_nodes.len(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ pub fn with_1000_transactions_direct() {
}

// note: this tests are flakes on CI ond also local (see https://mangolana.atlassian.net/browse/MAN-59)
#[ignore]
#[test]
pub fn bench_proxy() {
configure_logging(true);
Expand Down Expand Up @@ -311,7 +310,7 @@ fn wireup_and_send_txs_via_channel(test_case_params: TestCaseParams) {
count_map.insert_or_increment(*tx.get_signature());
}

if packet_count == warmup_tx_count {
if timer2.is_none() && packet_count >= warmup_tx_count {
timer2 = Some(Instant::now());
}
} // -- while not all packets received - by count
Expand Down
2 changes: 2 additions & 0 deletions quic-forward-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ publish = false

[dependencies]
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down Expand Up @@ -40,6 +41,7 @@ async-trait = { workspace = true }
futures = { workspace = true }
chrono = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-util = "0.7.8"
rcgen = "0.9.3"
spl-memo = "3.0.1"

35 changes: 13 additions & 22 deletions quic-forward-proxy/src/inbound/proxy_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_util::connection_stats;
use crate::shared::ForwardPacket;
use crate::tls_config_provider_server::ProxyTlsConfigProvider;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::FALLBACK_TIMEOUT;
Expand Down Expand Up @@ -33,7 +32,7 @@ impl ProxyListener {
}
}

pub async fn listen(&self, forwarder_channel: &Sender<ForwardPacket>) -> anyhow::Result<()> {
pub async fn listen(&self, forwarder_channel: &Sender<TpuForwardingRequest>) -> anyhow::Result<()> {
info!(
"TPU Quic Proxy server listening on {}",
self.proxy_listener_addr
Expand Down Expand Up @@ -88,7 +87,7 @@ impl ProxyListener {
#[tracing::instrument(skip_all, level = "debug")]
async fn handle_client_connection(
client_conn_handshake: Connecting,
forwarder_channel: Sender<ForwardPacket>,
forwarder_channel: Sender<TpuForwardingRequest>,
) -> anyhow::Result<()> {
let client_connection = client_conn_handshake.await.context("handshake")?;

Expand Down Expand Up @@ -117,35 +116,27 @@ impl ProxyListener {
trace!("proxy request details: {}", proxy_request);
let txs = proxy_request.get_transaction_bytes();


debug!(
"enqueue transaction batch of size {} to {} tpu nodes",
txs.len(),
proxy_request.get_tpu_nodes().len(),
);
if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity()
{
if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity() {
debug!(
"forward channel buffered: capacity {} of {}",
forwarder_channel_copy.capacity(),
"forward channel buffered: {} packets",
forwarder_channel_copy.max_capacity()
- forwarder_channel_copy.capacity(),
);
}

for tpu_node in proxy_request.get_tpu_nodes() {
let tpu_address = tpu_node.tpu_socket_addr;
forwarder_channel_copy
.send_timeout(
ForwardPacket::new(
txs.clone(),
tpu_address,
proxy_request.get_hash(),
),
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
}
forwarder_channel_copy
.send_timeout(proxy_request,
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
});

debug!(
Expand Down
2 changes: 0 additions & 2 deletions quic-forward-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ mod outbound;
pub mod proxy;
pub mod proxy_request_format;
mod quic_util;
mod quinn_auto_reconnect;
mod shared;
pub mod tls_config_provider_client;
pub mod tls_config_provider_server;
pub mod tls_self_signed_pair_generator;
Expand Down
2 changes: 0 additions & 2 deletions quic-forward-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ mod outbound;
pub mod proxy;
pub mod proxy_request_format;
pub mod quic_util;
mod quinn_auto_reconnect;
mod shared;
pub mod tls_config_provider_client;
pub mod tls_config_provider_server;
pub mod tls_self_signed_pair_generator;
Expand Down
2 changes: 2 additions & 0 deletions quic-forward-proxy/src/outbound/debouncer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(dead_code)]

use log::trace;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
Expand Down
4 changes: 2 additions & 2 deletions quic-forward-proxy/src/outbound/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod debouncer;
mod sharder;
pub mod tx_forward;
pub mod ng_forward;
mod tpu_connection_manager;
96 changes: 96 additions & 0 deletions quic-forward-proxy/src/outbound/ng_forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use crate::validator_identity::ValidatorIdentity;
use anyhow::{bail};
use solana_streamer::nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration};
use log::info;
use solana_sdk::pubkey::Pubkey;
use tokio::sync::mpsc::Receiver;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData;
use crate::outbound::tpu_connection_manager::{TpuConnectionManager};
use crate::proxy_request_format::TpuForwardingRequest;



const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters {
connection_timeout: Duration::from_secs(2),
connection_retry_count: 10,
finalize_timeout: Duration::from_secs(2),
max_number_of_connections: 8,
unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
};



pub async fn ng_forwarder(
validator_identity: ValidatorIdentity,
mut transaction_channel: Receiver<TpuForwardingRequest>,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {

// TODO
let fanout_slots = 4;

let (certificate, key) = new_self_signed_tls_certificate(
&validator_identity.get_keypair_for_tls(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC connection certificates");

// TODO make copy of TpuConnectionManager in proxy crate an strip unused features
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;

// TODO remove
let identity_stakes = IdentityStakesData {
peer_type: ConnectionPeerType::Staked,
stakes: 30,
min_stakes: 0,
max_stakes: 40,
total_stakes: 100,
};

let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
);

loop {
if exit_signal.load(Ordering::Relaxed) {
bail!("exit signal received");
}

let forward_packet =
transaction_channel
.recv()
.await
.expect("channel closed unexpectedly");

let mut requested_connections: HashMap<Pubkey, SocketAddr> = HashMap::new();
for tpu_node in forward_packet.get_tpu_nodes() {
requested_connections.insert(tpu_node.identity_tpunode, tpu_node.tpu_socket_addr);
}

tpu_connection_manager
.update_connections(
&requested_connections,
max_uni_stream_connections,
QUIC_CONNECTION_PARAMS, // TODO improve
)
.await;

for raw_tx in forward_packet.get_transaction_bytes() {
tpu_connection_manager.send_transaction(raw_tx);
}

} // all txs in packet

}
Loading