diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go index 61610f6f89d..bd48d9ae991 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go @@ -68,13 +68,13 @@ func (e Expression) Validate() error { return nil } - return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs() + return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs(e) } // ValidateForAvro checks whether topic pattern is {schema}_{table}, the only allowed func (e Expression) ValidateForAvro() error { if ok := avroTopicNameRE.MatchString(string(e)); !ok { - return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs( + return errors.ErrKafkaInvalidTopicExpression.GenWithStackByArgs(e, "topic rule for Avro must contain {schema} and {table}", ) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go index 070ae718d49..481c86a2fd0 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go +++ b/cdc/sink/dmlsink/mq/dispatcher/topic/expression_test.go @@ -17,10 +17,13 @@ import ( "fmt" "testing" + "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) func TestSubstituteTopicExpression(t *testing.T) { + t.Parallel() + cases := []struct { name string expression string @@ -228,6 +231,8 @@ func TestSubstituteTopicExpression(t *testing.T) { } func TestSchemaOptional(t *testing.T) { + t.Parallel() + expression := "prefix_{table}" topicExpr := Expression(expression) err := topicExpr.Validate() @@ -240,6 +245,8 @@ func TestSchemaOptional(t *testing.T) { } func TestTableOptional(t *testing.T) { + t.Parallel() + expression := "prefix_{schema}" topicExpr := Expression(expression) err := topicExpr.Validate() @@ -251,6 +258,22 @@ func TestTableOptional(t *testing.T) { require.Equal(t, topicName, "prefix_test") } +func TestInvalidExpression(t *testing.T) { + t.Parallel() + + invalidExpr := "%invalid{schema}" + topicExpr := Expression(invalidExpr) + + err := topicExpr.Validate() + require.ErrorIs(t, err, errors.ErrKafkaInvalidTopicExpression) + require.ErrorContains(t, err, invalidExpr) + + err = topicExpr.ValidateForAvro() + require.ErrorIs(t, err, errors.ErrKafkaInvalidTopicExpression) + require.ErrorContains(t, err, "Avro") + require.ErrorContains(t, err, invalidExpr) +} + // cmd: go test -run='^$' -bench '^(BenchmarkSubstitute)$' github.com/pingcap/tiflow/cdc/sink/dispatcher/topic // goos: linux // goarch: amd64 diff --git a/errors.toml b/errors.toml index 5e705e1d20b..5daaaa9a841 100755 --- a/errors.toml +++ b/errors.toml @@ -488,7 +488,7 @@ kafka send message failed ["CDC:ErrKafkaTopicExprInvalid"] error = ''' -invalid topic expression +invalid topic expression: %s ''' ["CDC:ErrLeaseExpired"] diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 0c4eaeacf1c..3a055316195 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -201,7 +201,7 @@ var ( errors.RFCCodeText("CDC:ErrKafkaCreateTopic"), ) ErrKafkaInvalidTopicExpression = errors.Normalize( - "invalid topic expression", + "invalid topic expression: %s ", errors.RFCCodeText("CDC:ErrKafkaTopicExprInvalid"), ) ErrKafkaConfigNotFound = errors.Normalize(