Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use commit time for lsn #158

Open
wants to merge 7 commits into
base: ybdb-debezium-2.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -80,12 +82,16 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
*/
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
private Lsn lastCompletelyProcessedLsn;
private Lsn lastSentFeedback = Lsn.valueOf(2L);
private PostgresOffsetContext effectiveOffset;

protected ConcurrentLinkedQueue<Lsn> commitTimes;

/**
* For DEBUGGING
*/
private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty();
private long recordCount = 0;

public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
PostgresConnection connection, PostgresEventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
Expand All @@ -101,7 +107,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.snapshotter = snapshotter;
this.replicationConnection = (PostgresReplicationConnection) replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());

this.commitTimes = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -147,14 +153,30 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
replicationConnection.getConnectedNodeIp());
}

// B, 1, 2 - lsn, 3, 4, C, B, 5, 6, 7, 8

if (hasStartLsnStoredInContext) {
// start streaming from the last recorded position in the offset
final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn()
: this.effectiveOffset.lsn();
// final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn()
// : this.effectiveOffset.lsn();
// we will be streaming from the last commit lsn since we are sure that we have
// received that transaction completely.
// if lastCommitLsn is null, that means we are only in the beginning of streaming.
LOGGER.info("LSN is stored in context");
final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ?
lastSentFeedback : this.effectiveOffset.lastCommitLsn();

if (this.effectiveOffset.lastCommitLsn() == null) {
LOGGER.info("Last commit stored in offset is null");
}

LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn);

final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType();
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType);
// LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType);
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
lastSentFeedback = lsn;
}
else {
LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
Expand Down Expand Up @@ -198,7 +220,7 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
replicationConnection.getConnectedNodeIp());
}

replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition));
replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition));
stream = this.replicationStream.get();
stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME));
}
Expand Down Expand Up @@ -292,6 +314,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId());
} else {
LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId());
LOGGER.debug("Record count in the txn {} is {} with commit time {}", message.getTransactionId(), recordCount, lsn.asLong() - 1);
recordCount = 0;
}

OptionalLong currentTxnid = message.getTransactionId();
Expand All @@ -308,7 +332,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
// Don't skip on BEGIN message as it would flush LSN for the whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}
return;
}
Expand All @@ -321,7 +345,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
}
else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
}
maybeWarnAboutGrowingWalBacklog(true);
Expand All @@ -333,7 +357,7 @@ else if (message.getOperation() == Operation.MESSAGE) {

// non-transactional message that will not be followed by a COMMIT message
if (message.isLastEventForLsn()) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}

dispatcher.dispatchLogicalDecodingMessage(
Expand All @@ -346,12 +370,19 @@ else if (message.getOperation() == Operation.MESSAGE) {
}
// DML event
else {
LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn);
++recordCount;

TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
}

/*
tx1: BEGIN2 - DML1 - COMMIT2 (updates lastCommitLsn)
tx2: BEGIN2 - DML2 (update lastCompletelyProcessedLsn) - COMMIT2 (updates lastCommitLsn)
*/
offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
tableId,
Expand Down Expand Up @@ -384,8 +415,15 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti
while (context.isRunning() && resumeLsn.get() == null) {

boolean receivedMessage = stream.readPending(message -> {
final Lsn lsn = stream.lastReceivedLsn();
// YB Note: We do not need this, we need to start from the last commit lsn from the
// walPosition
// final Lsn lsn = stream.lastReceivedLsn();
final Lsn lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback;
resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null));

if (resumeLsn.get() == null) {
LOGGER.info("Resume LSN is null");
}
});

if (receivedMessage) {
Expand All @@ -412,9 +450,17 @@ private void probeConnectionIfNeeded() throws SQLException {
}
}

private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException {
private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;

// todo
offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn);

if (message.getOperation() == Operation.COMMIT) {
LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1));
commitTimes.add(Lsn.valueOf(lsn.asLong() - 1));
}

maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
Expand Down Expand Up @@ -463,7 +509,7 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;

LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
if (replicationStream != null && lsn != null) {
if (!lsnFlushingAllowed) {
LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);
Expand All @@ -473,8 +519,15 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing LSN to server: {}", lsn);
}

