From 56424a1ec7108597ae76912efa507f7ccf401a78 Mon Sep 17 00:00:00 2001 From: nikhil-ctds <151718832+nikhil-ctds@users.noreply.github.com> Date: Thu, 30 May 2024 17:21:55 +0530 Subject: [PATCH] [fix][io] Upgrade debezium oracle to 2.5.4.Final (#279) * Revert "Upgrade debezium version to 2.5.4.Final" This reverts commit 1e4ffcc14ea3bd036cc7f32024bd501fca9d74ae. * Revert "Upgrade debezium to 2.3.5.Final" This reverts commit c7d8ed66727c1e1edc41b2c0517c18952b5e0563. * Added debezium-core to debezium-oracle * Move Debezium-core to oracle to use 2.5.4.Final version * Update Imports * Remove pulsar-io-debezium-core dependency * Fix Dependencies * Fix Checkstyle * Added Oracle JDBC driver for 12c * Revert "Added Oracle JDBC driver for 12c" This reverts commit 8260508fbbbd6b895cfcfefc70a9ae9ae0ef1c28. * Updated configs for upgraded debezium version --------- Co-authored-by: mukesh-ctds --- pom.xml | 3 +- .../pulsar/io/debezium/DebeziumSource.java | 2 +- .../io/debezium/PulsarDatabaseHistory.java | 22 +- .../debezium/PulsarDatabaseHistoryTest.java | 18 +- pulsar-io/debezium/oracle/pom.xml | 41 ++- .../debezium/oracle/DebeziumOracleSource.java | 1 - .../io/debezium/oracle/DebeziumSource.java | 117 +++++++ .../oracle/PulsarDatabaseHistory.java | 316 ++++++++++++++++++ .../pulsar/io/debezium/oracle/SerDeUtils.java | 71 ++++ .../DebeziumOracleDbSourceTester.java | 4 +- 10 files changed, 567 insertions(+), 28 deletions(-) create mode 100644 pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java create mode 100644 pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java create mode 100644 pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java diff --git a/pom.xml b/pom.xml index 9778a5c179186..59e291e33ca03 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,8 @@ flexible messaging model and an intuitive client API. 334 2.13 2.13.10 - 2.5.4.Final + 1.9.7.Final + 2.5.4.Final 42.5.0 8.0.30 diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 2a0ca8125b0e4..9e731fe48bbdb 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -88,7 +88,7 @@ public void open(Map config, SourceContext sourceContext) throws setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); // database.history : implementation class for database history. - setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY); + setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); // database.history.pulsar.service.url String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index e9f52e69ff625..8bad8885a4c55 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -26,12 +26,12 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.document.DocumentReader; -import io.debezium.relational.history.AbstractSchemaHistory; +import io.debezium.relational.history.AbstractDatabaseHistory; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.DatabaseHistoryListener; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecordComparator; -import io.debezium.relational.history.SchemaHistory; -import io.debezium.relational.history.SchemaHistoryException; -import io.debezium.relational.history.SchemaHistoryListener; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -52,12 +52,12 @@ import org.apache.pulsar.client.api.Schema; /** - * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified + * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. */ @Slf4j @ThreadSafe -public final class PulsarDatabaseHistory extends AbstractSchemaHistory { +public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") .withDisplayName("Database history topic name") @@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractSchemaHistory { TOPIC, SERVICE_URL, CLIENT_BUILDER, - SchemaHistory.NAME, + DatabaseHistory.NAME, READER_CONFIG); private final ObjectMapper mapper = new ObjectMapper(); @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractSchemaHistory { public void configure( Configuration config, HistoryRecordComparator comparator, - SchemaHistoryListener listener, + DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) { super.configure(config, comparator, listener, useCatalogBeforeSchema); if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { @@ -148,7 +148,7 @@ public void configure( } // Copy the relevant portions of the configuration and add useful defaults ... - this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); + this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString()); log.info("Configure to store the debezium database history {} to pulsar topic {}", dbHistoryName, topicName); @@ -201,7 +201,7 @@ public void start() { } @Override - protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { if (this.producer == null) { throw new IllegalStateException("No producer is available. Ensure that 'start()'" + " is called before storing database history records."); @@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { try { producer.send(record.toString()); } catch (PulsarClientException e) { - throw new SchemaHistoryException(e); + throw new DatabaseHistoryException(e); } } diff --git a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java index 8f6badd5c194b..081cfdcc5435a 100644 --- a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java +++ b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java @@ -27,8 +27,8 @@ import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; -import io.debezium.relational.history.SchemaHistory; -import io.debezium.relational.history.SchemaHistoryListener; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryListener; import io.debezium.text.ParsingException; import io.debezium.util.Collect; @@ -80,8 +80,8 @@ protected void cleanup() throws Exception { private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception { Configuration.Builder configBuidler = Configuration.create() .with(PulsarDatabaseHistory.TOPIC, topicName) - .with(SchemaHistory.NAME, "my-db-history") - .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); + .with(DatabaseHistory.NAME, "my-db-history") + .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL); if (testWithClientBuilder) { ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString()); @@ -101,7 +101,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit } // Start up the history ... - history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); history.start(); // Should be able to call start more than once ... @@ -160,7 +160,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit // Stop the history (which should stop the producer) ... history.stop(); history = new PulsarDatabaseHistory(); - history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true); + history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true); // no need to start // Recover from the very beginning to just past the first change ... @@ -240,11 +240,11 @@ public void testExists() throws Exception { Configuration config = Configuration.create() .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString()) .with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic") - .with(SchemaHistory.NAME, "my-db-history") - .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) + .with(DatabaseHistory.NAME, "my-db-history") + .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true) .build(); - history.configure(config, null, SchemaHistoryListener.NOOP, true); + history.configure(config, null, DatabaseHistoryListener.NOOP, true); history.start(); // dummytopic should not exist yet diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml index 412385418087f..bfaa998aa3e77 100644 --- a/pulsar-io/debezium/oracle/pom.xml +++ b/pulsar-io/debezium/oracle/pom.xml @@ -39,16 +39,49 @@ provided + + io.debezium + debezium-connector-oracle + ${debezium.oracle.version} + + + + io.debezium + debezium-core + ${debezium.oracle.version} + + ${project.groupId} - pulsar-io-debezium-core + pulsar-common ${project.version} - io.debezium - debezium-connector-oracle - ${debezium.version} + ${project.groupId} + pulsar-io-kafka-connect-adaptor + ${project.version} + + + + org.apache.kafka + connect-runtime + ${kafka-client.version} + + + org.apache.kafka + kafka-log4j-appender + + + jose4j + org.bitbucket.b_c + + + + + + org.apache.commons + commons-lang3 diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java index 3670c5c5043ad..d73e2218427a4 100644 --- a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java @@ -20,7 +20,6 @@ import java.util.Map; import org.apache.kafka.connect.runtime.TaskConfig; -import org.apache.pulsar.io.debezium.DebeziumSource; /** diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java new file mode 100644 index 0000000000000..892c4dcb5a8b7 --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumSource.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.oracle; + +import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; +import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; + +@Slf4j +public abstract class DebeziumSource extends KafkaConnectSource { + private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; + private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.oracle.PulsarDatabaseHistory"; + private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic"; + private static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic"; + + public static void throwExceptionIfConfigNotMatch(Map config, + String key, + String value) throws IllegalArgumentException { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + return; + } + + // throw exception if value not match + if (!orig.equals(value)) { + throw new IllegalArgumentException("Expected " + value + " but has " + orig); + } + } + + public static void setConfigIfNull(Map config, String key, String value) { + config.putIfAbsent(key, value); + } + + // namespace for output topics, default value is "tenant/namespace" + public static String topicNamespace(SourceContext sourceContext) { + String tenant = sourceContext.getTenant(); + String namespace = sourceContext.getNamespace(); + + return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" + + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); + } + + public static void tryLoadingConfigSecret(String secretName, Map config, SourceContext context) { + try { + String secret = context.getSecret(secretName); + if (secret != null) { + config.put(secretName, secret); + log.info("Config key {} set from secret.", secretName); + } + } catch (Exception e) { + log.warn("Failed to read secret {}.", secretName, e); + } + } + + public abstract void setDbConnectorTask(Map config) throws Exception; + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + setDbConnectorTask(config); + tryLoadingConfigSecret("database.user", config, sourceContext); + tryLoadingConfigSecret("database.password", config, sourceContext); + + // key.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + // value.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + + // database.history : implementation class for database history. + setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY); + + // database.history.pulsar.service.url + String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); + + String topicNamespace = topicNamespace(sourceContext); + // topic.namespace + setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); + + String sourceName = sourceContext.getSourceName(); + // database.history.pulsar.topic: history topic name + setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(), + topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC); + // offset.storage.topic: offset topic name + setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, + topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); + + // pass pulsar.client.builder if database.history.pulsar.service.url is not provided + if (StringUtils.isEmpty(pulsarUrl)) { + String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()); + config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder); + } + + super.open(config, sourceContext); + } + +} diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java new file mode 100644 index 0000000000000..ab84579e2cd92 --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/PulsarDatabaseHistory.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.oracle; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.debezium.annotation.ThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.relational.history.AbstractSchemaHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; + +/** + * A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified + * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic. + */ +@Slf4j +@ThreadSafe +public final class PulsarDatabaseHistory extends AbstractSchemaHistory { + + public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic") + .withDisplayName("Database history topic name") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("The name of the topic for the database schema history") + .withValidation(Field::isRequired); + + public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url") + .withDisplayName("Pulsar service url") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar service url") + .withValidation(Field::isOptional); + + public static final Field CLIENT_BUILDER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder") + .withDisplayName("Pulsar client builder") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("Pulsar client builder") + .withValidation(Field::isOptional); + + public static final Field READER_CONFIG = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.reader.config") + .withDisplayName("Extra configs of the reader") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.HIGH) + .withDescription("The configs of the reader for the database schema history topic, " + + "in the form of a JSON string with key-value pairs") + .withDefault((String) null) + .withValidation(Field::isOptional); + + public static final Field.Set ALL_FIELDS = Field.setOf( + TOPIC, + SERVICE_URL, + CLIENT_BUILDER, + SchemaHistory.NAME, + READER_CONFIG); + + private final ObjectMapper mapper = new ObjectMapper(); + private final DocumentReader reader = DocumentReader.defaultReader(); + private String topicName; + private Map readerConfigMap = new HashMap<>(); + private String dbHistoryName; + private ClientBuilder clientBuilder; + private volatile PulsarClient pulsarClient; + private volatile Producer producer; + + @Override + public void configure( + Configuration config, + HistoryRecordComparator comparator, + SchemaHistoryListener listener, + boolean useCatalogBeforeSchema) { + super.configure(config, comparator, listener, useCatalogBeforeSchema); + if (!config.validateAndRecord(ALL_FIELDS, logger::error)) { + throw new IllegalArgumentException("Error configuring an instance of " + + getClass().getSimpleName() + "; check the logs for details"); + } + this.topicName = config.getString(TOPIC); + try { + final String configString = config.getString(READER_CONFIG); + if (configString == null) { + this.readerConfigMap = Collections.emptyMap(); + } else { + this.readerConfigMap = mapper.readValue(configString, Map.class); + } + + } catch (JsonProcessingException exception) { + log.warn("The provided reader configs are invalid, " + + "will not passing any extra config to the reader builder.", exception); + } + + String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER); + if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) { + throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided."); + } + this.clientBuilder = PulsarClient.builder(); + if (!isBlank(clientBuilderBase64Encoded)) { + // deserialize the client builder to the same classloader + this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, + this.clientBuilder.getClass().getClassLoader()); + } else { + this.clientBuilder.serviceUrl(config.getString(SERVICE_URL)); + } + + // Copy the relevant portions of the configuration and add useful defaults ... + this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString()); + + log.info("Configure to store the debezium database history {} to pulsar topic {}", + dbHistoryName, topicName); + } + + @Override + public void initializeStorage() { + super.initializeStorage(); + + // try simple to publish an empty string to create topic + try (Producer p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { + p.send(""); + } catch (PulsarClientException pce) { + log.error("Failed to initialize storage", pce); + throw new RuntimeException("Failed to initialize storage", pce); + } + } + + void setupClientIfNeeded() { + if (null == this.pulsarClient) { + try { + pulsarClient = clientBuilder.build(); + } catch (PulsarClientException e) { + throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e); + } + } + } + + void setupProducerIfNeeded() { + setupClientIfNeeded(); + if (null == this.producer) { + try { + this.producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .producerName(dbHistoryName) + .blockIfQueueFull(true) + .create(); + } catch (PulsarClientException e) { + log.error("Failed to create pulsar producer to topic '{}'", topicName); + throw new RuntimeException("Failed to create pulsar producer to topic '" + + topicName, e); + } + } + } + + @Override + public void start() { + super.start(); + setupProducerIfNeeded(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + if (this.producer == null) { + throw new IllegalStateException("No producer is available. Ensure that 'start()'" + + " is called before storing database history records."); + } + if (log.isTraceEnabled()) { + log.trace("Storing record into database history: {}", record); + } + try { + producer.send(record.toString()); + } catch (PulsarClientException e) { + throw new SchemaHistoryException(e); + } + } + + @Override + public void stop() { + try { + if (this.producer != null) { + try { + producer.flush(); + } catch (PulsarClientException pce) { + // ignore the error to ensure the client is eventually closed + } finally { + this.producer.close(); + } + this.producer = null; + } + if (this.pulsarClient != null) { + pulsarClient.close(); + this.pulsarClient = null; + } + } catch (PulsarClientException pe) { + log.warn("Failed to closing pulsar client", pe); + } + } + + @Override + protected void recoverRecords(Consumer records) { + setupClientIfNeeded(); + try (Reader historyReader = createHistoryReader()) { + log.info("Scanning the database history topic '{}'", topicName); + + // Read all messages in the topic ... + MessageId lastProcessedMessageId = null; + + // read the topic until the end + while (historyReader.hasMessageAvailable()) { + Message msg = historyReader.readNext(); + try { + if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) { + if (!isBlank(msg.getValue())) { + HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue())); + if (log.isTraceEnabled()) { + log.trace("Recovering database history: {}", recordObj); + } + if (!recordObj.isValid()) { + log.warn("Skipping invalid database history record '{}'. This is often not an issue," + + " but if it happens repeatedly please check the '{}' topic.", + recordObj, topicName); + } else { + records.accept(recordObj); + log.trace("Recovered database history: {}", recordObj); + } + } + lastProcessedMessageId = msg.getMessageId(); + } + } catch (IOException ioe) { + log.error("Error while deserializing history record '{}'", msg.getValue(), ioe); + } catch (final Exception e) { + throw e; + } + } + log.info("Successfully completed scanning the database history topic '{}'", topicName); + } catch (IOException ioe) { + log.error("Encountered issues on recovering history records", ioe); + throw new RuntimeException("Encountered issues on recovering history records", ioe); + } + } + + @Override + public boolean exists() { + setupClientIfNeeded(); + try (Reader historyReader = createHistoryReader()) { + return historyReader.hasMessageAvailable(); + } catch (IOException e) { + log.error("Encountered issues on checking existence of database history", e); + throw new RuntimeException("Encountered issues on checking existence of database history", e); + } + } + + @Override + public boolean storageExists() { + return true; + } + + @Override + public String toString() { + if (topicName != null) { + return "Pulsar topic (" + topicName + ")"; + } + return "Pulsar topic"; + } + + @VisibleForTesting + Reader createHistoryReader() throws PulsarClientException { + return pulsarClient.newReader(Schema.STRING) + .topic(topicName) + .startMessageId(MessageId.earliest) + .loadConf(readerConfigMap) + .create(); + } +} diff --git a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java new file mode 100644 index 0000000000000..de7529fb8986e --- /dev/null +++ b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/SerDeUtils.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.debezium.oracle; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.util.Base64; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SerDeUtils { + public static Object deserialize(String objectBase64Encoded, ClassLoader classLoader) { + byte[] data = Base64.getDecoder().decode(objectBase64Encoded); + try (InputStream bai = new ByteArrayInputStream(data); + PulsarClientBuilderInputStream ois = new PulsarClientBuilderInputStream(bai, classLoader)) { + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to initialize the pulsar client to store debezium database history", e); + } + } + + public static String serialize(Object obj) throws Exception { + try (ByteArrayOutputStream bao = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bao)) { + oos.writeObject(obj); + oos.flush(); + byte[] data = bao.toByteArray(); + return Base64.getEncoder().encodeToString(data); + } + } + + static class PulsarClientBuilderInputStream extends ObjectInputStream { + private final ClassLoader classLoader; + public PulsarClientBuilderInputStream(InputStream in, ClassLoader ldr) throws IOException { + super(in); + this.classLoader = ldr; + } + + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + try { + return Class.forName(desc.getName(), true, classLoader); + } catch (Exception ex) { + log.warn("PulsarClientBuilderInputStream resolveClass failed {} {}", desc.getName(), ex); + } + return super.resolveClass(desc); + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java index 40078d67365ca..6dd38eabc8fc7 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java @@ -62,10 +62,12 @@ public DebeziumOracleDbSourceTester(PulsarCluster cluster) { sourceConfig.put("database.password", "dbz"); sourceConfig.put("database.server.name", "XE"); sourceConfig.put("database.dbname", "XE"); + sourceConfig.put("topic.prefix", "XE"); sourceConfig.put("snapshot.mode", "schema_only"); sourceConfig.put("schema.include.list", "inv"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("schema.history.internal", "org.apache.pulsar.io.debezium.oracle.PulsarDatabaseHistory"); + sourceConfig.put("schema.history.internal.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/oracle"); }