Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 6, 2024
1 parent 85d10a1 commit 9e405cd
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 151 deletions.
257 changes: 136 additions & 121 deletions vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class TaskQueue {

// @protectedby tasks
private final LinkedList<Task> tasks = new LinkedList<>();
private final Set<ContinuationTask> suspended = new HashSet<>();
private final Set<ContinuationTask> suspendedTasks = new HashSet<>();
private boolean closed;
private Executor currentExecutor;
private Thread currentThread;
Expand Down Expand Up @@ -87,6 +87,139 @@ private void run() {
}
}

/**
* A task of this queue.
*/
private interface Task {
}

/**
* Return a continuation task for the current task execution.
*
* @return the controller
* @throws IllegalStateException if the current thread is not currently being executed by the queue
*/
private ContinuationTask continuationTask() {
Thread thread;
Executor executor;
synchronized (tasks) {
if (Thread.currentThread() != currentThread) {
throw new IllegalStateException();
}
thread = currentThread;
executor = currentExecutor;
}
return new ContinuationTask(thread, executor);
}

/**
* Run a task.
*
* @param task the task to run.
*/
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
synchronized (tasks) {
if (closed) {
throw new RejectedExecutionException("Closed");
}
if (currentExecutor == null) {
currentExecutor = executor;
try {
executor.execute(runner);
} catch (RejectedExecutionException e) {
currentExecutor = null;
throw e;
}
}
// Add the task after the runner has been accepted to the executor
// to cover the case of a rejected execution exception.
tasks.add(new ExecuteTask(task, executor));
}
}

/**
* Test if the task queue is empty and no current executor is running anymore.
*/
public boolean isEmpty() {
synchronized (tasks) {
return tasks.isEmpty() && currentExecutor == null;
}
}

/**
* Structure holding the queue state at close time.
*/
public final static class CloseResult {

private final Thread activeThread;
private final List<Runnable> pendingTasks;
private final List<Thread> suspendedThreads;

private CloseResult(Thread activeThread, List<Thread> suspendedThreads, List<Runnable> pendingTasks) {
this.activeThread = activeThread;
this.suspendedThreads = suspendedThreads;
this.pendingTasks = pendingTasks;
}

/**
* @return the thread that was active
*/
public Thread activeThread() {
return activeThread;
}

/**
* @return the list of pending tasks
*/
public List<Runnable> pendingTasks() {
return pendingTasks;
}

/**
* @return the list of suspended threads
*/
public List<Thread> suspendedThreads() {
return suspendedThreads;
}
}

/**
* Close the queue.
*
* @return a structure of suspended threads and pending tasks
*/
public CloseResult close() {
List<Runnable> pendingTasks = Collections.emptyList();
List<Thread> suspendedThreads;
Thread currentThread;
synchronized (tasks) {
if (closed) {
throw new IllegalStateException("Already closed");
}
suspendedThreads = new ArrayList<>(suspendedTasks.size() + 1);
for (Task t : tasks) {
if (t instanceof ExecuteTask) {
if (pendingTasks.isEmpty()) {
pendingTasks = new LinkedList<>();
}
pendingTasks.add(((ExecuteTask)t).runnable);
} else if (t instanceof ContinuationTask) {
ContinuationTask rt = (ContinuationTask) t;
suspendedThreads.add(rt.thread);
}
}
tasks.clear();
for (ContinuationTask task : suspendedTasks) {
suspendedThreads.add(task.thread);
}
suspendedTasks.clear();
currentThread = this.currentThread;
currentExecutor = null;
closed = true;
}
return new CloseResult(currentThread, suspendedThreads, pendingTasks);
}

private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {

private static final int ST_CREATED = 0, ST_SUSPENDED = 1, ST_RESUMED = 2;
Expand All @@ -111,7 +244,7 @@ public void resume(Runnable callback) {
}
switch (status) {
case ST_SUSPENDED:
boolean removed = suspended.remove(this);
boolean removed = suspendedTasks.remove(this);
assert removed;
latch = () -> {
callback.run();
Expand Down Expand Up @@ -157,7 +290,7 @@ public boolean suspend() {
throw new IllegalStateException();
}
status = ST_SUSPENDED;
boolean added = suspended.add(this);
boolean added = suspendedTasks.add(this);
assert added;
currentThread = null;
}
Expand All @@ -180,124 +313,6 @@ public CountDownLatch suspend(Consumer<WorkerExecutor.Continuation> abc) {
return null;
}
}
/**
* Return a continuation task for the current task execution.
*
* @return the controller
* @throws IllegalStateException if the current thread is not currently being executed by the queue
*/
private ContinuationTask continuationTask() {
Thread thread;
Executor executor;
synchronized (tasks) {
if (Thread.currentThread() != currentThread) {
throw new IllegalStateException();
}
thread = currentThread;
executor = currentExecutor;
}
return new ContinuationTask(thread, executor);
}

/**
* Run a task.
*
* @param task the task to run.
*/
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
synchronized (tasks) {
if (closed) {
throw new RejectedExecutionException("Closed");
}
if (currentExecutor == null) {
currentExecutor = executor;
try {
executor.execute(runner);
} catch (RejectedExecutionException e) {
currentExecutor = null;
throw e;
}
}
// Add the task after the runner has been accepted to the executor
// to cover the case of a rejected execution exception.
tasks.add(new ExecuteTask(task, executor));
}
}

