Skip to content

Commit

Permalink
update code fix s3 sink issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdul Haris Djafar authored and Abdul Haris Djafar committed Dec 15, 2024
1 parent c52ef74 commit 703254d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ The repository is a cargo workspace. Each of the individual sub-folders are crat
- [x] Add BigQuery Sink
- [x] Add DuckDb Sink
- [x] Add MotherDuck Sink
- [x] Add Deltalake Sink (local/s3)
- [ ] Add Snowflake Sink
- [ ] Add ClickHouse Sink
- [ ] Many more to come...
Expand Down
2 changes: 1 addition & 1 deletion pg_replicate/examples/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct DbArgs {
#[derive(Debug, Args)]
struct DeltaArgs {
/// The path to the Delta Lake where data from the database will be saved.
/// Use `file://datalake` for saving data to local file storage.
/// Use `file:///home/data/datalake` for saving data to local file storage.
/// Use `s3://datalake` for saving data to an S3 bucket.
#[arg(long)]
delta_path: String,
Expand Down
26 changes: 21 additions & 5 deletions pg_replicate/src/clients/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use aws_config::BehaviorVersion;
use chrono::Timelike;
use chrono::{NaiveDate, NaiveTime, Utc};
use deltalake::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
use deltalake::aws::constants::{AWS_FORCE_CREDENTIAL_LOAD, AWS_S3_ALLOW_UNSAFE_RENAME};
use deltalake::aws::constants::{
AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_FORCE_CREDENTIAL_LOAD, AWS_S3_ALLOW_UNSAFE_RENAME,
};
use deltalake::datafusion::execution::context::SessionContext;
use deltalake::datafusion::prelude::col;
use deltalake::kernel::{PrimitiveType, StructField, TableFeatures};
Expand Down Expand Up @@ -30,11 +33,20 @@ pub struct DeltaClient {
}

impl DeltaClient {
fn aws_config() -> HashMap<String, String> {
async fn aws_config() -> HashMap<String, String> {
let mut storage_options = HashMap::new();

storage_options.insert(AWS_FORCE_CREDENTIAL_LOAD.to_string(), "true".to_string());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string());
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;

match config.endpoint_url() {
Some(endpoint) => {
storage_options.insert(AWS_ENDPOINT_URL.to_string(), endpoint.to_string());
storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string());
}
None => (),
}

storage_options
}
Expand All @@ -55,6 +67,9 @@ impl DeltaClient {
async fn strip_file_prefix(uri: &str) -> Result<&str, DeltaTableError> {
if let Some(path) = uri.strip_prefix("file://") {
Ok(path)
} else if let Some(path) = uri.strip_prefix("s3://") {
let _ = path;
Ok(uri)
} else {
panic!("strip_file_prefix called on a non-file uri")
}
Expand All @@ -66,7 +81,8 @@ impl DeltaClient {
open_table(uri).await
} else {
let uri = Self::strip_file_prefix(uri).await?;
open_table_with_storage_options(uri, Self::aws_config()).await

open_table_with_storage_options(uri, Self::aws_config().await).await
}
}

Expand All @@ -75,6 +91,7 @@ impl DeltaClient {
data: Vec<DeltaRecordBatch>,
save_mode: SaveMode,
) -> Result<(), DeltaTableError> {
println!("{}", uri);
let table = Self::open_delta_table(uri).await?;
let table_snapshot = table.snapshot()?.clone();
let table_logstore = table.log_store();
Expand Down Expand Up @@ -147,7 +164,6 @@ impl DeltaClient {
}

pub(crate) async fn delta_table_exists(&self, table_name: &str) -> bool {
deltalake_aws::register_handlers(None);
let uri = self.delta_full_path(table_name);

Self::open_delta_table(&uri).await.is_ok()
Expand Down Expand Up @@ -249,7 +265,7 @@ impl DeltaClient {
DeltaOps::try_from_uri(&uri).await?
} else {
let uri = Self::strip_file_prefix(&uri).await?;
DeltaOps::try_from_uri_with_storage_options(uri, Self::aws_config()).await?
DeltaOps::try_from_uri_with_storage_options(uri, Self::aws_config().await).await?
};

delta_ops
Expand Down
8 changes: 8 additions & 0 deletions pg_replicate/src/pipeline/sinks/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl SinkError for DeltaSinkError {}
impl BatchSink for DeltaSink {
type Error = DeltaSinkError;
async fn get_resumption_state(&mut self) -> Result<PipelineResumptionState, Self::Error> {
deltalake::aws::register_handlers(None);

let last_lsn_column_schemas = [
ColumnSchema {
name: "id".to_string(),
Expand Down Expand Up @@ -113,6 +115,8 @@ impl BatchSink for DeltaSink {
&mut self,
table_schemas: HashMap<TableId, TableSchema>,
) -> Result<(), Self::Error> {
deltalake::aws::register_handlers(None);

let mut delta_schema = HashMap::new();

for table_schema in table_schemas.values() {
Expand Down Expand Up @@ -141,6 +145,8 @@ impl BatchSink for DeltaSink {
rows: Vec<TableRow>,
table_id: TableId,
) -> Result<(), Self::Error> {
deltalake::aws::register_handlers(None);

let mut rows_batch: HashMap<TableId, Vec<TableRow>> = HashMap::new();

let updated_rows: Vec<TableRow> = rows
Expand All @@ -158,6 +164,8 @@ impl BatchSink for DeltaSink {
}

async fn write_cdc_events(&mut self, events: Vec<CdcEvent>) -> Result<PgLsn, Self::Error> {
deltalake::aws::register_handlers(None);

let mut rows_batch: HashMap<TableId, Vec<TableRow>> = HashMap::new();
let mut new_last_lsn = PgLsn::from(0);

Expand Down

0 comments on commit 703254d

Please sign in to comment.