diff --git a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java index 2a370ffa..1f92e21d 100644 --- a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java +++ b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java @@ -35,6 +35,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.server.BaseChangeConsumer; +import io.debezium.util.Strings; /** * Implementation of the consumer that delivers the messages into a Pulsar destination. @@ -77,8 +78,12 @@ public interface ProducerBuilder { void connect() { final Config config = ConfigProvider.getConfig(); try { + Map pulsarClientConfig = getConfigSubset(config, PROP_CLIENT_PREFIX); + Map camelCaseConfig = new HashMap<>(); + pulsarClientConfig.forEach((key, value) -> camelCaseConfig.put(Strings.convertDotAndUnderscoreStringToCamelCase(key), value)); + pulsarClient = PulsarClient.builder() - .loadConf(getConfigSubset(config, PROP_CLIENT_PREFIX)) + .loadConf(camelCaseConfig) .build(); } catch (PulsarClientException e) {