Skip to content

Commit

Permalink
feat(mc2mc): support single query generation when disable multi query…
Browse files Browse the repository at this point in the history
… is true (#66)

feat: support single query generation when disable multi query is true
  • Loading branch information
deryrahman authored Jan 9, 2025
1 parent 317cfeb commit 84c2fc5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
23 changes: 12 additions & 11 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (

// ConfigEnv is a mc configuration for the component.
type ConfigEnv struct {
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
DStart string `env:"DSTART"`
DEnd string `env:"DEND"`
ExecutionProject string `env:"EXECUTION_PROJECT"`
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"`
OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"`
OtelAttributes string `env:"OTEL_ATTRIBUTES"`
MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"`
LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"`
QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"`
DestinationTableID string `env:"DESTINATION_TABLE_ID"`
DStart string `env:"DSTART"`
DEnd string `env:"DEND"`
ExecutionProject string `env:"EXECUTION_PROJECT"`
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
// TODO: delete this
DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"`
DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"`
Expand Down
17 changes: 17 additions & 0 deletions mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func mc2mc(envs []string) error {
}
queriesToExecute = append(queriesToExecute, queryToExecute)
case "REPLACE":
dstart := start.Format(time.DateTime) // normalize date format as temporary support
queryBuilder := query.NewBuilder(
l,
client.NewODPSClient(l, cfg.GenOdps()),
Expand All @@ -96,6 +97,21 @@ func mc2mc(envs []string) error {
query.WithColumnOrder(),
)

// -- TODO(START): refactor this part --
// if multi query generation is disabled, then execute the query as is
if cfg.DisableMultiQueryGeneration {
queryToExecute, err := queryBuilder.SetOptions(
query.WithQuery(string(raw)),
query.WithOverridedValue("_partitiontime", fmt.Sprintf("TIMESTAMP('%s')", dstart)),
query.WithOverridedValue("_partitiondate", fmt.Sprintf("DATE(TIMESTAMP('%s'))", dstart)),
).Build()
if err != nil {
return errors.WithStack(err)
}
queriesToExecute = append(queriesToExecute, queryToExecute)
break
}

// generate queries for each date
// if it contains break marker, it must uses window range greater than 1 day
// if table destination is partition table, then it will be replaced based on the partition date
Expand Down Expand Up @@ -126,6 +142,7 @@ func mc2mc(envs []string) error {
}
queriesToExecute = append(queriesToExecute, queryToExecute)
}
// -- TODO(END): refactor this part --
case "MERGE":
queryToExecute, err := query.NewBuilder(
l,
Expand Down

0 comments on commit 84c2fc5

Please sign in to comment.