Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support deafult aws credentials for deltalake #19557

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,27 @@ use risingwave_common::util::env_var::env_var_is_true;
/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
pub struct AwsAuthProps {
#[serde(rename = "aws.region", alias = "region")]
#[serde(rename = "aws.region", alias = "region", alias = "s3.region")]
pub region: Option<String>,

#[serde(
rename = "aws.endpoint_url",
alias = "endpoint_url",
alias = "endpoint"
alias = "endpoint",
alias = "s3.endpoint"
)]
pub endpoint: Option<String>,
#[serde(rename = "aws.credentials.access_key_id", alias = "access_key")]
#[serde(
rename = "aws.credentials.access_key_id",
alias = "access_key",
alias = "s3.access.key"
)]
pub access_key: Option<String>,
#[serde(rename = "aws.credentials.secret_access_key", alias = "secret_key")]
#[serde(
rename = "aws.credentials.secret_access_key",
alias = "secret_key",
alias = "s3.secret.key"
)]
pub secret_key: Option<String>,
#[serde(rename = "aws.credentials.session_token", alias = "session_token")]
pub session_token: Option<String>,
Expand Down
90 changes: 51 additions & 39 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,20 @@ use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::connector_common::AwsAuthProps;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";

#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct DeltaLakeCommon {
#[serde(rename = "s3.access.key")]
pub s3_access_key: Option<String>,
#[serde(rename = "s3.secret.key")]
pub s3_secret_key: Option<String>,
#[serde(rename = "location")]
pub location: String,
#[serde(rename = "s3.region")]
pub s3_region: Option<String>,
#[serde(rename = "s3.endpoint")]
pub s3_endpoint: Option<String>,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,

#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
/// Commit every n(>0) checkpoints, default is 10.
Expand All @@ -80,35 +76,7 @@ impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
let table = match Self::get_table_url(&self.location)? {
DeltaTableUrl::S3(s3_path) => {
let mut storage_options = HashMap::new();
storage_options.insert(
AWS_ACCESS_KEY_ID.to_owned(),
self.s3_access_key.clone().ok_or_else(|| {
SinkError::Config(anyhow!("s3.access.key is required with aws s3"))
})?,
);
storage_options.insert(
AWS_SECRET_ACCESS_KEY.to_owned(),
self.s3_secret_key.clone().ok_or_else(|| {
SinkError::Config(anyhow!("s3.secret.key is required with aws s3"))
})?,
);
if self.s3_endpoint.is_none() && self.s3_region.is_none() {
return Err(SinkError::Config(anyhow!(
"s3.endpoint and s3.region need to be filled with at least one"
)));
}
storage_options.insert(
AWS_REGION.to_owned(),
self.s3_region
.clone()
.unwrap_or_else(|| DEFAULT_REGION.to_owned()),
);
if let Some(s3_endpoint) = &self.s3_endpoint {
storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.clone());
}
storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
let storage_options = self.build_delta_lake_config_for_aws().await?;
deltalake::aws::register_handlers(None);
deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
}
Expand Down Expand Up @@ -144,6 +112,50 @@ impl DeltaLakeCommon {
)))
}
}

async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
let mut storage_options = HashMap::new();
storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
let sdk_config = self.aws_auth_props.build_config().await?;
let credentials = sdk_config
.credentials_provider()
.ok_or_else(|| {
SinkError::Config(anyhow!(
"s3.access.key and s3.secret.key is required with aws s3"
))
})?
.as_ref()
.provide_credentials()
.await
.map_err(|e| SinkError::Config(e.into()))?;
let region = sdk_config.region();
let endpoint = sdk_config.endpoint_url();
storage_options.insert(
AWS_ACCESS_KEY_ID.to_owned(),
credentials.access_key_id().to_owned(),
);
storage_options.insert(
AWS_SECRET_ACCESS_KEY.to_owned(),
credentials.secret_access_key().to_owned(),
);
if endpoint.is_none() && region.is_none() {
return Err(SinkError::Config(anyhow!(
"s3.endpoint and s3.region need to be filled with at least one"
)));
}
storage_options.insert(
AWS_REGION.to_owned(),
region
.map(|r| r.as_ref().to_owned())
.clone()
.unwrap_or_else(|| DEFAULT_REGION.to_owned()),
);
if let Some(s3_endpoint) = endpoint {
storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
}
Ok(storage_options)
}
}

enum DeltaTableUrl {
Expand Down Expand Up @@ -272,7 +284,7 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -
_ => {
return Err(SinkError::DeltaLake(anyhow!(
"deltalake cannot support type {:?}",
rw_data_type.to_string()
rw_data_type.to_owned()
)))
}
};
Expand Down
63 changes: 57 additions & 6 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,26 @@ BigQueryConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -133,21 +137,56 @@ ClickHouseConfig:
required: true
DeltaLakeConfig:
fields:
- name: s3.access.key
- name: location
field_type: String
required: true
- name: aws.region
field_type: String
required: false
- name: s3.secret.key
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
- name: location
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: true
- name: s3.region
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
- name: s3.endpoint
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
alias:
- session_token
- name: aws.credentials.role.arn
field_type: String
comments: IAM role
required: false
alias:
- arn
- name: aws.credentials.role.external_id
field_type: String
comments: external ID in IAM role trust policy
required: false
alias:
- external_id
- name: aws.profile
field_type: String
required: false
alias:
- profile
- name: gcs.service.account
field_type: String
required: false
Expand Down Expand Up @@ -198,22 +237,26 @@ DynamoDbConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -673,22 +716,26 @@ KafkaConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -950,22 +997,26 @@ PulsarConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down
8 changes: 8 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -378,22 +378,26 @@ KafkaProperties:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -998,22 +1002,26 @@ PulsarProperties:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down
Loading