diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 97b51a4903..3e002f5028 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -138,7 +138,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err) } err = c.execWithLogging(ctx, - fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", + fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN IF NOT EXISTS `%s` %s", schemaDelta.DstTableName, addedColumn.Name, clickHouseColType)) if err != nil { return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.Name, schemaDelta.DstTableName, err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 1c7d110e37..8bb5504495 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -92,21 +92,21 @@ func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error { // add a column if err := c.exec(ctx, - fmt.Sprintf("ALTER TABLE %s ADD COLUMN updated_at DateTime64(9) DEFAULT now64()", validateDummyTableName), + fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN updated_at DateTime64(9) DEFAULT now64()", validateDummyTableName), ); err != nil { return fmt.Errorf("failed to add column to validation table %s: %w", validateDummyTableName, err) } // rename the table if err := c.exec(ctx, - fmt.Sprintf("RENAME TABLE %s TO %s", validateDummyTableName, validateDummyTableName+"_renamed"), + fmt.Sprintf("RENAME TABLE `%s` TO `%s`", validateDummyTableName, validateDummyTableName+"_renamed"), ); err != nil { return fmt.Errorf("failed to rename validation table %s: %w", validateDummyTableName, err) } validateDummyTableName += "_renamed" // insert a row - if err := c.exec(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1, now64())", validateDummyTableName)); err != nil { + if err := c.exec(ctx, fmt.Sprintf("INSERT INTO `%s` VALUES (1, now64())", validateDummyTableName)); err != nil { return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err) }