diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index e7f5d379..e8c75fbe 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -322,7 +322,7 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent) { List columns = new ArrayList<>(columnCount); for (short i = 0; i < columnCount; ++i) { Field field = fieldEvent.getFields(i); - String columnName = field.getName(); + String columnName = validateColumnName(field.getName(), schemaName, tableName); String columnType = field.getType().name(); VitessType vitessType = VitessType.resolve(columnType); if (vitessType.getJdbcId() == Types.OTHER) { @@ -388,4 +388,20 @@ else if (uniqueKeyColumnName != null) { return tableEditor.create(); } + + private static String validateColumnName(String columnName, String schemaName, String tableName) { + int length = columnName.length(); + if (length == 0) { + throw new IllegalArgumentException( + String.format("Empty column name from schema: %s, table: %s", schemaName, tableName)); + } + char first = columnName.charAt(0); + // Vitess VStreamer schema reloading transient bug could cause column names to be anonymized to @1, @2, etc + // We want to fail in this case instead of sending the corrupted row events with @1, @2 as column names. + if (first == '@') { + throw new IllegalArgumentException( + String.format("Illegal prefix '@' for column: %s, from schema: %s, table: %s", columnName, schemaName, tableName)); + } + return columnName; + } } diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 2db6359c..9f3dc883 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -120,8 +120,8 @@ private Vgtid getVgtid(Vtgate.VStreamResponse response) { // The VStreamResponse that contains an VERSION vEvent does not have VGTID. // We do not update lastReceivedVgtid in this case. // It can also be null if the 1st grpc response does not have vgtid upon restart - LOGGER.warn("No vgtid found in response {}...", response.toString().substring(0, Math.min(100, response.toString().length()))); - LOGGER.debug("Response is {}", response); + LOGGER.trace("No vgtid found in response {}...", response.toString().substring(0, Math.min(100, response.toString().length()))); + LOGGER.debug("Full response is {}", response); return null; } if (vgtids.size() > 1) { diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 17f3857b..8078630a 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -9,10 +9,12 @@ import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.IntStream; @@ -33,6 +35,7 @@ import io.debezium.data.Envelope; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; +import io.debezium.embedded.EmbeddedEngine; import io.debezium.relational.TableId; import io.debezium.util.Testing; @@ -408,6 +411,34 @@ public void shouldPrioritizePrimaryKeyAsRecordKey() throws Exception { assertConnectorNotRunning(); } + @Test + @FixFor("DBZ-2836") + public void shouldTaskFailIfColumnNameInvalid() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + TestHelper.execute("ALTER TABLE numeric_table ADD `@1` INT;"); + + CountDownLatch latch = new CountDownLatch(1); + EmbeddedEngine.CompletionCallback completionCallback = (success, message, error) -> { + if (error != null) { + latch.countDown(); + } + else { + fail("A controlled exception was expected...."); + } + }; + start(VitessConnector.class, TestHelper.defaultConfig().build(), completionCallback); + assertConnectorIsRunning(); + waitForStreamingRunning(); + + // Connector receives a row whose column name is not valid, task should fail + TestHelper.execute(INSERT_NUMERIC_TYPES_STMT); + if (!latch.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)) { + fail("did not reach stop condition in time"); + } + + assertConnectorNotRunning(); + } + private void startConnector() throws InterruptedException { startConnector(false); }