Skip to content

Commit

Permalink
Fix reentrant close in InboundMessageHandler
Browse files Browse the repository at this point in the history
Motivation:

The InboundMessageHandler clears message channel upon consumer side close, this operation should be delated when called from a consumer drain as it corrupts the message channel state. This happens in the new event-bus message consumer when consumer is unregistered when handling an event-bus message.

Changes:

When InboundMessageHandler consumer is closed when handling a message, the release message is delayed until the drain operation is finished to avoid corrupting the structure.
  • Loading branch information
vietj committed Feb 10, 2025
1 parent 825eacd commit bbcbba6
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,20 @@ public InboundMessageChannel(EventExecutor producer, EventExecutor consumer, int

@Override
public final boolean test(M msg) {
while (true) {
long d = DEMAND_UPDATER.get(this);
if (d == 0L) {
return false;
} else if (d == Long.MAX_VALUE || DEMAND_UPDATER.compareAndSet(this, d, d - 1)) {
break;
if (consumerClosed) {
return false;
} else {
while (true) {
long d = DEMAND_UPDATER.get(this);
if (d == 0L) {
return false;
} else if (d == Long.MAX_VALUE || DEMAND_UPDATER.compareAndSet(this, d, d - 1)) {
break;
}
}
handleMessage(msg);
return true;
}
handleMessage(msg);
return true;
}

/**
Expand Down Expand Up @@ -162,12 +166,16 @@ private void drainInternal() {
draining = true;
try {
int res = messageChannel.drain();
needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
if ((res & MessageChannel.WRITABLE_MASK) != 0) {
if (producer.inThread()) {
handleResume();
} else {
producer.execute(this::handleResume);
if (consumerClosed) {
releaseMessages();
} else {
needsDrain = (res & MessageChannel.DRAIN_REQUIRED_MASK) != 0;
if ((res & MessageChannel.WRITABLE_MASK) != 0) {
if (producer.inThread()) {
handleResume();
} else {
producer.execute(this::handleResume);
}
}
}
} finally {
Expand Down Expand Up @@ -242,7 +250,9 @@ public final void closeConsumer() {
return;
}
consumerClosed = true;
releaseMessages();
if (!draining) {
releaseMessages();
}
}

private void releaseMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void testPauseInHandlerSignalsFullImmediately() {
}

@Test
public void testReentrantClose() {
public void testWriteWhenClosing() {
AtomicInteger emitted = new AtomicInteger();
List<Integer> dropped = Collections.synchronizedList(new ArrayList<>());
queue = new TestChannel(elt -> emitted.incrementAndGet(), 4, 4) {
Expand All @@ -316,4 +316,29 @@ protected void handleDispose(Integer msg) {
testComplete();
});
}

@Test
public void testCloseWhenDraining() {
List<Integer> emitted = Collections.synchronizedList(new ArrayList<>());;
List<Integer> dropped = Collections.synchronizedList(new ArrayList<>());
queue = new TestChannel(elt -> {
emitted.add(elt);
if (elt == 0) {
queue.close();
assertEquals(Collections.emptyList(), dropped);
}
}, 4, 4) {
@Override
protected void handleDispose(Integer msg) {
dropped.add(msg);
}
};
producerTask(() -> {
queue.fill();
assertEquals(List.of(1, 2, 3), dropped);
assertEquals(List.of(0), emitted);
testComplete();
});
await();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1459,4 +1459,27 @@ public void testMessageConsumptionStayOnWorkerThreadAfterResume() {
});
await();
}

@Test
public void testUnregisterInHandler() {
waitFor(2);
MessageConsumerImpl<Object> consumer = (MessageConsumerImpl<Object>) vertx.eventBus().consumer(ADDRESS1);
consumer.discardHandler(msg -> {
assertEquals("msg-2", msg.body());
complete();
});
consumer.handler(msg -> {
consumer.unregister();
vertx.runOnContext(v -> {
complete();
});
});
consumer.pause();
vertx.eventBus().send(ADDRESS1, "msg-1");
vertx.eventBus().send(ADDRESS1, "msg-2");
vertx.runOnContext(v -> {
consumer.resume();
});
await();
}
}

0 comments on commit bbcbba6

Please sign in to comment.