Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing a context should interrupt pending virtual thread tasks of this context #5339

Merged
merged 3 commits into from
Oct 7, 2024

Conversation

vietj
Copy link
Member

@vietj vietj commented Oct 2, 2024

No description provided.

@vietj vietj force-pushed the interrupt-suspended-virtual-threads branch from dc2610b to c27b685 Compare October 3, 2024 13:17
@vietj vietj changed the title Mockup for interrupting suspended virtual threads Closing a context should interrupt pending virtual thread tasks of this context Oct 3, 2024
@vietj vietj added this to the 5.0.0 milestone Oct 3, 2024
@vietj vietj added the bug label Oct 3, 2024
@vietj vietj self-assigned this Oct 3, 2024
@vietj vietj marked this pull request as ready for review October 3, 2024 13:23
@vietj vietj force-pushed the interrupt-suspended-virtual-threads branch 3 times, most recently from a26cf72 to d2c6248 Compare October 3, 2024 15:36
@zekronium
Copy link
Contributor

Why not simplify this by creating a VirtualThreadExecutor per context and close it there more simply. Just need to reuse the virtualThreadFactory. Because alot of the shutdown/suspension logic can be reused from the VirtualThreadExecutor

@vietj
Copy link
Member Author

vietj commented Oct 3, 2024

yes maybe that would be an idea too, I'll ahve a look tomorrow again at this

@vietj vietj force-pushed the interrupt-suspended-virtual-threads branch 2 times, most recently from b321135 to 86b3b8b Compare October 4, 2024 13:54
@vietj
Copy link
Member Author

vietj commented Oct 6, 2024

I think one issue I found @zekronium is that when close the virtual thread executor instead will unlatch all virtual thread simultaneously in the same vertx context which is not allowed.

The current approach can let us unlatch threads one by one and join them to ensure two asks are not executed at the same time, this looks cleaner to me.

e.g. here is a test that should pass:

  @Test
  public void testContextCloseContextSerialization() throws Exception {
    int num = 4;
    Assume.assumeTrue(isVirtualThreadAvailable());
    ContextInternal ctx = vertx.createVirtualThreadContext();
    Thread[] threads = new Thread[num];
    List<Promise<Void>> promises = IntStream.range(0, num).mapToObj(idx -> Promise.<Void>promise()).collect(Collectors.toList());
    Deque<CyclicBarrier> latches = new ConcurrentLinkedDeque<>();
    CyclicBarrier[] l = new CyclicBarrier[num];
    AtomicInteger count = new AtomicInteger();
    for (int i = 0;i < num;i++) {
      int idx = i;
      CyclicBarrier latch = new CyclicBarrier(2);
      l[i] = latch;
      latches.add(latch);
      ctx.runOnContext(v -> {
        threads[idx] = Thread.currentThread();
        try {
          promises.get(idx).future().await();
          fail();
        } catch (Exception e) {
          assertTrue(e instanceof InterruptedException);
          CyclicBarrier barrier = latches.removeFirst();
          int val = count.addAndGet(1);
          assertTrue(val == 1);
          try {
            barrier.await();
          } catch (Exception ex) {
            throw new RuntimeException(ex);
          } finally {
            count.decrementAndGet();
          }
        }
      });
    }
    assertWaitUntil(() -> {
      for (Thread thread : threads) {
        if (thread == null || thread.getState() != Thread.State.WAITING) {
          return false;
        }
      }
      return true;
    });
    Future<Void> f = ctx.closeFuture().close();
    for (int i = 0;i < num;i++) {
      try {
        l[i].await();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      } catch (BrokenBarrierException e) {
        throw new RuntimeException(e);
      }
    }
    f.await();
  }

@vietj vietj force-pushed the interrupt-suspended-virtual-threads branch 2 times, most recently from 0f420d1 to 85d10a1 Compare October 6, 2024 11:46
…tate.

Motivation:

