From bbcbba63f9245baa6d0a0a4235688e72fccd688d Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 10 Feb 2025 13:59:40 +0100 Subject: [PATCH] Fix reentrant close in InboundMessageHandler Motivation: The InboundMessageHandler clears message channel upon consumer side close, this operation should be delated when called from a consumer drain as it corrupts the message channel state. This happens in the new event-bus message consumer when consumer is unregistered when handling an event-bus message. Changes: When InboundMessageHandler consumer is closed when handling a message, the release message is delayed until the drain operation is finished to avoid corrupting the structure. --- .../concurrent/InboundMessageChannel.java | 40 ++++++++++++------- ...InboundMessageChannelSingleThreadTest.java | 27 ++++++++++++- .../tests/eventbus/LocalEventBusTest.java | 23 +++++++++++ 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java b/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java index 1093b3e7d5c..3f6562295ac 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/concurrent/InboundMessageChannel.java @@ -72,16 +72,20 @@ public InboundMessageChannel(EventExecutor producer, EventExecutor consumer, int @Override public final boolean test(M msg) { - while (true) { - long d = DEMAND_UPDATER.get(this); - if (d == 0L) { - return false; - } else if (d == Long.MAX_VALUE || DEMAND_UPDATER.compareAndSet(this, d, d - 1)) { - break; + if (consumerClosed) { + return false; + } else { + while (true) { + long d = DEMAND_UPDATER.get(this); + if (d == 0L) { + return false; + } else if (d == Long.MAX_VALUE || DEMAND_UPDATER.compareAndSet(this, d, d - 1)) { + break; + } } + handleMessage(msg); + return true; } - handleMessage(msg); - return true; } /** @@ -162,12 +166,16 @@ private void drainInternal() { draining = true; try { int res = messageChannel.drain(); - needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0; - if ((res & MessageChannel.WRITABLE_MASK) != 0) { - if (producer.inThread()) { - handleResume(); - } else { - producer.execute(this::handleResume); + if (consumerClosed) { + releaseMessages(); + } else { + needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0; + if ((res & MessageChannel.WRITABLE_MASK) != 0) { + if (producer.inThread()) { + handleResume(); + } else { + producer.execute(this::handleResume); + } } } } finally { @@ -242,7 +250,9 @@ public final void closeConsumer() { return; } consumerClosed = true; - releaseMessages(); + if (!draining) { + releaseMessages(); + } } private void releaseMessages() { diff --git a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java index 8fd1898931a..705d43327e8 100644 --- a/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/concurrent/InboundMessageChannelSingleThreadTest.java @@ -295,7 +295,7 @@ public void testPauseInHandlerSignalsFullImmediately() { } @Test - public void testReentrantClose() { + public void testWriteWhenClosing() { AtomicInteger emitted = new AtomicInteger(); List dropped = Collections.synchronizedList(new ArrayList<>()); queue = new TestChannel(elt -> emitted.incrementAndGet(), 4, 4) { @@ -316,4 +316,29 @@ protected void handleDispose(Integer msg) { testComplete(); }); } + + @Test + public void testCloseWhenDraining() { + List emitted = Collections.synchronizedList(new ArrayList<>());; + List dropped = Collections.synchronizedList(new ArrayList<>()); + queue = new TestChannel(elt -> { + emitted.add(elt); + if (elt == 0) { + queue.close(); + assertEquals(Collections.emptyList(), dropped); + } + }, 4, 4) { + @Override + protected void handleDispose(Integer msg) { + dropped.add(msg); + } + }; + producerTask(() -> { + queue.fill(); + assertEquals(List.of(1, 2, 3), dropped); + assertEquals(List.of(0), emitted); + testComplete(); + }); + await(); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/LocalEventBusTest.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/LocalEventBusTest.java index c422822d0f2..0d8657fb436 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/LocalEventBusTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/LocalEventBusTest.java @@ -1459,4 +1459,27 @@ public void testMessageConsumptionStayOnWorkerThreadAfterResume() { }); await(); } + + @Test + public void testUnregisterInHandler() { + waitFor(2); + MessageConsumerImpl consumer = (MessageConsumerImpl) vertx.eventBus().consumer(ADDRESS1); + consumer.discardHandler(msg -> { + assertEquals("msg-2", msg.body()); + complete(); + }); + consumer.handler(msg -> { + consumer.unregister(); + vertx.runOnContext(v -> { + complete(); + }); + }); + consumer.pause(); + vertx.eventBus().send(ADDRESS1, "msg-1"); + vertx.eventBus().send(ADDRESS1, "msg-2"); + vertx.runOnContext(v -> { + consumer.resume(); + }); + await(); + } }