Lsn finalLsn = getLsnToBeFlushed(lsn);
LOGGER.info("Flushing lsn '{}' for table", finalLsn);

// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
replicationStream.flushLsn(finalLsn);
lastSentFeedback = finalLsn;

cleanCommitTimeQueue(finalLsn);
}
else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
Expand All @@ -485,6 +538,35 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
}
}

protected Lsn getLsnToBeFlushed(Lsn lsn) {
if (commitTimes == null || commitTimes.isEmpty()) {
// This means that the queue has not been initialised and the task is still starting.
return lastSentFeedback;
}

Lsn result = lastSentFeedback;

LOGGER.info("Queue at this time: {}", commitTimes);

for (Lsn commitLsn : commitTimes) {
if (commitLsn.compareTo(lsn) < 0) {
LOGGER.debug("Assigning result as {}", commitLsn);
result = commitLsn;
} else {
// This will be the loop exit when we encounter any bigger element.
break;
}
}

return result;
}

protected void cleanCommitTimeQueue(Lsn lsn) {
if (commitTimes != null) {
commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1);
}
}

@Override
public PostgresOffsetContext getOffsetContext() {
return effectiveOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean isValid() {

@Override
public String toString() {
return "LSN{" + asString() + '}';
return "LSN{" + asLong() + '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi
offset = defaultStartingPos;
}
Lsn lsn = offset;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("starting streaming from LSN '{}'", lsn);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("starting streaming from LSN '{}'", lsn);
}

final int maxRetries = connectorConfig.maxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Optional;
import java.util.Set;

import io.debezium.connector.postgresql.YugabyteDBServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,6 +43,7 @@ public class WalPositionLocator {

public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType) {
this.lastCommitStoredLsn = lastCommitStoredLsn;
// YB Note: lastEventStoredLsn and lastCommitStoredLsn will be the same.
this.lastEventStoredLsn = lastEventStoredLsn;
this.lastProcessedMessageType = lastProcessedMessageType;

Expand Down Expand Up @@ -82,23 +84,33 @@ public Optional<Lsn> resumeFromLsn(Lsn currentLsn, ReplicationMessage message) {
startStreamingLsn = txStartLsn;
return Optional.of(startStreamingLsn);
}
LOGGER.info("Returning optional empty as resume LSN");
return Optional.empty();
}
lsnAfterLastEventStoredLsn = currentLsn;

// YB Note: we do not want this to be turned true ever.
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
storeLsnAfterLastEventStoredLsn = false;
LOGGER.info("LSN after last stored change LSN '{}' received", lsnAfterLastEventStoredLsn);
startStreamingLsn = lsnAfterLastEventStoredLsn;
return Optional.of(startStreamingLsn);
}
if (currentLsn.equals(lastEventStoredLsn)) {
LOGGER.info("Current LSN is equal to the last event stored LSN {}", lastEventStoredLsn);
storeLsnAfterLastEventStoredLsn = true;
}

if (lastCommitStoredLsn == null) {
startStreamingLsn = firstLsnReceived;
LOGGER.info("Last commit stored LSN is null, returning firstLsnReceived {}", startStreamingLsn);
return Optional.of(startStreamingLsn);
}

if (currentLsn.equals(lastCommitStoredLsn)) {
LOGGER.info("Returning lastCommitStoredLsn {} for resuming", lastCommitStoredLsn);
return Optional.of(lastCommitStoredLsn);
}

switch (message.getOperation()) {
case BEGIN:
txStartLsn = currentLsn;
Expand Down Expand Up @@ -145,6 +157,11 @@ else if (txStartLsn != null) {
* @return true if the message should be skipped, false otherwise
*/
public boolean skipMessage(Lsn lsn) {
if (YugabyteDBServer.isEnabled()) {
// YB Note: We will not be skipping any message.
return false;
}

if (passMessages) {
return false;
}
Expand All @@ -160,7 +177,7 @@ public boolean skipMessage(Lsn lsn) {
lsn,
lsnSeen));
}
LOGGER.debug("Message with LSN '{}' filtered", lsn);
LOGGER.info("Message with LSN '{}' filtered", lsn);
return true;
}

Expand Down