From f4c69aa5f2d181d38d7caee4aaf7d6dec7198ddb Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Mon, 30 Dec 2024 23:03:31 +0700 Subject: [PATCH] fix: query on split + construct query with new line to avoid comments --- mc2mc/internal/query/builder.go | 14 ++-- mc2mc/internal/query/builder_test.go | 102 ++++++++++++++++++++++++--- 2 files changed, 101 insertions(+), 15 deletions(-) diff --git a/mc2mc/internal/query/builder.go b/mc2mc/internal/query/builder.go index b0e40e8..27b10ef 100644 --- a/mc2mc/internal/query/builder.go +++ b/mc2mc/internal/query/builder.go @@ -111,16 +111,16 @@ func (b *Builder) Build() (string, error) { if len(partitionNames) == 0 || b.enableAutoPartition { switch b.method { case APPEND: - query = fmt.Sprintf("INSERT INTO TABLE %s %s;", b.destinationTableID, query) + query = fmt.Sprintf("INSERT INTO TABLE %s \n%s\n;", b.destinationTableID, query) case REPLACE: - query = fmt.Sprintf("INSERT OVERWRITE TABLE %s %s;", b.destinationTableID, query) + query = fmt.Sprintf("INSERT OVERWRITE TABLE %s \n%s\n;", b.destinationTableID, query) } } else { switch b.method { case APPEND: - query = fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s;", b.destinationTableID, strings.Join(partitionNames, ", "), query) + query = fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) \n%s\n;", b.destinationTableID, strings.Join(partitionNames, ", "), query) case REPLACE: - query = fmt.Sprintf("INSERT OVERWRITE TABLE %s PARTITION (%s) %s;", b.destinationTableID, strings.Join(partitionNames, ", "), query) + query = fmt.Sprintf("INSERT OVERWRITE TABLE %s PARTITION (%s) \n%s\n;", b.destinationTableID, strings.Join(partitionNames, ", "), query) } } @@ -141,14 +141,14 @@ func (b *Builder) constructColumnOrder(query string) (string, error) { } b.orderedColumns = columns } - return fmt.Sprintf("SELECT %s FROM (%s)", strings.Join(b.orderedColumns, ", "), query), nil + return fmt.Sprintf("SELECT %s FROM (\n%s\n)", strings.Join(b.orderedColumns, ", "), query), nil } // constructPartitionValue constructs partition value for the given query // by adding a pseudo column __partitionvalue with the current date // this is for temporary solution to support partition value func (b *Builder) constructPartitionValue(query string) (string, error) { - return fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", query), nil + return fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (\n%s\n)", query), nil } // constructOverridedValues constructs query with overrided values @@ -168,5 +168,5 @@ func (b *Builder) constructOverridedValues(query string) (string, error) { columns[i] = fmt.Sprintf("%s as %s", val, col) } } - return fmt.Sprintf("SELECT %s FROM (%s)", strings.Join(columns, ", "), query), nil + return fmt.Sprintf("SELECT %s FROM (\n%s\n)", strings.Join(columns, ", "), query), nil } diff --git a/mc2mc/internal/query/builder_test.go b/mc2mc/internal/query/builder_test.go index eba7392..fdd032f 100644 --- a/mc2mc/internal/query/builder_test.go +++ b/mc2mc/internal/query/builder_test.go @@ -91,7 +91,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for append load method when contains overrided values but no column order", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -115,7 +121,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for append load method when temporary partition_value enable", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -141,7 +153,15 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table)));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM ( +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +) +;`, queryToExecute) }) t.Run("returns query for append load method when auto partition enable", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -168,7 +188,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for append load method for partition table", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -193,7 +219,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination PARTITION (col3) SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination PARTITION (col3) +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for append load method for partition table but autopartition enable", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -219,7 +251,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for append load method", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -247,7 +285,13 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) }) t.Run("returns query for replace load method", func(t *testing.T) { queryToExecute := `select * from project.playground.table;` @@ -275,7 +319,49 @@ func TestBuilder_Build(t *testing.T) { ).Build() assert.NoError(t, err) - assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute) + assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +) +) +;`, queryToExecute) + }) + t.Run("returns query for replace load method with comment in the end", func(t *testing.T) { + queryToExecute := `select * from project.playground.table +-- this is comment` + odspClient := &mockOdpsClient{ + orderedColumns: func() ([]string, error) { + return []string{"col1", "col2", "_partitiontime"}, nil + }, + partitionResult: func() ([]string, error) { + return []string{"col3"}, nil + }, + } + destinationTableID := "project.playground.table_destination" + + queryToExecute, err := query.NewBuilder( + logger.NewDefaultLogger(), + odspClient, + query.WithQuery(queryToExecute), + query.WithMethod(query.REPLACE), + query.WithDestination(destinationTableID), + query.WithOverridedValue("_partitiontime", "TIMESTAMP('2021-01-01')"), + query.WithOverridedValue("_partitiondate", "DATE(TIMESTAMP('2021-01-01'))"), + query.WithAutoPartition(true), + query.WithPartitionValue(true), + query.WithColumnOrder(), + ).Build() + + assert.NoError(t, err) + assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination +SELECT col1, col2, _partitiontime FROM ( +SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM ( +select * from project.playground.table +-- this is comment +) +) +;`, queryToExecute) }) }