Skip to content

Commit

Permalink
DBZ-2836 Invalid column name should fail connector with meaningful me…
Browse files Browse the repository at this point in the history
…ssage (debezium#11)
  • Loading branch information
keweishang authored Dec 14, 2020
1 parent 2ab62ae commit 96a917b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private void handleFieldMessage(Binlogdata.VEvent vEvent) {
List<ColumnMetaData> columns = new ArrayList<>(columnCount);
for (short i = 0; i < columnCount; ++i) {
Field field = fieldEvent.getFields(i);
String columnName = field.getName();
String columnName = validateColumnName(field.getName(), schemaName, tableName);
String columnType = field.getType().name();
VitessType vitessType = VitessType.resolve(columnType);
if (vitessType.getJdbcId() == Types.OTHER) {
Expand Down Expand Up @@ -388,4 +388,20 @@ else if (uniqueKeyColumnName != null) {

return tableEditor.create();
}

private static String validateColumnName(String columnName, String schemaName, String tableName) {
int length = columnName.length();
if (length == 0) {
throw new IllegalArgumentException(
String.format("Empty column name from schema: %s, table: %s", schemaName, tableName));
}
char first = columnName.charAt(0);
// Vitess VStreamer schema reloading transient bug could cause column names to be anonymized to @1, @2, etc
// We want to fail in this case instead of sending the corrupted row events with @1, @2 as column names.
if (first == '@') {
throw new IllegalArgumentException(
String.format("Illegal prefix '@' for column: %s, from schema: %s, table: %s", columnName, schemaName, tableName));
}
return columnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ private Vgtid getVgtid(Vtgate.VStreamResponse response) {
// The VStreamResponse that contains an VERSION vEvent does not have VGTID.
// We do not update lastReceivedVgtid in this case.
// It can also be null if the 1st grpc response does not have vgtid upon restart
LOGGER.warn("No vgtid found in response {}...", response.toString().substring(0, Math.min(100, response.toString().length())));
LOGGER.debug("Response is {}", response);
LOGGER.trace("No vgtid found in response {}...", response.toString().substring(0, Math.min(100, response.toString().length())));
LOGGER.debug("Full response is {}", response);
return null;
}
if (vgtids.size() > 1) {
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;
Expand All @@ -33,6 +35,7 @@
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;

Expand Down Expand Up @@ -408,6 +411,34 @@ public void shouldPrioritizePrimaryKeyAsRecordKey() throws Exception {
assertConnectorNotRunning();
}

@Test
@FixFor("DBZ-2836")
public void shouldTaskFailIfColumnNameInvalid() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
TestHelper.execute("ALTER TABLE numeric_table ADD `@1` INT;");

CountDownLatch latch = new CountDownLatch(1);
EmbeddedEngine.CompletionCallback completionCallback = (success, message, error) -> {
if (error != null) {
latch.countDown();
}
else {
fail("A controlled exception was expected....");
}
};
start(VitessConnector.class, TestHelper.defaultConfig().build(), completionCallback);
assertConnectorIsRunning();
waitForStreamingRunning();

// Connector receives a row whose column name is not valid, task should fail
TestHelper.execute(INSERT_NUMERIC_TYPES_STMT);
if (!latch.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)) {
fail("did not reach stop condition in time");
}

assertConnectorNotRunning();
}

private void startConnector() throws InterruptedException {
startConnector(false);
}
Expand Down

0 comments on commit 96a917b

Please sign in to comment.