Skip to content

Commit ad4e101

Browse files
authored
F/delayed handler (StubbornJava#100)
* Add a non blocking delayed handler
1 parent 434704f commit ad4e101

File tree

15 files changed

+588
-4
lines changed

15 files changed

+588
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.stubbornjava.common;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.jooq.lambda.Unchecked;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import com.google.common.util.concurrent.MoreExecutors;
12+
13+
import okhttp3.OkHttpClient;
14+
import okhttp3.Request;
15+
import okhttp3.Response;
16+
17+
public class Http {
18+
private static final Logger log = LoggerFactory.getLogger(Http.class);
19+
20+
// {{start:get}}
21+
public static Response get(OkHttpClient client, String url) {
22+
Request request = new Request.Builder()
23+
.url(url)
24+
.get()
25+
.build();
26+
return Unchecked.supplier(() -> {
27+
Response response = client.newCall(request).execute();
28+
return response;
29+
}).get();
30+
}
31+
// {{end:get}}
32+
33+
// {{start:getInParallel}}
34+
public static void getInParallel(OkHttpClient client, String url, int count) {
35+
ExecutorService exec = Executors.newFixedThreadPool(count);
36+
for (int i = 0; i < count; i++) {
37+
exec.submit(() -> Http.get(client, url));
38+
}
39+
MoreExecutors.shutdownAndAwaitTermination(exec, 30, TimeUnit.SECONDS);
40+
}
41+
// {{end:getInParallel}}
42+
}

stubbornjava-common/src/main/java/com/stubbornjava/common/HttpClient.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ private HttpClient() {
4040
log.debug(msg);
4141
});
4242
static {
43-
loggingInterceptor.setLevel(Level.BODY);
43+
if (log.isDebugEnabled()) {
44+
loggingInterceptor.setLevel(Level.BASIC);
45+
} else if (log.isTraceEnabled()) {
46+
loggingInterceptor.setLevel(Level.BODY);
47+
}
4448
}
4549

4650
public static HttpLoggingInterceptor getLoggingInterceptor() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.stubbornjava.common;
2+
3+
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import com.google.common.base.Stopwatch;
10+
11+
public class Timers {
12+
private static final Logger logger = LoggerFactory.getLogger(Timers.class);
13+
14+
private Timers() {}
15+
16+
public static void time(String message, Runnable runnable) {
17+
Stopwatch sw = Stopwatch.createStarted();
18+
try {
19+
logger.info("{}", message);
20+
runnable.run();
21+
} catch (Exception ex) {
22+
logger.warn("Exception in runnable", ex);
23+
throw ex;
24+
} finally {
25+
logger.info("{} took {}ms", message, sw.elapsed(TimeUnit.MILLISECONDS));
26+
}
27+
}
28+
29+
}

