Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for auto partition table #47

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions mc2mc/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Client struct {

// TODO: remove this temporary capability after 15 nov
enablePartitionValue bool
enableAutoPartition bool
}

func NewClient(ctx context.Context, setupFns ...SetupFn) (*Client, error) {
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
if err != nil {
return errors.WithStack(err)
}
if c.enablePartitionValue {
if c.enablePartitionValue && !c.enableAutoPartition {
queryRaw = addPartitionValueColumn(queryRaw)
}

Expand All @@ -76,7 +77,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err

// prepare query
queryToExec := c.Loader.GetQuery(tableID, string(queryRaw))
if len(partitionNames) > 0 {
if len(partitionNames) > 0 && !c.enableAutoPartition {
// when table is partitioned and auto partition is disabled, then we need to specify partition columns explicitly
c.logger.Info(fmt.Sprintf("table %s is partitioned by %s", tableID, strings.Join(partitionNames, ", ")))
queryToExec = c.Loader.GetPartitionedQuery(tableID, string(queryRaw), partitionNames)
}
Expand Down
51 changes: 50 additions & 1 deletion mc2mc/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestExecute(t *testing.T) {
})
t.Run("should return nil when everything is successful", func(t *testing.T) {
// arrange
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("APPEND"))
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
Expand All @@ -70,6 +70,42 @@ func TestExecute(t *testing.T) {
return nil
},
}
client.Loader = &mockLoader{
getQueryFunc: func(tableID, query string) string {
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
},
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
assert.True(t, true, "should be called")
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
// assert
assert.NoError(t, err)
})
t.Run("should return nil when everything is successful with enable auto partition", func(t *testing.T) {
// arrange
client, err := client.NewClient(context.TODO(), client.SetupLogger("error"), client.SetupLoader("REPLACE"), client.EnableAutoPartition(true))
require.NoError(t, err)
client.OdpsClient = &mockOdpsClient{
partitionResult: func() ([]string, error) {
return []string{"_partition_value"}, nil
},
execSQLResult: func() error {
return nil
},
}
client.Loader = &mockLoader{
getQueryFunc: func(tableID, query string) string {
return "INSERT OVERWRITE TABLE project_test.table_test SELECT * FROM table;"
},
getPartitionedQueryFunc: func(tableID, query string, partitionNames []string) string {
assert.False(t, true, "should not be called")
return "INSERT OVERWRITE TABLE project_test.table_test PARTITION(event_date) SELECT * FROM table;"
},
}
require.NoError(t, os.WriteFile("/tmp/query.sql", []byte("SELECT * FROM table;"), 0644))
// act
err = client.Execute(context.TODO(), "project_test.table_test", "/tmp/query.sql")
Expand All @@ -90,3 +126,16 @@ func (m *mockOdpsClient) GetPartitionNames(ctx context.Context, tableID string)
func (m *mockOdpsClient) ExecSQL(ctx context.Context, query string) error {
return m.execSQLResult()
}

type mockLoader struct {
getQueryFunc func(tableID, query string) string
getPartitionedQueryFunc func(tableID, query string, partitionNames []string) string
}

func (m *mockLoader) GetQuery(tableID, query string) string {
return m.getQueryFunc(tableID, query)
}

func (m *mockLoader) GetPartitionedQuery(tableID, query string, partitionNames []string) string {
return m.getPartitionedQueryFunc(tableID, query, partitionNames)
}
7 changes: 7 additions & 0 deletions mc2mc/internal/client/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ func EnablePartitionValue(enabled bool) SetupFn {
return nil
}
}

func EnableAutoPartition(enabled bool) SetupFn {
return func(c *Client) error {
c.enableAutoPartition = enabled
return nil
}
}
2 changes: 2 additions & 0 deletions mc2mc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
ScheduledTime string
// TODO: remove this temporary support after 15 nov 2024
DevEnablePartitionValue bool
DevEnableAutoPartition bool
}

type maxComputeCredentials struct {
Expand All @@ -41,6 +42,7 @@ func NewConfig() (*Config, error) {
ScheduledTime: getEnv("SCHEDULED_TIME", ""),
// TODO: delete this after 15 nov
DevEnablePartitionValue: getEnv("DEV__ENABLE_PARTITION_VALUE", "false") == "true",
DevEnableAutoPartition: getEnv("DEV__ENABLE_AUTO_PARTITION", "false") == "true",
}
// ali-odps-go-sdk related config
scvAcc := getEnv("MC_SERVICE_ACCOUNT", "")
Expand Down
1 change: 1 addition & 0 deletions mc2mc/mc2mc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func mc2mc() error {
client.SetupODPSClient(cfg.GenOdps()),
client.SetupLoader(cfg.LoadMethod),
client.EnablePartitionValue(cfg.DevEnablePartitionValue),
client.EnableAutoPartition(cfg.DevEnableAutoPartition),
)
if err != nil {
return errors.WithStack(err)
Expand Down
Loading