From 8f7c383aebfaa42bdf000d9e3ec40bc572a48a50 Mon Sep 17 00:00:00 2001 From: ashwinmk Date: Wed, 14 Sep 2022 20:46:31 +0530 Subject: [PATCH 1/5] DBZ-4812 z/OS implementation on Debezium 1.7 --- .../connector/db2/Db2ChangeTable.java | 15 +-- .../debezium/connector/db2/Db2Connection.java | 115 ++++++++++++++---- .../db2/Db2StreamingChangeEventSource.java | 7 +- .../connector/db2/util/TestHelper.java | 4 + 4 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java b/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java index 9c54196..a3f8ced 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java +++ b/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java @@ -18,7 +18,7 @@ */ public class Db2ChangeTable extends ChangeTable { - private static final String CDC_SCHEMA = "ASNCDC"; + private final String CDC_SCHEMA; /** * A LSN from which the data in the change table are relevant @@ -35,15 +35,16 @@ public class Db2ChangeTable extends ChangeTable { */ private final String db2CaptureInstance; - public Db2ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) { - super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance), changeTableObjectId); + public Db2ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn, String tableCdcSchema) { + super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance, tableCdcSchema), changeTableObjectId); this.startLsn = startLsn; this.stopLsn = stopLsn; this.db2CaptureInstance = Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance); + this.CDC_SCHEMA = tableCdcSchema; } - public Db2ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) { - this(null, captureInstance, changeTableObjectId, startLsn, stopLsn); + public Db2ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn, String tableCdcSchema) { + this(null, captureInstance, changeTableObjectId, startLsn, stopLsn, tableCdcSchema); } public String getCaptureInstance() { @@ -69,7 +70,7 @@ public String toString() { + getChangeTableObjectId() + ", stopLsn=" + stopLsn + "]"; } - private static TableId resolveChangeTableId(TableId sourceTableId, String captureInstance) { - return sourceTableId != null ? new TableId(sourceTableId.catalog(), CDC_SCHEMA, Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance)) : null; + private static TableId resolveChangeTableId(TableId sourceTableId, String captureInstance, String cdcSchema) { + return sourceTableId != null ? new TableId(sourceTableId.catalog(), cdcSchema, Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance)) : null; } } diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index 3e598c8..a7065c3 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -52,38 +52,20 @@ public class Db2Connection extends JdbcConnection { private static Logger LOGGER = LoggerFactory.getLogger(Db2Connection.class); - private static final String CDC_SCHEMA = "ASNCDC"; + private final String CDC_SCHEMA; + private final String TABLE_CDC_SCHEMA; + private final String DB_TYPE; private static final String STATEMENTS_PLACEHOLDER = "#"; - private static final String GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + CDC_SCHEMA - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + CDC_SCHEMA + ".IBMSNAP_REGISTER) t"; + private final String GET_MAX_LSN; private static final String LOCK_TABLE = "SELECT * FROM # WITH CS"; // DB2 private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'"; - private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT " - + "CASE " - + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " - + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " - + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " - + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " - + "END " - + "OPCODE," - + "cdc.* " - + "FROM ASNCDC.# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - - private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " - + CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; - - // No new Tabels 1=0 - private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from ASNCDC.IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + - "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + private final String GET_ALL_CHANGES_FOR_TABLE; + private final String GET_LIST_OF_CDC_ENABLED_TABLES; + private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES; private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT " + "CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) as objectid, " @@ -119,6 +101,83 @@ public Db2Connection(JdbcConfiguration config) { super(config, FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER); lsnToInstantCache = new BoundedConcurrentHashMap<>(100); realDatabaseName = retrieveRealDatabaseName(); + this.CDC_SCHEMA = config.getString("custom.cdc.program.schema"); + this.TABLE_CDC_SCHEMA = config.getString("custom.cdc.table.schema"); + + this.DB_TYPE = config.getString("custom.db.type"); + + LOGGER.info("==========================================================================================================================================="); + LOGGER.info("CDC_SCHEMA: {}", this.CDC_SCHEMA); + LOGGER.info("TABLE_CDC_SCHEMA: {}", this.TABLE_CDC_SCHEMA); + LOGGER.info("DB_TYPE: {}", this.DB_TYPE); + LOGGER.info("==========================================================================================================================================="); + + if ("ZOS".equals(this.DB_TYPE)) { + LOGGER.info("ZOS choice"); + + this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t for read only with ur"; + + this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + + "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " + + this.TABLE_CDC_SCHEMA + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + + " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + + " tmp2 AS (SELECT " + + " CASE " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + + " END " + + " OPCODE, " + + " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + + " FROM tmp cdc left JOIN tmp cdc2 " + + " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + + " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + + " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + + " select res.OPCODE, cdc.* from " + this.TABLE_CDC_SCHEMA + + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " + + this.CDC_SCHEMA + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; + + this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + + "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; + + } + else { + LOGGER.info("LUW choice"); + this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t"; + this.GET_ALL_CHANGES_FOR_TABLE = "SELECT " + + "CASE " + + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " + + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " + + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " + + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " + + "END " + + "OPCODE," + + "cdc.* " + + "FROM " + this.TABLE_CDC_SCHEMA + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " + + CDC_SCHEMA + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; + + // No new Tables 1=0 + this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + + "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + } } /** @@ -293,7 +352,8 @@ public Set listOfChangeTables() throws SQLException { rs.getString(4), rs.getInt(9), Lsn.valueOf(rs.getBytes(5)), - Lsn.valueOf(rs.getBytes(6)) + Lsn.valueOf(rs.getBytes(6)), + this.TABLE_CDC_SCHEMA )); } @@ -316,7 +376,8 @@ public Set listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws rs.getString(2), rs.getInt(1), Lsn.valueOf(rs.getBytes(3)), - Lsn.valueOf(rs.getBytes(4)))); + Lsn.valueOf(rs.getBytes(4)), + this.TABLE_CDC_SCHEMA)); } return changeTables; }); diff --git a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java index 25465ac..e98e8c2 100644 --- a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java @@ -216,10 +216,11 @@ public void execute(ChangeEventSourceContext context, Db2Partition partition, Db } if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { - LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, + LOGGER.warn( + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + LOGGER.warn("PAY ATTENTION: Table change {} , its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, tableWithSmallestLsn.getChangePosition()); - tableWithSmallestLsn.next(); - continue; + } LOGGER.trace("Processing change {}", tableWithSmallestLsn); if (!schemaChangeCheckpoints.isEmpty()) { diff --git a/src/test/java/io/debezium/connector/db2/util/TestHelper.java b/src/test/java/io/debezium/connector/db2/util/TestHelper.java index 3046968..32f899f 100644 --- a/src/test/java/io/debezium/connector/db2/util/TestHelper.java +++ b/src/test/java/io/debezium/connector/db2/util/TestHelper.java @@ -76,6 +76,8 @@ public static JdbcConfiguration adminJdbcConfig() { .withDefault(JdbcConfiguration.PORT, 50000) .withDefault(JdbcConfiguration.USER, "db2inst1") .withDefault(JdbcConfiguration.PASSWORD, "admin") + .withDefault("custom.cdc.program.schema", "ASNCDC") + .withDefault("custom.cdc.table.schema", "ASNCDC") .build(); } @@ -86,6 +88,8 @@ public static JdbcConfiguration defaultJdbcConfig() { .withDefault(JdbcConfiguration.PORT, 50000) .withDefault(JdbcConfiguration.USER, "db2inst1") .withDefault(JdbcConfiguration.PASSWORD, "admin") + .withDefault("custom.cdc.program.schema", "ASNCDC") + .withDefault("custom.cdc.table.schema", "ASNCDC") .build(); } From 9bef76a5d93f4b0c8da4a4df6d4984373ec57d15 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 5 Jun 2024 06:35:33 +0200 Subject: [PATCH 2/5] DBZ-4812 Remove unnecessary attribute --- src/main/java/io/debezium/connector/db2/Db2ChangeTable.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java b/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java index a3f8ced..33429e8 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java +++ b/src/main/java/io/debezium/connector/db2/Db2ChangeTable.java @@ -18,8 +18,6 @@ */ public class Db2ChangeTable extends ChangeTable { - private final String CDC_SCHEMA; - /** * A LSN from which the data in the change table are relevant */ @@ -40,7 +38,6 @@ public Db2ChangeTable(TableId sourceTableId, String captureInstance, int changeT this.startLsn = startLsn; this.stopLsn = stopLsn; this.db2CaptureInstance = Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance); - this.CDC_SCHEMA = tableCdcSchema; } public Db2ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn, String tableCdcSchema) { From 6bb6c489a86fadcf1fed03f640abedb86506750c Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 5 Jun 2024 06:37:27 +0200 Subject: [PATCH 3/5] DBZ-4812 Reduce log level --- .../connector/db2/Db2StreamingChangeEventSource.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java index e98e8c2..25465ac 100644 --- a/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java @@ -216,11 +216,10 @@ public void execute(ChangeEventSourceContext context, Db2Partition partition, Db } if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { - LOGGER.warn( - "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); - LOGGER.warn("PAY ATTENTION: Table change {} , its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, + LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, tableWithSmallestLsn.getChangePosition()); - + tableWithSmallestLsn.next(); + continue; } LOGGER.trace("Processing change {}", tableWithSmallestLsn); if (!schemaChangeCheckpoints.isEmpty()) { From a05714b9b0f050586ebe49239052a9a528c36011 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 1 Nov 2023 14:41:15 +0100 Subject: [PATCH 4/5] DBZ-4812 Add customization config options --- .../debezium/connector/db2/Db2Connection.java | 165 +++++++++--------- .../debezium/connector/db2/Db2Connector.java | 4 +- .../connector/db2/Db2ConnectorConfig.java | 129 +++++++++++++- .../connector/db2/Db2ConnectorTask.java | 2 +- .../connector/db2/util/TestHelper.java | 8 +- 5 files changed, 214 insertions(+), 94 deletions(-) diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index a7065c3..2605390 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -52,17 +52,13 @@ public class Db2Connection extends JdbcConnection { private static Logger LOGGER = LoggerFactory.getLogger(Db2Connection.class); - private final String CDC_SCHEMA; - private final String TABLE_CDC_SCHEMA; - private final String DB_TYPE; - private static final String STATEMENTS_PLACEHOLDER = "#"; - private final String GET_MAX_LSN; private static final String LOCK_TABLE = "SELECT * FROM # WITH CS"; // DB2 private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'"; + private final String GET_MAX_LSN; private final String GET_ALL_CHANGES_FOR_TABLE; private final String GET_LIST_OF_CDC_ENABLED_TABLES; private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES; @@ -92,91 +88,96 @@ public class Db2Connection extends JdbcConnection { private final BoundedConcurrentHashMap lsnToInstantCache; + private final Db2ConnectorConfig connectorConfig; + /** * Creates a new connection using the supplied configuration. * * @param config {@link Configuration} instance, may not be null. */ - public Db2Connection(JdbcConfiguration config) { - super(config, FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER); + public Db2Connection(Db2ConnectorConfig config) { + super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER); + connectorConfig = config; lsnToInstantCache = new BoundedConcurrentHashMap<>(100); realDatabaseName = retrieveRealDatabaseName(); - this.CDC_SCHEMA = config.getString("custom.cdc.program.schema"); - this.TABLE_CDC_SCHEMA = config.getString("custom.cdc.table.schema"); - - this.DB_TYPE = config.getString("custom.db.type"); LOGGER.info("==========================================================================================================================================="); - LOGGER.info("CDC_SCHEMA: {}", this.CDC_SCHEMA); - LOGGER.info("TABLE_CDC_SCHEMA: {}", this.TABLE_CDC_SCHEMA); - LOGGER.info("DB_TYPE: {}", this.DB_TYPE); + LOGGER.info("CDC_SCHEMA: {}", connectorConfig.getCdcControlSchema()); + LOGGER.info("TABLE_CDC_SCHEMA: {}", connectorConfig.getCdcChangeTablesSchema()); + LOGGER.info("DB_TYPE: {}", connectorConfig.getDb2Platform()); LOGGER.info("==========================================================================================================================================="); - if ("ZOS".equals(this.DB_TYPE)) { - LOGGER.info("ZOS choice"); - - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t for read only with ur"; - - this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + - "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " - + this.TABLE_CDC_SCHEMA + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + - " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + - " tmp2 AS (SELECT " + - " CASE " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + - " END " + - " OPCODE, " + - " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + - " FROM tmp cdc left JOIN tmp cdc2 " + - " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + - " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + - " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + - " select res.OPCODE, cdc.* from " + this.TABLE_CDC_SCHEMA - + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " - + this.CDC_SCHEMA - + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; - - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + - "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; - - } - else { - LOGGER.info("LUW choice"); - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t"; - this.GET_ALL_CHANGES_FOR_TABLE = "SELECT " - + "CASE " - + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " - + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " - + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " - + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " - + "END " - + "OPCODE," - + "cdc.* " - + "FROM " + this.TABLE_CDC_SCHEMA + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " - + CDC_SCHEMA - + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; - - // No new Tables 1=0 - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + - "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + switch (connectorConfig.getDb2Platform()) { + case Z: + LOGGER.info("ZOS choice"); + + this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER) t for read only with ur"; + + this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + + "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " + + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + + " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + + " tmp2 AS (SELECT " + + " CASE " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + + " END " + + " OPCODE, " + + " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + + " FROM tmp cdc left JOIN tmp cdc2 " + + " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + + " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + + " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + + " select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema() + + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; + + this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + + "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; + break; + case LUW: + LOGGER.info("LUW choice"); + this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t"; + this.GET_ALL_CHANGES_FOR_TABLE = "SELECT " + + "CASE " + + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " + + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " + + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " + + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " + + "END " + + "OPCODE," + + "cdc.* " + + "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; + + // No new Tables 1=0 + this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + + "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + break; + default: + throw new DebeziumException("Unsupported platform"); } } @@ -353,7 +354,7 @@ public Set listOfChangeTables() throws SQLException { rs.getInt(9), Lsn.valueOf(rs.getBytes(5)), Lsn.valueOf(rs.getBytes(6)), - this.TABLE_CDC_SCHEMA + connectorConfig.getCdcChangeTablesSchema() )); } @@ -377,7 +378,7 @@ public Set listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws rs.getInt(1), Lsn.valueOf(rs.getBytes(3)), Lsn.valueOf(rs.getBytes(4)), - this.TABLE_CDC_SCHEMA)); + connectorConfig.getCdcChangeTablesSchema())); } return changeTables; }); @@ -631,7 +632,7 @@ public boolean validateLogPosition(Partition partition, OffsetContext offset, Co final Lsn storedLsn = ((Db2OffsetContext) offset).getChangePosition().getCommitLsn(); - String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", CDC_SCHEMA); + String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", connectorConfig.getCdcControlSchema()); try { final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1)); diff --git a/src/main/java/io/debezium/connector/db2/Db2Connector.java b/src/main/java/io/debezium/connector/db2/Db2Connector.java index d237d7d..599b664 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connector.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connector.java @@ -77,7 +77,7 @@ protected void validateConnection(Map configValues, Configu ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name()); // Try to connect to the database ... Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config); - try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) { + try (Db2Connection connection = new Db2Connection(connectorConfig)) { try { connection.connect(); connection.execute("SELECT 1 FROM sysibm.sysdummy1"); @@ -102,7 +102,7 @@ protected Map validateAllFields(Configuration config) { @Override public List getMatchingCollections(Configuration config) { Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config); - try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) { + try (Db2Connection connection = new Db2Connection(connectorConfig)) { return new ArrayList<>( connection.readTableNames(null, null, null, new String[]{ "TABLE" })); } diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java index ee981a0..e8a816a 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java @@ -12,6 +12,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import io.debezium.config.CommonConnectorConfig; @@ -38,6 +39,8 @@ */ public class Db2ConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig { + private static final String DEFAULT_CDC_SCHEMA = "ASNCDC"; + public static final int DEFAULT_QUERY_FETCH_SIZE = 10_000; protected static final int DEFAULT_PORT = 50000; @@ -292,6 +295,71 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { } } + /** + * The set of supported Db2 platforms + */ + public enum Db2Platform implements EnumeratedValue { + + /** + * Linux, Unix, Windows + */ + LUW("LUW"), + + /** + * z/OS + */ + Z("ZOS"); + + private final String value; + + Db2Platform(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static Db2Platform parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + + for (Db2Platform option : Db2Platform.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static Db2Platform parse(String value, String defaultValue) { + Db2Platform mode = parse(value); + + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + + return mode; + } + } + public static final Field PORT = RelationalDatabaseConnectorConfig.PORT .withDefault(DEFAULT_PORT); @@ -336,6 +404,38 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { + "locks entirely which can be done by specifying 'none'. This mode is only safe to use if no schema changes are happening while the " + "snapshot is taken."); + public static final Field CDC_CONTROL_SCHEMA = Field.create("cdc.control.schema") + .withDisplayName("CDC control schema") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(DEFAULT_CDC_SCHEMA) + .withDescription( + "The name of the schema where CDC control structures are located; defaults to '" + DEFAULT_CDC_SCHEMA + "'"); + + public static final Field CDC_CHANGE_TABLES_SCHEMA = Field.create("cdc.change.tables.schema") + .withDisplayName("CDC change tables schema") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 1)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(DEFAULT_CDC_SCHEMA) + .withDescription( + "The name of the schema where CDC change tables are located; defaults to '" + DEFAULT_CDC_SCHEMA + "'"); + + public static final Field DB2_PLATFORM = Field.create("db2.platform") + .withDisplayName("Db2 platform") + .withEnum(Db2Platform.class, Db2Platform.LUW) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2)) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Informs connector which Db2 implementation platform it is connected to. " + + "The default is '" + Db2Platform.LUW + + "', which means Windows, UNIX, Linux. " + + "Using a value of '" + Db2Platform.Z + + "' ensures that the Db2 for z/OS specific SQL statements are used."); + public static final Field QUERY_FETCH_SIZE = CommonConnectorConfig.QUERY_FETCH_SIZE .withDescription( "The maximum number of records that should be loaded into memory while streaming. A value of '0' uses the default JDBC fetch size. The default value is '10000'.") @@ -358,7 +458,10 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) { SNAPSHOT_MODE, INCREMENTAL_SNAPSHOT_CHUNK_SIZE, SCHEMA_NAME_ADJUSTMENT_MODE, - QUERY_FETCH_SIZE) + QUERY_FETCH_SIZE, + CDC_CONTROL_SCHEMA, + CDC_CHANGE_TABLES_SCHEMA, + DB2_PLATFORM) .events(SOURCE_INFO_STRUCT_MAKER) .excluding( SCHEMA_INCLUDE_LIST, @@ -381,11 +484,15 @@ protected static ConfigDef configDef() { public static Field.Set ALL_FIELDS = Field.setOf(CONFIG_DEFINITION.all()); private final String databaseName; + private final SnapshotMode snapshotMode; private final SnapshotIsolationMode snapshotIsolationMode; - private final SnapshotLockingMode snapshotLockingMode; + private final Db2Platform db2Platform; + private final String cdcChangeTablesSchema; + private final String cdcControlSchema; + public Db2ConnectorConfig(Configuration config) { super( Db2Connector.class, @@ -400,6 +507,10 @@ public Db2ConnectorConfig(Configuration config) { this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); this.snapshotIsolationMode = SnapshotIsolationMode.parse(config.getString(SNAPSHOT_ISOLATION_MODE), SNAPSHOT_ISOLATION_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + + this.db2Platform = Db2Platform.parse(config.getString(DB2_PLATFORM), DB2_PLATFORM.defaultValueAsString()); + this.cdcChangeTablesSchema = config.getString(CDC_CHANGE_TABLES_SCHEMA); + this.cdcControlSchema = config.getString(CDC_CONTROL_SCHEMA); } public String getDatabaseName() { @@ -418,6 +529,18 @@ public SnapshotMode getSnapshotMode() { return snapshotMode; } + public Db2Platform getDb2Platform() { + return db2Platform; + } + + public String getCdcChangeTablesSchema() { + return cdcChangeTablesSchema; + } + + public String getCdcControlSchema() { + return cdcControlSchema; + } + @Override protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this); @@ -429,7 +552,7 @@ private static class SystemTablesPredicate implements TableFilter { public boolean isIncluded(TableId t) { return t.schema() != null && !(t.table().toLowerCase().startsWith("ibmsnap_") || - t.schema().toUpperCase().startsWith("ASNCDC") || + t.schema().toUpperCase().startsWith(DEFAULT_CDC_SCHEMA) || t.schema().toUpperCase().startsWith("SYSTOOLS") || t.table().toLowerCase().startsWith("ibmqrep_")); diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index 7b20a23..55103f4 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -70,7 +70,7 @@ public ChangeEventSourceCoordinator start(Config final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( - () -> new Db2Connection(connectorConfig.getJdbcConfig())); + () -> new Db2Connection(connectorConfig)); dataConnection = connectionFactory.mainConnection(); metadataConnection = connectionFactory.newConnection(); try { diff --git a/src/test/java/io/debezium/connector/db2/util/TestHelper.java b/src/test/java/io/debezium/connector/db2/util/TestHelper.java index 32f899f..0870e82 100644 --- a/src/test/java/io/debezium/connector/db2/util/TestHelper.java +++ b/src/test/java/io/debezium/connector/db2/util/TestHelper.java @@ -76,8 +76,6 @@ public static JdbcConfiguration adminJdbcConfig() { .withDefault(JdbcConfiguration.PORT, 50000) .withDefault(JdbcConfiguration.USER, "db2inst1") .withDefault(JdbcConfiguration.PASSWORD, "admin") - .withDefault("custom.cdc.program.schema", "ASNCDC") - .withDefault("custom.cdc.table.schema", "ASNCDC") .build(); } @@ -88,8 +86,6 @@ public static JdbcConfiguration defaultJdbcConfig() { .withDefault(JdbcConfiguration.PORT, 50000) .withDefault(JdbcConfiguration.USER, "db2inst1") .withDefault(JdbcConfiguration.PASSWORD, "admin") - .withDefault("custom.cdc.program.schema", "ASNCDC") - .withDefault("custom.cdc.table.schema", "ASNCDC") .build(); } @@ -111,11 +107,11 @@ public static Configuration.Builder defaultConfig() { } public static Db2Connection adminConnection() { - return new Db2Connection(TestHelper.adminJdbcConfig()); + return new Db2Connection(new Db2ConnectorConfig(defaultConfig().build())); } public static Db2Connection testConnection() { - return new Db2Connection(TestHelper.defaultJdbcConfig()); + return new Db2Connection(new Db2ConnectorConfig(defaultConfig().build())); } /** From c9e74f708ac2c846df91eb865da9c8bc12d372a9 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 5 Jun 2024 09:42:39 +0200 Subject: [PATCH 5/5] DBZ-4812 Introduce platform adapter --- .../debezium/connector/db2/Db2Connection.java | 100 ++---------------- .../connector/db2/Db2ConnectorConfig.java | 31 +++++- .../connector/db2/Db2ConnectorTask.java | 4 + .../db2/platform/Db2PlatformAdapter.java | 23 ++++ .../connector/db2/platform/LuwPlatform.java | 72 +++++++++++++ .../connector/db2/platform/ZOsPlatform.java | 81 ++++++++++++++ 6 files changed, 218 insertions(+), 93 deletions(-) create mode 100644 src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java create mode 100644 src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java create mode 100644 src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index 2605390..95220ab 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -30,6 +30,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.db2.platform.Db2PlatformAdapter; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; @@ -58,11 +59,6 @@ public class Db2Connection extends JdbcConnection { private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'"; - private final String GET_MAX_LSN; - private final String GET_ALL_CHANGES_FOR_TABLE; - private final String GET_LIST_OF_CDC_ENABLED_TABLES; - private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES; - private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT " + "CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) as objectid, " + "c.colname,c.colno,c.keyseq " @@ -89,6 +85,7 @@ public class Db2Connection extends JdbcConnection { private final BoundedConcurrentHashMap lsnToInstantCache; private final Db2ConnectorConfig connectorConfig; + private final Db2PlatformAdapter platform; /** * Creates a new connection using the supplied configuration. @@ -97,95 +94,18 @@ public class Db2Connection extends JdbcConnection { */ public Db2Connection(Db2ConnectorConfig config) { super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER); + connectorConfig = config; lsnToInstantCache = new BoundedConcurrentHashMap<>(100); realDatabaseName = retrieveRealDatabaseName(); - - LOGGER.info("==========================================================================================================================================="); - LOGGER.info("CDC_SCHEMA: {}", connectorConfig.getCdcControlSchema()); - LOGGER.info("TABLE_CDC_SCHEMA: {}", connectorConfig.getCdcChangeTablesSchema()); - LOGGER.info("DB_TYPE: {}", connectorConfig.getDb2Platform()); - LOGGER.info("==========================================================================================================================================="); - - switch (connectorConfig.getDb2Platform()) { - case Z: - LOGGER.info("ZOS choice"); - - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER) t for read only with ur"; - - this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + - "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " - + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + - " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + - " tmp2 AS (SELECT " + - " CASE " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + - " END " + - " OPCODE, " + - " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + - " FROM tmp cdc left JOIN tmp cdc2 " + - " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + - " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + - " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + - " select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema() - + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " - + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; - - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + - "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; - break; - case LUW: - LOGGER.info("LUW choice"); - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t"; - this.GET_ALL_CHANGES_FOR_TABLE = "SELECT " - + "CASE " - + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " - + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " - + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " - + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " - + "END " - + "OPCODE," - + "cdc.* " - + "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " - + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; - - // No new Tables 1=0 - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + - "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; - break; - default: - throw new DebeziumException("Unsupported platform"); - } + platform = connectorConfig.getDb2Platform().createAdapter(connectorConfig); } /** * @return the current largest log sequence number */ public Lsn getMaxLsn() throws SQLException { - return queryAndMap(GET_MAX_LSN, singleResultMapper(rs -> { + return queryAndMap(platform.getMaxLsnQuery(), singleResultMapper(rs -> { final Lsn ret = Lsn.valueOf(rs.getBytes(1)); LOGGER.trace("Current maximum lsn is {}", ret); return ret; @@ -202,7 +122,7 @@ public Lsn getMaxLsn() throws SQLException { * @throws SQLException */ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException { - final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId)); + final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId)); prepareQuery(query, statement -> { statement.setBytes(1, fromLsn.getBinary()); statement.setBytes(2, toLsn.getBinary()); @@ -226,7 +146,7 @@ public void getChangesForTables(Db2ChangeTable[] changeTables, Lsn intervalFromL int idx = 0; for (Db2ChangeTable changeTable : changeTables) { - final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()); + final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()); queries[idx] = query; // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available @@ -331,9 +251,8 @@ public Lsn getFromLsn() { } public Set listOfChangeTables() throws SQLException { - final String query = GET_LIST_OF_CDC_ENABLED_TABLES; - return queryAndMap(query, rs -> { + return queryAndMap(platform.getListOfCdcEnabledTablesQuery(), rs -> { final Set changeTables = new HashSet<>(); while (rs.next()) { /** @@ -363,9 +282,8 @@ public Set listOfChangeTables() throws SQLException { } public Set listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQLException { - final String query = GET_LIST_OF_NEW_CDC_ENABLED_TABLES; - return prepareQueryAndMap(query, + return prepareQueryAndMap(platform.getListOfNewCdcEnabledTablesQuery(), ps -> { ps.setBytes(1, fromLsn.getBinary()); ps.setBytes(2, toLsn.getBinary()); diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java index e8a816a..166d157 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java @@ -22,6 +22,9 @@ import io.debezium.config.Field; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.connector.db2.platform.Db2PlatformAdapter; +import io.debezium.connector.db2.platform.LuwPlatform; +import io.debezium.connector.db2.platform.ZOsPlatform; import io.debezium.document.Document; import io.debezium.heartbeat.DatabaseHeartbeatImpl; import io.debezium.relational.ColumnFilterMode; @@ -303,12 +306,32 @@ public enum Db2Platform implements EnumeratedValue { /** * Linux, Unix, Windows */ - LUW("LUW"), + LUW("LUW") { + @Override + public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) { + return new LuwPlatform(config); + } + + @Override + public String platfromName() { + return "LUW"; + } + }, /** * z/OS */ - Z("ZOS"); + Z("ZOS") { + @Override + public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) { + return new ZOsPlatform(config); + } + + @Override + public String platfromName() { + return "z/OS"; + } + }; private final String value; @@ -321,6 +344,10 @@ public String getValue() { return value; } + public abstract Db2PlatformAdapter createAdapter(Db2ConnectorConfig config); + + public abstract String platfromName(); + /** * Determine if the supplied value is one of the predefined options. * diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index 55103f4..05e3ff6 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -69,6 +69,10 @@ public ChangeEventSourceCoordinator start(Config final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY); final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); + LOGGER.info("Using Db2 {} platfrom, CDC control schema is {}, schema with change tables is {}", + connectorConfig.getDb2Platform(), connectorConfig.getCdcControlSchema(), + connectorConfig.getCdcChangeTablesSchema()); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( () -> new Db2Connection(connectorConfig)); dataConnection = connectionFactory.mainConnection(); diff --git a/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java b/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java new file mode 100644 index 0000000..ec3c67a --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +/** + * Implementation details differing between Db2 flavours + * + * @author Jiri Pechanec + */ +public interface Db2PlatformAdapter { + + String getMaxLsnQuery(); + + String getAllChangesForTableQuery(); + + String getListOfCdcEnabledTablesQuery(); + + String getListOfNewCdcEnabledTablesQuery(); + +} diff --git a/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java b/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java new file mode 100644 index 0000000..78fa530 --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java @@ -0,0 +1,72 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +import io.debezium.connector.db2.Db2ConnectorConfig; + +/** + * Implementation details for LUW Platform (Linux, Unix, Windows) + * + * @author Jiri Pechanec + */ +public class LuwPlatform implements Db2PlatformAdapter { + + private final String getMaxLsn; + private final String getAllChangesForTable; + private final String getListOfCdcEnabledTables; + private final String getListOfNewCdcEnabledTables; + + public LuwPlatform(Db2ConnectorConfig connectorConfig) { + + this.getMaxLsn = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t"; + + this.getAllChangesForTable = "SELECT " + + "CASE " + + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " + + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " + + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " + + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " + + "END " + + "OPCODE," + + "cdc.* " + + "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.getListOfCdcEnabledTables = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; + + // No new Tables 1=0 + this.getListOfNewCdcEnabledTables = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + + "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + } + + @Override + public String getMaxLsnQuery() { + return getMaxLsn; + } + + @Override + public String getAllChangesForTableQuery() { + return getAllChangesForTable; + } + + @Override + public String getListOfCdcEnabledTablesQuery() { + return getListOfCdcEnabledTables; + } + + @Override + public String getListOfNewCdcEnabledTablesQuery() { + return getListOfNewCdcEnabledTables; + } +} diff --git a/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java b/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java new file mode 100644 index 0000000..e0d1f47 --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java @@ -0,0 +1,81 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +import io.debezium.connector.db2.Db2ConnectorConfig; + +/** + * Implementation details for z/OS + * + * @author Jiri Pechanec + */ +public class ZOsPlatform implements Db2PlatformAdapter { + + private final String getMaxLsn; + private final String getAllChangesForTable; + private final String getListOfCdcEnabledTables; + private final String getListOfNewCdcEnabledTables; + + public ZOsPlatform(Db2ConnectorConfig connectorConfig) { + + this.getMaxLsn = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER) t for read only with ur"; + + this.getAllChangesForTable = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + + "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " + + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + + " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + + " tmp2 AS (SELECT " + + " CASE " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + + " END " + + " OPCODE, " + + " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + + " FROM tmp cdc left JOIN tmp cdc2 " + + " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + + " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + + " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + + " select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema() + + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.getListOfCdcEnabledTables = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; + + this.getListOfNewCdcEnabledTables = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + + "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; + } + + @Override + public String getMaxLsnQuery() { + return getMaxLsn; + } + + @Override + public String getAllChangesForTableQuery() { + return getAllChangesForTable; + } + + @Override + public String getListOfCdcEnabledTablesQuery() { + return getListOfCdcEnabledTables; + } + + @Override + public String getListOfNewCdcEnabledTablesQuery() { + return getListOfNewCdcEnabledTables; + } +}