Skip to content

Commit

Permalink
Async copy, poll every 0.1 seconds, fixes massive latency issue
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 11, 2024
1 parent 4af6155 commit 67a4e66
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 61 deletions.
3 changes: 3 additions & 0 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rust_decimal = { version = "1", default-features = false, features = [
] }
ssh2 = "0.9"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" }
tokio.workspace = true
tracing = "0.1"
pgwire = { version = "0.26", default-features = false, features = [
"scram",
Expand Down
2 changes: 1 addition & 1 deletion nexus/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pt = { path = "../pt" }
refinery = { version = "0.8", default-features = false, features = ["tokio-postgres"] }
serde_json = "1.0"
sqlparser.workspace = true
tokio = { version = "1.13.0", features = ["full"] }
tokio.workspace = true
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
Expand Down
2 changes: 1 addition & 1 deletion nexus/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ pgwire.workspace = true
pt = { path = "../pt" }
rand = "0.8"
sqlparser.workspace = true
tokio = { version = "1", features = ["full"] }
tokio.workspace = true
tracing.workspace = true
2 changes: 1 addition & 1 deletion nexus/peer-bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ serde_json = "1.0"
serde_bytes = "0.11"
sqlparser.workspace = true
tracing.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio.workspace = true
gcp-bigquery-client = "0.24"
uuid = { version = "1.0", features = ["serde", "v4"] }
value = { path = "../value" }
2 changes: 1 addition & 1 deletion nexus/peer-connections/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0"
chrono.workspace = true
deadpool-postgres = { version = "0.14", features = ["rt_tokio_1"] }
tokio = { version = "1", features = ["full"] }
tokio.workspace = true
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-cursor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ dashmap.workspace = true
futures = "0.3"
pgwire.workspace = true
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio.workspace = true
tracing.workspace = true
value = { path = "../value" }
2 changes: 1 addition & 1 deletion nexus/peer-mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ serde_json = "1.0"
serde_bytes = "0.11"
sqlparser.workspace = true
tracing.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio.workspace = true
tokio-stream = "0.1"
value = { path = "../value" }
2 changes: 1 addition & 1 deletion nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ serde_json = "1.0"
serde_bytes = "0.11"
ssh2.workspace = true
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio.workspace = true
tokio-postgres = { version = "0.7.6", features = [
"with-chrono-0_4",
"with-serde_json-1",
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
sqlparser.workspace = true
tokio = { version = "1.21", features = ["full"] }
tokio.workspace = true
tracing.workspace = true
ureq = { version = "2", features = ["json", "charset"] }
value = { path = "../value" }
4 changes: 3 additions & 1 deletion nexus/postgres-connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ edition = "2021"

[dependencies]
anyhow = "1"
futures-util = { version = "0.3", default-features = false, features = ["io"] }
pt = { path = "../pt" }
rustls = { version = "0.23", default-features = false, features = ["ring"] }
ssh2.workspace = true
tokio.workspace = true
tokio-postgres = "0.7.2"
tokio-postgres-rustls = "0.13"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }
tokio-stream = "0.1"
tracing.workspace = true
urlencoding = "2"
76 changes: 25 additions & 51 deletions nexus/postgres-connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use pt::peerdb_peers::{PostgresConfig, SshConfig};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, SignatureScheme};
use std::fmt::Write;
use std::os::unix::net::UnixStream;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::io;
use std::sync::Arc;
use tokio::net::UnixStream;
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_util::compat::FuturesAsyncReadCompatExt;

#[derive(Copy, Clone, Debug)]
struct NoCertificateVerification;
Expand Down Expand Up @@ -81,17 +80,15 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String {
connection_string
}

// from https://github.com/alexcrichton/ssh2-rs/issues/218#issuecomment-1698814611
pub fn create_tunnel(
pub async fn create_tunnel(
tcp: std::net::TcpStream,
ssh_config: &SshConfig,
remote_server: String,
remote_port: u16,
) -> std::io::Result<(ssh2::Session, UnixStream)> {
) -> io::Result<(ssh2::Session, UnixStream)> {
let mut session = ssh2::Session::new()?;
session.set_tcp_stream(tcp);
session.set_compress(true);
session.set_timeout(15000);
session.handshake()?;
if !ssh_config.password.is_empty() {
session.userauth_password(&ssh_config.user, &ssh_config.password)?;
Expand All @@ -103,54 +100,33 @@ pub fn create_tunnel(
let mut known_hosts = session.known_hosts()?;
known_hosts.read_str(&ssh_config.host_key, ssh2::KnownHostFileKind::OpenSSH)?;
}
let (stream1, stream2) = UnixStream::pair()?;
let (mut stream1, stream2) = tokio::net::UnixStream::pair()?;
let channel = session.channel_direct_tcpip(remote_server.as_str(), remote_port, None)?;
tracing::info!(
"tunnel to {:}:{:} opened",
remote_server.as_str(),
remote_port
);

tokio::task::spawn_blocking(move || {
let closed = Arc::new(AtomicBool::new(false));
let mut reader_stream = stream1;
let mut writer_stream = reader_stream.try_clone().unwrap();

let mut writer_channel = channel.stream(0); //open two streams on the same channel, so we can read and write separately
let mut reader_channel = channel.stream(0);

//pipe stream output into channel
let write_closed = closed.clone();
tokio::task::spawn_blocking(move || loop {
match std::io::copy(&mut reader_stream, &mut writer_channel) {
Ok(_) => (),
Err(err) => {
tracing::info!("failed to write to channel, reason: {:?}", err);
session.set_blocking(false);
tokio::spawn(async move {
let mut channel_stream = futures_util::io::AllowStdIo::new(channel.stream(0)).compat();
loop {
if let Err(err) = tokio::io::copy_bidirectional(&mut stream1, &mut channel_stream).await
{
if err.kind() == io::ErrorKind::WouldBlock {
tokio::time::sleep(std::time::Duration::new(0, 123456789)).await;
continue;
}
tracing::error!(
"tunnel to {:}:{:} failed: {:}",
remote_server.as_str(),
remote_port,
err
);
}
if write_closed.load(Ordering::SeqCst) {
break;
}
});

//pipe channel output into stream
let read_closed = closed.clone();
tokio::task::spawn_blocking(move || loop {
match std::io::copy(&mut reader_channel, &mut writer_stream) {
Ok(_) => (),
Err(err) => {
tracing::info!("failed to read from channel, reason: {:?}", err);
}
}
if read_closed.load(Ordering::SeqCst) {
break;
}
});
tracing::info!(
"tunnel to {:}:{:} closed",
remote_server.as_str(),
remote_port
);
break;
}
});

Ok((session, stream2))
Expand All @@ -163,9 +139,7 @@ pub async fn connect_postgres(
let tcp = std::net::TcpStream::connect((ssh_config.host.as_str(), ssh_config.port as u16))?;
tcp.set_nodelay(true)?;
let (session, stream) =
create_tunnel(tcp, ssh_config, config.host.clone(), config.port as u16)?;
stream.set_nonblocking(true)?;
let stream = tokio::net::UnixStream::from_std(stream)?;
create_tunnel(tcp, ssh_config, config.host.clone(), config.port as u16).await?;
let (client, connection) = tokio_postgres::Config::default()
.user(&config.user)
.password(&config.password)
Expand Down
2 changes: 1 addition & 1 deletion nexus/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sqlparser = { workspace = true, features = ["visitor"] }
serde_json = "1.0"
rand = "0.8"
time = "0.3"
tokio = { version = "1", features = ["full"] }
tokio.workspace = true
tracing.workspace = true
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down

0 comments on commit 67a4e66

Please sign in to comment.