Skip to content

Commit

Permalink
Implement table rename with atomic exchange (#2229)
Browse files Browse the repository at this point in the history
Currently, peerdb creates a _resync table and then tries to run 

```sql
DROP TABLE IF EXISTS target_table;
RENAME TABLE _resync_table TO target_table;
```

however, the problem is that this procedure breaks, once a depending
dictionary is defined on these tables.

In order to enable such behavior, this PR modifies the
ClickhouseConnector of Flow to

- check whether the database engine supports the `EXCHANGE TABLES`
command
(https://clickhouse.com/docs/en/sql-reference/statements/exchange)
- If it does and the table already existed, runs `EXCHANGE TABLES`
followed by a `DROP` which atomically swaps and therefore keeps the
dictionary references
- If it doesnt, there's a fallback to the old method.

Currently, this will work with the Atomic() database engine.

---------

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
martin31821 and serprex authored Nov 8, 2024
1 parent ab638d5 commit b7ca715
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"log/slog"
"strings"

_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand Down Expand Up @@ -93,8 +93,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -113,8 +112,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

err = c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID)
if err != nil {
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}
Expand All @@ -137,15 +135,13 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
for _, addedColumn := range schemaDelta.AddedColumns {
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w",
addedColumn.Type, err)
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
}
err = c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s",
schemaDelta.DstTableName, addedColumn.Name, clickHouseColType))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.Name,
schemaDelta.DstTableName, err)
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.Name, schemaDelta.DstTableName, err)
}
c.logger.Info(fmt.Sprintf("[schema delta replay] added column %s with data type %s", addedColumn.Name,
addedColumn.Type),
Expand Down Expand Up @@ -186,34 +182,47 @@ func (c *ClickHouseConnector) RenameTables(
}

allCols := strings.Join(columnNames, ",")
c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", renameRequest.NewName))
err = c.execWithLogging(ctx,
fmt.Sprintf("INSERT INTO %s(%s,%s) SELECT %s,true FROM %s WHERE %s = 1",
renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName))
if err != nil {
c.logger.Info("handling soft-deletes for table before rename", slog.String("NewName", renameRequest.NewName))
if err := c.execWithLogging(ctx,
fmt.Sprintf("INSERT INTO `%s`(%s,%s) SELECT %s,true FROM `%s` WHERE %s = 1",
renameRequest.CurrentName, allCols, signColName, allCols, renameRequest.NewName, signColName),
); err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", renameRequest.NewName, err)
}
} else {
c.logger.Info(fmt.Sprintf("table '%s' does not exist, skipping soft-deletes transfer for it", renameRequest.NewName))
}

// drop the dst table if exists
err = c.execWithLogging(ctx, "DROP TABLE IF EXISTS "+renameRequest.NewName)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", renameRequest.NewName, err)
// target table exists, so we can attempt to swap. In most cases, we will have Atomic engine,
// which supports a special query to exchange two tables, allowing dependent (materialized) views and dictionaries on these tables
c.logger.Info("attempting atomic exchange",
slog.String("OldName", renameRequest.CurrentName), slog.String("NewName", renameRequest.NewName))
if err = c.execWithLogging(ctx,
fmt.Sprintf("EXCHANGE TABLES %s and %s", renameRequest.NewName, renameRequest.CurrentName),
); err == nil {
if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, renameRequest.CurrentName)); err != nil {
return nil, fmt.Errorf("unable to drop exchanged table %s: %w", renameRequest.CurrentName, err)
}
} else if ex, ok := err.(*clickhouse.Exception); !ok || ex.Code != 48 {
// code 48 == not implemented -> move on to the fallback code, in all other error codes / types
// return, since we know/assume that the exchange would be the sensible action
return nil, fmt.Errorf("unable to exchange tables %s and %s: %w", renameRequest.NewName, renameRequest.CurrentName, err)
}
}

// rename the src table to dst
err = c.execWithLogging(ctx, fmt.Sprintf("RENAME TABLE %s TO %s",
renameRequest.CurrentName,
renameRequest.NewName))
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w",
renameRequest.CurrentName, renameRequest.NewName, err)
// either original table doesn't exist, in which case it is safe to just run rename,
// or err is set (in which case err comes from EXCHANGE TABLES)
if !originalTableExists || err != nil {
if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, renameRequest.NewName)); err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", renameRequest.NewName, err)
}

if err := c.execWithLogging(ctx,
fmt.Sprintf("RENAME TABLE %s TO %s", renameRequest.CurrentName, renameRequest.NewName),
); err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", renameRequest.CurrentName, renameRequest.NewName, err)
}
}

c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'",
renameRequest.CurrentName, renameRequest.NewName))
c.logger.Info("successfully renamed table",
slog.String("OldName", renameRequest.CurrentName), slog.String("NewName", renameRequest.NewName))
}

return &protos.RenameTablesOutput{
Expand Down

0 comments on commit b7ca715

Please sign in to comment.