Skip to content

Commit

Permalink
[fix][io] Upgrade debezium oracle to 2.5.4.Final (#279)
Browse files Browse the repository at this point in the history
* Revert "Upgrade debezium version to 2.5.4.Final"

This reverts commit 1e4ffcc.

* Revert "Upgrade debezium to 2.3.5.Final"

This reverts commit c7d8ed6.

* Added debezium-core to debezium-oracle

* Move Debezium-core to oracle to use 2.5.4.Final version

* Update Imports

* Remove pulsar-io-debezium-core dependency

* Fix Dependencies

* Fix Checkstyle

* Added Oracle JDBC driver for 12c

* Revert "Added Oracle JDBC driver for 12c"

This reverts commit 8260508.

* Updated configs for upgraded debezium version

---------

Co-authored-by: mukesh-ctds <[email protected]>
  • Loading branch information
nikhil-ctds and mukesh154 authored May 30, 2024
1 parent 0a92854 commit 56424a1
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 28 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.10</scala-library.version>
<debezium.version>2.5.4.Final</debezium.version>
<debezium.version>1.9.7.Final</debezium.version>
<debezium.oracle.version>2.5.4.Final</debezium.oracle.version>
<debezium.postgresql.version>42.5.0</debezium.postgresql.version>
<debezium.mysql.version>8.0.30</debezium.mysql.version>
<!-- Override version that brings CVE-2022-3143 with debezium -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);

// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -52,12 +52,12 @@
import org.apache.pulsar.client.api.Schema;

/**
* A {@link SchemaHistory} implementation that records schema changes as normal pulsar messages on the specified
* A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
* topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
*/
@Slf4j
@ThreadSafe
public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {

public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
.withDisplayName("Database history topic name")
Expand Down Expand Up @@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
SchemaHistory.NAME,
DatabaseHistory.NAME,
READER_CONFIG);

private final ObjectMapper mapper = new ObjectMapper();
Expand All @@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends AbstractSchemaHistory {
public void configure(
Configuration config,
HistoryRecordComparator comparator,
SchemaHistoryListener listener,
DatabaseHistoryListener listener,
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
Expand Down Expand Up @@ -148,7 +148,7 @@ public void configure(
}

// Copy the relevant portions of the configuration and add useful defaults ...
this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());

log.info("Configure to store the debezium database history {} to pulsar topic {}",
dbHistoryName, topicName);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+ " is called before storing database history records.");
Expand All @@ -212,7 +212,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
try {
producer.send(record.toString());
} catch (PulsarClientException e) {
throw new SchemaHistoryException(e);
throw new DatabaseHistoryException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -80,8 +80,8 @@ protected void cleanup() throws Exception {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL);

if (testWithClientBuilder) {
ClientBuilder builder = PulsarClient.builder().serviceUrl(brokerUrl.toString());
Expand All @@ -101,7 +101,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
}

// Start up the history ...
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.start();

// Should be able to call start more than once ...
Expand Down Expand Up @@ -160,7 +160,7 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
// Stop the history (which should stop the producer) ...
history.stop();
history = new PulsarDatabaseHistory();
history.configure(configBuidler.build(), null, SchemaHistoryListener.NOOP, true);
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
// no need to start

// Recover from the very beginning to just past the first change ...
Expand Down Expand Up @@ -240,11 +240,11 @@ public void testExists() throws Exception {
Configuration config = Configuration.create()
.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
.with(PulsarDatabaseHistory.TOPIC, "persistent://my-property/my-ns/dummytopic")
.with(SchemaHistory.NAME, "my-db-history")
.with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.with(DatabaseHistory.NAME, "my-db-history")
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();

history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.configure(config, null, DatabaseHistoryListener.NOOP, true);
history.start();

// dummytopic should not exist yet
Expand Down
41 changes: 37 additions & 4 deletions pulsar-io/debezium/oracle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,49 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.oracle.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.oracle.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.version}</version>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka-client.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
<exclusion>
<artifactId>jose4j</artifactId>
<groupId>org.bitbucket.b_c</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Map;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;


/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.debezium.oracle;

import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;

@Slf4j
public abstract class DebeziumSource extends KafkaConnectSource {
private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.oracle.PulsarDatabaseHistory";
private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
private static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";

public static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
String key,
String value) throws IllegalArgumentException {
Object orig = config.get(key);
if (orig == null) {
config.put(key, value);
return;
}

// throw exception if value not match
if (!orig.equals(value)) {
throw new IllegalArgumentException("Expected " + value + " but has " + orig);
}
}

public static void setConfigIfNull(Map<String, Object> config, String key, String value) {
config.putIfAbsent(key, value);
}

// namespace for output topics, default value is "tenant/namespace"
public static String topicNamespace(SourceContext sourceContext) {
String tenant = sourceContext.getTenant();
String namespace = sourceContext.getNamespace();

return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/"
+ (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace);
}

public static void tryLoadingConfigSecret(String secretName, Map<String, Object> config, SourceContext context) {
try {
String secret = context.getSecret(secretName);
if (secret != null) {
config.put(secretName, secret);
log.info("Config key {} set from secret.", secretName);
}
} catch (Exception e) {
log.warn("Failed to read secret {}.", secretName, e);
}
}

public abstract void setDbConnectorTask(Map<String, Object> config) throws Exception;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
setDbConnectorTask(config);
tryLoadingConfigSecret("database.user", config, sourceContext);
tryLoadingConfigSecret("database.password", config, sourceContext);

// key.converter
setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
// value.converter
setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);

// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());

String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);

String sourceName = sourceContext.getSourceName();
// database.history.pulsar.topic: history topic name
setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
// offset.storage.topic: offset topic name
setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);

// pass pulsar.client.builder if database.history.pulsar.service.url is not provided
if (StringUtils.isEmpty(pulsarUrl)) {
String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder);
}

super.open(config, sourceContext);
}

}
Loading

0 comments on commit 56424a1

Please sign in to comment.