The TaskQueue does not implement closeability, since now such queue can hold many suspended virtual thread tasks, we should provide a way to deal those threads (interrupt) when the context holding the queue is closed, e.g.  undeploying a verticle or closing a vertx instance.

Changes:

Implement a TaskQueue close method that returns the list of current thread being suspended. The context holding the queue can close the queue when the context close future is destroyed.
@vietj vietj force-pushed the interrupt-suspended-virtual-threads branch from 9e405cd to 50733de Compare October 6, 2024 13:18
@zekronium
Copy link
Contributor

zekronium commented Oct 6, 2024

I was under the assumption that TaskQueue wont allow that, but you are totally right. Maybe this was some of the leaking I saw when I was implementing an interrupt approach on close too. The solution I had thought was uppon interrupt on await() I would requeue the task to the executor, which would be gated by task queue, but that was not very clean and hacky

Is this your final solution? I will test if it is

@vietj
Copy link
Member Author

vietj commented Oct 7, 2024

@zekronium please have a look at #5344 which is the port to 4.x and is the best solutution I think (I will port the 4.x improvements to this branch of course).

@vietj vietj merged commit eb20c08 into master Oct 7, 2024
7 checks passed
@vietj vietj deleted the interrupt-suspended-virtual-threads branch October 7, 2024 15:37
@zekronium
Copy link
Contributor

@vietj I am encoutering the same issue I had myself when trying to shutdown the taskQueue itself with rejection. Upon shutdown or close of the context, there might be alot of events and promises fired and some of them are off context (We are in Virtual Thread land after all) and because the TaskQueue/Executor is closed, some of the events can not fire

Example with HTTP2:

java.util.concurrent.RejectedExecutionException: Closed
	at io.vertx.core.impl.TaskQueue.execute(TaskQueue.java:134) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.impl.WorkerExecutor.execute(WorkerExecutor.java:75) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:343) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:329) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.http.impl.VertxHttp2Stream.onException(VertxHttp2Stream.java:95) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.http.impl.Http2ConnectionBase.onStreamClosed(Http2ConnectionBase.java:153) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler$1.onStreamClosed(VertxHttp2ConnectionHandler.java:95) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:355) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1034) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:990) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:515) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.DefaultHttp2Connection.close(DefaultHttp2Connection.java:151) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.Http2ConnectionHandler$BaseDecoder.channelInactive(Http2ConnectionHandler.java:215) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430) ~[netty-codec-http2-4.1.111.Final.jar:4.1.111.Final]
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelInactive(VertxHttp2ConnectionHandler.java:196) ~[vertx-core-4.5.11-SNAPSHOT.jar:4.5.11-SNAPSHOT]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280) ~[netty-handler-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1172) ~[netty-handler-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1402) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:900) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:811) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

@zekronium
Copy link
Contributor

zekronium commented Oct 9, 2024

Another issue is due to failing executions, it seems to not cleanup timeouts correctly:
image
image

All the verticles are not GC'd and this is the GC ROOT. The timeouts get rescheduled

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

if the verticle is undeployed, it makes sense that the related tasks cannot be executed, otherwise undeplying might take forever ? perhaps we could add a grace period and let the queue execute tasks for a while until a timeout decides ?

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

can you provide a reproduce for this specific cases ?

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

I'll have a look at timers

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

it is weird to see the timer affected, because timers are executed on the event-loop and then transferred to context

@zekronium
Copy link
Contributor

I think the issue happens due to queueWrite in the connection. It always gets queued up, then it fails to write because the connection is closed, then it fails the promise, but to fail the promise it has to do context.emit/runOnContext which thats where the rejection exception is thrown, because it can not callback to the virtual thread. Thats the example at least in http2 i sent above. My guess is because it does not expect the callback to fail, if it does, some cleanup steps dont run.

@zekronium
Copy link
Contributor

can you provide a reproduce for this specific cases ?

Just do a virtual thread that loops requests in a loop with sleep

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