/**
* Test if the task queue is empty and no current executor is running anymore.
*/
public boolean isEmpty() {
synchronized (tasks) {
return tasks.isEmpty() && currentExecutor == null;
}
}

public static class CloseResult {
public final List<Runnable> pendingTasks;
public final List<Thread> suspendedThreads;
public CloseResult(List<Thread> suspendedThreads, List<Runnable> pendingTasks) {
this.suspendedThreads = suspendedThreads;
this.pendingTasks = pendingTasks;
}
}

public CloseResult close() {
List<Runnable> drops = Collections.emptyList();
List<Thread> toInterrupt;
synchronized (tasks) {
if (closed) {
throw new IllegalStateException("Already closed");
}
toInterrupt = new ArrayList<>(suspended.size() + 1);
for (Task t : tasks) {
if (t instanceof ExecuteTask) {
if (drops.isEmpty()) {
drops = new LinkedList<>();
}
drops.add(((ExecuteTask)t).runnable);
} else if (t instanceof ContinuationTask) {
ContinuationTask rt = (ContinuationTask) t;
toInterrupt.add(rt.thread);
}
}
tasks.clear();
for (ContinuationTask b : suspended) {
toInterrupt.add(b.thread);
}
suspended.clear();
if (currentThread != null) {
toInterrupt.add(currentThread);
}
currentExecutor = null;
closed = true;
}
/*
for (Thread t : toInterrupt) {
t.interrupt();
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (curr != null) {
curr.interrupt();
try {
curr.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
*/
return new CloseResult(toInterrupt, drops);
}

/**
* A task of this queue.
*/
interface Task {
}

/**
* Execute another task
Expand Down
19 changes: 13 additions & 6 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,25 +614,32 @@ public ContextImpl createContext(ThreadingModel threadingModel,
ClassLoader tccl) {
if (closeFuture != null) {
closeFuture.add(completion -> {
TaskQueue.CloseResult drops = orderedTasks.close();
for (Runnable pendingTask : drops.pendingTasks) {
TaskQueue.CloseResult closeResult = orderedTasks.close();
for (Runnable pendingTask : closeResult.pendingTasks()) {
if (pendingTask instanceof ExecuteBlockingTask) {
ExecuteBlockingTask<?> t = (ExecuteBlockingTask<?>) pendingTask;
t.reject();
}
}
if (!drops.suspendedThreads.isEmpty()) {
if (!closeResult.suspendedThreads().isEmpty() || closeResult.activeThread() != null) {
Executor exec = virtualThreadExecutor != null ? virtualThreadExecutor : internalWorkerPool.executor();
exec
.execute(() -> {
// todo : rewrite this in master to avoid requiring the internal worker pool
// Maintain context invariant: serialize task execution
for (Thread suspendedThread : drops.suspendedThreads) {
for (Thread suspendedThread : closeResult.suspendedThreads()) {
suspendedThread.interrupt();
try {
suspendedThread.join(5_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException ignore) {
}
}
Thread activeThread = closeResult.activeThread();
if (activeThread != null) {
activeThread.interrupt();
try {
activeThread.join(5_000);
} catch (InterruptedException ignore) {
}
}
completion.complete();
Expand Down
28 changes: 5 additions & 23 deletions vertx-core/src/test/java/io/vertx/tests/context/TaskQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,6 @@ public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
Assertions.assertThat(taskQueue.isEmpty()).isTrue();
}

@Test
public void testClose() {
TaskQueue taskQueue = new TaskQueue();
Deque<Runnable> pending = new ConcurrentLinkedDeque<>();
Executor executor = pending::add;
taskQueue.execute(() -> {
CountDownLatch latch = taskQueue.suspend(cont -> cont.resume());
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor);
assertEquals(1, pending.size());
pending.poll().run();
// assertEquals(1, pending.size());
taskQueue.close();
}

@Test
public void testClosePendingTasks() {
TaskQueue taskQueue = new TaskQueue();
Expand All @@ -231,8 +212,8 @@ public void testClosePendingTasks() {
taskQueue.execute(task, executor);
assertEquals(1, pending.size());
TaskQueue.CloseResult result = taskQueue.close();
assertEquals(1, result.pendingTasks.size());
assertSame(task, result.pendingTasks.get(0));
assertEquals(1, result.pendingTasks().size());
assertSame(task, result.pendingTasks().get(0));
}

@Test
Expand All @@ -256,7 +237,8 @@ public void testCloseBeforeSuspend() {
}, exec);
Runnable t = pending.pop();
t.run();
assertTrue(result.get().suspendedThreads.size() == 1);
assertTrue(result.get().suspendedThreads().isEmpty());
assertNotNull(result.get().activeThread());
}

@Test
Expand Down Expand Up @@ -284,7 +266,7 @@ public void testCloseBetweenSuspendAndAwait() {
AtomicBoolean closed = new AtomicBoolean();
Thread th = new Thread(() -> {
TaskQueue.CloseResult res = taskQueue.close();
res.suspendedThreads.get(0).interrupt();
res.suspendedThreads().get(0).interrupt();
closed.set(true);
});
th.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import io.vertx.tests.deployment.VirtualThreadDeploymentTest;
import org.junit.Assume;
Expand Down

0 comments on commit 9e405cd

Please sign in to comment.