From 84c2fc54b1546371c3480d4ee4755b6bf2fe87b7 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Thu, 9 Jan 2025 15:21:29 +0700 Subject: [PATCH] feat(mc2mc): support single query generation when disable multi query is true (#66) feat: support single query generation when disable multi query is true --- mc2mc/internal/config/config.go | 23 ++++++++++++----------- mc2mc/mc2mc.go | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/mc2mc/internal/config/config.go b/mc2mc/internal/config/config.go index 52d160b..0bee427 100644 --- a/mc2mc/internal/config/config.go +++ b/mc2mc/internal/config/config.go @@ -9,17 +9,18 @@ import ( // ConfigEnv is a mc configuration for the component. type ConfigEnv struct { - LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` - OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"` - OtelAttributes string `env:"OTEL_ATTRIBUTES"` - MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"` - LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"` - QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` - DestinationTableID string `env:"DESTINATION_TABLE_ID"` - DStart string `env:"DSTART"` - DEnd string `env:"DEND"` - ExecutionProject string `env:"EXECUTION_PROJECT"` - LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"` + LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` + OtelCollectorGRPCEndpoint string `env:"OTEL_COLLECTOR_GRPC_ENDPOINT"` + OtelAttributes string `env:"OTEL_ATTRIBUTES"` + MCServiceAccount string `env:"MC_SERVICE_ACCOUNT"` + LoadMethod string `env:"LOAD_METHOD" envDefault:"APPEND"` + QueryFilePath string `env:"QUERY_FILE_PATH" envDefault:"/data/in/query.sql"` + DestinationTableID string `env:"DESTINATION_TABLE_ID"` + DStart string `env:"DSTART"` + DEnd string `env:"DEND"` + ExecutionProject string `env:"EXECUTION_PROJECT"` + LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"` + DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"` // TODO: delete this DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"` DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"` diff --git a/mc2mc/mc2mc.go b/mc2mc/mc2mc.go index dbc2e6c..05a23e4 100644 --- a/mc2mc/mc2mc.go +++ b/mc2mc/mc2mc.go @@ -86,6 +86,7 @@ func mc2mc(envs []string) error { } queriesToExecute = append(queriesToExecute, queryToExecute) case "REPLACE": + dstart := start.Format(time.DateTime) // normalize date format as temporary support queryBuilder := query.NewBuilder( l, client.NewODPSClient(l, cfg.GenOdps()), @@ -96,6 +97,21 @@ func mc2mc(envs []string) error { query.WithColumnOrder(), ) + // -- TODO(START): refactor this part -- + // if multi query generation is disabled, then execute the query as is + if cfg.DisableMultiQueryGeneration { + queryToExecute, err := queryBuilder.SetOptions( + query.WithQuery(string(raw)), + query.WithOverridedValue("_partitiontime", fmt.Sprintf("TIMESTAMP('%s')", dstart)), + query.WithOverridedValue("_partitiondate", fmt.Sprintf("DATE(TIMESTAMP('%s'))", dstart)), + ).Build() + if err != nil { + return errors.WithStack(err) + } + queriesToExecute = append(queriesToExecute, queryToExecute) + break + } + // generate queries for each date // if it contains break marker, it must uses window range greater than 1 day // if table destination is partition table, then it will be replaced based on the partition date @@ -126,6 +142,7 @@ func mc2mc(envs []string) error { } queriesToExecute = append(queriesToExecute, queryToExecute) } + // -- TODO(END): refactor this part -- case "MERGE": queryToExecute, err := query.NewBuilder( l,