diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 5e636adee1..dfd0c8d99a 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -190,21 +190,17 @@ func (s ClickHouseSuite) Test_NullableMirrorSetting() { srcFullName := s.attachSchemaSuffix(srcTableName) dstTableName := "test_nullable_mirror_dst" - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + require.NoError(s.t, s.source.Exec(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, - key TEXT NOT NULL, + ky TEXT NOT NULL, val TEXT, n NUMERIC, t TIMESTAMP ); - `, srcFullName)) - require.NoError(s.t, err) + `, srcFullName))) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('init'); - `, srcFullName)) - require.NoError(s.t, err) + require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('init')`, srcFullName))) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("ch_nullable_mirror"), @@ -219,14 +215,11 @@ func (s ClickHouseSuite) Test_NullableMirrorSetting() { env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",val,n,t") + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,ky,val,n,t") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('cdc'); - `, srcFullName)) - require.NoError(s.t, err) + require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('cdc')`, srcFullName))) - e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",val,n,t") + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,ky,val,n,t") env.Cancel() e2e.RequireEnvCanceled(s.t, env) @@ -237,21 +230,17 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() { srcFullName := s.attachSchemaSuffix(srcTableName) dstTableName := "test_nullable_column_dst" - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + require.NoError(s.t, s.source.Exec(fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, - key TEXT NOT NULL, + ky TEXT NOT NULL, val TEXT, n NUMERIC, t TIMESTAMP ); - `, srcFullName)) - require.NoError(s.t, err) + `, srcFullName))) - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('init'); - `, srcFullName)) - require.NoError(s.t, err) + require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('init')`, srcFullName))) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("ch_nullable_column"), @@ -262,7 +251,7 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() { flowConnConfig.DoInitialSnapshot = true for _, tm := range flowConnConfig.TableMappings { tm.Columns = []*protos.ColumnSetting{ - {SourceName: "key", NullableEnabled: true}, + {SourceName: "ky", NullableEnabled: true}, {SourceName: "val", NullableEnabled: true}, {SourceName: "n", NullableEnabled: true}, {SourceName: "t", NullableEnabled: true}, @@ -273,14 +262,11 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() { env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) - e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",val,n,t") + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,ky,val,n,t") - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key) VALUES ('cdc'); - `, srcFullName)) - require.NoError(s.t, err) + require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('cdc')`, srcFullName))) - e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",val,n,t") + e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,ky,val,n,t") env.Cancel() e2e.RequireEnvCanceled(s.t, env)