Skip to content

Commit

Permalink
fix: query on split + construct query with new line to avoid comments
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Jan 6, 2025
1 parent 54e753e commit f4c69aa
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 15 deletions.
14 changes: 7 additions & 7 deletions mc2mc/internal/query/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,16 @@ func (b *Builder) Build() (string, error) {
if len(partitionNames) == 0 || b.enableAutoPartition {
switch b.method {
case APPEND:
query = fmt.Sprintf("INSERT INTO TABLE %s %s;", b.destinationTableID, query)
query = fmt.Sprintf("INSERT INTO TABLE %s \n%s\n;", b.destinationTableID, query)
case REPLACE:
query = fmt.Sprintf("INSERT OVERWRITE TABLE %s %s;", b.destinationTableID, query)
query = fmt.Sprintf("INSERT OVERWRITE TABLE %s \n%s\n;", b.destinationTableID, query)
}
} else {
switch b.method {
case APPEND:
query = fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) %s;", b.destinationTableID, strings.Join(partitionNames, ", "), query)
query = fmt.Sprintf("INSERT INTO TABLE %s PARTITION (%s) \n%s\n;", b.destinationTableID, strings.Join(partitionNames, ", "), query)
case REPLACE:
query = fmt.Sprintf("INSERT OVERWRITE TABLE %s PARTITION (%s) %s;", b.destinationTableID, strings.Join(partitionNames, ", "), query)
query = fmt.Sprintf("INSERT OVERWRITE TABLE %s PARTITION (%s) \n%s\n;", b.destinationTableID, strings.Join(partitionNames, ", "), query)
}
}

Expand All @@ -141,14 +141,14 @@ func (b *Builder) constructColumnOrder(query string) (string, error) {
}
b.orderedColumns = columns
}
return fmt.Sprintf("SELECT %s FROM (%s)", strings.Join(b.orderedColumns, ", "), query), nil
return fmt.Sprintf("SELECT %s FROM (\n%s\n)", strings.Join(b.orderedColumns, ", "), query), nil
}

// constructPartitionValue constructs partition value for the given query
// by adding a pseudo column __partitionvalue with the current date
// this is for temporary solution to support partition value
func (b *Builder) constructPartitionValue(query string) (string, error) {
return fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)", query), nil
return fmt.Sprintf("SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (\n%s\n)", query), nil
}

// constructOverridedValues constructs query with overrided values
Expand All @@ -168,5 +168,5 @@ func (b *Builder) constructOverridedValues(query string) (string, error) {
columns[i] = fmt.Sprintf("%s as %s", val, col)
}
}
return fmt.Sprintf("SELECT %s FROM (%s)", strings.Join(columns, ", "), query), nil
return fmt.Sprintf("SELECT %s FROM (\n%s\n)", strings.Join(columns, ", "), query), nil
}
102 changes: 94 additions & 8 deletions mc2mc/internal/query/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method when contains overrided values but no column order", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand All @@ -115,7 +121,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method when temporary partition_value enable", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand All @@ -141,7 +153,15 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table)));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method when auto partition enable", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand All @@ -168,7 +188,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method for partition table", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand All @@ -193,7 +219,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination PARTITION (col3) SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination PARTITION (col3)
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method for partition table but autopartition enable", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand All @@ -219,7 +251,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for append load method", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand Down Expand Up @@ -247,7 +285,13 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT INTO TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for replace load method", func(t *testing.T) {
queryToExecute := `select * from project.playground.table;`
Expand Down Expand Up @@ -275,7 +319,49 @@ func TestBuilder_Build(t *testing.T) {
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination SELECT col1, col2, _partitiontime FROM (SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (select * from project.playground.table));`, queryToExecute)
assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
)
)
;`, queryToExecute)
})
t.Run("returns query for replace load method with comment in the end", func(t *testing.T) {
queryToExecute := `select * from project.playground.table
-- this is comment`
odspClient := &mockOdpsClient{
orderedColumns: func() ([]string, error) {
return []string{"col1", "col2", "_partitiontime"}, nil
},
partitionResult: func() ([]string, error) {
return []string{"col3"}, nil
},
}
destinationTableID := "project.playground.table_destination"

queryToExecute, err := query.NewBuilder(
logger.NewDefaultLogger(),
odspClient,
query.WithQuery(queryToExecute),
query.WithMethod(query.REPLACE),
query.WithDestination(destinationTableID),
query.WithOverridedValue("_partitiontime", "TIMESTAMP('2021-01-01')"),
query.WithOverridedValue("_partitiondate", "DATE(TIMESTAMP('2021-01-01'))"),
query.WithAutoPartition(true),
query.WithPartitionValue(true),
query.WithColumnOrder(),
).Build()

assert.NoError(t, err)
assert.Equal(t, `INSERT OVERWRITE TABLE project.playground.table_destination
SELECT col1, col2, _partitiontime FROM (
SELECT col1, col2, TIMESTAMP('2021-01-01') as _partitiontime FROM (
select * from project.playground.table
-- this is comment
)
)
;`, queryToExecute)
})
}

Expand Down

0 comments on commit f4c69aa

Please sign in to comment.