diff --git a/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution-no-drivers.xml b/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution-no-drivers.xml index 6167a81316e..78eb4291811 100644 --- a/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution-no-drivers.xml +++ b/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution-no-drivers.xml @@ -23,6 +23,9 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:* org.reflections:reflections:* + + com.google.guava:listenablefuture:* + ${assembly.exclude.1} ${assembly.exclude.2} diff --git a/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution.xml b/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution.xml index 4edf31a9f40..6c2068a4462 100644 --- a/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution.xml +++ b/debezium-assembly-descriptors/src/main/resources/assemblies/connector-distribution.xml @@ -22,6 +22,9 @@ com.fasterxml.jackson.core:jackson-annotations:* com.fasterxml.jackson.datatype:jackson-datatype-jsr310:* org.reflections:reflections:* + + + com.google.guava:listenablefuture:* diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java index 80e172d9a45..102be1051b0 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java @@ -85,6 +85,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) .maxQueueSize(connectorConfig.getMaxQueueSize()) + .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) .build(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java index dd3f583ecc7..1a7fceb8c19 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java @@ -84,6 +84,17 @@ public int totalCapacity() { public int remainingCapacity() { return records.remainingCapacity(); } + + @Override + public long maxQueueSizeInBytes() { + return context.getConnectorConfig().getMaxQueueSizeInBytes(); + } + + @Override + public long currentQueueSizeInBytes() { + // return 0 since MySQL connector doesn't use ChangeEventQueue implementation + return 0; + } }; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 1b965b8e53f..07deba58bf6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -150,6 +150,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) .maxQueueSize(connectorConfig.getMaxQueueSize()) + .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) .build(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java index b37e6234729..b9ea8a26758 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresMetricsIT.java @@ -21,6 +21,7 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; /** @@ -190,6 +191,97 @@ private void assertStreamingMetrics() throws Exception { // Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "MonitoredTables")).isEqualTo(new String[] {"public.simple"}); } + @Test + public void twoRecordsInQueue() throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); + final int recordCount = 2; + + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10) + .with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1) + .with(PostgresConnectorConfig.POLL_INTERVAL_MS, 5000L) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES, 10000L); + start(PostgresConnector.class, configBuilder.build()); + + waitForStreamingToStart(); + for (int i = 0; i < recordCount - 1; i++) { + TestHelper.execute(INSERT_STATEMENTS); + } + Awaitility.await() + .alias("MBean attribute was not an expected value") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> { + long value = (long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes"); + return value > 0; + }); + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isNotEqualTo(0L); + Awaitility.await() + .alias("MBean attribute was not an expected value") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> { + int value = (int) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity"); + return value == 8; + }); + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).isEqualTo(8); + + SourceRecords records = consumeRecordsByTopic(recordCount); + + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isEqualTo(0L); + stopConnector(); + } + + @Test + public void oneRecordInQueue() throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + TestHelper.execute(INIT_STATEMENTS, INSERT_STATEMENTS); + final int recordCount = 2; + + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10) + .with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1) + .with(PostgresConnectorConfig.POLL_INTERVAL_MS, 5000L) + .with(PostgresConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES, 10L); + start(PostgresConnector.class, configBuilder.build()); + + waitForStreamingToStart(); + for (int i = 0; i < recordCount - 1; i++) { + TestHelper.execute(INSERT_STATEMENTS); + } + Awaitility.await() + .alias("MBean attribute was not an expected value") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> { + long value = (long) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes"); + return value > 0; + }); + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isNotEqualTo(0L); + Awaitility.await() + .alias("MBean attribute was not an expected value") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> { + int value = (int) mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity"); + return value == 9; + }); + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).isEqualTo(9); + + SourceRecords records = consumeRecordsByTopic(recordCount); + Assertions.assertThat(mBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isEqualTo(0L); + stopConnector(); + } + private ObjectName getSnapshotMetricsObjectName() throws MalformedObjectNameException { return getSnapshotMetricsObjectName("postgres", TestHelper.TEST_SERVER); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index ec5bd91a65b..8a1745feab0 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -95,6 +95,7 @@ public ChangeEventSourceCoordinator start(Configuration config) { .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) .maxQueueSize(connectorConfig.getMaxQueueSize()) + .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) .build(); diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml index 9e7caf952ab..f6ff6cda86b 100644 --- a/debezium-core/pom.xml +++ b/debezium-core/pom.xml @@ -101,6 +101,30 @@ kafka-connect-avro-converter test + + + com.google.guava + guava + 29.0-jre + + + com.google.code.findbugs + jsr305 + + + com.google.errorprone + error_prone_annotations + + + com.google.j2objc + j2objc-annotations + + + org.checkerframework + checker-qual + + + diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 1e1f062c9f0..b145ff569c8 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -242,6 +242,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { public static final String DATABASE_CONFIG_PREFIX = "database."; private static final String CONVERTER_TYPE_SUFFIX = ".type"; public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L; + public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes; public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms") .withDisplayName("Retriable restart wait (ms)") @@ -294,6 +295,16 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { .withDefault(DEFAULT_POLL_INTERVAL_MILLIS) .withValidation(Field::isPositiveInteger); + public static final Field MAX_QUEUE_SIZE_IN_BYTES = Field.create("max.queue.size.in.bytes") + .withDisplayName("Change event buffer size in bytes") + .withType(Type.LONG) + .withWidth(Width.LONG) + .withImportance(Importance.MEDIUM) + .withDescription("Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to " + + DEFAULT_MAX_QUEUE_SIZE_IN_BYTES + ". Mean the feature is not enabled") + .withDefault(DEFAULT_MAX_QUEUE_SIZE_IN_BYTES) + .withValidation(Field::isNonNegativeLong); + public static final Field SNAPSHOT_DELAY_MS = Field.create("snapshot.delay.ms") .withDisplayName("Snapshot Delay (milliseconds)") .withType(Type.LONG) @@ -405,6 +416,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { MAX_BATCH_SIZE, MAX_QUEUE_SIZE, POLL_INTERVAL_MS, + MAX_QUEUE_SIZE_IN_BYTES, PROVIDE_TRANSACTION_METADATA, SKIPPED_OPERATIONS, SNAPSHOT_DELAY_MS, @@ -426,6 +438,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { private final boolean emitTombstoneOnDelete; private final int maxQueueSize; private final int maxBatchSize; + private final long maxQueueSizeInBytes; private final Duration pollInterval; private final String logicalName; private final String heartbeatTopicsPrefix; @@ -447,6 +460,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE); this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE); this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS); + this.maxQueueSizeInBytes = config.getLong(MAX_QUEUE_SIZE_IN_BYTES); this.logicalName = logicalName; this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX); this.snapshotDelayMs = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS)); @@ -484,6 +498,10 @@ public int getMaxBatchSize() { return maxBatchSize; } + public long getMaxQueueSizeInBytes() { + return maxQueueSizeInBytes; + } + public Duration getPollInterval() { return pollInterval; } diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java index 34bf4143de4..2f7e3344dcd 100644 --- a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java @@ -8,8 +8,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.kafka.connect.source.SourceRecord; @@ -22,6 +25,7 @@ import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext.PreviousContext; import io.debezium.util.Metronome; +import io.debezium.util.ObjectSizeCalculator; import io.debezium.util.Threads; import io.debezium.util.Threads.Timer; @@ -57,19 +61,24 @@ public class ChangeEventQueue implements ChangeEventQueueMetrics { private final Duration pollInterval; private final int maxBatchSize; private final int maxQueueSize; + private final long maxQueueSizeInBytes; private final BlockingQueue queue; private final Metronome metronome; private final Supplier loggingContextSupplier; + private AtomicLong currentQueueSizeInBytes = new AtomicLong(0); + private Map objectMap = new ConcurrentHashMap<>(); private volatile RuntimeException producerException; - private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier loggingContextSupplier) { + private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier loggingContextSupplier, + long maxQueueSizeInBytes) { this.pollInterval = pollInterval; this.maxBatchSize = maxBatchSize; this.maxQueueSize = maxQueueSize; this.queue = new LinkedBlockingDeque<>(maxQueueSize); this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM); this.loggingContextSupplier = loggingContextSupplier; + this.maxQueueSizeInBytes = maxQueueSizeInBytes; } public static class Builder { @@ -78,6 +87,7 @@ public static class Builder { private int maxQueueSize; private int maxBatchSize; private Supplier loggingContextSupplier; + private long maxQueueSizeInBytes; public Builder pollInterval(Duration pollInterval) { this.pollInterval = pollInterval; @@ -99,8 +109,13 @@ public Builder loggingContextSupplier(Supplier maxQueueSizeInBytes(long maxQueueSizeInBytes) { + this.maxQueueSizeInBytes = maxQueueSizeInBytes; + return this; + } + public ChangeEventQueue build() { - return new ChangeEventQueue(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier); + return new ChangeEventQueue(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier, maxQueueSizeInBytes); } } @@ -126,9 +141,18 @@ public void enqueue(T record) throws InterruptedException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing source record '{}'", record); } - + // Waiting for queue to add more record. + while (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes.get() > maxQueueSizeInBytes) { + Thread.sleep(pollInterval.toMillis()); + } // this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue queue.put(record); + // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature + if (maxQueueSizeInBytes > 0) { + long messageSize = ObjectSizeCalculator.getObjectSize(record); + objectMap.put(record, messageSize); + currentQueueSizeInBytes.addAndGet(messageSize); + } } /** @@ -154,6 +178,14 @@ public List poll() throws InterruptedException { metronome.pause(); LOGGER.debug("checking for more records..."); } + if (maxQueueSizeInBytes > 0 && records.size() > 0) { + records.parallelStream().forEach((record) -> { + if (objectMap.containsKey(record)) { + currentQueueSizeInBytes.addAndGet(-objectMap.get(record)); + objectMap.remove(record); + } + }); + } return records; } finally { @@ -180,4 +212,14 @@ public int totalCapacity() { public int remainingCapacity() { return queue.remainingCapacity(); } + + @Override + public long maxQueueSizeInBytes() { + return maxQueueSizeInBytes; + } + + @Override + public long currentQueueSizeInBytes() { + return currentQueueSizeInBytes.get(); + } } diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java index 9644f68f44e..f4703a6f6df 100644 --- a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java @@ -10,4 +10,8 @@ public interface ChangeEventQueueMetrics { int totalCapacity(); int remainingCapacity(); + + long maxQueueSizeInBytes(); + + long currentQueueSizeInBytes(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java index cad6c7cbb8b..cb7f67eda1b 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/ChangeEventSourceMetricsMXBean.java @@ -28,5 +28,9 @@ public interface ChangeEventSourceMetricsMXBean { int getQueueRemainingCapacity(); + long getMaxQueueSizeInBytes(); + + long getCurrentQueueSizeInBytes(); + void reset(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java index 6ef06fafe4b..35d7828265d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/metrics/PipelineMetrics.java @@ -118,4 +118,15 @@ public int getQueueTotalCapacity() { public int getQueueRemainingCapacity() { return changeEventQueueMetrics.remainingCapacity(); } + + @Override + public long getMaxQueueSizeInBytes() { + return changeEventQueueMetrics.maxQueueSizeInBytes(); + } + + @Override + public long getCurrentQueueSizeInBytes() { + return changeEventQueueMetrics.currentQueueSizeInBytes(); + } + } diff --git a/debezium-core/src/main/java/io/debezium/util/ObjectSizeCalculator.java b/debezium-core/src/main/java/io/debezium/util/ObjectSizeCalculator.java new file mode 100644 index 00000000000..ccce10ab5ff --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/util/ObjectSizeCalculator.java @@ -0,0 +1,453 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +/** + * copied from https://github.com/twitter-archive/commons/blob/master/src/java/com/twitter/common/objectsize/ObjectSizeCalculator.java + */ +package io.debezium.util; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryPoolMXBean; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Sets; + +/** + * Contains utility methods for calculating the memory usage of objects. It + * only works on the HotSpot JVM, and infers the actual memory layout (32 bit + * vs. 64 bit word size, compressed object pointers vs. uncompressed) from + * best available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. + * It can only make an educated guess at whether compressed OOPs are used, + * though; specifically, it knows what the JVM's default choice of OOP + * compression would be based on HotSpot version and maximum heap sizes, but if + * the choice is explicitly overridden with the -XX:{+|-}UseCompressedOops command line + * switch, it can not detect + * this fact and will report incorrect sizes, as it will presume the default JVM + * behavior. + * + * @author Attila Szegedi + */ +public class ObjectSizeCalculator { + + /** + * Describes constant memory overheads for various constructs in a JVM implementation. + */ + public interface MemoryLayoutSpecification { + + /** + * Returns the fixed overhead of an array of any type or length in this JVM. + * + * @return the fixed overhead of an array. + */ + int getArrayHeaderSize(); + + /** + * Returns the fixed overhead of for any {@link Object} subclass in this JVM. + * + * @return the fixed overhead of any object. + */ + int getObjectHeaderSize(); + + /** + * Returns the quantum field size for a field owned by an object in this JVM. + * + * @return the quantum field size for an object. + */ + int getObjectPadding(); + + /** + * Returns the fixed size of an object reference in this JVM. + * + * @return the size of all object references. + */ + int getReferenceSize(); + + /** + * Returns the quantum field size for a field owned by one of an object's ancestor superclasses + * in this JVM. + * + * @return the quantum field size for a superclass field. + */ + int getSuperclassFieldPadding(); + } + + private static class CurrentLayout { + private static final MemoryLayoutSpecification SPEC = getEffectiveMemoryLayoutSpecification(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. Attempts to to detect the current JVM memory layout, + * but may fail with {@link UnsupportedOperationException}; + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + * @throws UnsupportedOperationException if the current vm memory layout cannot be detected. + */ + public static long getObjectSize(Object obj) throws UnsupportedOperationException { + return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj); + } + + // Fixed object header size for arrays. + private final int arrayHeaderSize; + // Fixed object header size for non-array objects. + private final int objectHeaderSize; + // Padding for the object size - if the object size is not an exact multiple + // of this, it is padded to the next multiple. + private final int objectPadding; + // Size of reference (pointer) fields. + private final int referenceSize; + // Padding for the fields of superclass before fields of subclasses are + // added. + private final int superclassFieldPadding; + + private final LoadingCache, ClassSizeInfo> classSizeInfos = CacheBuilder.newBuilder().build(new CacheLoader, ClassSizeInfo>() { + public ClassSizeInfo load(Class clazz) { + return new ClassSizeInfo(clazz); + } + }); + + private final Set alreadyVisited = Sets.newIdentityHashSet(); + private final Deque pending = new ArrayDeque(16 * 1024); + private long size; + + /** + * Creates an object size calculator that can calculate object sizes for a given + * {@code memoryLayoutSpecification}. + * + * @param memoryLayoutSpecification a description of the JVM memory layout. + */ + public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) { + Preconditions.checkNotNull(memoryLayoutSpecification); + arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize(); + objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize(); + objectPadding = memoryLayoutSpecification.getObjectPadding(); + referenceSize = memoryLayoutSpecification.getReferenceSize(); + superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding(); + } + + /** + * Given an object, returns the total allocated size, in bytes, of the object + * and all other objects reachable from it. + * + * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do + * anything special, it measures the size of all objects + * reachable through it (which will include its class loader, and by + * extension, all other Class objects loaded by + * the same loader, and all the parent class loaders). It doesn't provide the + * size of the static fields in the JVM class that the Class object + * represents. + * @return the total allocated size of the object and all other objects it + * retains. + */ + public synchronized long calculateObjectSize(Object obj) { + // Breadth-first traversal instead of naive depth-first with recursive + // implementation, so we don't blow the stack traversing long linked lists. + try { + for (;;) { + visit(obj); + if (pending.isEmpty()) { + return size; + } + obj = pending.removeFirst(); + } + } + finally { + alreadyVisited.clear(); + pending.clear(); + size = 0; + } + } + + private void visit(Object obj) { + if (alreadyVisited.contains(obj)) { + return; + } + final Class clazz = obj.getClass(); + if (clazz == ArrayElementsVisitor.class) { + ((ArrayElementsVisitor) obj).visit(this); + } + else { + alreadyVisited.add(obj); + if (clazz.isArray()) { + visitArray(obj); + } + else { + classSizeInfos.getUnchecked(clazz).visit(obj, this); + } + } + } + + private void visitArray(Object array) { + final Class componentType = array.getClass().getComponentType(); + final int length = Array.getLength(array); + if (componentType.isPrimitive()) { + increaseByArraySize(length, getPrimitiveFieldSize(componentType)); + } + else { + increaseByArraySize(length, referenceSize); + // If we didn't use an ArrayElementsVisitor, we would be enqueueing every + // element of the array here instead. For large arrays, it would + // tremendously enlarge the queue. In essence, we're compressing it into + // a small command object instead. This is different than immediately + // visiting the elements, as their visiting is scheduled for the end of + // the current queue. + switch (length) { + case 0: { + break; + } + case 1: { + enqueue(Array.get(array, 0)); + break; + } + default: { + enqueue(new ArrayElementsVisitor((Object[]) array)); + } + } + } + } + + private void increaseByArraySize(int length, long elementSize) { + increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding)); + } + + private static class ArrayElementsVisitor { + private final Object[] array; + + ArrayElementsVisitor(Object[] array) { + this.array = array; + } + + public void visit(ObjectSizeCalculator calc) { + for (Object elem : array) { + if (elem != null) { + calc.visit(elem); + } + } + } + } + + void enqueue(Object obj) { + if (obj != null) { + pending.addLast(obj); + } + } + + void increaseSize(long objectSize) { + size += objectSize; + } + + @VisibleForTesting + static long roundTo(long x, int multiple) { + return ((x + multiple - 1) / multiple) * multiple; + } + + private class ClassSizeInfo { + // Padded fields + header size + private final long objectSize; + // Only the fields size - used to calculate the subclasses' memory + // footprint. + private final long fieldsSize; + private final Field[] referenceFields; + + public ClassSizeInfo(Class clazz) { + long fieldsSize = 0; + final List referenceFields = new LinkedList(); + for (Field f : clazz.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + final Class type = f.getType(); + if (type.isPrimitive()) { + fieldsSize += getPrimitiveFieldSize(type); + } + else { + f.setAccessible(true); + referenceFields.add(f); + fieldsSize += referenceSize; + } + } + final Class superClass = clazz.getSuperclass(); + if (superClass != null) { + final ClassSizeInfo superClassInfo = classSizeInfos.getUnchecked(superClass); + fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding); + referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields)); + } + this.fieldsSize = fieldsSize; + this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding); + this.referenceFields = referenceFields.toArray( + new Field[referenceFields.size()]); + } + + void visit(Object obj, ObjectSizeCalculator calc) { + calc.increaseSize(objectSize); + enqueueReferencedObjects(obj, calc); + } + + public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { + for (Field f : referenceFields) { + try { + calc.enqueue(f.get(obj)); + } + catch (IllegalAccessException e) { + final AssertionError ae = new AssertionError( + "Unexpected denial of access to " + f); + ae.initCause(e); + throw ae; + } + } + } + } + + private static long getPrimitiveFieldSize(Class type) { + if (type == boolean.class || type == byte.class) { + return 1; + } + if (type == char.class || type == short.class) { + return 2; + } + if (type == int.class || type == float.class) { + return 4; + } + if (type == long.class || type == double.class) { + return 8; + } + throw new AssertionError("Encountered unexpected primitive type " + + type.getName()); + } + + @VisibleForTesting + static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() { + final String vmName = System.getProperty("java.vm.name"); + if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") + || vmName.startsWith("OpenJDK") || vmName.startsWith("TwitterJDK"))) { + throw new UnsupportedOperationException( + "ObjectSizeCalculator only supported on HotSpot VM"); + } + + final String dataModel = System.getProperty("sun.arch.data.model"); + if ("32".equals(dataModel)) { + // Running with 32-bit data model + return new MemoryLayoutSpecification() { + @Override + public int getArrayHeaderSize() { + return 12; + } + + @Override + public int getObjectHeaderSize() { + return 8; + } + + @Override + public int getObjectPadding() { + return 8; + } + + @Override + public int getReferenceSize() { + return 4; + } + + @Override + public int getSuperclassFieldPadding() { + return 4; + } + }; + } + else if (!"64".equals(dataModel)) { + throw new UnsupportedOperationException("Unrecognized value '" + + dataModel + "' of sun.arch.data.model system property"); + } + + final String strVmVersion = System.getProperty("java.vm.version"); + final int vmVersion = Integer.parseInt(strVmVersion.substring(0, + strVmVersion.indexOf('.'))); + if (vmVersion >= 17) { + long maxMemory = 0; + for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { + maxMemory += mp.getUsage().getMax(); + } + if (maxMemory < 30L * 1024 * 1024 * 1024) { + // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total + // for all memory pools (yes, including code cache). + return new MemoryLayoutSpecification() { + @Override + public int getArrayHeaderSize() { + return 16; + } + + @Override + public int getObjectHeaderSize() { + return 12; + } + + @Override + public int getObjectPadding() { + return 8; + } + + @Override + public int getReferenceSize() { + return 4; + } + + @Override + public int getSuperclassFieldPadding() { + return 4; + } + }; + } + } + + // In other cases, it's a 64-bit uncompressed OOPs object model + return new MemoryLayoutSpecification() { + @Override + public int getArrayHeaderSize() { + return 24; + } + + @Override + public int getObjectHeaderSize() { + return 16; + } + + @Override + public int getObjectPadding() { + return 8; + } + + @Override + public int getReferenceSize() { + return 8; + } + + @Override + public int getSuperclassFieldPadding() { + return 8; + } + }; + } +} diff --git a/documentation/modules/ROOT/pages/connectors/db2.adoc b/documentation/modules/ROOT/pages/connectors/db2.adoc index 22d59d47781..05c169e8478 100644 --- a/documentation/modules/ROOT/pages/connectors/db2.adoc +++ b/documentation/modules/ROOT/pages/connectors/db2.adoc @@ -1914,6 +1914,10 @@ The following _advanced_ configuration properties have defaults that work in mos |`2048` |Positive integer value that specifies the maximum size of each batch of events that the connector processes. +|[[db2-property-max-queue-size-in-bytes]]<> +|`0` +|Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it's set with a positive long value. + |[[db2-property-heartbeat-interval-ms]]<> |`0` |Controls how frequently the connector sends heartbeat messages to a Kafka topic. The default behavior is that the connector does not send heartbeat messages. + diff --git a/documentation/modules/ROOT/pages/connectors/vitess.adoc b/documentation/modules/ROOT/pages/connectors/vitess.adoc index 4bd22413761..15f965335f6 100644 --- a/documentation/modules/ROOT/pages/connectors/vitess.adoc +++ b/documentation/modules/ROOT/pages/connectors/vitess.adoc @@ -982,6 +982,15 @@ The values will incoporate any differences between the clocks on the machines wh |`long` |The number of processed transactions that were committed. +|[[connectors-strm-metric-maxqueuesizeinbytes]]<> +|`long` +|The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop. + +|[[connectors-strm-metric-currentqueuesizeinbytes]]<> +|`long` +|The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop. + + |=== // Type: reference diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-snapshot-metrics.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-snapshot-metrics.adoc index ef45ca59b06..c0165c24581 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-snapshot-metrics.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-snapshot-metrics.adoc @@ -60,4 +60,12 @@ Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table. +|[[connectors-strm-metric-maxqueuesizeinbytes_{context}]]<> +|`long` +|The maximum buffer of the queue in bytes. + +|[[connectors-strm-metric-currentqueuesizeinbytes_{context}]]<> +|`long` +|The current data of records in the queue in bytes. + |=== diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-streaming-metrics.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-streaming-metrics.adoc index cbc3a016a74..2f9bcfe5cd8 100644 --- a/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-streaming-metrics.adoc +++ b/documentation/modules/ROOT/partials/modules/all-connectors/ref-connector-monitoring-streaming-metrics.adoc @@ -51,4 +51,12 @@ The values will incoporate any differences between the clocks on the machines wh |`string` |Transaction identifier of the last processed transaction. +|[[connectors-strm-metric-maxqueuesizeinbytes_{context}]]<> +|`long` +|The maximum buffer of the queue in bytes. + +|[[connectors-strm-metric-currentqueuesizeinbytes_{context}]]<> +|`long` +|The current data of records in the queue in bytes. + |===