From 58a753f71db7551ada0db516423e79957f543a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 13 Nov 2024 19:13:05 +0000 Subject: [PATCH] nexus: ssh (#2231) --- nexus/Cargo.lock | 187 +++++++++++++++++++-------- nexus/Cargo.toml | 2 + nexus/catalog/Cargo.toml | 2 +- nexus/catalog/src/lib.rs | 4 +- nexus/parser/Cargo.toml | 2 +- nexus/peer-bigquery/Cargo.toml | 2 +- nexus/peer-connections/Cargo.toml | 2 +- nexus/peer-cursor/Cargo.toml | 2 +- nexus/peer-mysql/Cargo.toml | 2 +- nexus/peer-postgres/Cargo.toml | 7 +- nexus/peer-postgres/src/lib.rs | 16 ++- nexus/peer-snowflake/Cargo.toml | 2 +- nexus/postgres-connection/Cargo.toml | 8 +- nexus/postgres-connection/src/lib.rs | 113 +++++++++++++--- nexus/server/Cargo.toml | 2 +- stacks/peerdb-server.Dockerfile | 3 +- 16 files changed, 263 insertions(+), 93 deletions(-) diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index d48d10d36..cc3650b6f 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -663,7 +663,7 @@ version = "0.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" dependencies = [ - "bitflags", + "bitflags 2.6.0", "cexpr", "clang-sys", "itertools", @@ -675,6 +675,12 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.6.0" @@ -713,9 +719,9 @@ dependencies = [ [[package]] name = "borsh" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" +checksum = "f5327f6c99920069d1fe374aa743be1af0031dea9f250852cdf1ae6a0861ee24" dependencies = [ "borsh-derive", "cfg_aliases", @@ -723,16 +729,15 @@ dependencies = [ [[package]] name = "borsh-derive" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" +checksum = "10aedd8f1a81a8aafbfde924b0e3061cd6fedd6f6bbcfc6a76e6fd426d7bfe26" dependencies = [ "once_cell", "proc-macro-crate", "proc-macro2", "quote", "syn 2.0.87", - "syn_derive", ] [[package]] @@ -1127,7 +1132,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.10", ] [[package]] @@ -2018,6 +2023,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.10.1" @@ -2121,9 +2135,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.161" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libloading" @@ -2147,9 +2161,35 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags", + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.7", +] + +[[package]] +name = "libssh2-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee" +dependencies = [ + "cc", + "libc", + "libz-sys", + "openssl-sys", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", "libc", - "redox_syscall", + "pkg-config", + "vcpkg", ] [[package]] @@ -2339,7 +2379,7 @@ checksum = "478b0ff3f7d67b79da2b96f56f334431aef65e15ba4b29dd74a4236e29582bdc" dependencies = [ "base64 0.21.7", "bindgen", - "bitflags", + "bitflags 2.6.0", "btoi", "byteorder", "bytes", @@ -2495,6 +2535,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "outref" version = "0.5.1" @@ -2507,6 +2559,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -2514,7 +2577,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.10", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -2525,7 +2602,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.7", "smallvec", "windows-targets 0.52.6", ] @@ -2685,6 +2762,7 @@ dependencies = [ "serde_bytes", "serde_json", "sqlparser", + "ssh2", "tokio", "tokio-postgres", "tracing", @@ -2964,11 +3042,15 @@ name = "postgres-connection" version = "0.1.0" dependencies = [ "anyhow", + "futures-util", "pt", "rustls 0.23.16", + "ssh2", "tokio", "tokio-postgres", "tokio-postgres-rustls", + "tokio-stream", + "tokio-util", "tracing", "urlencoding", ] @@ -3051,29 +3133,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro2" version = "1.0.89" @@ -3180,7 +3239,7 @@ version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f86ba2052aebccc42cbbb3ed234b8b13ce76f75c3551a303cb2bcffcff12bb14" dependencies = [ - "bitflags", + "bitflags 2.6.0", "memchr", "unicase", ] @@ -3314,13 +3373,22 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags", + "bitflags 2.6.0", ] [[package]] @@ -3583,7 +3651,7 @@ version = "0.38.39" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" dependencies = [ - "bitflags", + "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", @@ -3794,7 +3862,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.6.0", "core-foundation", "core-foundation-sys", "libc", @@ -4037,6 +4105,18 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "ssh2" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7fe461910559f6d5604c3731d00d2aafc4a83d1665922e280f42f9a168d5455" +dependencies = [ + "bitflags 1.3.2", + "libc", + "libssh2-sys", + "parking_lot 0.11.2", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4104,18 +4184,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn_derive" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 2.0.87", -] - [[package]] name = "sync_wrapper" version = "0.1.2" @@ -4290,7 +4358,7 @@ dependencies = [ "bytes", "libc", "mio 1.0.2", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -4322,7 +4390,7 @@ dependencies = [ "futures-channel", "futures-util", "log", - "parking_lot", + "parking_lot 0.12.3", "percent-encoding", "phf", "pin-project-lite", @@ -4401,6 +4469,7 @@ checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -4798,6 +4867,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -4943,7 +5018,7 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" dependencies = [ - "redox_syscall", + "redox_syscall 0.5.7", "wasite", "web-sys", ] diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 1131c8fd5..6efea5f4b 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -28,7 +28,9 @@ dashmap = "6" rust_decimal = { version = "1", default-features = false, features = [ "tokio-pg", ] } +ssh2 = "0.9" sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" } +tokio = { version = "1", features = ["full"] } tracing = "0.1" pgwire = { version = "0.26", default-features = false, features = [ "scram", diff --git a/nexus/catalog/Cargo.toml b/nexus/catalog/Cargo.toml index 162df1da2..af7b79963 100644 --- a/nexus/catalog/Cargo.toml +++ b/nexus/catalog/Cargo.toml @@ -18,7 +18,7 @@ pt = { path = "../pt" } refinery = { version = "0.8", default-features = false, features = ["tokio-postgres"] } serde_json = "1.0" sqlparser.workspace = true -tokio = { version = "1.13.0", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 015c66b29..d5d023e57 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -72,8 +72,8 @@ impl<'a> CatalogConfig<'a> { impl Catalog { pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result { - let client = connect_postgres(&pt_config).await?; - Ok(Self { pg: client }) + let (pg, _) = connect_postgres(&pt_config).await?; + Ok(Self { pg }) } pub async fn run_migrations(&mut self) -> anyhow::Result<()> { diff --git a/nexus/parser/Cargo.toml b/nexus/parser/Cargo.toml index b6aac7d88..45bdc558c 100644 --- a/nexus/parser/Cargo.toml +++ b/nexus/parser/Cargo.toml @@ -14,5 +14,5 @@ pgwire.workspace = true pt = { path = "../pt" } rand = "0.8" sqlparser.workspace = true -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tracing.workspace = true diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index c10964568..c3cf3c18b 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -21,7 +21,7 @@ serde_json = "1.0" serde_bytes = "0.11" sqlparser.workspace = true tracing.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true gcp-bigquery-client = "0.24" uuid = { version = "1.0", features = ["serde", "v4"] } value = { path = "../value" } diff --git a/nexus/peer-connections/Cargo.toml b/nexus/peer-connections/Cargo.toml index 8aa69c7f0..01b172c74 100644 --- a/nexus/peer-connections/Cargo.toml +++ b/nexus/peer-connections/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" anyhow = "1.0" chrono.workspace = true deadpool-postgres = { version = "0.14", features = ["rt_tokio_1"] } -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index 74a2fe9de..3db4eecba 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -12,6 +12,6 @@ dashmap.workspace = true futures = "0.3" pgwire.workspace = true sqlparser.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tracing.workspace = true value = { path = "../value" } diff --git a/nexus/peer-mysql/Cargo.toml b/nexus/peer-mysql/Cargo.toml index 2fe32d845..a6fed50b8 100644 --- a/nexus/peer-mysql/Cargo.toml +++ b/nexus/peer-mysql/Cargo.toml @@ -22,6 +22,6 @@ serde_json = "1.0" serde_bytes = "0.11" sqlparser.workspace = true tracing.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tokio-stream = "0.1" value = { path = "../value" } diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index 873baa267..78b055500 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] anyhow = "1.0" async-trait = "0.1" -rust_decimal.workspace = true bytes = "1.0" chrono.workspace = true futures = "0.3" @@ -16,13 +15,15 @@ peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } pgwire.workspace = true postgres-connection = { path = "../postgres-connection" } +postgres-inet = "0.19.0" pt = { path = "../pt" } +rust_decimal.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_bytes = "0.11" -postgres-inet = "0.19.0" +ssh2.workspace = true sqlparser.workspace = true -tokio = { version = "1.0", features = ["full"] } +tokio.workspace = true tokio-postgres = { version = "0.7.6", features = [ "with-chrono-0_4", "with-serde_json-1", diff --git a/nexus/peer-postgres/src/lib.rs b/nexus/peer-postgres/src/lib.rs index 4e9c317d2..36676618a 100644 --- a/nexus/peer-postgres/src/lib.rs +++ b/nexus/peer-postgres/src/lib.rs @@ -16,19 +16,29 @@ pub mod stream; // backing store. pub struct PostgresQueryExecutor { peername: String, - client: Box, + client: Client, + session: Option, } impl PostgresQueryExecutor { pub async fn new(peername: String, config: &PostgresConfig) -> anyhow::Result { - let client = postgres_connection::connect_postgres(config).await?; + let (client, session) = postgres_connection::connect_postgres(config).await?; Ok(Self { peername, - client: Box::new(client), + client, + session, }) } } +impl Drop for PostgresQueryExecutor { + fn drop(&mut self) { + if let Some(session) = &mut self.session { + session.disconnect(None, "", None).ok(); + } + } +} + async fn schema_from_query(client: &Client, query: &str) -> anyhow::Result { let prepared = client.prepare_typed(query, &[]).await?; diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index e74c54318..bb33eed01 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" sqlparser.workspace = true -tokio = { version = "1.21", features = ["full"] } +tokio.workspace = true tracing.workspace = true ureq = { version = "2", features = ["json", "charset"] } value = { path = "../value" } diff --git a/nexus/postgres-connection/Cargo.toml b/nexus/postgres-connection/Cargo.toml index 93da58109..4d981c7c6 100644 --- a/nexus/postgres-connection/Cargo.toml +++ b/nexus/postgres-connection/Cargo.toml @@ -7,10 +7,14 @@ edition = "2021" [dependencies] anyhow = "1" +futures-util = { version = "0.3", default-features = false, features = ["io"] } pt = { path = "../pt" } rustls = { version = "0.23", default-features = false, features = ["ring"] } -urlencoding = "2" +ssh2.workspace = true +tokio.workspace = true tokio-postgres = "0.7.2" tokio-postgres-rustls = "0.13" -tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } +tokio-stream = "0.1" tracing.workspace = true +urlencoding = "2" diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 7b2591687..69dafbde6 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -1,9 +1,12 @@ -use pt::peerdb_peers::PostgresConfig; +use pt::peerdb_peers::{PostgresConfig, SshConfig}; use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, SignatureScheme}; use std::fmt::Write; +use std::io; use std::sync::Arc; +use tokio::net::UnixStream; use tokio_postgres_rustls::MakeRustlsConnect; +use tokio_util::compat::FuturesAsyncReadCompatExt; #[derive(Copy, Clone, Debug)] struct NoCertificateVerification; @@ -77,25 +80,99 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String { connection_string } -pub async fn connect_postgres(config: &PostgresConfig) -> anyhow::Result { - let connection_string = get_pg_connection_string(config); - - let mut config = ClientConfig::builder() - .with_root_certificates(RootCertStore::empty()) - .with_no_client_auth(); - config - .dangerous() - .set_certificate_verifier(Arc::new(NoCertificateVerification)); - let tls_connector = MakeRustlsConnect::new(config); - let (client, connection) = tokio_postgres::connect(&connection_string, tls_connector) - .await - .map_err(|e| anyhow::anyhow!("error encountered while connecting to postgres {:?}", e))?; +pub async fn create_tunnel( + tcp: std::net::TcpStream, + ssh_config: &SshConfig, + remote_server: String, + remote_port: u16, +) -> io::Result<(ssh2::Session, UnixStream)> { + let mut session = ssh2::Session::new()?; + session.set_tcp_stream(tcp); + session.set_compress(true); + session.handshake()?; + if !ssh_config.password.is_empty() { + session.userauth_password(&ssh_config.user, &ssh_config.password)?; + } + if !ssh_config.private_key.is_empty() { + session.userauth_pubkey_memory(&ssh_config.user, None, &ssh_config.private_key, None)?; + } + if !ssh_config.host_key.is_empty() { + let mut known_hosts = session.known_hosts()?; + known_hosts.read_str(&ssh_config.host_key, ssh2::KnownHostFileKind::OpenSSH)?; + } + let (mut stream1, stream2) = tokio::net::UnixStream::pair()?; + let channel = session.channel_direct_tcpip(remote_server.as_str(), remote_port, None)?; + tracing::info!( + "tunnel to {:}:{:} opened", + remote_server.as_str(), + remote_port + ); - tokio::task::spawn(async move { - if let Err(e) = connection.await { - tracing::info!("connection error: {}", e) + session.set_blocking(false); + tokio::spawn(async move { + let mut channel_stream = futures_util::io::AllowStdIo::new(channel.stream(0)).compat(); + loop { + if let Err(err) = tokio::io::copy_bidirectional(&mut stream1, &mut channel_stream).await + { + if err.kind() == io::ErrorKind::WouldBlock { + tokio::time::sleep(std::time::Duration::new(0, 123456789)).await; + continue; + } + tracing::error!( + "tunnel to {:}:{:} failed: {:}", + remote_server.as_str(), + remote_port, + err + ); + } + break; } }); - Ok(client) + Ok((session, stream2)) +} + +pub async fn connect_postgres( + config: &PostgresConfig, +) -> anyhow::Result<(tokio_postgres::Client, Option)> { + if let Some(ssh_config) = &config.ssh_config { + let tcp = std::net::TcpStream::connect((ssh_config.host.as_str(), ssh_config.port as u16))?; + tcp.set_nodelay(true)?; + let (session, stream) = + create_tunnel(tcp, ssh_config, config.host.clone(), config.port as u16).await?; + let (client, connection) = tokio_postgres::Config::default() + .user(&config.user) + .password(&config.password) + .dbname(&config.database) + .application_name("peerdb_nexus") + .connect_raw(stream, tokio_postgres::NoTls) + .await?; + tokio::task::spawn(async move { + if let Err(e) = connection.await { + tracing::info!("connection error: {}", e) + } + }); + Ok((client, Some(session))) + } else { + let connection_string = get_pg_connection_string(config); + + let mut tls_config = ClientConfig::builder() + .with_root_certificates(RootCertStore::empty()) + .with_no_client_auth(); + tls_config + .dangerous() + .set_certificate_verifier(Arc::new(NoCertificateVerification)); + let tls_connector = MakeRustlsConnect::new(tls_config); + let (client, connection) = tokio_postgres::connect(&connection_string, tls_connector) + .await + .map_err(|e| { + anyhow::anyhow!("error encountered while connecting to postgres {:?}", e) + })?; + tokio::task::spawn(async move { + if let Err(e) = connection.await { + tracing::info!("connection error: {}", e) + } + }); + Ok((client, None)) + } } diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index bb0e181c4..b7f05da27 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -53,7 +53,7 @@ sqlparser = { workspace = true, features = ["visitor"] } serde_json = "1.0" rand = "0.8" time = "0.3" -tokio = { version = "1", features = ["full"] } +tokio.workspace = true tracing.workspace = true tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index c4c5a2b04..689e3cf5b 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -9,7 +9,8 @@ WORKDIR /root/nexus RUN cargo chef prepare --recipe-path recipe.json FROM chef as builder -RUN apk add --no-cache build-base pkgconfig curl unzip +ENV OPENSSL_STATIC=1 +RUN apk add --no-cache build-base pkgconfig curl unzip openssl-dev openssl-libs-static WORKDIR /root/nexus COPY scripts /root/scripts RUN /root/scripts/install-protobuf.sh