From 4c48480762da7aede1b4d55509c2f0dd3fffdd0d Mon Sep 17 00:00:00 2001 From: Carsten Lohmann Date: Sun, 19 May 2024 11:30:34 +0200 Subject: [PATCH] Fix AsyncHandlingAutoCommitKafkaConsumerTest tests. Tests were failing with Kafka 3.6.1 because of changes to the MockConsumer.rebalance() method. The rebalance-related tests have been changed to now work with the MockConsumer.rebalance() method (removing the Hono KafkaMockConsumer override of that method). --- .../AsyncHandlingAutoCommitKafkaConsumer.java | 13 +- .../kafka/consumer/HonoKafkaConsumer.java | 8 +- ...ncHandlingAutoCommitKafkaConsumerTest.java | 178 ++++++++---------- .../hono/kafka/test/KafkaMockConsumer.java | 29 +-- 4 files changed, 88 insertions(+), 140 deletions(-) diff --git a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java index 9893e4d3bf..5565e4bc7f 100644 --- a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java +++ b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2021 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -63,9 +63,10 @@ * completed gets committed. This prevents incompletely handled records and thereby enables at-least-once semantics. *

* In terms of when offsets are committed, the behaviour is similar to the one used for a consumer with - * enable.auto.commit. Commits are done periodically (using commitAsync) and when a rebalance - * happens or the consumer is stopped (using commitSync). The periodic commit interval is defined via - * the standard auto.commit.interval.ms configuration property. + * enable.auto.commit. Commits are done periodically (using commitAsync), and they are also done + * (using commitSync) when the consumer is stopped or a rebalance happens (with eager rebalancing + * or a non-empty revoked partitions list). + * The periodic commit interval is defined via the standard auto.commit.interval.ms configuration property. *

