Skip to content

Commit

Permalink
Add a blocking iterator utility for Vert.x read streams.
Browse files Browse the repository at this point in the history
Motivation:

With virtual threads, it might be a common thing to adapt a Vert.x read stream to a blocking Java iterator. The primary use case of this is the gRPC blocking client generation.

Changes:

An iterator adapter for a Vert.x read stream.

The implementation uses java lock and lock conditions to let the producer thread signal consumer threads.

While the iterator should be consumed by a single thread (because of the hasNext/next racy sequence), the iterator is thread safe and tolerate multiple consumers.

The implementation consumer side is similar to a Java blocking queue and relies on lock conditions to signal state change to consumers.
  • Loading branch information
vietj committed Feb 28, 2025
1 parent 71d1cc8 commit 5a23d05
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.internal.streams;

import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.impl.Utils;
import io.vertx.core.streams.ReadStream;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Adapts a Vert.x read stream to a blocking iterator, the main use case is dispatching a Vert.x read stream
* to a virtual thread consumer.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ReadStreamIterator<E> implements Iterator<E>, Handler<E> {

private static final Throwable END_SENTINEL = new VertxException("", true);

public static <E> Iterator<E> iterator(ReadStream<E> stream) {
ReadStreamIterator<E> iterator = new ReadStreamIterator<>(stream);
iterator.init();
return iterator;
}

private final ReadStream<E> stream;
private final Queue<Object> queue;
private final Lock lock;
private final Condition consumerProgress;
private Throwable ended;

public ReadStreamIterator(ReadStream<E> stream) {
this.stream = stream;
this.queue = new ArrayDeque<>();
this.lock = new ReentrantLock();
this.consumerProgress = lock.newCondition();
}

/**
* Signal the consumer that a resume operation is required to fill the buffer again.
*/
static class Resume {
final Object elt;
public Resume(Object elt) {
this.elt = elt;
}
}

void init() {
stream.handler(this);
stream.exceptionHandler(this::handleEnd);
stream.endHandler(v -> {
handleEnd(END_SENTINEL);
});
}

public void handle(E elt) {
int size;
boolean pause;
lock.lock();
try {
size = queue.size();
pause = size == 15;
if (pause) {
stream.pause();
queue.add(new Resume(elt));
} else {
queue.add(elt);
}
consumerProgress.signal();
} finally {
lock.unlock();
}
}

private void handleEnd(Throwable cause) {
try {
stream.endHandler(null);
stream.exceptionHandler(null);
stream.handler(null);
} catch (Throwable ignore) {
}
lock.lock();
try {
ended = cause;
consumerProgress.signalAll();
} finally {
lock.unlock();
}
}

@Override
public boolean hasNext() {
lock.lock();
try {
while (true) {
if (!queue.isEmpty()) {
return true;
}
if (ended != null) {
return false;
}
try {
consumerProgress.await();
} catch (InterruptedException e) {
Utils.throwAsUnchecked(e);
}
}
} finally {
lock.unlock();
}
}

@Override
public E next() {
Object elt;
lock.lock();
try {
while (true) {
elt = queue.poll();
if (elt != null) {
break;
}
Throwable t = ended;
if (t != null) {
if (t == END_SENTINEL) {
throw new NoSuchElementException();
} else {
Utils.throwAsUnchecked(t);
}
}
try {
consumerProgress.await();
} catch (InterruptedException e) {
Utils.throwAsUnchecked(e);
}
}
} finally {
lock.unlock();
}
if (elt instanceof Resume) {
elt = ((Resume)elt).elt;
stream.resume();
}
return (E) elt;
}
}
1 change: 1 addition & 0 deletions vertx-core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
exports io.vertx.core.internal.threadchecker;
exports io.vertx.core.internal.concurrent;
exports io.vertx.core.internal.resource;
exports io.vertx.core.internal.streams;

// Testing

Expand Down
151 changes: 151 additions & 0 deletions vertx-core/src/test/java/io/vertx/tests/streams/IteratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.tests.streams;

import io.vertx.core.internal.streams.ReadStreamIterator;
import io.vertx.test.core.AsyncTestBase;
import io.vertx.test.fakestream.FakeStream;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CyclicBarrier;

public class IteratorTest extends AsyncTestBase {

@Test
public void testIteratorResuming() {
FakeStream<Integer> stream = new FakeStream<>();
stream.setWriteQueueMaxSize(0);
Iterator<Integer> iterator = ReadStreamIterator.iterator(stream);
for (int i = 0;i < 16;i++) {
assertFalse(stream.writeQueueFull());
stream.write(i);
}
stream.write(17);
assertTrue(stream.writeQueueFull());
for (int i = 0;i < 16;i++) {
iterator.next();
}
assertFalse(stream.writeQueueFull());
}

@Test
public void testEnd() {
FakeStream<Integer> stream = new FakeStream<>();
Iterator<Integer> iterator = ReadStreamIterator.iterator(stream);
for (int i = 0;i < 15;i++) {
stream.write(i);
}
stream.end();
for (int i = 0;i < 15;i++) {
assertTrue(iterator.hasNext());
iterator.next();
}
assertFalse(iterator.hasNext());
try {
iterator.next();
fail();
} catch (NoSuchElementException expected) {
}
}

@Test
public void testFail() {
FakeStream<Integer> stream = new FakeStream<>();
Iterator<Integer> iterator = ReadStreamIterator.iterator(stream);
for (int i = 0;i < 15;i++) {
stream.write(i);
}
Throwable cause = new Throwable();
stream.fail(cause);
for (int i = 0;i < 15;i++) {
assertTrue(iterator.hasNext());
iterator.next();
}
assertFalse(iterator.hasNext());
try {
iterator.next();
fail();
} catch (Throwable failure) {
assertSame(cause, failure);
}
}

@Test
public void testHasNextSignal() throws Exception {
FakeStream<Integer> stream = new FakeStream<>();
Iterator<Integer> iterator = ReadStreamIterator.iterator(stream);
int numThreads = 4;
Thread[] consumers = new Thread[numThreads];
for (int i = 0;i < numThreads;i++) {
Thread consumer = new Thread(iterator::hasNext);
consumers[i] = consumer;
consumer.start();
assertWaitUntil(() -> consumer.getState() == Thread.State.WAITING);
}
stream.end();
for (Thread consumer : consumers) {
consumer.join();
}
}

@Test
public void testConcurrentReads() throws Exception {
// While the iterator should not be used concurrently because of hasNext()/next() races
// calling next() from multiple thread is possible
FakeStream<Integer> stream = new FakeStream<>();
Iterator<Integer> iterator = ReadStreamIterator.iterator(stream);
int numThreads = 8;
int numElements = 16384;
CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
class Consumer extends Thread {
final List<Integer> consumed = new ArrayList<>();
@Override
public void run() {
try {
barrier.await();
} catch (Exception e) {
return;
}
while (true) {
try {
Integer elt = iterator.next();
consumed.add(elt);
} catch (NoSuchElementException e) {
// Done
break;
}
}
}
}
Consumer[] consumers = new Consumer[numElements];
for (int i = 0;i < numThreads;i++) {
Consumer consumer = new Consumer();
consumer.start();
consumers[i] = consumer;
}
barrier.await();
for (int i = 0;i < numElements;i++) {
stream.write(i);
}
stream.end();
ArrayList<Integer> list = new ArrayList<>();
for (int i = 0;i < numThreads;i++) {
Consumer consumer = consumers[i];
consumer.join();
list.addAll(consumer.consumed);
}
assertEquals(list.size(), numElements);
}
}

0 comments on commit 5a23d05

Please sign in to comment.