From 8db56961a1c9fba58986e55f5ed7ef53ecb506eb Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Mon, 3 Feb 2025 16:07:56 -0300 Subject: [PATCH 1/3] handle S3 dataflow export - Fixed a bug where S3 dataflow export was not handled correctly. - Now only local file system export is handled. --- core/dbio/filesys/fs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbio/filesys/fs.go b/core/dbio/filesys/fs.go index 8b7ad257..6cdc5003 100755 --- a/core/dbio/filesys/fs.go +++ b/core/dbio/filesys/fs.go @@ -1300,7 +1300,7 @@ func WriteDataflowViaDuckDB(fs FileSysClient, df *iop.Dataflow, uri string) (bw // generate sql for export switch fs.FsType() { - case dbio.TypeFileS3, dbio.TypeFileLocal: + case dbio.TypeFileLocal: // copy files bytes recursively to target if strings.Contains(duckURI, "*") { duckURI = GetDeepestParent(duckURI) // get target folder, since split by files From 8fffdbbbaf396936183ad2b2ddd471a87983626d Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Mon, 3 Feb 2025 18:07:45 -0300 Subject: [PATCH 2/3] support AWS session token for s3 access - Add support for AWS session token in Redshift connection configuration. - Update `Unload` and `CopyFromS3` functions to use session token if provided. - Modify Redshift YAML templates to include session token in credentials. - Improve error handling for missing credentials, requiring either access key/secret or session token. - Enhance documentation to reflect the new functionality. --- core/dbio/database/database_redshift.go | 8 ++++++-- core/dbio/templates/redshift.yaml | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/dbio/database/database_redshift.go b/core/dbio/database/database_redshift.go index 7a10a046..8ffa336c 100755 --- a/core/dbio/database/database_redshift.go +++ b/core/dbio/database/database_redshift.go @@ -95,6 +95,7 @@ func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string AwsID := conn.GetProp("AWS_ACCESS_KEY_ID") AwsAccessKey := conn.GetProp("AWS_SECRET_ACCESS_KEY") + AwsSessionToken := conn.GetProp("AWS_SESSION_TOKEN") g.Info("unloading from redshift to s3") queryContext := g.NewContext(ctx.Ctx) @@ -118,6 +119,7 @@ func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string "s3_path", s3PathPart, "aws_access_key_id", AwsID, "aws_secret_access_key", AwsAccessKey, + "aws_session_token", AwsSessionToken, "parallel", conn.GetProp("PARALLEL"), ) @@ -347,8 +349,9 @@ func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pk func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Columns) (count uint64, err error) { AwsID := conn.GetProp("AWS_ACCESS_KEY_ID") AwsAccessKey := conn.GetProp("AWS_SECRET_ACCESS_KEY") - if AwsID == "" || AwsAccessKey == "" { - err = g.Error("Need to set 'AWS_ACCESS_KEY_ID' and 'AWS_SECRET_ACCESS_KEY' to copy to snowflake from S3") + AwsSessionToken := conn.GetProp("AWS_SESSION_TOKEN") + if (AwsID == "" || AwsAccessKey == "") && (AwsSessionToken == "") { + err = g.Error("Need to set 'AWS_ACCESS_KEY_ID' and 'AWS_SECRET_ACCESS_KEY' or 'AWS_SESSION_TOKEN' to copy to redshift from S3") return } @@ -363,6 +366,7 @@ func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Colu "s3_path", s3Path, "aws_access_key_id", AwsID, "aws_secret_access_key", AwsAccessKey, + "aws_session_token", AwsSessionToken, ) sql = conn.setEmptyAsNull(sql) diff --git a/core/dbio/templates/redshift.yaml b/core/dbio/templates/redshift.yaml index fa69494f..2d776f8a 100755 --- a/core/dbio/templates/redshift.yaml +++ b/core/dbio/templates/redshift.yaml @@ -34,12 +34,12 @@ core: copy_from_s3: | COPY {tgt_table} ({tgt_columns}) from '{s3_path}' - credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' + credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key};token={aws_session_token}' CSV delimiter ',' EMPTYASNULL BLANKSASNULL GZIP IGNOREHEADER 1 DATEFORMAT 'auto' TIMEFORMAT 'auto' copy_to_s3: | unload ('{sql}') to '{s3_path}' - credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}' + credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key};token={aws_session_token}' gzip allowoverwrite CSV PARALLEL {parallel} NULL '\\N' HEADER DELIMITER ',' alter_columns: | alter table {table} {col_ddl} From 25d8b8b22ee10f8c246d5eb4a2c1969a56c43ebe Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Mon, 3 Feb 2025 18:15:11 -0300 Subject: [PATCH 3/3] fix Redshift connection with optional session token --- core/dbio/database/database_redshift.go | 14 ++++++++++++-- core/dbio/templates/redshift.yaml | 4 ++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/dbio/database/database_redshift.go b/core/dbio/database/database_redshift.go index 8ffa336c..aa3ab362 100755 --- a/core/dbio/database/database_redshift.go +++ b/core/dbio/database/database_redshift.go @@ -97,6 +97,11 @@ func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string AwsAccessKey := conn.GetProp("AWS_SECRET_ACCESS_KEY") AwsSessionToken := conn.GetProp("AWS_SESSION_TOKEN") + AwsSessionTokenExpr := "" + if AwsSessionToken != "" { + AwsSessionTokenExpr = g.F(";token=%s", AwsSessionToken) + } + g.Info("unloading from redshift to s3") queryContext := g.NewContext(ctx.Ctx) unload := func(table Table, s3PathPart string) { @@ -119,7 +124,7 @@ func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string "s3_path", s3PathPart, "aws_access_key_id", AwsID, "aws_secret_access_key", AwsAccessKey, - "aws_session_token", AwsSessionToken, + "aws_session_token_expr", AwsSessionTokenExpr, "parallel", conn.GetProp("PARALLEL"), ) @@ -355,6 +360,11 @@ func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Colu return } + AwsSessionTokenExpr := "" + if AwsSessionToken != "" { + AwsSessionTokenExpr = g.F(";token=%s", AwsSessionToken) + } + tgtColumns := conn.GetType().QuoteNames(columns.Names()...) g.Debug("copying into redshift from s3") @@ -366,7 +376,7 @@ func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Colu "s3_path", s3Path, "aws_access_key_id", AwsID, "aws_secret_access_key", AwsAccessKey, - "aws_session_token", AwsSessionToken, + "aws_session_token_expr", AwsSessionTokenExpr, ) sql = conn.setEmptyAsNull(sql) diff --git a/core/dbio/templates/redshift.yaml b/core/dbio/templates/redshift.yaml index 2d776f8a..cdf8d2da 100755 --- a/core/dbio/templates/redshift.yaml +++ b/core/dbio/templates/redshift.yaml @@ -34,12 +34,12 @@ core: copy_from_s3: | COPY {tgt_table} ({tgt_columns}) from '{s3_path}' - credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key};token={aws_session_token}' + credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}{aws_session_token_expr}' CSV delimiter ',' EMPTYASNULL BLANKSASNULL GZIP IGNOREHEADER 1 DATEFORMAT 'auto' TIMEFORMAT 'auto' copy_to_s3: | unload ('{sql}') to '{s3_path}' - credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key};token={aws_session_token}' + credentials 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}{aws_session_token_expr}' gzip allowoverwrite CSV PARALLEL {parallel} NULL '\\N' HEADER DELIMITER ',' alter_columns: | alter table {table} {col_ddl}