From 5fd56d1c20ee83c4b19702c9437b166d80f43921 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 8 Aug 2024 12:16:00 +0530 Subject: [PATCH] add --motherduck-db-name option to allow the user to pass db name --- pg_replicate/examples/duckdb.rs | 28 +++++++++++++------ pg_replicate/src/clients/duckdb.rs | 7 +++-- .../src/pipeline/sinks/duckdb/sink.rs | 7 +++-- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pg_replicate/examples/duckdb.rs b/pg_replicate/examples/duckdb.rs index ea539bb..9ecdf27 100644 --- a/pg_replicate/examples/duckdb.rs +++ b/pg_replicate/examples/duckdb.rs @@ -49,15 +49,24 @@ struct DbArgs { } #[derive(Debug, clap::Args)] -#[group(required = true, multiple = false)] pub struct DuckDbOptions { /// DuckDb file name #[clap(long)] duckdb_file: Option, - /// motherduck access token + #[clap(flatten)] + motherduck: Option, +} + +#[derive(Debug, Args)] +struct MotherDuckOptions { + /// MotherDuck access token #[clap(long)] - motherduck_access_token: Option, + motherduck_access_token: String, + + /// MotherDuck database name + #[clap(long)] + motherduck_db_name: String, } #[derive(Debug, Subcommand)] @@ -122,12 +131,15 @@ async fn main_impl() -> Result<(), Box> { } }; - let duckdb_sink = match ( - db_args.duckdb.duckdb_file, - db_args.duckdb.motherduck_access_token, - ) { + let duckdb_sink = match (db_args.duckdb.duckdb_file, db_args.duckdb.motherduck) { (Some(duckdb_file), None) => DuckDbSink::file(duckdb_file).await?, - (None, Some(access_token)) => DuckDbSink::mother_duck(&access_token).await?, + (None, Some(motherduck)) => { + DuckDbSink::mother_duck( + &motherduck.motherduck_access_token, + &motherduck.motherduck_db_name, + ) + .await? + } _ => { unreachable!() } diff --git a/pg_replicate/src/clients/duckdb.rs b/pg_replicate/src/clients/duckdb.rs index 62ec3e7..68cee32 100644 --- a/pg_replicate/src/clients/duckdb.rs +++ b/pg_replicate/src/clients/duckdb.rs @@ -28,12 +28,15 @@ impl DuckDbClient { Ok(DuckDbClient { conn }) } - pub fn open_mother_duck(access_token: &str) -> Result { + pub fn open_mother_duck( + access_token: &str, + db_name: &str, + ) -> Result { let conf = Config::default() .with("motherduck_token", access_token)? .with("custom_user_agent", "pg_replicate")?; - let conn = Connection::open_with_flags("md:my_db", conf)?; + let conn = Connection::open_with_flags(format!("md:{db_name}"), conf)?; Ok(DuckDbClient { conn }) } diff --git a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs index adf807c..86b7ad2 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs @@ -43,10 +43,13 @@ impl DuckDbSink { }) } - pub async fn mother_duck(access_token: &str) -> Result { + pub async fn mother_duck( + access_token: &str, + db_name: &str, + ) -> Result { let (req_sender, req_receiver) = channel(CHANNEL_SIZE); let (res_sender, res_receiver) = channel(CHANNEL_SIZE); - let client = DuckDbClient::open_mother_duck(access_token)?; + let client = DuckDbClient::open_mother_duck(access_token, db_name)?; let executor = DuckDbExecutor { client, req_receiver,