stubbornjava-common/src/main/java/com/stubbornjava/common/undertow/SimpleServer.java

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public static SimpleServer simpleServer(HttpHandler handler) {
4747
* If you base64 encode any cookie values you probably want it on.
4848
*/
4949
.setServerOption(UndertowOptions.ALLOW_EQUALS_IN_COOKIE_VALUE, true)
50+
// Needed to set request time in access logs
51+
.setServerOption(UndertowOptions.RECORD_REQUEST_START_TIME, true)
5052
.addHttpListener(DEFAULT_PORT, DEFAULT_HOST, handler)
5153
;
5254
return new SimpleServer(undertow);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.stubbornjava.common.undertow;
2+
3+
import java.net.InetSocketAddress;
4+
import java.util.function.Consumer;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.undertow.Undertow;
10+
import io.undertow.Undertow.ListenerInfo;
11+
import io.undertow.server.HttpHandler;
12+
13+
public class UndertowUtil {
14+
private static final Logger logger = LoggerFactory.getLogger(UndertowUtil.class);
15+
16+
/**
17+
* This is currently intended to be used in unit tests but may
18+
* be appropriate in other situations as well. It's not worth building
19+
* out a test module at this time so it lives here.
20+
*
21+
* This helper will spin up the http handler on a random available port.
22+
* The full host and port will be passed to the hostConsumer and the server
23+
* will be shut down after the consumer completes.
24+
*
25+
* @param builder
26+
* @param handler
27+
* @param hostConusmer
28+
*/
29+
public static void useLocalServer(Undertow.Builder builder,
30+
HttpHandler handler,
31+
Consumer<String> hostConusmer) {
32+
Undertow undertow = null;
33+
try {
34+
// Starts server on a random open port
35+
undertow = builder.addHttpListener(0, "127.0.0.1", handler).build();
36+
undertow.start();
37+
ListenerInfo listenerInfo = undertow.getListenerInfo().get(0);
38+
InetSocketAddress addr = (InetSocketAddress) listenerInfo.getAddress();
39+
String host = "http://localhost:" + addr.getPort();
40+
hostConusmer.accept(host);
41+
} finally {
42+
if (undertow != null) {
43+
undertow.stop();
44+
}
45+
}
46+
}
47+
}

stubbornjava-common/src/main/java/com/stubbornjava/common/undertow/handlers/CustomHandlers.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ public class CustomHandlers {
4646
private static final Logger log = LoggerFactory.getLogger(CustomHandlers.class);
4747

4848
public static AccessLogHandler accessLog(HttpHandler next, Logger logger) {
49-
return new AccessLogHandler(next, new Slf4jAccessLogReceiver(logger), "combined", CustomHandlers.class.getClassLoader());
49+
// see http://undertow.io/javadoc/2.0.x/io/undertow/server/handlers/accesslog/AccessLogHandler.html
50+
String format = "%H %h %u \"%r\" %s %Dms %b bytes \"%{i,Referer}\" \"%{i,User-Agent}\"";
51+
return new AccessLogHandler(next, new Slf4jAccessLogReceiver(logger), format, CustomHandlers.class.getClassLoader());
5052
}
5153

5254
public static AccessLogHandler accessLog(HttpHandler next) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.function.Function;
6+
7+
import io.undertow.server.Connectors;
8+
import io.undertow.server.HttpHandler;
9+
import io.undertow.server.HttpServerExchange;
10+
import io.undertow.server.handlers.BlockingHandler;
11+
12+
// {{start:delayedHandler}}
13+
/**
14+
* A non blocking handler to add a time delay before the next handler
15+
* is executed. If the exchange has already been dispatched this will
16+
* un-dispatch the exchange and re-dispatch it before next is called.
17+
*/
18+
public class DelayedExecutionHandler implements HttpHandler {
19+
20+
private final HttpHandler next;
21+
private final Function<HttpServerExchange, Duration> durationFunc;
22+
23+
DelayedExecutionHandler(HttpHandler next,
24+
Function<HttpServerExchange, Duration> durationFunc) {
25+
this.next = next;
26+
this.durationFunc = durationFunc;
27+
}
28+
29+
@Override
30+
public void handleRequest(HttpServerExchange exchange) throws Exception {
31+
Duration duration = durationFunc.apply(exchange);
32+
33+
final HttpHandler delegate;
34+
if (exchange.isBlocking()) {
35+
// We want to undispatch here so that we are not blocking
36+
// a worker thread. We will spin on the IO thread using the
37+
// built in executeAfter.
38+
exchange.unDispatch();
39+
delegate = new BlockingHandler(next);
40+
} else {
41+
delegate = next;
42+
}
43+
44+
exchange.dispatch(exchange.getIoThread(), () -> {
45+
exchange.getIoThread().executeAfter(() ->
46+
Connectors.executeRootHandler(delegate, exchange),
47+
duration.toMillis(),
48+
TimeUnit.MILLISECONDS);
49+
});
50+
}
51+
}
52+
// {{end:delayedHandler}}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.ThreadLocalRandom;
5+
import java.util.concurrent.TimeUnit;
6+
7+
import io.undertow.server.HttpHandler;
8+
9+
public class DiagnosticHandlers {
10+
11+
// {{start:delayedHandler}}
12+
/**
13+
* Add a fixed delay before execution of the next handler
14+
* @param next
15+
* @param duration
16+
* @param unit
17+
* @return
18+
*/
19+
public static DelayedExecutionHandler fixedDelay(HttpHandler next,
20+
long duration,
21+
TimeUnit unit) {
22+
return new DelayedExecutionHandler(
23+
next, (exchange) -> Duration.ofMillis(unit.toMillis(duration)));
24+
}
25+
26+
/**
27+
* Add a random delay between minDuration (inclusive) and
28+
* maxDuration (exclusive) before execution of the next handler.
29+
* This can be used to add artificial latency for requests.
30+
*
31+
* @param next
32+
* @param minDuration inclusive
33+
* @param maxDuration exclusive
34+
* @param unit
35+
* @return
36+
*/
37+
public static DelayedExecutionHandler randomDelay(HttpHandler next,
38+
long minDuration,
39+
long maxDuration,
40+
TimeUnit unit) {
41+
return new DelayedExecutionHandler(
42+
next, (exchange) -> {
43+
long duration = ThreadLocalRandom.current()
44+
.nextLong(minDuration, maxDuration);
45+
return Duration.ofMillis(unit.toMillis(duration));
46+
});
47+
}
48+
// {{end:delayedHandler}}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.stubbornjava.common.undertow.handlers.diagnostic;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import java.util.List;
6+
import java.util.concurrent.Callable;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Future;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.stream.Collectors;
12+
import java.util.stream.IntStream;
13+
14+
import org.jooq.lambda.Seq;
15+
import org.jooq.lambda.Unchecked;
16+
import org.junit.Assert;
17+
import org.junit.Test;
18+
19+
import com.google.common.base.Stopwatch;
20+
import com.google.common.util.concurrent.MoreExecutors;
21+
import com.stubbornjava.common.Http;
22+
import com.stubbornjava.common.HttpClient;
23+
import com.stubbornjava.common.undertow.Exchange;
24+
import com.stubbornjava.common.undertow.UndertowUtil;
25+
import com.stubbornjava.common.undertow.handlers.CustomHandlers;
26+
import com.stubbornjava.undertow.handlers.MiddlewareBuilder;
27+
28+
import io.undertow.Undertow;
29+
import io.undertow.server.HttpHandler;
30+
import io.undertow.server.handlers.BlockingHandler;
31+
import okhttp3.OkHttpClient;
32+
import okhttp3.Response;
33+
34+
public class DelayedExecutionHandlerTest {
35+
36+
// Delay for 500ms then return "ok"
37+
private static final DelayedExecutionHandler delayedHandler =
38+
DiagnosticHandlers.fixedDelay((exchange) -> {
39+
Exchange.body().sendText(exchange, "ok");
40+
},
41+
500, TimeUnit.MILLISECONDS);
42+
43+
@Test
44+
public void testOnXIoThread() throws InterruptedException {
45+
int numThreads = 10;
46+
run(delayedHandler, numThreads);
47+
}
48+
49+
@Test
50+
public void testOnWorkerThread() throws InterruptedException {
51+
int numThreads = 10;
52+
run(new BlockingHandler(delayedHandler), numThreads);
53+
}
54+
55+
/**
56+
* Spin up a new server with a single IO thread and worker thread.
57+
* Run N GET requests against it concurrently and make sure they
58+
* do not take N * 500ms total. This is not the best test but it
59+
* should show that we are delaying N requests at once using a single
60+
* thread.
61+
*
62+
* @param handler
63+
* @param numThreads
64+
* @throws InterruptedException
65+
*/
66+
private void run(HttpHandler handler, int numThreads) throws InterruptedException {
67+
HttpHandler route = MiddlewareBuilder.begin(CustomHandlers::accessLog)
68+
.complete(handler);
69+
Undertow.Builder builder = Undertow.builder()
70+
.setWorkerThreads(1)
71+
.setIoThreads(1);
72+
UndertowUtil.useLocalServer(builder, route, host -> {
73+
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
74+
OkHttpClient client = new OkHttpClient().newBuilder()
75+
.addInterceptor(HttpClient.getLoggingInterceptor())
76+
.build();
77+
78+
// Using time in tests isn't the best approach but this one seems
79+
// A little difficult to test another way.
80+
Stopwatch sw = Stopwatch.createStarted();
81+
List<Callable<Response>> callables = IntStream.range(0, numThreads)
82+
.mapToObj(i -> (Callable<Response>) () -> Http.get(client, host))
83+
.collect(Collectors.toList());
84+
sw.stop();
85+
Seq.seq(Unchecked.supplier(() -> exec.invokeAll(callables)).get())
86+
.map(Unchecked.function(Future::get))
87+
.forEach(DelayedExecutionHandlerTest::assertSuccess);
88+
assertTrue("Responses took too long", sw.elapsed().toMillis() < 1_000);
89+
MoreExecutors.shutdownAndAwaitTermination(exec, 10, TimeUnit.SECONDS);
90+
});
91+
}
92+
93+
private static void assertSuccess(Response response) {
94+
Assert.assertTrue("Response should be a 200", response.isSuccessful());
95+
}
96+
97+
}

0 commit comments

Comments
 (0)