diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index c932aef292c70..9eb4893f124be 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -64,18 +64,27 @@ use risingwave_common::util::env_var::env_var_is_true; /// A flatten config map for aws auth. #[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] pub struct AwsAuthProps { - #[serde(rename = "aws.region", alias = "region")] + #[serde(rename = "aws.region", alias = "region", alias = "s3.region")] pub region: Option, #[serde( rename = "aws.endpoint_url", alias = "endpoint_url", - alias = "endpoint" + alias = "endpoint", + alias = "s3.endpoint" )] pub endpoint: Option, - #[serde(rename = "aws.credentials.access_key_id", alias = "access_key")] + #[serde( + rename = "aws.credentials.access_key_id", + alias = "access_key", + alias = "s3.access.key" + )] pub access_key: Option, - #[serde(rename = "aws.credentials.secret_access_key", alias = "secret_key")] + #[serde( + rename = "aws.credentials.secret_access_key", + alias = "secret_key", + alias = "s3.secret.key" + )] pub secret_key: Option, #[serde(rename = "aws.credentials.session_token", alias = "session_token")] pub session_token: Option, diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index b297cc0741af8..3cd406f5f55bc 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -50,24 +50,20 @@ use super::{ Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use crate::connector_common::AwsAuthProps; pub const DELTALAKE_SINK: &str = "deltalake"; pub const DEFAULT_REGION: &str = "us-east-1"; pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key"; #[serde_as] -#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)] +#[derive(Deserialize, Debug, Clone, WithOptions)] pub struct DeltaLakeCommon { - #[serde(rename = "s3.access.key")] - pub s3_access_key: Option, - #[serde(rename = "s3.secret.key")] - pub s3_secret_key: Option, #[serde(rename = "location")] pub location: String, - #[serde(rename = "s3.region")] - pub s3_region: Option, - #[serde(rename = "s3.endpoint")] - pub s3_endpoint: Option, + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, + #[serde(rename = "gcs.service.account")] pub gcs_service_account: Option, /// Commit every n(>0) checkpoints, default is 10. @@ -80,35 +76,7 @@ impl DeltaLakeCommon { pub async fn create_deltalake_client(&self) -> Result { let table = match Self::get_table_url(&self.location)? { DeltaTableUrl::S3(s3_path) => { - let mut storage_options = HashMap::new(); - storage_options.insert( - AWS_ACCESS_KEY_ID.to_owned(), - self.s3_access_key.clone().ok_or_else(|| { - SinkError::Config(anyhow!("s3.access.key is required with aws s3")) - })?, - ); - storage_options.insert( - AWS_SECRET_ACCESS_KEY.to_owned(), - self.s3_secret_key.clone().ok_or_else(|| { - SinkError::Config(anyhow!("s3.secret.key is required with aws s3")) - })?, - ); - if self.s3_endpoint.is_none() && self.s3_region.is_none() { - return Err(SinkError::Config(anyhow!( - "s3.endpoint and s3.region need to be filled with at least one" - ))); - } - storage_options.insert( - AWS_REGION.to_owned(), - self.s3_region - .clone() - .unwrap_or_else(|| DEFAULT_REGION.to_owned()), - ); - if let Some(s3_endpoint) = &self.s3_endpoint { - storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.clone()); - } - storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned()); - storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned()); + let storage_options = self.build_delta_lake_config_for_aws().await?; deltalake::aws::register_handlers(None); deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await? } @@ -144,6 +112,50 @@ impl DeltaLakeCommon { ))) } } + + async fn build_delta_lake_config_for_aws(&self) -> Result> { + let mut storage_options = HashMap::new(); + storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned()); + storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned()); + let sdk_config = self.aws_auth_props.build_config().await?; + let credentials = sdk_config + .credentials_provider() + .ok_or_else(|| { + SinkError::Config(anyhow!( + "s3.access.key and s3.secret.key is required with aws s3" + )) + })? + .as_ref() + .provide_credentials() + .await + .map_err(|e| SinkError::Config(e.into()))?; + let region = sdk_config.region(); + let endpoint = sdk_config.endpoint_url(); + storage_options.insert( + AWS_ACCESS_KEY_ID.to_owned(), + credentials.access_key_id().to_owned(), + ); + storage_options.insert( + AWS_SECRET_ACCESS_KEY.to_owned(), + credentials.secret_access_key().to_owned(), + ); + if endpoint.is_none() && region.is_none() { + return Err(SinkError::Config(anyhow!( + "s3.endpoint and s3.region need to be filled with at least one" + ))); + } + storage_options.insert( + AWS_REGION.to_owned(), + region + .map(|r| r.as_ref().to_owned()) + .clone() + .unwrap_or_else(|| DEFAULT_REGION.to_owned()), + ); + if let Some(s3_endpoint) = endpoint { + storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned()); + } + Ok(storage_options) + } } enum DeltaTableUrl { @@ -272,7 +284,7 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) - _ => { return Err(SinkError::DeltaLake(anyhow!( "deltalake cannot support type {:?}", - rw_data_type.to_string() + rw_data_type.to_owned() ))) } }; diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 285d42522e581..6e9e7fcbed5c9 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -62,22 +62,26 @@ BigQueryConfig: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false @@ -133,21 +137,56 @@ ClickHouseConfig: required: true DeltaLakeConfig: fields: - - name: s3.access.key + - name: location + field_type: String + required: true + - name: aws.region field_type: String required: false - - name: s3.secret.key + alias: + - region + - s3.region + - name: aws.endpoint_url field_type: String required: false - - name: location + alias: + - endpoint_url + - endpoint + - s3.endpoint + - name: aws.credentials.access_key_id field_type: String - required: true - - name: s3.region + required: false + alias: + - access_key + - s3.access.key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: s3.endpoint + alias: + - secret_key + - s3.secret.key + - name: aws.credentials.session_token field_type: String required: false + alias: + - session_token + - name: aws.credentials.role.arn + field_type: String + comments: IAM role + required: false + alias: + - arn + - name: aws.credentials.role.external_id + field_type: String + comments: external ID in IAM role trust policy + required: false + alias: + - external_id + - name: aws.profile + field_type: String + required: false + alias: + - profile - name: gcs.service.account field_type: String required: false @@ -198,22 +237,26 @@ DynamoDbConfig: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false @@ -673,22 +716,26 @@ KafkaConfig: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false @@ -950,22 +997,26 @@ PulsarConfig: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c9251f1430c1a..f2547998a5c9c 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -378,22 +378,26 @@ KafkaProperties: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false @@ -998,22 +1002,26 @@ PulsarProperties: required: false alias: - region + - s3.region - name: aws.endpoint_url field_type: String required: false alias: - endpoint_url - endpoint + - s3.endpoint - name: aws.credentials.access_key_id field_type: String required: false alias: - access_key + - s3.access.key - name: aws.credentials.secret_access_key field_type: String required: false alias: - secret_key + - s3.secret.key - name: aws.credentials.session_token field_type: String required: false