Skip to content

Commit

Permalink
DBZ-2662 Handle ChangeEventQueue by the size in bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
vanhoale authored and jpechane committed Nov 16, 2020
1 parent 4faf13d commit 17a424d
Show file tree
Hide file tree
Showing 18 changed files with 700 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
<exclude>com.fasterxml.jackson.datatype:jackson-datatype-jsr310:*</exclude>
<exclude>org.reflections:reflections:*</exclude>

<!-- Exclude guava dependencies -->
<exclude>com.google.guava:listenablefuture:*</exclude>

<!-- Exclude connection drivers -->
<exclude>${assembly.exclude.1}</exclude>
<exclude>${assembly.exclude.2}</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
<exclude>com.fasterxml.jackson.core:jackson-annotations:*</exclude>
<exclude>com.fasterxml.jackson.datatype:jackson-datatype-jsr310:*</exclude>
<exclude>org.reflections:reflections:*</exclude>

<!-- Exclude guava dependencies -->
<exclude>com.google.guava:listenablefuture:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public int totalCapacity() {
public int remainingCapacity() {
return records.remainingCapacity();
}

@Override
public long maxQueueSizeInBytes() {
return context.getConnectorConfig().getMaxQueueSizeInBytes();
}

@Override
public long currentQueueSizeInBytes() {
// return 0 since MySQL connector doesn't use ChangeEventQueue implementation
return 0;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.junit.Before;
import org.junit.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;

/**
Expand Down Expand Up @@ -190,6 +191,97 @@ private void assertStreamingMetrics() throws Exception {
// Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "MonitoredTables")).isEqualTo(new String[] {"public.simple"});
}

@Test
public void twoRecordsInQueue() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
final int recordCount = 2;

Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10)
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, 5000L)
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES, 10000L);
start(PostgresConnector.class, configBuilder.build());

waitForStreamingToStart();
for (int i = 0; i < recordCount - 1; i++) {
TestHelper.execute(INSERT_STATEMENTS);
}
Awaitility.await()
.alias("MBean attribute was not an expected value")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> {
long value = (long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes");
return value > 0;
});
Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isNotEqualTo(0L);
Awaitility.await()
.alias("MBean attribute was not an expected value")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> {
int value = (int) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity");
return value == 8;
});
Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).isEqualTo(8);

SourceRecords records = consumeRecordsByTopic(recordCount);

Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isEqualTo(0L);
stopConnector();
}

@Test
public void oneRecordInQueue() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS);
final int recordCount = 2;

Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10)
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, 5000L)
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES, 10L);
start(PostgresConnector.class, configBuilder.build());

waitForStreamingToStart();
for (int i = 0; i < recordCount - 1; i++) {
TestHelper.execute(INSERT_STATEMENTS);
}
Awaitility.await()
.alias("MBean attribute was not an expected value")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> {
long value = (long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes");
return value > 0;
});
Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isNotEqualTo(0L);
Awaitility.await()
.alias("MBean attribute was not an expected value")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.ignoreException(InstanceNotFoundException.class)
.until(() -> {
int value = (int) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity");
return value == 9;
});
Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).isEqualTo(9);

SourceRecords records = consumeRecordsByTopic(recordCount);
Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isEqualTo(0L);
stopConnector();
}

private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException {
return getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

Expand Down
24 changes: 24 additions & 0 deletions debezium-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,30 @@
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<!-- open libraries -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.j2objc</groupId>
<artifactId>j2objc-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final String DATABASE_CONFIG_PREFIX = "database.";
private static final String CONVERTER_TYPE_SUFFIX = ".type";
public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L;
public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes;

public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms")
.withDisplayName("Retriable restart wait (ms)")
Expand Down Expand Up @@ -294,6 +295,16 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
.withDefault(DEFAULT_POLL_INTERVAL_MILLIS)
.withValidation(Field::isPositiveInteger);

public static final Field MAX_QUEUE_SIZE_IN_BYTES = Field.create("max.queue.size.in.bytes")
.withDisplayName("Change event buffer size in bytes")
.withType(Type.LONG)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to "
+ DEFAULT_MAX_QUEUE_SIZE_IN_BYTES + ". Mean the feature is not enabled")
.withDefault(DEFAULT_MAX_QUEUE_SIZE_IN_BYTES)
.withValidation(Field::isNonNegativeLong);

public static final Field SNAPSHOT_DELAY_MS = Field.create("snapshot.delay.ms")
.withDisplayName("Snapshot Delay (milliseconds)")
.withType(Type.LONG)
Expand Down Expand Up @@ -405,6 +416,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
MAX_BATCH_SIZE,
MAX_QUEUE_SIZE,
POLL_INTERVAL_MS,
MAX_QUEUE_SIZE_IN_BYTES,
PROVIDE_TRANSACTION_METADATA,
SKIPPED_OPERATIONS,
SNAPSHOT_DELAY_MS,
Expand All @@ -426,6 +438,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
private final boolean emitTombstoneOnDelete;
private final int maxQueueSize;
private final int maxBatchSize;
private final long maxQueueSizeInBytes;
private final Duration pollInterval;
private final String logicalName;
private final String heartbeatTopicsPrefix;
Expand All @@ -447,6 +460,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE);
this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS);
this.maxQueueSizeInBytes = config.getLong(MAX_QUEUE_SIZE_IN_BYTES);
this.logicalName = logicalName;
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
this.snapshotDelayMs = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));
Expand Down Expand Up @@ -484,6 +498,10 @@ public int getMaxBatchSize() {
return maxBatchSize;
}

public long getMaxQueueSizeInBytes() {
return maxQueueSizeInBytes;
}

public Duration getPollInterval() {
return pollInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -22,6 +25,7 @@
import io.debezium.util.LoggingContext;
import io.debezium.util.LoggingContext.PreviousContext;
import io.debezium.util.Metronome;
import io.debezium.util.ObjectSizeCalculator;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;

Expand Down Expand Up @@ -57,19 +61,24 @@ public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private final Duration pollInterval;
private final int maxBatchSize;
private final int maxQueueSize;
private final long maxQueueSizeInBytes;
private final BlockingQueue<T> queue;
private final Metronome metronome;
private final Supplier<PreviousContext> loggingContextSupplier;
private AtomicLong currentQueueSizeInBytes = new AtomicLong(0);
private Map<T, Long> objectMap = new ConcurrentHashMap<>();

private volatile RuntimeException producerException;

private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier,
long maxQueueSizeInBytes) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
}

public static class Builder<T> {
Expand All @@ -78,6 +87,7 @@ public static class Builder<T> {
private int maxQueueSize;
private int maxBatchSize;
private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
private long maxQueueSizeInBytes;

public Builder<T> pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
Expand All @@ -99,8 +109,13 @@ public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext
return this;
}

public Builder<T> maxQueueSizeInBytes(long maxQueueSizeInBytes) {
this.maxQueueSizeInBytes = maxQueueSizeInBytes;
return this;
}

public ChangeEventQueue<T> build() {
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier, maxQueueSizeInBytes);
}
}

Expand All @@ -126,9 +141,18 @@ public void enqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}

// Waiting for queue to add more record.
while (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes.get() > maxQueueSizeInBytes) {
Thread.sleep(pollInterval.toMillis());
}
// this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
queue.put(record);
// If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
if (maxQueueSizeInBytes > 0) {
long messageSize = ObjectSizeCalculator.getObjectSize(record);
objectMap.put(record, messageSize);
currentQueueSizeInBytes.addAndGet(messageSize);
}
}

/**
Expand All @@ -154,6 +178,14 @@ public List<T> poll() throws InterruptedException {
metronome.pause();
LOGGER.debug("checking for more records...");
}
if (maxQueueSizeInBytes > 0 && records.size() > 0) {
records.parallelStream().forEach((record) -> {
if (objectMap.containsKey(record)) {
currentQueueSizeInBytes.addAndGet(-objectMap.get(record));
objectMap.remove(record);
}
});
}
return records;
}
finally {
Expand All @@ -180,4 +212,14 @@ public int totalCapacity() {
public int remainingCapacity() {
return queue.remainingCapacity();
}

@Override
public long maxQueueSizeInBytes() {
return maxQueueSizeInBytes;
}

@Override
public long currentQueueSizeInBytes() {
return currentQueueSizeInBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ public interface ChangeEventQueueMetrics {
int totalCapacity();

int remainingCapacity();

long maxQueueSizeInBytes();

long currentQueueSizeInBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ public interface ChangeEventSourceMetricsMXBean {

int getQueueRemainingCapacity();

long getMaxQueueSizeInBytes();

long getCurrentQueueSizeInBytes();

void reset();
}
Loading

0 comments on commit 17a424d

Please sign in to comment.