From 67a4e666199c3b667bbedd73bd52435ba728d777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 11 Nov 2024 15:59:25 +0000 Subject: [PATCH] Async copy, poll every 0.1 seconds, fixes massive latency issue --- nexus/Cargo.lock | 3 ++ nexus/Cargo.toml | 1 + nexus/catalog/Cargo.toml | 2 +- nexus/parser/Cargo.toml | 2 +- nexus/peer-bigquery/Cargo.toml | 2 +- nexus/peer-connections/Cargo.toml | 2 +- nexus/peer-cursor/Cargo.toml | 2 +- nexus/peer-mysql/Cargo.toml | 2 +- nexus/peer-postgres/Cargo.toml | 2 +- nexus/peer-snowflake/Cargo.toml | 2 +- nexus/postgres-connection/Cargo.toml | 4 +- nexus/postgres-connection/src/lib.rs | 76 +++++++++------------------- nexus/server/Cargo.toml | 2 +- 13 files changed, 41 insertions(+), 61 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 5579d89b6..f6462cbdf 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -3042,6 +3042,7 @@ name = "postgres-connection" version = "0.1.0" dependencies = [ "anyhow", + "futures-util", "pt", "rustls 0.23.16", "ssh2", @@ -3049,6 +3050,7 @@ dependencies = [ "tokio-postgres", "tokio-postgres-rustls", "tokio-stream", + "tokio-util", "tracing", "urlencoding", ] @@ -4467,6 +4469,7 @@ checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index ac33cbe67..72dc94a26 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -30,6 +30,7 @@ rust_decimal = { version = "1", default-features = false, features = [ ] } ssh2 = "0.9" sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" } +tokio.workspace = true tracing = "0.1" pgwire = { version = "0.26", default-features = false, features = [ "scram", diff --git a/nexus/catalog/Cargo.toml b/nexus/catalog/Cargo.toml index 162df1da2..af7b79963 100644 --- a/nexus/catalog/Cargo.toml +++ b/nexus/catalog/Cargo.toml @@ -18,7 +18,7 @@ pt = { path = "../pt" } refinery = { version = "0.8", default-features = false, features = ["tokio-postgres"] } serde_json = "1.0" sqlparser.workspace = true -tokio = { version = "1.13.0", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/parser/Cargo.toml b/nexus/parser/Cargo.toml index b6aac7d88..45bdc558c 100644 --- a/nexus/parser/Cargo.toml +++ b/nexus/parser/Cargo.toml @@ -14,5 +14,5 @@ pgwire.workspace = true pt = { path = "../pt" } rand = "0.8" sqlparser.workspace = true -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tracing.workspace = true diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index c10964568..c3cf3c18b 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -21,7 +21,7 @@ serde_json = "1.0" serde_bytes = "0.11" sqlparser.workspace = true tracing.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true gcp-bigquery-client = "0.24" uuid = { version = "1.0", features = ["serde", "v4"] } value = { path = "../value" } diff --git a/nexus/peer-connections/Cargo.toml b/nexus/peer-connections/Cargo.toml index 8aa69c7f0..01b172c74 100644 --- a/nexus/peer-connections/Cargo.toml +++ b/nexus/peer-connections/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" anyhow = "1.0" chrono.workspace = true deadpool-postgres = { version = "0.14", features = ["rt_tokio_1"] } -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index 74a2fe9de..3db4eecba 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -12,6 +12,6 @@ dashmap.workspace = true futures = "0.3" pgwire.workspace = true sqlparser.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tracing.workspace = true value = { path = "../value" } diff --git a/nexus/peer-mysql/Cargo.toml b/nexus/peer-mysql/Cargo.toml index 2fe32d845..a6fed50b8 100644 --- a/nexus/peer-mysql/Cargo.toml +++ b/nexus/peer-mysql/Cargo.toml @@ -22,6 +22,6 @@ serde_json = "1.0" serde_bytes = "0.11" sqlparser.workspace = true tracing.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tokio-stream = "0.1" value = { path = "../value" } diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index b3fa882b1..78b055500 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -23,7 +23,7 @@ serde_json = "1.0" serde_bytes = "0.11" ssh2.workspace = true sqlparser.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index e74c54318..bb33eed01 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" sqlparser.workspace = true -tokio = { version = "1.21", features = ["full"] } +tokio.workspace = true tracing.workspace = true ureq = { version = "2", features = ["json", "charset"] } value = { path = "../value" } diff --git a/nexus/postgres-connection/Cargo.toml b/nexus/postgres-connection/Cargo.toml index 72c180cb5..4d981c7c6 100644 --- a/nexus/postgres-connection/Cargo.toml +++ b/nexus/postgres-connection/Cargo.toml @@ -7,12 +7,14 @@ edition = "2021" [dependencies] anyhow = "1" +futures-util = { version = "0.3", default-features = false, features = ["io"] } pt = { path = "../pt" } rustls = { version = "0.23", default-features = false, features = ["ring"] } ssh2.workspace = true +tokio.workspace = true tokio-postgres = "0.7.2" tokio-postgres-rustls = "0.13" -tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } tokio-stream = "0.1" tracing.workspace = true urlencoding = "2" diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index b9982d847..69dafbde6 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -2,12 +2,11 @@ use pt::peerdb_peers::{PostgresConfig, SshConfig}; use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, SignatureScheme}; use std::fmt::Write; -use std::os::unix::net::UnixStream; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; +use std::io; +use std::sync::Arc; +use tokio::net::UnixStream; use tokio_postgres_rustls::MakeRustlsConnect; +use tokio_util::compat::FuturesAsyncReadCompatExt; #[derive(Copy, Clone, Debug)] struct NoCertificateVerification; @@ -81,17 +80,15 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String { connection_string } -// from https://github.com/alexcrichton/ssh2-rs/issues/218#issuecomment-1698814611 -pub fn create_tunnel( +pub async fn create_tunnel( tcp: std::net::TcpStream, ssh_config: &SshConfig, remote_server: String, remote_port: u16, -) -> std::io::Result<(ssh2::Session, UnixStream)> { +) -> io::Result<(ssh2::Session, UnixStream)> { let mut session = ssh2::Session::new()?; session.set_tcp_stream(tcp); session.set_compress(true); - session.set_timeout(15000); session.handshake()?; if !ssh_config.password.is_empty() { session.userauth_password(&ssh_config.user, &ssh_config.password)?; @@ -103,7 +100,7 @@ pub fn create_tunnel( let mut known_hosts = session.known_hosts()?; known_hosts.read_str(&ssh_config.host_key, ssh2::KnownHostFileKind::OpenSSH)?; } - let (stream1, stream2) = UnixStream::pair()?; + let (mut stream1, stream2) = tokio::net::UnixStream::pair()?; let channel = session.channel_direct_tcpip(remote_server.as_str(), remote_port, None)?; tracing::info!( "tunnel to {:}:{:} opened", @@ -111,46 +108,25 @@ pub fn create_tunnel( remote_port ); - tokio::task::spawn_blocking(move || { - let closed = Arc::new(AtomicBool::new(false)); - let mut reader_stream = stream1; - let mut writer_stream = reader_stream.try_clone().unwrap(); - - let mut writer_channel = channel.stream(0); //open two streams on the same channel, so we can read and write separately - let mut reader_channel = channel.stream(0); - - //pipe stream output into channel - let write_closed = closed.clone(); - tokio::task::spawn_blocking(move || loop { - match std::io::copy(&mut reader_stream, &mut writer_channel) { - Ok(_) => (), - Err(err) => { - tracing::info!("failed to write to channel, reason: {:?}", err); + session.set_blocking(false); + tokio::spawn(async move { + let mut channel_stream = futures_util::io::AllowStdIo::new(channel.stream(0)).compat(); + loop { + if let Err(err) = tokio::io::copy_bidirectional(&mut stream1, &mut channel_stream).await + { + if err.kind() == io::ErrorKind::WouldBlock { + tokio::time::sleep(std::time::Duration::new(0, 123456789)).await; + continue; } + tracing::error!( + "tunnel to {:}:{:} failed: {:}", + remote_server.as_str(), + remote_port, + err + ); } - if write_closed.load(Ordering::SeqCst) { - break; - } - }); - - //pipe channel output into stream - let read_closed = closed.clone(); - tokio::task::spawn_blocking(move || loop { - match std::io::copy(&mut reader_channel, &mut writer_stream) { - Ok(_) => (), - Err(err) => { - tracing::info!("failed to read from channel, reason: {:?}", err); - } - } - if read_closed.load(Ordering::SeqCst) { - break; - } - }); - tracing::info!( - "tunnel to {:}:{:} closed", - remote_server.as_str(), - remote_port - ); + break; + } }); Ok((session, stream2)) @@ -163,9 +139,7 @@ pub async fn connect_postgres( let tcp = std::net::TcpStream::connect((ssh_config.host.as_str(), ssh_config.port as u16))?; tcp.set_nodelay(true)?; let (session, stream) = - create_tunnel(tcp, ssh_config, config.host.clone(), config.port as u16)?; - stream.set_nonblocking(true)?; - let stream = tokio::net::UnixStream::from_std(stream)?; + create_tunnel(tcp, ssh_config, config.host.clone(), config.port as u16).await?; let (client, connection) = tokio_postgres::Config::default() .user(&config.user) .password(&config.password) diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index bb0e181c4..b7f05da27 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -53,7 +53,7 @@ sqlparser = { workspace = true, features = ["visitor"] } serde_json = "1.0" rand = "0.8" time = "0.3" -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tracing.workspace = true tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter"] }