Skip to content

Commit

Permalink
Rework the TaskQueue implementation as well as implementing a close s…
Browse files Browse the repository at this point in the history
…tate.

Motivation:

The TaskQueue does not implement closeability, since now such queue can hold many suspended virtual thread tasks, we should provide a way to deal those threads (interrupt) when the context holding the queue is closed, e.g.  undeploying a verticle or closing a vertx instance.

Changes:

Implement a TaskQueue close method that returns the list of current thread being suspended. The context holding the queue can close the queue when the context close future is destroyed.
  • Loading branch information
vietj committed Oct 3, 2024
1 parent a391ed5 commit c27b685
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 194 deletions.
5 changes: 2 additions & 3 deletions vertx-core/src/main/java/io/vertx/core/Future.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.Utils;
import io.vertx.core.impl.future.CompositeFutureImpl;
Expand Down Expand Up @@ -715,9 +716,7 @@ default T await(long timeout, TimeUnit unit) throws TimeoutException {
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
CountDownLatch latch;
if (executor != null) {
io.vertx.core.impl.WorkerExecutor.TaskController cont = executor.current();
onComplete(ar -> cont.resume());
latch = cont.suspend();
latch = executor.suspend(cont -> onComplete(ar -> cont.resume()));
} else {
latch = new CountDownLatch(1);
onComplete(ar -> latch.countDown());
Expand Down
204 changes: 147 additions & 57 deletions vertx-core/src/main/java/io/vertx/core/impl/TaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;

import java.util.LinkedList;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;

/**
* A task queue that always run all tasks in order. The executor to run the tasks is passed
Expand All @@ -37,6 +38,8 @@ public class TaskQueue {

// @protectedby tasks
private final LinkedList<Task> tasks = new LinkedList<>();
private final Set<ContinuationTask> suspended = new HashSet<>();
private boolean closed;
private Executor currentExecutor;
private Thread currentThread;

Expand All @@ -50,13 +53,16 @@ private void run() {
for (; ; ) {
final ExecuteTask execute;
synchronized (tasks) {
if (closed) {
return;
}
Task task = tasks.poll();
if (task == null) {
currentExecutor = null;
return;
}
if (task instanceof ResumeTask) {
ResumeTask resume = (ResumeTask) task;
if (task instanceof ContinuationTask) {
ContinuationTask resume = (ContinuationTask) task;
currentExecutor = resume.executor;
currentThread = resume.thread;
resume.latch.run();
Expand All @@ -81,13 +87,104 @@ private void run() {
}
}

private class ContinuationTask extends CountDownLatch implements WorkerExecutor.Continuation, Task {

private static final int ST_CREATED = 0, ST_SUSPENDED = 1, ST_RESUMED = 2;

private final Thread thread;
private final Executor executor;
private int status;
private Runnable latch;

public ContinuationTask(Thread thread, Executor executor) {
super(1);
this.thread = thread;
this.executor = executor;
this.status = ST_CREATED;
}

@Override
public void resume(Runnable callback) {
synchronized (tasks) {
if (closed) {
return;
}
switch (status) {
case ST_SUSPENDED:
boolean removed = suspended.remove(this);
assert removed;
latch = () -> {
callback.run();
countDown();
};
break;
case ST_CREATED:
latch = callback;
break;
default:
throw new IllegalStateException();
}
status = ST_RESUMED;
if (currentExecutor != null) {
tasks.addFirst(this);
return;
}
currentExecutor = executor;
currentThread = thread;
}
latch.run();
}

public boolean suspend() {
if (Thread.currentThread() != thread) {
throw new IllegalStateException();
}
synchronized (tasks) {
if (closed) {
assert thread.isInterrupted();
return false;
}
if (currentThread == null || currentThread != thread) {
throw new IllegalStateException();
}
switch (status) {
case ST_RESUMED:
countDown();
return true;
case ST_SUSPENDED:
throw new IllegalStateException();
}
status = ST_SUSPENDED;
boolean added = suspended.add(this);
assert added;
currentThread = null;
}
executor.execute(runner);
return true;
}
}

public CountDownLatch suspend() {
return suspend(cont -> {});
}

public CountDownLatch suspend(Consumer<WorkerExecutor.Continuation> abc) {
ContinuationTask continuationTask = continuationTask();
abc.accept(continuationTask);
if (continuationTask.suspend()) {
return continuationTask;
} else {
// Closed
return new CountDownLatch(0);
}
}
/**
* Return a controller for the current task.
* Return a continuation task for the current task execution.
*
* @return the controller
* @throws IllegalStateException if the current thread is not currently being executed by the queue
*/
public WorkerExecutor.TaskController current() {
private ContinuationTask continuationTask() {
Thread thread;
Executor executor;
synchronized (tasks) {
Expand All @@ -97,51 +194,19 @@ public WorkerExecutor.TaskController current() {
thread = currentThread;
executor = currentExecutor;
}
return new WorkerExecutor.TaskController() {

final CountDownLatch latch = new CountDownLatch(1);

@Override
public void resume(Runnable callback) {
Runnable task = () -> {
callback.run();
latch.countDown();
};
synchronized (tasks) {
if (currentExecutor != null) {
tasks.addFirst(new ResumeTask(task, executor, thread));
return;
}
currentExecutor = executor;
currentThread = thread;
}
task.run();
}

@Override
public CountDownLatch suspend() {
if (Thread.currentThread() != thread) {
throw new IllegalStateException();
}
synchronized (tasks) {
if (currentThread == null || currentThread != Thread.currentThread()) {
throw new IllegalStateException();
}
currentThread = null;
}
executor.execute(runner);
return latch;
}
};
return new ContinuationTask(thread, executor);
}

/**
* Run a task.
*
* @param task the task to run.
*/
public void execute(Runnable task, Executor executor) {
public void execute(Runnable task, Executor executor) throws RejectedExecutionException {
synchronized (tasks) {
if (closed) {
throw new RejectedExecutionException("Closed");
}
if (currentExecutor == null) {
currentExecutor = executor;
try {
Expand All @@ -166,10 +231,49 @@ public boolean isEmpty() {
}
}

public List<Runnable> close() {
List<Runnable> drops = Collections.emptyList();
List<Thread> toInterrupt;
Thread curr;
synchronized (tasks) {
if (closed) {
throw new IllegalStateException("Already closed");
}
toInterrupt = new ArrayList<>(suspended.size());
for (Task t : tasks) {
if (t instanceof ExecuteTask) {
if (drops.isEmpty()) {
drops = new LinkedList<>();
}
drops.add(((ExecuteTask)t).runnable);
} else if (t instanceof ContinuationTask) {
ContinuationTask rt = (ContinuationTask) t;
toInterrupt.add(rt.thread);
}
}
tasks.clear();
for (ContinuationTask b : suspended) {
toInterrupt.add(b.thread);
}
suspended.clear();
curr = currentThread;
currentExecutor = null;
currentThread = null;
closed = true;
}
for (Thread t : toInterrupt) {
t.interrupt();
}
if (curr != null) {
curr.interrupt();
}
return drops;
}

/**
* A task of this queue.
*/
private interface Task {
interface Task {
}

/**
Expand All @@ -183,18 +287,4 @@ public ExecuteTask(Runnable runnable, Executor exec) {
this.exec = exec;
}
}

/**
* Resume an existing task blocked on a thread
*/
private static class ResumeTask implements Task {
private final Runnable latch;
private final Executor executor;
private final Thread thread;
ResumeTask(Runnable latch, Executor executor, Thread thread) {
this.latch = latch;
this.executor = executor;
this.thread = thread;
}
}
}
12 changes: 12 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,18 @@ public ContextImpl createContext(ThreadingModel threadingModel,
CloseFuture closeFuture,
DeploymentContext deployment,
ClassLoader tccl) {
try {
closeFuture.add(completion -> {
List<Runnable> drops = orderedTasks.close();
for (Runnable drop : drops) {
WorkerTask<?> t = (WorkerTask<?>) drop;
t.reject();
}
completion.complete();
});
} catch (Exception e) {
e.printStackTrace(System.out);
}
return new ContextImpl(this,
createContextLocals(),
eventLoopExecutor,
Expand Down
42 changes: 13 additions & 29 deletions vertx-core/src/main/java/io/vertx/core/impl/WorkerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@
package io.vertx.core.impl;

import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.spi.metrics.PoolMetrics;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

/**
* Execute events on a worker pool.
Expand Down Expand Up @@ -85,17 +81,24 @@ public void execute(Runnable command) {
}

/**
* See {@link TaskQueue#current()}.
* Suspend the current task execution until the task is resumed, the next task in the queue will be executed
* when there is one.
*
* <p>The {@code resumeAcceptor} argument is passed the continuation to resume the current task, this acceptor
* is guaranteed to be called before the task is actually suspended so the task can be eagerly resumed and avoid
* suspending the current task.
*
* @return the latch to wait for until the current task can resume
*/
public TaskController current() {
return orderedTasks.current();
public CountDownLatch suspend(Consumer<Continuation> resumeAcceptor) {
return orderedTasks.suspend(resumeAcceptor);
}

public interface TaskController {
public interface Continuation {

/**
* Resume the task, the {@code callback} will be executed when the task is resumed, before the task thread
* is unparked.
* is un-parked.
*
* @param callback called when the task is resumed
*/
Expand All @@ -107,24 +110,5 @@ public interface TaskController {
default void resume() {
resume(() -> {});
}

/**
* Suspend the task execution and park the current thread until the task is resumed.
* The next task in the queue will be executed, when there is one.
*
* <p>When the task wants to be resumed, it should call {@link #resume}, this will be executed immediately if there
* is no other tasks being executed, otherwise it will be added first in the queue.
*/
default void suspendAndAwaitResume() throws InterruptedException {
suspend().await();
}

/**
* Like {@link #suspendAndAwaitResume()} but does not await the task to be resumed.
*
* @return the latch to await
*/
CountDownLatch suspend();

}
}
Loading

0 comments on commit c27b685

Please sign in to comment.