From 9b642975d503ee37734d8b8fdb6083730c17fd1d Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 11 Sep 2024 16:30:57 +0530 Subject: [PATCH 1/7] added commit times queue --- .../PostgresStreamingChangeEventSource.java | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 8d2fcaa1b62..111931d2e60 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; @@ -82,6 +84,8 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS private Lsn lastCompletelyProcessedLsn; private PostgresOffsetContext effectiveOffset; + protected ConcurrentLinkedQueue commitTimes; + /** * For DEBUGGING */ @@ -101,7 +105,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.snapshotter = snapshotter; this.replicationConnection = (PostgresReplicationConnection) replicationConnection; this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); - + this.commitTimes = new ConcurrentLinkedQueue<>(); } @Override @@ -415,6 +419,7 @@ private void probeConnectionIfNeeded() throws SQLException { private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException { lastCompletelyProcessedLsn = lsn; offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); + commitTimes.add(lsn); maybeWarnAboutGrowingWalBacklog(false); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } @@ -463,7 +468,7 @@ public void commitOffset(Map partition, Map offset) { final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; - LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); + LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); if (replicationStream != null && lsn != null) { if (!lsnFlushingAllowed) { LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); @@ -473,8 +478,14 @@ public void commitOffset(Map partition, Map offset) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); } + + Lsn finalLsn = getLsnToBeFlushed(lsn); + LOGGER.info("Flushing lsn '{}'", finalLsn); + // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - replicationStream.flushLsn(lsn); + replicationStream.flushLsn(finalLsn); + + cleanCommitTimeQueue(finalLsn); } else { LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); @@ -485,6 +496,30 @@ public void commitOffset(Map partition, Map offset) { } } + protected Lsn getLsnToBeFlushed(Lsn lsn) { + if (commitTimes == null || commitTimes.isEmpty()) { + // This means that the queue has not been initialised and the task is still starting. + return lsn; + } + + Lsn result = null; + + for (Lsn commitLsn : commitTimes) { + if (commitLsn.compareTo(lsn) < 1) { + result = commitLsn; + } else { + // This will be the loop exit when we encounter any bigger element. + break; + } + } + + return result; + } + + protected void cleanCommitTimeQueue(Lsn lsn) { + commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1); + } + @Override public PostgresOffsetContext getOffsetContext() { return effectiveOffset; From 00d70e368870c48cf670f47c33d9e4f4b953d1f6 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 11 Sep 2024 19:04:18 +0530 Subject: [PATCH 2/7] changes --- .../PostgresStreamingChangeEventSource.java | 32 +++++++++++++------ .../connector/postgresql/connection/Lsn.java | 2 +- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 111931d2e60..fefcaf1a5c8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -82,6 +82,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS */ private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; private Lsn lastCompletelyProcessedLsn; + private Lsn lastSentFeedback = Lsn.valueOf(1L); private PostgresOffsetContext effectiveOffset; protected ConcurrentLinkedQueue commitTimes; @@ -312,7 +313,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff // Don't skip on BEGIN message as it would flush LSN for the whole transaction // too early if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } return; } @@ -325,7 +326,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); } else if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime()); } maybeWarnAboutGrowingWalBacklog(true); @@ -337,7 +338,7 @@ else if (message.getOperation() == Operation.MESSAGE) { // non-transactional message that will not be followed by a COMMIT message if (message.isLastEventForLsn()) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } dispatcher.dispatchLogicalDecodingMessage( @@ -350,6 +351,8 @@ else if (message.getOperation() == Operation.MESSAGE) { } // DML event else { + LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn); + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = PostgresSchema.parse(message.getTable()); @@ -416,10 +419,15 @@ private void probeConnectionIfNeeded() throws SQLException { } } - private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException { + private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { lastCompletelyProcessedLsn = lsn; offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); - commitTimes.add(lsn); + + if (message.getOperation() == Operation.COMMIT) { + LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1)); + commitTimes.add(Lsn.valueOf(lsn.asLong() - 1)); + } + maybeWarnAboutGrowingWalBacklog(false); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } @@ -484,6 +492,7 @@ public void commitOffset(Map partition, Map offset) { // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments replicationStream.flushLsn(finalLsn); + lastSentFeedback = finalLsn; cleanCommitTimeQueue(finalLsn); } @@ -499,13 +508,16 @@ public void commitOffset(Map partition, Map offset) { protected Lsn getLsnToBeFlushed(Lsn lsn) { if (commitTimes == null || commitTimes.isEmpty()) { // This means that the queue has not been initialised and the task is still starting. - return lsn; + return lastSentFeedback; } - Lsn result = null; + Lsn result = lastSentFeedback; + + LOGGER.info("Queue at this time: {}", commitTimes); for (Lsn commitLsn : commitTimes) { - if (commitLsn.compareTo(lsn) < 1) { + if (commitLsn.compareTo(lsn) < 0) { + LOGGER.debug("Assigning result as {}", commitLsn); result = commitLsn; } else { // This will be the loop exit when we encounter any bigger element. @@ -517,7 +529,9 @@ protected Lsn getLsnToBeFlushed(Lsn lsn) { } protected void cleanCommitTimeQueue(Lsn lsn) { - commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1); + if (commitTimes != null) { + commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1); + } } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index b9f4b7dc8fe..d2cedf2a033 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -144,7 +144,7 @@ public boolean isValid() { @Override public String toString() { - return "LSN{" + asString() + '}'; + return "LSN{" + asLong() + '}'; } @Override From c379bf456724d7a5cb8159091dfa16397ab785a3 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 12 Sep 2024 16:59:15 +0530 Subject: [PATCH 3/7] added logs --- .../postgresql/PostgresStreamingChangeEventSource.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index fefcaf1a5c8..aa631cbcc1e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -91,6 +91,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS * For DEBUGGING */ private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty(); + private long recordCount = 0; public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, PostgresEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, @@ -297,6 +298,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId()); } else { LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId()); + LOGGER.debug("Record count in the txn {} is {}", message.getTransactionId(), recordCount); + recordCount = 0; } OptionalLong currentTxnid = message.getTransactionId(); @@ -352,6 +355,7 @@ else if (message.getOperation() == Operation.MESSAGE) { // DML event else { LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn); + ++recordCount; TableId tableId = null; if (message.getOperation() != Operation.NOOP) { From f399f83c487eb252910632892eb174e0871ce301 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 12 Sep 2024 23:02:22 +0530 Subject: [PATCH 4/7] printing commit time too --- .../postgresql/PostgresStreamingChangeEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index aa631cbcc1e..56b72ab4d0a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -298,7 +298,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId()); } else { LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId()); - LOGGER.debug("Record count in the txn {} is {}", message.getTransactionId(), recordCount); + LOGGER.debug("Record count in the txn {} is {} with commit time {}", message.getTransactionId(), recordCount, lsn.asLong() - 1); recordCount = 0; } From a0b507eb8fb82eaeabd5bd700398cfe3d56339e8 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 17 Sep 2024 20:06:52 +0530 Subject: [PATCH 5/7] random commit to revert back to --- .../postgresql/PostgresStreamingChangeEventSource.java | 6 ++++++ .../connection/PostgresReplicationConnection.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 56b72ab4d0a..1e4b138093d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -363,6 +363,10 @@ else if (message.getOperation() == Operation.MESSAGE) { Objects.requireNonNull(tableId); } + /* + tx1: BEGIN2 - DML1 - COMMIT2 (updates lastCommitLsn) + tx2: BEGIN2 - DML2 (update lastCompletelyProcessedLsn) - COMMIT2 (updates lastCommitLsn) + */ offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), taskContext.getSlotXmin(connection), tableId, @@ -425,6 +429,8 @@ private void probeConnectionIfNeeded() throws SQLException { private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { lastCompletelyProcessedLsn = lsn; + + // todo offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); if (message.getOperation() == Operation.COMMIT) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index a95d46c239d..2e6e6eb4599 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -666,7 +666,7 @@ public boolean readPending(ReplicationMessageProcessor processor) throws SQLExce private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); - LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + LOGGER.info("Received message at LSN {}", lastReceivedLsn); messageDecoder.processMessage(buffer, processor, typeRegistry); } From 32596dbf1b17752c08fc923a71731085eeaaece4 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 17 Sep 2024 21:37:15 +0530 Subject: [PATCH 6/7] hehehehe --- .../PostgresStreamingChangeEventSource.java | 35 +++++++++++++++---- .../PostgresReplicationConnection.java | 2 +- .../connection/WalPositionLocator.java | 10 +++++- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 1e4b138093d..c27a10674f3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -82,7 +82,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS */ private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; private Lsn lastCompletelyProcessedLsn; - private Lsn lastSentFeedback = Lsn.valueOf(1L); + private Lsn lastSentFeedback = Lsn.valueOf(2L); private PostgresOffsetContext effectiveOffset; protected ConcurrentLinkedQueue commitTimes; @@ -153,14 +153,25 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } + // B, 1, 2 - lsn, 3, 4, C, B, 5, 6, 7, 8 + if (hasStartLsnStoredInContext) { // start streaming from the last recorded position in the offset - final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() - : this.effectiveOffset.lsn(); +// final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() +// : this.effectiveOffset.lsn(); + // we will be streaming from the last commit lsn since we are sure that we have + // received that transaction completely. + // if lastCommitLsn is null, that means we are only in the beginning of streaming. + final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? + Lsn.valueOf(2L) : this.effectiveOffset.lastCommitLsn(); + + LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); + final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); +// LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + lastSentFeedback = lsn; } else { LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); @@ -204,7 +215,7 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } - replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); + replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition)); stream = this.replicationStream.get(); stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); } @@ -399,8 +410,18 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti while (context.isRunning() && resumeLsn.get() == null) { boolean receivedMessage = stream.readPending(message -> { - final Lsn lsn = stream.lastReceivedLsn(); + // YB Note: We do not need this, we need to start from the last commit lsn from the + // walPosition +// final Lsn lsn = stream.lastReceivedLsn(); + final Lsn lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : stream.startLsn(); + if (lsn == null) { + + } resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); + + if (resumeLsn.get() == null) { + LOGGER.info("Resume LSN is null"); + } }); if (receivedMessage) { @@ -498,7 +519,7 @@ public void commitOffset(Map partition, Map offset) { } Lsn finalLsn = getLsnToBeFlushed(lsn); - LOGGER.info("Flushing lsn '{}'", finalLsn); + LOGGER.info("Flushing lsn '{}' for table", finalLsn); // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments replicationStream.flushLsn(finalLsn); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 2e6e6eb4599..a95d46c239d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -666,7 +666,7 @@ public boolean readPending(ReplicationMessageProcessor processor) throws SQLExce private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); - LOGGER.info("Received message at LSN {}", lastReceivedLsn); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); messageDecoder.processMessage(buffer, processor, typeRegistry); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java index 9d9cb4fa6f7..ed3d24cfebc 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java @@ -42,6 +42,7 @@ public class WalPositionLocator { public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType) { this.lastCommitStoredLsn = lastCommitStoredLsn; + // YB Note: lastEventStoredLsn and lastCommitStoredLsn will be the same. this.lastEventStoredLsn = lastEventStoredLsn; this.lastProcessedMessageType = lastProcessedMessageType; @@ -85,6 +86,8 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { return Optional.empty(); } lsnAfterLastEventStoredLsn = currentLsn; + + // YB Note: we do not want this to be turned true ever. storeLsnAfterLastEventStoredLsn = false; LOGGER.info("LSN after last stored change LSN '{}' received", lsnAfterLastEventStoredLsn); startStreamingLsn = lsnAfterLastEventStoredLsn; @@ -99,6 +102,11 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { return Optional.of(startStreamingLsn); } + if (currentLsn.equals(lastCommitStoredLsn)) { + LOGGER.info("Returning lastCommitStoredLsn {} for resuming", lastCommitStoredLsn); + return Optional.of(lastCommitStoredLsn); + } + switch (message.getOperation()) { case BEGIN: txStartLsn = currentLsn; @@ -160,7 +168,7 @@ public boolean skipMessage(Lsn lsn) { lsn, lsnSeen)); } - LOGGER.debug("Message with LSN '{}' filtered", lsn); + LOGGER.info("Message with LSN '{}' filtered", lsn); return true; } From 5bfc2b70c446ce12c420180726da75cb52f47358 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 18 Sep 2024 14:27:41 +0530 Subject: [PATCH 7/7] skipping all messages --- .../PostgresStreamingChangeEventSource.java | 14 ++++++++------ .../connection/PostgresReplicationConnection.java | 4 ++-- .../postgresql/connection/WalPositionLocator.java | 9 +++++++++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index c27a10674f3..fedd1f82bc7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -162,14 +162,19 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio // we will be streaming from the last commit lsn since we are sure that we have // received that transaction completely. // if lastCommitLsn is null, that means we are only in the beginning of streaming. + LOGGER.info("LSN is stored in context"); final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? - Lsn.valueOf(2L) : this.effectiveOffset.lastCommitLsn(); + lastSentFeedback : this.effectiveOffset.lastCommitLsn(); + + if (this.effectiveOffset.lastCommitLsn() == null) { + LOGGER.info("Last commit stored in offset is null"); + } LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); // LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); - walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); + walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); lastSentFeedback = lsn; } @@ -413,10 +418,7 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti // YB Note: We do not need this, we need to start from the last commit lsn from the // walPosition // final Lsn lsn = stream.lastReceivedLsn(); - final Lsn lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : stream.startLsn(); - if (lsn == null) { - - } + final Lsn lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback; resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); if (resumeLsn.get() == null) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index a95d46c239d..391e1a6b8e3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -395,8 +395,8 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi offset = defaultStartingPos; } Lsn lsn = offset; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("starting streaming from LSN '{}'", lsn); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("starting streaming from LSN '{}'", lsn); } final int maxRetries = connectorConfig.maxRetries(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java index ed3d24cfebc..d60599e9af4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java @@ -9,6 +9,7 @@ import java.util.Optional; import java.util.Set; +import io.debezium.connector.postgresql.YugabyteDBServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +84,7 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { startStreamingLsn = txStartLsn; return Optional.of(startStreamingLsn); } + LOGGER.info("Returning optional empty as resume LSN"); return Optional.empty(); } lsnAfterLastEventStoredLsn = currentLsn; @@ -94,11 +96,13 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { return Optional.of(startStreamingLsn); } if (currentLsn.equals(lastEventStoredLsn)) { + LOGGER.info("Current LSN is equal to the last event stored LSN {}", lastEventStoredLsn); storeLsnAfterLastEventStoredLsn = true; } if (lastCommitStoredLsn == null) { startStreamingLsn = firstLsnReceived; + LOGGER.info("Last commit stored LSN is null, returning firstLsnReceived {}", startStreamingLsn); return Optional.of(startStreamingLsn); } @@ -153,6 +157,11 @@ else if (txStartLsn != null) { * @return true if the message should be skipped, false otherwise */ public boolean skipMessage(Lsn lsn) { + if (YugabyteDBServer.isEnabled()) { + // YB Note: We will not be skipping any message. + return false; + } + if (passMessages) { return false; }