do you mean a verticle that performs HTTP client requests and then does thread sleep without vertx await , that is undeployed ? that being said if there is no await how can it get a response ? it is not clear

@zekronium
Copy link
Contributor

zekronium commented Oct 9, 2024

if the verticle is undeployed, it makes sense that the related tasks cannot be executed, otherwise undeplying might take forever ? perhaps we could add a grace period and let the queue execute tasks for a while until a timeout decides ?

I tried this and did not work, because who prevents a user to run some other virtual thread code in the callback which in reality should not run, like some other loop that should of been stopped. The best idea I had was to ALLOW internal events always and only suspend/interrupt the users virtual threads on promises and callbacks thwt the user wrote. Meaning any internal vertx callbacks, runOnContext would have to be wrapped in a different Task object type that encodes that its an internal event and is always allowed to run. Or have two types of task queue and one thats always allowed to run for internal events. The first solution seems cleaner but this approach of effectively coloring the callbacks does not sound so nice in general.

EDIT: i think it would make more sense to wrap the user events from virtual threads as suspendable and leave everything event loop as is, reverse of what I said

@zekronium
Copy link
Contributor

zekronium commented Oct 9, 2024

do you mean a verticle that performs HTTP client requests and then does thread sleep without vertx await , that is undeployed ? that being said if there is no await how can it get a response ? it is not clear

No, its always awaiting. Lets say while(contextNotClosed) get every 5000ms. Everything awaited, effectively synchronous code

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

if the verticle is undeployed, it makes sense that the related tasks cannot be executed, otherwise undeplying might take forever ? perhaps we could add a grace period and let the queue execute tasks for a while until a timeout decides ?

I tried this and did not work, because who prevents a user to run some other virtual thread code in the callback which in reality should not run, like some other loop that should of been stopped. The best idea I had was to ALLOW internal events always and only suspend/interrupt the users virtual threads on promises and callbacks thwt the user wrote. Meaning any internal vertx callbacks, runOnContext would have to be wrapped in a different Task object type that encodes that its an internal event and is always allowed to run. Or have two types of task queue and one thats always allowed to run for internal events. The first solution seems cleaner but this approach of effectively coloring the callbacks does not sound so nice in general.

have you tried closing the server in undeploy explicitely and complete undeploy when the server has stopped ?

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

do you mean a verticle that performs HTTP client requests and then does thread sleep without vertx await , that is undeployed ? that being said if there is no await how can it get a response ? it is not clear

No, its always awaiting. Lets say while(contextNotClosed) get every 5000ms. Everything awaited, effectively synchronous code

it is still unclear, can you show some code ?

@zekronium
Copy link
Contributor

do you mean a verticle that performs HTTP client requests and then does thread sleep without vertx await , that is undeployed ? that being said if there is no await how can it get a response ? it is not clear

No, its always awaiting. Lets say while(contextNotClosed) get every 5000ms. Everything awaited, effectively synchronous code

it is still unclear, can you show some code ?

A primitive example. I usually await without Thread.sleep but using actual vertx mechanisms.

private void test() {
    while (this.active()) {
      var resp = await(client.getAbs("https://google.com").send());
      await(Future.future(p -> this.vertx.setTimer(5000, t -> p.complete())));
    }
  }

have you tried closing the server in undeploy explicitely and complete undeploy when the server has stopped ?

Our application that uses virtual threads uses the client mostly. If this behavior is also visible on the server as well I am not sure.

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024 via email

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

