Skip to content

Commit

Permalink
Fix Async R2 Servlet deadlock condition (#882)
Browse files Browse the repository at this point in the history
* Fix async servlet deadlock

* update version and changelog

* update min threads comment
  • Loading branch information
TylerHorth authored Feb 2, 2023
1 parent 5b93e9e commit 0e9941e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 11 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.41.6] - 2023-01-25
Fix Async R2 Servlet deadlock condition

## [29.41.5] - 2023-01-11
Handle Avro self-referential aliases in Avro to Proto schema translation.

Expand Down Expand Up @@ -5435,7 +5438,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.41.5...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.41.6...master
[29.41.6]: https://github.com/linkedin/rest.li/compare/v29.41.5...v29.41.6
[29.41.5]: https://github.com/linkedin/rest.li/compare/v29.41.4...v29.41.5
[29.41.4]: https://github.com/linkedin/rest.li/compare/v29.41.3...v29.41.4
[29.41.3]: https://github.com/linkedin/rest.li/compare/v29.41.2...v29.41.3
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.41.5
version=29.41.6
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,17 @@ public void onResponse(final TransportResponse<StreamResponse> response)
{
if (startedResponding.compareAndSet(false, true))
{
ioHandler.writeResponseHeaders(() -> {
StreamResponse streamResponse = ServletHelper.writeResponseHeadersToServletResponse(response, resp);
streamResponse.getEntityStream().setReader(ioHandler);
});
ctx.start(new Runnable()
{
@Override
public void run()
{
try
{
StreamResponse streamResponse = ServletHelper.writeResponseHeadersToServletResponse(response, resp);
streamResponse.getEntityStream().setReader(ioHandler);
ioHandler.loop();
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,9 @@ public void exitLoop()
}

@Override
public void onInit(ReadHandle rh)
{
synchronized (this)
{
_responseWriteStarted = true;
}
super.onInit(rh);
public void writeResponseHeaders(Runnable writeResponse) {
_responseWriteStarted = true;
super.writeResponseHeaders(writeResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void onError(Throwable e)
}
}

public void writeResponseHeaders(Runnable writeResponse) {
_eventQueue.add(new Event(EventType.WriteResponseHeaders, writeResponse));
}

public void loop() throws ServletException, IOException
{
try
Expand Down Expand Up @@ -252,6 +256,10 @@ private void eventLoop() throws ServletException, IOException, InterruptedExcept
}
break;
}
case WriteResponseHeaders:
Runnable writeResponse = (Runnable) event.getData();
writeResponse.run();
break;
case ForceExit:
{
_forceExit = true;
Expand Down Expand Up @@ -288,6 +296,7 @@ private static enum EventType
WriteRequestPossible,
WriteRequestAborted,
DrainRequest,
WriteResponseHeaders,
FullResponseReceived,
ResponseDataAvailable,
ResponseDataError,
Expand Down
2 changes: 2 additions & 0 deletions r2-jetty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {
compile externalDependency.jettyServer
compile externalDependency.jettyUtil
compile externalDependency.servletApi
testCompile externalDependency.testng
testCompile externalDependency.httpclient
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.thread.QueuedThreadPool;


/**
* @author Steven Ihde
* @version $Revision: $
Expand Down Expand Up @@ -189,4 +190,9 @@ private static HttpServlet createServlet(HttpDispatcher dispatcher, ServletType

return httpServlet;
}

// exposed for testing
Server getInternalServer() {
return _server;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.linkedin.r2.transport.http.server;

import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.StreamResponseBuilder;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestAsyncLockup {
private static final int PORT = 9000;
private static final String CONTEXT = "/context";
private static final int THREAD_POOL_SIZE = 20; // must be greater than 8 (minimum supported by QueuedThreadPool)
private static final String URL = "http://localhost:" + PORT + CONTEXT;
private static final int TIMEOUT_MILLIS = 1000;

/*
* Test a deadlock scenario where all Jetty worker threads are blocked in the SyncIOHandler event loop.
*
* 1) Enable Async and Streaming.
* 2) Occupy all jetty worker threads with requests.
* 3) Each request returns a response without consuming the request body.
* 4) All threads are permanently stuck.
*
* Even in Async mode, the SyncIOHandler will block the Jetty worker thread until the request body has been fully read
* by the application. If the application does not read the request body, then the SyncIOHandler will unblock when the
* final byte of the response has been written. However, a Jetty worker thread is needed to write the response. If all
* worker threads are stuck in the same situation, then there will be no worker threads available to write a response,
* and thus no way for any of them to be unblocked.
*
* This bug was fixed by using the SyncIOHandler to write the response, eliminating the need to acquire a new Jetty
* worker thread. This test exists to prevent regression.
*/
@Test()
public void testAsyncLockup() throws Exception {
BarrierDispatcher dispatcher = new BarrierDispatcher();
HttpJettyServer httpJettyServer = new HttpJettyServer(PORT, CONTEXT, THREAD_POOL_SIZE,
HttpDispatcherFactory.create(dispatcher), HttpJettyServer.ServletType.ASYNC_EVENT, Integer.MAX_VALUE, true);

httpJettyServer.start();
int workers = numWorkerThreads(httpJettyServer.getInternalServer());
dispatcher.setBarrier(workers);

List<CompletableFuture<Integer>> responseFutures = new ArrayList<>();
try (CloseableHttpClient client = HttpClients.custom()
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(TIMEOUT_MILLIS).build())
.setMaxConnTotal(THREAD_POOL_SIZE)
.setMaxConnPerRoute(THREAD_POOL_SIZE)
.disableAutomaticRetries()
.build()) {

for (int i = 0; i < workers; i++) {
CompletableFuture<Integer> future = new CompletableFuture<>();
responseFutures.add(future);
new Thread(() -> {
try {
CloseableHttpResponse response = client.execute(new HttpGet(URL));
int status = response.getStatusLine().getStatusCode();
future.complete(status);
} catch (Throwable e) {
future.completeExceptionally(e);
}
}).start();
}

for (CompletableFuture<Integer> future : responseFutures) {
Assert.assertEquals(future.get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS).intValue(), 200);
}
}

httpJettyServer.stop();
httpJettyServer.waitForStop();
}

// Calculates the number of worker threads by subtracting acceptor and selector threads.
// Extracted from Server#onStart.
private int numWorkerThreads(Server server) {
int selectors = 0;
int acceptors = 0;

for (Connector connector : server.getConnectors())
{
if (!(connector instanceof AbstractConnector))
continue;

AbstractConnector abstractConnector = (AbstractConnector) connector;
Executor connectorExecutor = connector.getExecutor();

if (connectorExecutor != server.getThreadPool()) {
// Do not count the selectors and acceptors from this connector at server level, because connector uses dedicated executor.
continue;
}

acceptors += abstractConnector.getAcceptors();

if (connector instanceof ServerConnector) {
selectors += ((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}

return THREAD_POOL_SIZE - selectors - acceptors;
}

static class BarrierDispatcher implements TransportDispatcher {
private CyclicBarrier _barrier;

public void setBarrier(int count) {
_barrier = new CyclicBarrier(count);
}

@Override
public void handleRestRequest(RestRequest req, Map<String, String> wireAttrs, RequestContext requestContext,
TransportCallback<RestResponse> callback) {
throw new UnsupportedOperationException();
}

@Override
public void handleStreamRequest(StreamRequest req, Map<String, String> wireAttrs, RequestContext requestContext,
TransportCallback<StreamResponse> callback) {
try {
_barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
callback.onResponse(TransportResponseImpl.success(new StreamResponseBuilder().build(EntityStreams.emptyStream())));
}
}
}

0 comments on commit 0e9941e

Please sign in to comment.