Skip to content

Commit

Permalink
nexus: ssh
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 11, 2024
1 parent 0fa358b commit 92852ca
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 88 deletions.
213 changes: 142 additions & 71 deletions nexus/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dashmap = "6"
rust_decimal = { version = "1", default-features = false, features = [
"tokio-pg",
] }
ssh2 = "0.9"
sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" }
tracing = "0.1"
pgwire = { version = "0.26", default-features = false, features = [
Expand Down
4 changes: 2 additions & 2 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl<'a> CatalogConfig<'a> {

impl Catalog {
pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result<Self> {
let client = connect_postgres(&pt_config).await?;
Ok(Self { pg: client })
let (pg, _) = connect_postgres(&pt_config).await?;
Ok(Self { pg })
}

pub async fn run_migrations(&mut self) -> anyhow::Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions nexus/peer-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
rust_decimal.workspace = true
bytes = "1.0"
chrono.workspace = true
futures = "0.3"
peer-cursor = { path = "../peer-cursor" }
peer-connections = { path = "../peer-connections" }
pgwire.workspace = true
postgres-connection = { path = "../postgres-connection" }
postgres-inet = "0.19.0"
pt = { path = "../pt" }
rust_decimal.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "0.11"
postgres-inet = "0.19.0"
ssh2.workspace = true
sqlparser.workspace = true
tokio = { version = "1.0", features = ["full"] }
tokio-postgres = { version = "0.7.6", features = [
Expand Down
16 changes: 13 additions & 3 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@ pub mod stream;
// backing store.
pub struct PostgresQueryExecutor {
peername: String,
client: Box<Client>,
client: Client,
session: Option<ssh2::Session>,
}

impl PostgresQueryExecutor {
pub async fn new(peername: String, config: &PostgresConfig) -> anyhow::Result<Self> {
let client = postgres_connection::connect_postgres(config).await?;
let (client, session) = postgres_connection::connect_postgres(config).await?;
Ok(Self {
peername,
client: Box::new(client),
client,
session,
})
}
}

impl Drop for PostgresQueryExecutor {
fn drop(&mut self) {
if let Some(session) = &mut self.session {
session.disconnect(None, "", None).ok();
}
}
}

async fn schema_from_query(client: &Client, query: &str) -> anyhow::Result<Schema> {
let prepared = client.prepare_typed(query, &[]).await?;

Expand Down
4 changes: 3 additions & 1 deletion nexus/postgres-connection/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ edition = "2021"
anyhow = "1"
pt = { path = "../pt" }
rustls = { version = "0.23", default-features = false, features = ["ring"] }
urlencoding = "2"
ssh2.workspace = true
tokio-postgres = "0.7.2"
tokio-postgres-rustls = "0.13"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tracing.workspace = true
urlencoding = "2"
134 changes: 126 additions & 8 deletions nexus/postgres-connection/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use pt::peerdb_peers::PostgresConfig;
use pt::peerdb_peers::{PostgresConfig, SshConfig};
use rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, SignatureScheme};
use std::fmt::Write;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio_postgres_rustls::MakeRustlsConnect;

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -77,16 +80,131 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String {
connection_string
}

pub async fn connect_postgres(config: &PostgresConfig) -> anyhow::Result<tokio_postgres::Client> {
let connection_string = get_pg_connection_string(config);
// from https://github.com/alexcrichton/ssh2-rs/issues/218#issuecomment-1698814611
pub fn create_tunnel(
tcp: std::net::TcpStream,
ssh_config: &SshConfig,
host_server: String,
host_port: u16,
remote_server: String,
remote_port: u16,
) -> std::io::Result<(ssh2::Session, std::net::SocketAddr)> {
let mut session = ssh2::Session::new()?;
session.set_tcp_stream(tcp);
session.set_compress(true);
session.set_timeout(15000);
session.handshake()?;
if !ssh_config.password.is_empty() {
session.userauth_password(&ssh_config.user, &ssh_config.password)?;
}
if !ssh_config.private_key.is_empty() {
session.userauth_pubkey_memory(&ssh_config.user, None, &ssh_config.private_key, None)?;
}
if !ssh_config.host_key.is_empty() {
let mut known_hosts = session.known_hosts()?;
known_hosts.read_str(&ssh_config.host_key, ssh2::KnownHostFileKind::OpenSSH)?;
}
let listener = std::net::TcpListener::bind((host_server.as_str(), host_port))?;
let local_addr = listener.local_addr()?;
let channel = session.channel_direct_tcpip(remote_server.as_str(), remote_port, None)?;
tracing::info!(
"tunnel on {:}:{:} to {:}:{:} opened",
host_server.as_str(),
host_port,
remote_server.as_str(),
remote_port
);

tokio::task::spawn_blocking(move || {
let mut stream_id = 0;
let closed = Arc::new(AtomicBool::new(false));
loop {
match listener.accept() {
Err(err) => {
tracing::info!("failed to accept connection, reason {:?}", err);
closed.store(false, Ordering::SeqCst);
break;
}
Ok((stream, socket)) => {
tracing::debug!("new TCP stream from socket {:}", socket);

let mut reader_stream = stream;
let mut writer_stream = reader_stream.try_clone().unwrap();

let mut writer_channel = channel.stream(stream_id); //open two streams on the same channel, so we can read and write separately
let mut reader_channel = channel.stream(stream_id);
stream_id = stream_id.wrapping_add(1);

//pipe stream output into channel
let write_closed = closed.clone();
tokio::task::spawn_blocking(move || loop {
match std::io::copy(&mut reader_stream, &mut writer_channel) {
Ok(_) => (),
Err(err) => {
tracing::info!("failed to write to channel, reason: {:?}", err);
}
}
if write_closed.load(Ordering::SeqCst) {
break;
}
});

//pipe channel output into stream
let read_closed = closed.clone();
tokio::task::spawn_blocking(move || loop {
match std::io::copy(&mut reader_channel, &mut writer_stream) {
Ok(_) => (),
Err(err) => {
tracing::info!("failed to read from channel, reason: {:?}", err);
}
}
if read_closed.load(Ordering::SeqCst) {
break;
}
});
}
}
}
tracing::info!(
"tunnel on {:}:{:} to {:}:{:} closed",
host_server.as_str(),
host_port,
remote_server.as_str(),
remote_port
);
});

Ok((session, local_addr))
}

pub async fn connect_postgres(
config: &PostgresConfig,
) -> anyhow::Result<(tokio_postgres::Client, Option<ssh2::Session>)> {
let (connection_string, session) = if let Some(ssh_config) = &config.ssh_config {
let tcp = std::net::TcpStream::connect((ssh_config.host.as_str(), ssh_config.port as u16))?;
let (session, local_addr) = create_tunnel(
tcp,
ssh_config,
String::from("localhost"),
0,
config.host.clone(),
config.port as u16,
)?;
let mut newconfig = config.clone();
newconfig.host = local_addr.ip().to_string();
newconfig.port = local_addr.port() as u32;
(get_pg_connection_string(&newconfig), Some(session))
} else {
(get_pg_connection_string(config), None)
};

let mut config = ClientConfig::builder()
let mut tls_config = ClientConfig::builder()
.with_root_certificates(RootCertStore::empty())
.with_no_client_auth();
config
tls_config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification));
let tls_connector = MakeRustlsConnect::new(config);
let tls_connector = MakeRustlsConnect::new(tls_config);
let (client, connection) = tokio_postgres::connect(&connection_string, tls_connector)
.await
.map_err(|e| anyhow::anyhow!("error encountered while connecting to postgres {:?}", e))?;
Expand All @@ -97,5 +215,5 @@ pub async fn connect_postgres(config: &PostgresConfig) -> anyhow::Result<tokio_p
}
});

Ok(client)
Ok((client, session))
}
3 changes: 2 additions & 1 deletion stacks/peerdb-server.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ WORKDIR /root/nexus
RUN cargo chef prepare --recipe-path recipe.json

FROM chef as builder
RUN apk add --no-cache build-base pkgconfig curl unzip
ENV OPENSSL_STATIC=1
RUN apk add --no-cache build-base pkgconfig curl unzip openssl-dev openssl-libs-static
WORKDIR /root/nexus
COPY scripts /root/scripts
RUN /root/scripts/install-protobuf.sh
Expand Down

0 comments on commit 92852ca

Please sign in to comment.