From 23b39a776f8abbd20627cbaa35e3535dd5fd9b01 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Thu, 5 Dec 2024 16:37:42 +0530 Subject: [PATCH 1/2] feat: add support for macros and headers --- mc2mc/internal/client/client_test.go | 3 +- mc2mc/internal/loader/append.go | 6 ++- mc2mc/internal/loader/helper.go | 25 ++++++++++++ mc2mc/internal/loader/helper_test.go | 58 ++++++++++++++++++++++++++++ mc2mc/internal/loader/replace.go | 6 ++- 5 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 mc2mc/internal/loader/helper.go create mode 100644 mc2mc/internal/loader/helper_test.go diff --git a/mc2mc/internal/client/client_test.go b/mc2mc/internal/client/client_test.go index d84083b..33ff72a 100644 --- a/mc2mc/internal/client/client_test.go +++ b/mc2mc/internal/client/client_test.go @@ -6,9 +6,10 @@ import ( "os" "testing" - "github.com/goto/transformers/mc2mc/internal/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/goto/transformers/mc2mc/internal/client" ) func TestExecute(t *testing.T) { diff --git a/mc2mc/internal/loader/append.go b/mc2mc/internal/loader/append.go index e46ac68..7466145 100644 --- a/mc2mc/internal/loader/append.go +++ b/mc2mc/internal/loader/append.go @@ -17,9 +17,11 @@ func NewAppendLoader(logger *slog.Logger) (*appendLoader, error) { } func (l *appendLoader) GetQuery(tableID, query string) string { - return fmt.Sprintf("INSERT INTO TABLE %s %s;", tableID, query) + headers, qr := SeparateHeadersAndQuery(query) + return fmt.Sprintf("%s INSERT INTO TABLE %s %s;", headers, tableID, qr) } func (l *appendLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string { - return fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s;", tableID, strings.Join(partitionNames, ", "), query) + headers, qr := SeparateHeadersAndQuery(query) + return fmt.Sprintf("%s INSERT INTO TABLE %s PARTITION (%s) %s;", headers, tableID, strings.Join(partitionNames, ", "), qr) } diff --git a/mc2mc/internal/loader/helper.go b/mc2mc/internal/loader/helper.go new file mode 100644 index 0000000..7d574e2 --- /dev/null +++ b/mc2mc/internal/loader/helper.go @@ -0,0 +1,25 @@ +package loader + +import ( + "strings" +) + +func SeparateHeadersAndQuery(query string) (string, string) { + parts := strings.Split(query, ";") + + last := "" + idx := len(parts) - 1 + for idx >= 0 { + last = parts[idx] + if strings.TrimSpace(last) != "" { + break + } + idx = idx - 1 + } + + headers := strings.Join(parts[:idx], ";") + if headers != "" { + headers += ";" + } + return headers, last +} diff --git a/mc2mc/internal/loader/helper_test.go b/mc2mc/internal/loader/helper_test.go new file mode 100644 index 0000000..a54aaf0 --- /dev/null +++ b/mc2mc/internal/loader/helper_test.go @@ -0,0 +1,58 @@ +package loader_test + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/goto/transformers/mc2mc/internal/loader" +) + +func TestMacroSeparator(t *testing.T) { + t.Run("returns query without macros", func(t *testing.T) { + q1 := `select * from playground` + macros, query := loader.SeparateHeadersAndQuery(q1) + assert.Empty(t, macros) + assert.Equal(t, q1, query) + }) + t.Run("returns query removing whitespace", func(t *testing.T) { + q1 := ` +select * from playground` + + header, query := loader.SeparateHeadersAndQuery(q1) + assert.Empty(t, header) + assert.Contains(t, query, q1) + }) + t.Run("splits headers and query", func(t *testing.T) { + q1 := `set odps.sql.allow.fullscan=true; +select * from playground` + headers, query := loader.SeparateHeadersAndQuery(q1) + assert.Equal(t, "set odps.sql.allow.fullscan=true;", headers) + assert.Equal(t, "select * from playground", strings.TrimSpace(query)) + }) + t.Run("works with query of multiple headers", func(t *testing.T) { + q1 := `set odps.sql.allow.fullscan=true; +set odps.sql.python.version=cp37; + +select distinct event_timestamp, + client_id, + country_code, +from presentation.main.important_date +where CAST(event_timestamp as DATE) = '{{ .DSTART | Date }}' + and client_id in ('123') +` + headers, query := loader.SeparateHeadersAndQuery(q1) + expectedHeader := `set odps.sql.allow.fullscan=true; +set odps.sql.python.version=cp37;` + assert.Equal(t, expectedHeader, headers) + + expectedQuery := `select distinct event_timestamp, + client_id, + country_code, +from presentation.main.important_date +where CAST(event_timestamp as DATE) = '{{ .DSTART | Date }}' + and client_id in ('123')` + assert.Contains(t, query, expectedQuery) + }) +} diff --git a/mc2mc/internal/loader/replace.go b/mc2mc/internal/loader/replace.go index 7afcba1..c217d28 100644 --- a/mc2mc/internal/loader/replace.go +++ b/mc2mc/internal/loader/replace.go @@ -17,9 +17,11 @@ func NewReplaceLoader(logger *slog.Logger) *replaceLoader { } func (l *replaceLoader) GetQuery(tableID, query string) string { - return fmt.Sprintf("INSERT OVERWRITE TABLE %s %s;", tableID, query) + headers, qr := SeparateHeadersAndQuery(query) + return fmt.Sprintf("%s INSERT OVERWRITE TABLE %s %s;", headers, tableID, qr) } func (l *replaceLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string { - return fmt.Sprintf("INSERT OVERWRITE TABLE %s PARTITION (%s) %s;", tableID, strings.Join(partitionNames, ", "), query) + headers, qr := SeparateHeadersAndQuery(query) + return fmt.Sprintf("%s INSERT OVERWRITE TABLE %s PARTITION (%s) %s;", headers, tableID, strings.Join(partitionNames, ", "), qr) } From 3ac15cab64553b330d03ccb48b510ec481e8ef5f Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Thu, 5 Dec 2024 17:17:04 +0530 Subject: [PATCH 2/2] feat: add header separation to clien --- mc2mc/internal/client/client.go | 6 ++++-- mc2mc/internal/client/setup.go | 3 ++- mc2mc/main.go | 1 + mc2mc/mc2mc.go | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/mc2mc/internal/client/client.go b/mc2mc/internal/client/client.go index a2f0ad8..f62f703 100644 --- a/mc2mc/internal/client/client.go +++ b/mc2mc/internal/client/client.go @@ -9,6 +9,8 @@ import ( "strings" "github.com/pkg/errors" + + "github.com/goto/transformers/mc2mc/internal/loader" ) type Loader interface { @@ -91,6 +93,6 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err // TODO: remove this temporary support after 15 nov func addPartitionValueColumn(rawQuery []byte) []byte { - sanitizeQuery := strings.TrimSuffix(string(rawQuery), ";") - return []byte(fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", sanitizeQuery)) + header, qr := loader.SeparateHeadersAndQuery(string(rawQuery)) + return []byte(fmt.Sprintf("%s SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", header, qr)) } diff --git a/mc2mc/internal/client/setup.go b/mc2mc/internal/client/setup.go index 358a76c..411a1b0 100644 --- a/mc2mc/internal/client/setup.go +++ b/mc2mc/internal/client/setup.go @@ -2,9 +2,10 @@ package client import ( "github.com/aliyun/aliyun-odps-go-sdk/odps" + "github.com/pkg/errors" + "github.com/goto/transformers/mc2mc/internal/loader" "github.com/goto/transformers/mc2mc/internal/logger" - "github.com/pkg/errors" ) type SetupFn func(c *Client) error diff --git a/mc2mc/main.go b/mc2mc/main.go index cf8cb69..8cb71c1 100644 --- a/mc2mc/main.go +++ b/mc2mc/main.go @@ -5,6 +5,7 @@ import ( "os" _ "github.com/aliyun/aliyun-odps-go-sdk/sqldriver" + "github.com/goto/transformers/mc2mc/internal/logger" ) diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index 1d3bb6c..d85d3d7 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -6,9 +6,10 @@ import ( "os/signal" "syscall" + "github.com/pkg/errors" + "github.com/goto/transformers/mc2mc/internal/client" "github.com/goto/transformers/mc2mc/internal/config" - "github.com/pkg/errors" ) func mc2mc() error {