From 6bd61d35c10ffffa102665631fecd4a81c97fead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 12 Oct 2024 14:59:53 +0000 Subject: [PATCH] is this possible? --- flow/connectors/clickhouse/cdc.go | 7 +++++-- flow/connectors/core.go | 3 +-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index d8067f2d58..a10bc49e64 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -18,8 +18,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,6 +36,9 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { + if block.Rows == 0 && block.Info.Overflows { + return nil + } if block.Rows != 1 { return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e7dc2bf688..9facc671a8 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -426,8 +426,7 @@ func GetByNameAs[T Connector](ctx context.Context, env map[string]string, catalo } func CloseConnector(ctx context.Context, conn Connector) { - err := conn.Close() - if err != nil { + if err := conn.Close(); err != nil { logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err)) } }