* In order to not fall behind with the position of the committed offset vs. the last received offset, users of this * class have to make sure that the record handling function, which provides the completion Future, is completed in time. @@ -90,7 +91,7 @@ * still done) but record fetching from all assigned topic partitions is suspended until the throttling threshold is * reached again. * The overall limit, i.e. the maximum number of incomplete record handler result futures at a given point in time, is - * calculated from the above mentioned throttling threshold plus the maximum number of records per poll operation. + * calculated from the above-mentioned throttling threshold plus the maximum number of records per poll operation. * * @param The type of record payload this consumer can process. */ @@ -653,7 +654,7 @@ public OffsetsQueueEntry addOffset(final long offset) { * the 'skipOffsetRecommitPeriodSeconds'. Note that for the actual commit, {@code 1} has to be added to the * returned value. *

- * Otherwise an empty Optional is returned. + * Otherwise, an empty Optional is returned. * * @return The offset wrapped in an Optional or an empty Optional if no offset commit is needed. */ diff --git a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java index 8bf59d0747..87ff9e50b7 100644 --- a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java +++ b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2021 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -720,7 +720,8 @@ public void onPartitionsAssigned(final Collection partitionsSet = Helper.from(partitions); if (LOG.isDebugEnabled()) { - LOG.debug("partitions assigned: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions)); + LOG.debug("partitions assigned: [{}] [client-id: {}]", + HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId()); } ensurePositionsHaveBeenSetIfNeeded(partitionsSet); updateSubscribedTopicPatternTopicsAndRemoveMetrics(); @@ -744,7 +745,8 @@ public void onPartitionsRevoked(final Collection partitionsSet = Helper.from(partitions); if (LOG.isDebugEnabled()) { - LOG.debug("partitions revoked: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions)); + LOG.debug("partitions revoked: [{}] [client-id: {}]", + HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), getClientId()); } onPartitionsRevokedBlocking(partitionsSet); context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet)); diff --git a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java index cb1d6bcf71..4538c415ac 100644 --- a/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java +++ b/clients/kafka-common/src/test/java/org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumerTest.java @@ -331,15 +331,18 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig); consumer.setKafkaConsumerSupplier(() -> mockConsumer); consumer.addOnKafkaConsumerReadyHandler(readyTracker); - consumer.start() - .compose(ok -> readyTracker.future()) - .onComplete(ctx.succeeding(v2 -> { - mockConsumer.schedulePollTask(() -> { - IntStream.range(0, numTestRecords).forEach(offset -> { - mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer())); - }); - }); - })); + final Context consumerVertxContext = vertx.getOrCreateContext(); + consumerVertxContext.runOnContext(v -> { + consumer.start() + .compose(ok -> readyTracker.future()) + .onComplete(ctx.succeeding(v2 -> { + mockConsumer.schedulePollTask(() -> { + IntStream.range(0, numTestRecords).forEach(offset -> { + mockConsumer.addRecord(new ConsumerRecord<>(TOPIC, PARTITION, offset, "key_" + offset, Buffer.buffer())); + }); + }); + })); + }); assertWithMessage("records received in 5s") .that(receivedRecordsCtx.awaitCompletion(5, TimeUnit.SECONDS)) .isTrue(); @@ -356,66 +359,23 @@ public void testConsumerCommitsOffsetsOnRebalance(final VertxTestContext ctx) th final AtomicInteger latestFullyHandledOffset = new AtomicInteger(1); recordsHandlingPromiseMap.get(4L).complete(); - // define VertxTestContexts for 3 checks (3x rebalance/commit) - final var checkIndex = new AtomicInteger(0); - final var commitCheckContexts = IntStream.range(0, 3) - .mapToObj(i -> new VertxTestContext()).toList(); - final var commitCheckpoints = commitCheckContexts.stream() - .map(c -> c.laxCheckpoint(1)).toList(); - final InterruptableSupplier waitForCurrentCommitCheckResult = () -> { - final var checkContext = commitCheckContexts.get(checkIndex.get()); - assertWithMessage("partition assigned in 5s for checking of commits") - .that(checkContext.awaitCompletion(5, TimeUnit.SECONDS)) - .isTrue(); - if (checkContext.failed()) { - ctx.failNow(checkContext.causeOfFailure()); - return false; - } - return true; - }; + final Checkpoint commitCheckDone = ctx.checkpoint(1); consumer.setOnPartitionsAssignedHandler(partitions -> { - LOG.debug("onPartitionsAssignedHandler invoked [check index: {}, newly assigned partitions: {}]", - checkIndex.get(), partitions.stream().map(t -> t.toString()).collect(Collectors.joining(", "))); - final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION)); - final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION); - LOG.debug("committed partition [name: {}, offset: {}, expected offset: {}]", - TOPIC_PARTITION, offsetAndMetadata.offset(), latestFullyHandledOffset.get() + 1L); - ctx.verify(() -> { - assertThat(offsetAndMetadata).isNotNull(); -// assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L); - }); - if (offsetAndMetadata.offset() == latestFullyHandledOffset.get() + 1L) { - commitCheckpoints.get(checkIndex.get()).flag(); + if (!partitions.isEmpty()) { + final var committedPartitions = mockConsumer.committed(Set.of(TOPIC_PARTITION)); + final var offsetAndMetadata = committedPartitions.get(TOPIC_PARTITION); + ctx.verify(() -> { + assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset.get() + 1L); + }); + commitCheckDone.flag(); } }); // now force a rebalance which should trigger the above onPartitionsAssignedHandler - LOG.debug("force rebalance 1"); - mockConsumer.rebalance(List.of(TOPIC_PARTITION)); - if (!waitForCurrentCommitCheckResult.get()) { - return; - } - checkIndex.incrementAndGet(); - - // now another rebalance (ie. commit trigger) - no change in offsets - LOG.debug("force rebalance 2"); - mockConsumer.rebalance(List.of(TOPIC_PARTITION)); - if (!waitForCurrentCommitCheckResult.get()) { - return; - } - checkIndex.incrementAndGet(); - - // now complete some more promises - recordsHandlingPromiseMap.get(2L).complete(); - recordsHandlingPromiseMap.get(3L).complete(); - // offset 4 already complete - latestFullyHandledOffset.set(4); - // again rebalance/commit - LOG.debug("force rebalance 3"); + LOG.debug("force rebalance"); + mockConsumer.rebalance(List.of()); mockConsumer.rebalance(List.of(TOPIC_PARTITION)); - if (waitForCurrentCommitCheckResult.get()) { - ctx.completeNow(); - } } /** @@ -448,6 +408,8 @@ public void testConsumerCommitsOffsetsOnRebalanceAfterWaitingForRecordCompletion mockConsumer.updateBeginningOffsets(Map.of(TOPIC_PARTITION, 0L)); mockConsumer.updateEndOffsets(Map.of(TOPIC_PARTITION, 0L)); + mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L)); + mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L)); mockConsumer.updatePartitions(TOPIC_PARTITION, KafkaMockConsumer.DEFAULT_NODE); mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION)); final AtomicReference> onNextPartitionsRevokedBlockingHandlerRef = new AtomicReference<>(); @@ -483,38 +445,59 @@ protected void onPartitionsRevokedBlocking( ctx.failNow(receivedRecordsCtx.causeOfFailure()); return; } - // records received, complete the handling of all except the first 2 records + LOG.debug("all records received, complete the handling of all except the first 2 records"); LongStream.range(2, numTestRecords).forEach(offset -> recordsHandlingPromiseMap.get(offset).complete()); - ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isFalse()); + ctx.verify(() -> { + LongStream.range(0, 2).forEach(offset -> { + assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isFalse(); + }); + }); + final Checkpoint commitCheckDone = ctx.checkpoint(1); - // partitions revoked handler shall get called after the blocking partitions-revoked handling has waited for the records to be marked as completed consumer.setOnPartitionsRevokedHandler(s -> { - ctx.verify(() -> assertThat(recordsHandlingPromiseMap.get(1L).future().isComplete()).isTrue()); + // (3) this partitions revoked handler is called after the blocking partitions-revoked handling (2) + // has waited for the records to be marked as completed and after the offsets were committed + // (we can't check for committed offsets of the just revoked partition here because + // mockConsumer.committed() only returns offsets of assigned partitions) + ctx.verify(() -> { + LongStream.range(0, 2).forEach(offset -> { + assertThat(recordsHandlingPromiseMap.get(offset).future().isComplete()).isTrue(); + }); + }); }); - final Checkpoint commitCheckDone = ctx.laxCheckpoint(1); consumer.setOnPartitionsAssignedHandler(partitions -> { - final var committed = mockConsumer.committed(Set.of(TOPIC_PARTITION)); - final var offsetAndMetadata = committed.get(TOPIC_PARTITION); + if (partitions.isEmpty()) { + // (4) ignore if invoked when all partitions got revoked (1); only the subsequent invocation with assigned partitions is relevant + return; + } + // (5) ensure all offsets were committed + final var committedOffsets = mockConsumer.committed(Set.of(TOPIC_PARTITION)); + LOG.debug("committed partition offsets: {}", committedOffsets); + final var offsetAndMetadata = committedOffsets.get(TOPIC_PARTITION); ctx.verify(() -> { assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(numTestRecords); }); - if (offsetAndMetadata.offset() == numTestRecords) { - commitCheckDone.flag(); - } + commitCheckDone.flag(); }); - // trigger a rebalance where the currently assigned partition is revoked - // (and then assigned again - otherwise its offset wouldn't be returned by mockConsumer.committed()) - // the remaining 2 records are to be marked as completed with some delay + onNextPartitionsRevokedBlockingHandlerRef.set(v -> { + // (2) handler to complete the remaining record handling promises (on the Kafka polling thread; invoked before the OnPartitionsRevokedHandler) consumerVertxContext.runOnContext(v2 -> { + LOG.debug("complete remaining record handling promises"); recordsHandlingPromiseMap.get(0L).complete(); recordsHandlingPromiseMap.get(1L).complete(); }); + // trigger another rebalance; this time the partition is assigned again; + // this means we can then (see (5)) check the committed offsets (only available from MockConsumer for currently assigned partitions) + mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION)); }); - mockConsumer.setRevokeAllOnRebalance(true); - mockConsumer.updateBeginningOffsets(Map.of(TOPIC2_PARTITION, 0L)); - mockConsumer.updateEndOffsets(Map.of(TOPIC2_PARTITION, 0L)); - mockConsumer.setNextPollRebalancePartitionAssignment(List.of(TOPIC_PARTITION, TOPIC2_PARTITION)); + // (1) Trigger a rebalance where the currently assigned partition is revoked (via mockConsumer.setNextPollRebalancePartitionAssignment(List.of())). + // Since the records at offsets 0 and 1 are not yet completely handled here, the revocation logic will + // wait some time (up to 300ms by default) for the handling to be marked as completed, until committing offsets of completed records. + // The wait time will not be long here, since the blocking partitions-revoked handler above (2) will complete + // the remaining 2 record-handling promises shortly after the partition revocation (via consumerVertxContext.runOnContext()). + mockConsumer.setNextPollRebalancePartitionAssignment(List.of()); } /** @@ -674,7 +657,7 @@ public void testConsumerCommitsInitialOffset(final VertxTestContext ctx) throws mockConsumer.setRebalancePartitionAssignmentAfterSubscribe(List.of(TOPIC_PARTITION)); final VertxTestContext consumerStartedCtx = new VertxTestContext(); - final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.laxCheckpoint(2); + final Checkpoint consumerStartedCheckpoint = consumerStartedCtx.checkpoint(2); consumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Set.of(TOPIC), handler, consumerConfig); consumer.setKafkaConsumerSupplier(() -> mockConsumer); consumer.setOnRebalanceDoneHandler(s -> { @@ -816,7 +799,7 @@ public void testScenarioWithPartitionRevokedWhileHandlingIncomplete(final VertxT recordsHandlingPromiseMap.get(1L).complete(); recordsHandlingPromiseMap.get(2L).complete(); - final Checkpoint commitCheckDone = ctx.laxCheckpoint(1); + final Checkpoint commitCheckDone = ctx.checkpoint(1); consumer.setOnPartitionsAssignedHandler(partitions -> { LOG.info("rebalancing ..."); final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION)); @@ -895,13 +878,15 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord< final Checkpoint commitCheckpoint = commitCheckContext.checkpoint(1); consumer.setOnPartitionsAssignedHandler(partitions -> { - final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION)); - ctx.verify(() -> { - final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION); - assertThat(offsetAndMetadata).isNotNull(); - assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L); - }); - commitCheckpoint.flag(); + if (!partitions.isEmpty()) { + final Map committed = mockConsumer.committed(Set.of(TOPIC_PARTITION)); + ctx.verify(() -> { + final OffsetAndMetadata offsetAndMetadata = committed.get(TOPIC_PARTITION); + assertThat(offsetAndMetadata).isNotNull(); + assertThat(offsetAndMetadata.offset()).isEqualTo(latestFullyHandledOffset + 1L); + }); + commitCheckpoint.flag(); + } }); // now force a rebalance which should trigger the above onPartitionsAssignedHandler // (rebalance is done as part of the poll() invocation; the vert.x consumer will schedule that invocation @@ -910,6 +895,7 @@ protected void onRecordHandlerSkippedForExpiredRecord(final KafkaConsumerRecord< final CountDownLatch latch = new CountDownLatch(1); consumerVertxContext.runOnContext(v -> latch.countDown()); latch.await(); + mockConsumer.rebalance(List.of()); mockConsumer.rebalance(List.of(TOPIC_PARTITION)); assertWithMessage("partition assigned in 5s for checking of commits") .that(commitCheckContext.awaitCompletion(5, TimeUnit.SECONDS)) @@ -941,18 +927,4 @@ private ConsumerRecord createRecordWithElapsedTtl() { new RecordHeaders(new Header[] { ttl, creationTime }), Optional.empty()); } - - /** - * Supplier whose get() method might throw an {@link InterruptedException}. - * @param The type of results supplied by this supplier. - */ - @FunctionalInterface - interface InterruptableSupplier { - /** - * Gets a result. - * @return The result. - * @throws InterruptedException If getting the result was interrupted. - */ - T get() throws InterruptedException; - } } diff --git a/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java b/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java index be450dc876..db50c32150 100644 --- a/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java +++ b/test-utils/kafka-test-utils/src/main/java/org/eclipse/hono/kafka/test/KafkaMockConsumer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2021 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -21,7 +21,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -51,7 +50,6 @@ public class KafkaMockConsumer extends MockConsumer { private final AtomicBoolean skipSettingClosedFlagOnNextClose = new AtomicBoolean(); private final List>> commitListeners = new ArrayList<>(); - private boolean revokeAllOnRebalance = true; private Collection nextPollRebalancePartitionAssignment; private Collection onSubscribeRebalancePartitionAssignment; private ConsumerRebalanceListener rebalanceListener; @@ -81,18 +79,6 @@ private static PartitionInfo getPartitionInfo(final String topic, final int part return new PartitionInfo(topic, partition, node, replicas, replicas); } - /** - * Sets whether the onPartitionsRevoked method shall be invoked with all currently assigned partitions when - * a rebalance is triggered. - * If set to {@code false}, only the partitions will be revoked that are not in the list of newly assigned - * partitions. - * - * @param revokeAllOnRebalance {@code true} if all assigned partitions shall be revoked on a rebalance. - */ - public void setRevokeAllOnRebalance(final boolean revokeAllOnRebalance) { - this.revokeAllOnRebalance = revokeAllOnRebalance; - } - /** * Marks the following subscribe() invocations to be followed by a rebalance with the given partition * assignment, if the given assignment collection isn't {@code null}. The rebalance will be invoked @@ -145,19 +131,6 @@ public synchronized ConsumerRecords poll(final Duration timeout) { return super.poll(timeout); } - @Override - public synchronized void rebalance(final Collection newAssignment) { - Optional.ofNullable(rebalanceListener) - .ifPresent(listener -> { - listener.onPartitionsRevoked(assignment().stream() - .filter(tp -> revokeAllOnRebalance || !newAssignment.contains(tp)) - .collect(Collectors.toList())); - }); - super.rebalance(newAssignment); - Optional.ofNullable(rebalanceListener) - .ifPresent(listener -> listener.onPartitionsAssigned(newAssignment)); - } - /** * Skips setting the "closed" flag on the next close() invocation. *