diff --git a/README.md b/README.md index b36285e..9175f22 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,7 @@ The repository is a cargo workspace. Each of the individual sub-folders are crat - [x] Add BigQuery Sink - [x] Add DuckDb Sink - [x] Add MotherDuck Sink +- [x] Add Deltalake Sink (local/s3) - [ ] Add Snowflake Sink - [ ] Add ClickHouse Sink - [ ] Many more to come... diff --git a/pg_replicate/examples/delta.rs b/pg_replicate/examples/delta.rs index fbdda71..3c4b992 100644 --- a/pg_replicate/examples/delta.rs +++ b/pg_replicate/examples/delta.rs @@ -52,7 +52,7 @@ struct DbArgs { #[derive(Debug, Args)] struct DeltaArgs { /// The path to the Delta Lake where data from the database will be saved. - /// Use `file://datalake` for saving data to local file storage. + /// Use `file:///home/data/datalake` for saving data to local file storage. /// Use `s3://datalake` for saving data to an S3 bucket. #[arg(long)] delta_path: String, diff --git a/pg_replicate/src/clients/delta.rs b/pg_replicate/src/clients/delta.rs index 7294c3d..d899709 100644 --- a/pg_replicate/src/clients/delta.rs +++ b/pg_replicate/src/clients/delta.rs @@ -1,7 +1,10 @@ +use aws_config::BehaviorVersion; use chrono::Timelike; use chrono::{NaiveDate, NaiveTime, Utc}; use deltalake::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; -use deltalake::aws::constants::{AWS_FORCE_CREDENTIAL_LOAD, AWS_S3_ALLOW_UNSAFE_RENAME}; +use deltalake::aws::constants::{ + AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_FORCE_CREDENTIAL_LOAD, AWS_S3_ALLOW_UNSAFE_RENAME, +}; use deltalake::datafusion::execution::context::SessionContext; use deltalake::datafusion::prelude::col; use deltalake::kernel::{PrimitiveType, StructField, TableFeatures}; @@ -30,11 +33,20 @@ pub struct DeltaClient { } impl DeltaClient { - fn aws_config() -> HashMap { + async fn aws_config() -> HashMap { let mut storage_options = HashMap::new(); storage_options.insert(AWS_FORCE_CREDENTIAL_LOAD.to_string(), "true".to_string()); storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string()); + let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + + match config.endpoint_url() { + Some(endpoint) => { + storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string()); + storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string()); + } + None => (), + } storage_options } @@ -55,6 +67,9 @@ impl DeltaClient { async fn strip_file_prefix(uri: &str) -> Result<&str, DeltaTableError> { if let Some(path) = uri.strip_prefix("file://") { Ok(path) + } else if let Some(path) = uri.strip_prefix("s3://") { + let _ = path; + Ok(uri) } else { panic!("strip_file_prefix called on a non-file uri") } @@ -66,7 +81,8 @@ impl DeltaClient { open_table(uri).await } else { let uri = Self::strip_file_prefix(uri).await?; - open_table_with_storage_options(uri, Self::aws_config()).await + + open_table_with_storage_options(uri, Self::aws_config().await).await } } @@ -75,6 +91,7 @@ impl DeltaClient { data: Vec, save_mode: SaveMode, ) -> Result<(), DeltaTableError> { + println!("{}", uri); let table = Self::open_delta_table(uri).await?; let table_snapshot = table.snapshot()?.clone(); let table_logstore = table.log_store(); @@ -147,7 +164,6 @@ impl DeltaClient { } pub(crate) async fn delta_table_exists(&self, table_name: &str) -> bool { - deltalake_aws::register_handlers(None); let uri = self.delta_full_path(table_name); Self::open_delta_table(&uri).await.is_ok() @@ -249,7 +265,7 @@ impl DeltaClient { DeltaOps::try_from_uri(&uri).await? } else { let uri = Self::strip_file_prefix(&uri).await?; - DeltaOps::try_from_uri_with_storage_options(uri, Self::aws_config()).await? + DeltaOps::try_from_uri_with_storage_options(uri, Self::aws_config().await).await? }; delta_ops diff --git a/pg_replicate/src/pipeline/sinks/delta.rs b/pg_replicate/src/pipeline/sinks/delta.rs index 3b26daf..106992a 100644 --- a/pg_replicate/src/pipeline/sinks/delta.rs +++ b/pg_replicate/src/pipeline/sinks/delta.rs @@ -73,6 +73,8 @@ impl SinkError for DeltaSinkError {} impl BatchSink for DeltaSink { type Error = DeltaSinkError; async fn get_resumption_state(&mut self) -> Result { + deltalake::aws::register_handlers(None); + let last_lsn_column_schemas = [ ColumnSchema { name: "id".to_string(), @@ -113,6 +115,8 @@ impl BatchSink for DeltaSink { &mut self, table_schemas: HashMap, ) -> Result<(), Self::Error> { + deltalake::aws::register_handlers(None); + let mut delta_schema = HashMap::new(); for table_schema in table_schemas.values() { @@ -141,6 +145,8 @@ impl BatchSink for DeltaSink { rows: Vec, table_id: TableId, ) -> Result<(), Self::Error> { + deltalake::aws::register_handlers(None); + let mut rows_batch: HashMap> = HashMap::new(); let updated_rows: Vec = rows @@ -158,6 +164,8 @@ impl BatchSink for DeltaSink { } async fn write_cdc_events(&mut self, events: Vec) -> Result { + deltalake::aws::register_handlers(None); + let mut rows_batch: HashMap> = HashMap::new(); let mut new_last_lsn = PgLsn::from(0);