can you provide a full reproducer ? what you shown will simply interrupt the thread when the verticle is undeployed since there are no pending requests, here is what I did:

  @Test
  public void testRepro) throws Exception {

    vertx.createHttpServer().requestHandler(req -> {
      vertx.setTimer(100, id -> {
        req.response().end();
      });
    }).listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);

    vertx.deployVerticle(new AbstractVerticle() {
      volatile boolean active;
      HttpClient client;
      @Override
      public void stop(Promise<Void> stopPromise) throws Exception {
        active = false;
        super.stop(stopPromise);
      }
      @Override
      public void start() throws Exception {
        active = true;
        client = vertx.createHttpClient();
        vertx.runOnContext(v -> test());
        vertx.setTimer(2000, id -> {

        });
      }
      private void test() {
        while (this.active) {
          Future<Buffer> fut = client.request(HttpMethod.GET, HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST, "/")
            .compose(req -> req
              .send()
              .compose(resp -> resp.body()));
          Future.await(fut);
          System.out.println("got resp");
          Future.await(Future.future(p -> this.vertx.setTimer(150, t -> p.complete())));
        }
      }
    }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD))
      .onComplete(onSuccess(id -> {
        vertx.setTimer(4000, timerID -> {
          System.out.println("Undeploying");
          vertx.undeploy(id);
        });
      }));

    await();
  }

@vietj
Copy link
Member Author

vietj commented Oct 9, 2024

I wrote this simple test that closes the HTTP client.

  @Test
  public void testDeployHTTPClient() throws Exception {
    Assume.assumeTrue(isVirtualThreadAvailable());
    AtomicInteger inflight = new AtomicInteger();
    vertx.createHttpServer().requestHandler(request -> {
      inflight.incrementAndGet();
    }).listen(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST);
    int numReq = 10;
    Set<Thread> threads = Collections.synchronizedSet(new HashSet<>());
    Set<Thread> interruptedThreads = Collections.synchronizedSet(new HashSet<>());
    String deploymentID = vertx.deployVerticle(new VerticleBase() {
        HttpClient client;
        @Override
        public Future<?> start() throws Exception {
          client = vertx.createHttpClient(new PoolOptions().setHttp1MaxSize(numReq));
          for (int i = 0;i < numReq;i++) {
            vertx.runOnContext(v -> {
              threads.add(Thread.currentThread());
              try {
                HttpClientResponse response = client
                  .request(HttpMethod.GET, HttpTestBase.DEFAULT_HTTP_PORT, "localhost", "/")
                  .compose(HttpClientRequest::send)
                  .await();
              } catch (Throwable e) {
                if (e instanceof InterruptedException) {
                  interruptedThreads.add(Thread.currentThread());
                }
              }
            });
          }
          return super.start();
        }
        @Override
        public Future<?> stop() throws Exception {
          return client.close();
        }
      }, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD))
      .await();
    assertWaitUntil(() -> inflight.get() == numReq);
    vertx.undeploy(deploymentID).await();
    assertEquals(threads, interruptedThreads);
  }

When the stop method is commented, indeed I can see such exceptions

An exception 'java.util.concurrent.RejectedExecutionException: Closed' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception: 
java.util.concurrent.RejectedExecutionException: Closed
	at io.vertx.core/io.vertx.core.impl.TaskQueue.execute(TaskQueue.java:129)
	at io.vertx.core/io.vertx.core.impl.WorkerExecutor.execute(WorkerExecutor.java:74)
	at io.vertx.core/io.vertx.core.impl.ContextBase.execute(ContextBase.java:115)
	at io.vertx.core/io.vertx.core.http.impl.Http1xClientConnection.handleClosed(Http1xClientConnection.java:1186)
	at io.vertx.core/io.vertx.core.net.impl.VertxHandler.channelInactive(VertxHandler.java:137)

But it makes sense to me, the client is not closed and tries to resume its work which cannot be done. It would be the same with a worker verticle using a named worker pool, we would also get rejected tasks as well.

When the verticle closes the client, then the client is closed and the thread are interrupted, no exception is logged.

As for the timer lingering there might be a bug there that would be different.

@zekronium
Copy link
Contributor

In my tests I let it loop then close the verticle and I get the same exception. If the client is closed, it normally throws illegalStateException, where client close state is checked before sending the request(somehwhere in send), so why rejected? Also how does it try to resume work if the threads are interrupted/suspended

@zekronium
Copy link
Contributor

As for the timer, I am confused myself what happens

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants