From d3ef908e5f090f8bf2dd8426160f66e38beaf048 Mon Sep 17 00:00:00 2001 From: Emma Humber Date: Mon, 11 Jul 2022 14:56:47 +0100 Subject: [PATCH] KREST-2746 Reflect slow responses from Kafka back to the http client --- kafka-rest/pom.xml | 6 + .../confluent/kafkarest/KafkaRestConfig.java | 40 +++- .../kafkarest/config/ConfigModule.java | 26 +++ .../kafkarest/response/JsonStream.java | 3 + .../kafkarest/response/StreamingResponse.java | 182 ++++++++++++--- .../response/StreamingResponseFactory.java | 18 +- .../resources/v3/ProduceActionTest.java | 4 +- .../response/StreamingResponseTest.java | 211 +++++++++++++++--- pom.xml | 6 + 9 files changed, 430 insertions(+), 66 deletions(-) diff --git a/kafka-rest/pom.xml b/kafka-rest/pom.xml index b3f193aa79..3281975c7f 100644 --- a/kafka-rest/pom.xml +++ b/kafka-rest/pom.xml @@ -156,6 +156,12 @@ hamcrest-all test + + org.awaitility + awaitility + 4.0.2 + test + diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java index 094d9fbdef..d174bb9197 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java @@ -469,6 +469,24 @@ public class KafkaRestConfig extends RestConfig { + "requests will be processed for before the connection is closed."; private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500"; + public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH = + "streaming.response.queue.throttle.depth"; + private static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC = + "The maximum depth of the response queue that is used to hold requests as they are being " + + "processed by Kafka. If this queue grows too long it indicates that Kafka is " + + "processing messages more slowly than the user is sending them. After this " + + "queue depth is reached, then all requests will receive a 429 response until the" + + "queue depth returns under the limit."; + private static final Integer STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT = 100; + + public static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH = + "streaming.response.queue.disconnect.depth"; + private static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC = + "The maximum depth of the response queue that is used to hold requests as they are being" + + "processed by Kafka. If the queue depth grows beyond this limit then the connection" + + "is closed."; + private static final Integer STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT = 200; + private static final ConfigDef config; private volatile Metrics metrics; @@ -847,7 +865,19 @@ protected static ConfigDef baseKafkaRestConfigDef() { Type.LONG, STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT, Importance.LOW, - STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC); + STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC) + .define( + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH, + Type.INT, + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT, + Importance.LOW, + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC) + .define( + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH, + Type.INT, + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT, + Importance.LOW, + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC); } private static Properties getPropsFromFile(String propsFile) throws RestConfigException { @@ -1088,6 +1118,14 @@ public final Duration getStreamingConnectionMaxDurationGracePeriod() { return Duration.ofMillis(getLong(STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS)); } + public final Integer getStreamingConnectionMaxQueueDepthBeforeThrottling() { + return getInt(STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH); + } + + public final Integer getStreamingConnectionMaxQueueDepthBeforeDisconnect() { + return getInt(STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH); + } + public final int getRateLimitDefaultCost() { return getInt(RATE_LIMIT_DEFAULT_COST_CONFIG); } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java b/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java index 0718caa89d..2c45a6089d 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java @@ -169,6 +169,14 @@ protected void configure() { .qualifiedBy(new StreamingConnectionMaxDurationGracePeriodImpl()) .to(Duration.class); + bind(config.getStreamingConnectionMaxQueueDepthBeforeThrottling()) + .qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl()) + .to(Integer.class); + + bind(config.getStreamingConnectionMaxQueueDepthBeforeDisconnect()) + .qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl()) + .to(Integer.class); + bind(config.getSchemaRegistryConfigs()) .qualifiedBy(new SchemaRegistryConfigsImpl()) .to(new TypeLiteral>() {}); @@ -441,5 +449,23 @@ private static final class StreamingConnectionMaxDurationConfigImpl private static final class StreamingConnectionMaxDurationGracePeriodImpl extends AnnotationLiteral implements StreamingMaxConnectionGracePeriod {} + + @Qualifier + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) + public @interface getStreamingConnectionMaxQueueDepthBeforeThrottling {} + + private static final class getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl + extends AnnotationLiteral + implements getStreamingConnectionMaxQueueDepthBeforeThrottling {} + + @Qualifier + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) + public @interface getStreamingConnectionMaxQueueDepthBeforeDisconnect {} + + private static final class getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl + extends AnnotationLiteral + implements getStreamingConnectionMaxQueueDepthBeforeDisconnect {} } // CHECKSTYLE:ON:ClassDataAbstractionCoupling diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java index 3aee9c60ae..072f7ba740 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java @@ -49,8 +49,11 @@ public T nextValue() throws IOException { @Override public void close() throws IOException { + System.out.println("CLOSE in json stream which calls mapping iterator close"); if (delegate.get() != null) { + System.out.println("delegate not null" + delegate.get().getClass()); delegate.get().close(); + System.out.println("delegate now closed"); } } } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java index d0eb455b87..d780574198 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java @@ -45,6 +45,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.annotation.Nullable; import javax.ws.rs.WebApplicationException; @@ -109,30 +110,45 @@ public abstract class StreamingResponse { private final ChunkedOutputFactory chunkedOutputFactory; private final Duration maxDuration; private final Duration gracePeriod; + private final Integer throttleDepth; + private final Integer disconnectDepth; private final Instant streamStartTime; private final Clock clock; volatile boolean closingStarted = false; + volatile boolean closingFinished = false; StreamingResponse( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { this.clock = clock; this.streamStartTime = clock.instant(); this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory); this.maxDuration = maxDuration; this.gracePeriod = gracePeriod; + this.throttleDepth = throttleDepth; + this.disconnectDepth = disconnectDepth; } public static StreamingResponse from( JsonStream inputStream, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, - Duration gracePeriod) { + Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth) { return new InputStreamingResponse<>( - inputStream, chunkedOutputFactory, maxDuration, gracePeriod, Clock.systemUTC()); + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + Clock.systemUTC()); } @VisibleForTesting @@ -141,15 +157,29 @@ static StreamingResponse fromWithClock( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { return new InputStreamingResponse<>( - inputStream, chunkedOutputFactory, maxDuration, gracePeriod, clock); + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + clock); } public final StreamingResponse compose( Function> transform) { return new ComposingStreamingResponse<>( - this, transform, chunkedOutputFactory, maxDuration, gracePeriod); + this, + transform, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth); } /** @@ -158,8 +188,8 @@ public final StreamingResponse compose( *

This method will block until all requests are read in. The responses are computed and * written to {@code asyncResponse} asynchronously. */ - public final void resume(AsyncResponse asyncResponse) { - log.debug("Resuming StreamingResponse"); + public final void resume(AsyncResponse asyncResponse) throws InterruptedException { + log.error("Resuming StreamingResponse - about to call new async response queue"); AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory); responseQueue.asyncResume(asyncResponse); ScheduledExecutorService executorService = null; @@ -170,20 +200,9 @@ public final void resume(AsyncResponse asyncResponse) { // need to recheck closingStarted because hasNext can take time to respond if (!closingStarted && Duration.between(streamStartTime, clock.instant()).compareTo(maxDuration) > 0) { - if (executorService == null) { - executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.schedule( - () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); - } - next(); - responseQueue.push( - CompletableFuture.completedFuture( - ResultOrError.error( - EXCEPTION_MAPPER.toErrorResponse( - new StatusCodeException( - Status.REQUEST_TIMEOUT, - "Streaming connection open for longer than allowed", - "Connection will be closed."))))); + handleOverMaxStreamDuration(executorService, responseQueue); + } else if (!closingStarted && responseQueue.getTailLength().get() >= throttleDepth) { + handleResponseQueueDepthTooLarge(executorService, responseQueue); } else if (!closingStarted) { responseQueue.push(next().handle(this::handleNext)); } else { @@ -191,13 +210,28 @@ public final void resume(AsyncResponse asyncResponse) { } } } catch (Exception e) { - log.debug("Exception thrown when processing stream ", e); + log.error("Exception thrown when processing stream ", e); responseQueue.push( CompletableFuture.completedFuture( ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e)))); } finally { - close(); - responseQueue.close(); + System.out.println( + "why isfinally triggered closing started" + + closingStarted); // can't call hasnext because of the mocking + // if there are still outstanding response to send back, for example hasNext has returned + // false, but the executorService hasn't triggered yet, give everything a chance to + // complete within the grace period if there are still outstanding responses. + System.out.println("FINALLY before tail length wait"); + if (responseQueue.getTailLength().get() > 0) { + System.out.println("in tail length wait"); + Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE + } + System.out.println("FINALL calling a close " + this.getClass()); + //if (!closingFinished) { + close(); + System.out.println("FINALLY calling responseQueue close" + this.getClass()); + responseQueue.close(); + //} if (executorService != null) { executorService.shutdown(); try { @@ -211,12 +245,47 @@ public final void resume(AsyncResponse asyncResponse) { } } - private void closeAll(AsyncResponseQueue responseQueue) { - closingStarted = true; - close(); - responseQueue.close(); + private void handleResponseQueueDepthTooLarge( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + if (responseQueue.getTailLength().get() >= disconnectDepth) { + triggerDelayedClose(executorService, responseQueue); + } + next(); + responseQueue.push( + CompletableFuture.completedFuture( + ResultOrError.error( + EXCEPTION_MAPPER.toErrorResponse( + new StatusCodeException( + Status.TOO_MANY_REQUESTS, + "Backlog of messages waiting to be sent to Kafka is too large", + "Not sending to Kafka."))))); + } + + private void handleOverMaxStreamDuration( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + triggerDelayedClose(executorService, responseQueue); + next(); + responseQueue.push( + CompletableFuture.completedFuture( + ResultOrError.error( + EXCEPTION_MAPPER.toErrorResponse( + new StatusCodeException( + Status.REQUEST_TIMEOUT, + "Streaming connection open for longer than allowed", + "Connection will be closed."))))); + } + + private void triggerDelayedClose( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + if (executorService == null) { + executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.schedule( + () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); + } } + abstract void closeAll(AsyncResponseQueue responseQueue); + private ResultOrError handleNext(T result, @Nullable Throwable error) { if (error == null) { return ResultOrError.result(result); @@ -241,12 +310,21 @@ private InputStreamingResponse( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { - super(chunkedOutputFactory, maxDuration, gracePeriod, clock); + super(chunkedOutputFactory, maxDuration, gracePeriod, throttleDepth, disconnectDepth, clock); this.inputStream = requireNonNull(inputStream); } + public void closeAll(AsyncResponseQueue responseQueue) { + System.out.println("!!! do nothing"); + } + public void close() { + System.out.println( + "CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming close" + + this.getClass()); try { inputStream.close(); } catch (IOException e) { @@ -288,12 +366,37 @@ private ComposingStreamingResponse( Function> transform, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, - Duration gracePeriod) { - super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock); + Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth) { + super( + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + streamingResponseInput.clock); this.streamingResponseInput = requireNonNull(streamingResponseInput); this.transform = requireNonNull(transform); } + @Override + protected void closeAll(AsyncResponseQueue responseQueue) { + System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass()); + streamingResponseInput.closingStarted = true; + closingStarted = true; + System.out.println( + "THREAD EXECUTOR closeAll from thread executor call through to a sub close " + + this.getClass()); + close(); + System.out.println( + "THREAD EXECUTOR closeAll from thread executor calling responsequeue.close"); + responseQueue.close(); + System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close"); + streamingResponseInput.closingFinished = true; + closingFinished = true; + } + @Override public boolean hasNext() { try { @@ -317,6 +420,9 @@ public CompletableFuture next() { } public void close() { + System.out.println( + "CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close" + + this.getClass()); streamingResponseInput.close(); } } @@ -347,26 +453,30 @@ private void asyncResume(AsyncResponse asyncResponse) { asyncResponse.resume(Response.ok(sink).build()); } - private volatile boolean sinkClosed = false; + volatile boolean sinkClosed = false; + + private volatile AtomicInteger queueDepth = new AtomicInteger(0); - private boolean isClosed() { - return sinkClosed; + private AtomicInteger getTailLength() { + return queueDepth; } private void push(CompletableFuture result) { - log.debug("Pushing to response queue"); + queueDepth.incrementAndGet(); tail = CompletableFuture.allOf(tail, result) .thenApply( unused -> { try { if (sinkClosed || sink.isClosed()) { + System.out.println("closing sink " + this.getClass()); sinkClosed = true; return null; } ResultOrError res = result.join(); log.debug("Writing to sink"); sink.write(res); + queueDepth.decrementAndGet(); } catch (IOException e) { log.error("Error when writing streaming result to response channel.", e); } @@ -375,10 +485,14 @@ private void push(CompletableFuture result) { } private void close() { + System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass()); tail.whenComplete( (unused, throwable) -> { try { sinkClosed = true; + System.out.println( + "*** actually closing tail from FINALLY or TRHEAD (Is an Async call)" + + this.getClass()); sink.close(); } catch (IOException e) { log.error("Error when closing response channel.", e); diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java index 7b5af17d73..a92e18e6e2 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java @@ -19,6 +19,8 @@ import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionDurationConfig; import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionGracePeriod; +import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeDisconnect; +import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeThrottling; import java.time.Duration; import javax.inject.Inject; @@ -27,18 +29,30 @@ public final class StreamingResponseFactory { private final ChunkedOutputFactory chunkedOutputFactory; private final Duration maxDuration; private final Duration gracePeriod; + private final Integer throttleDepth; + private final Integer disconnectDepth; @Inject public StreamingResponseFactory( ChunkedOutputFactory chunkedOutputFactory, @StreamingMaxConnectionDurationConfig Duration maxDuration, - @StreamingMaxConnectionGracePeriod Duration gracePeriod) { + @StreamingMaxConnectionGracePeriod Duration gracePeriod, + @getStreamingConnectionMaxQueueDepthBeforeThrottling Integer throttleDepth, + @getStreamingConnectionMaxQueueDepthBeforeDisconnect Integer disconnectDepth) { this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory); this.maxDuration = maxDuration; this.gracePeriod = gracePeriod; + this.throttleDepth = throttleDepth; + this.disconnectDepth = disconnectDepth; } public StreamingResponse from(JsonStream inputStream) { - return StreamingResponse.from(inputStream, chunkedOutputFactory, maxDuration, gracePeriod); + return StreamingResponse.from( + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth); } } diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java index 992d9b184f..802db21f23 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java @@ -77,6 +77,7 @@ public class ProduceActionTest { private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000); + private static final int DEPTH = 100; @AfterAll public static void cleanUp() { @@ -839,7 +840,8 @@ private static ProduceAction getProduceAction( replay(produceControllerProvider, produceController); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS); + new StreamingResponseFactory( + chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS, DEPTH, DEPTH); // get the current thread so that the call counts can be seen by easy mock ExecutorService executorService = MoreExecutors.newDirectExecutorService(); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java index 37ceef3814..e0d414104d 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java @@ -33,17 +33,21 @@ import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import org.easymock.EasyMock; import org.eclipse.jetty.http.HttpStatus; import org.glassfish.jersey.server.ChunkedOutput; import org.junit.jupiter.api.Test; +// import org.awaitility.Awaitility; + public class StreamingResponseTest { private static final Duration DURATION = Duration.ofMillis(5000); + private static final int DEPTH = 100; @Test - public void testGracePeriodExceededExceptionThrown() throws IOException { + public void testGracePeriodExceededExceptionThrown() throws IOException, InterruptedException { String key = "foo"; String value = "bar"; ProduceRequest request = @@ -92,7 +96,7 @@ public void testGracePeriodExceededExceptionThrown() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -109,7 +113,7 @@ public void testGracePeriodExceededExceptionThrown() throws IOException { } @Test - public void testWriteToChunkedOutput() throws IOException { + public void testWriteToChunkedOutput() throws IOException, InterruptedException { String key = "foo"; String value = "bar"; ProduceRequest request = @@ -156,7 +160,7 @@ public void testWriteToChunkedOutput() throws IOException { replay(mockedChunkedOutput, mockedChunkedOutputFactory); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator)); @@ -173,7 +177,7 @@ public void testWriteToChunkedOutput() throws IOException { } @Test - public void testHasNextMappingException() throws IOException { + public void testHasNextMappingException() throws IOException, InterruptedException { MappingIterator requests = mock(MappingIterator.class); expect(requests.hasNext()) @@ -201,7 +205,7 @@ public void testHasNextMappingException() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -215,7 +219,7 @@ public void testHasNextMappingException() throws IOException { } @Test - public void testHasNextRuntimeException() throws IOException { + public void testHasNextRuntimeException() throws IOException, InterruptedException { MappingIterator requests = mock(MappingIterator.class); expect(requests.hasNext()) .andThrow( @@ -241,7 +245,7 @@ public void testHasNextRuntimeException() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -256,6 +260,7 @@ public void testHasNextRuntimeException() throws IOException { @Test public void testWriteToChunkedOutputAfterTimeout() throws IOException, InterruptedException { + Thread.sleep(1000); String key = "foo"; String value = "bar"; ProduceRequest request = @@ -300,25 +305,185 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response (check) - expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(0)); // stream start - composing response (check) + .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response + + //first message + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout - expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); - mockedChunkedOutput.write(sucessResult); expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); - expect(requestsMappingIterator.hasNext()).andReturn(true); // is another message - expect(requestsMappingIterator.nextValue()).andReturn(request); // second message - bad gateway + //second message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) .andReturn(Instant.ofEpochMilli(timeout + 5)); // second message beyond timeout + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); + + //no third message - the thread executor should kick in after 50 ms to close the mapping iterator + //and the chunkedOutput at this point: + + requestsMappingIterator.close(); // call from thread executor + mockedChunkedOutput.close(); //call from thread executor + + + final CompletableFuture finished = new CompletableFuture<>(); + //we don't let the hasNext fire until the "finished" future has completed + //This future is defined below, after the streaming response, because we need information from the streaming response to create it + //It completes when the two close calls from the "closeAll" called by the Thread executor have both fired + + expect(requestsMappingIterator.hasNext()) + .andAnswer( + () -> { // return hasnext true AFTER the thread executor has timed out and closed the + // connections. Then expect no other calls except the connection closing. + System.out.println("** has next delayed response"); + while (!finished.isDone()) { + Thread.sleep(10); + } + return false; + }); + + requestsMappingIterator.close(); // call from finally + mockedChunkedOutput.close(); //call from finally + + replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); + + final StreamingResponse streamingResponse = + StreamingResponse.fromWithClock( + new JsonStream<>(() -> requestsMappingIterator), + mockedChunkedOutputFactory, + Duration.ofMillis(timeout), + Duration.ofMillis(50), + DEPTH, + DEPTH, + clock); + + //This finished future fires when the streamingResponse.closingFinished volatile boolean fires. This happens when the thread executor has finished doing its thing and doing the two closes + Executors.newCachedThreadPool() + .submit( + () -> { + try { + System.out.println( + "&&& TEST thread streaming response object" + streamingResponse.getClass()); + while (!streamingResponse.closingFinished) { + Thread.sleep(100); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("&& test thread completing future"); + finished.complete(true); + }); + + CompletableFuture produceResponseFuture = new CompletableFuture<>(); + produceResponseFuture.complete(produceResponse); + + FakeAsyncResponse response = new FakeAsyncResponse(); + streamingResponse.compose(result -> produceResponseFuture).resume(response); + + try { + EasyMock.verify(mockedChunkedOutput); + EasyMock.verify(mockedChunkedOutputFactory); + EasyMock.verify(requestsMappingIterator); + EasyMock.verify(clock); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testSlowWritesToKafka429ThenDisconnect() throws IOException, InterruptedException { + + String key = "foo"; + String value = "bar"; + ProduceRequest request = + ProduceRequest.builder() + .setKey( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(key)) + .build()) + .setValue( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(value)) + .build()) + .setOriginalSize(0L) + .build(); + + MappingIterator requestsMappingIterator = mock(MappingIterator.class); + + long timeout = 10; + Clock clock = mock(Clock.class); + + CompletableFuture produceResponseFuture = new CompletableFuture<>(); + + ProduceResponse produceResponse = + ProduceResponse.builder() + .setClusterId("clusterId") + .setTopicName("topicName") + .setPartitionId(1) + .setOffset(1L) + .setErrorCode(HttpStatus.OK_200) + .build(); + ResultOrError sucessResult = ResultOrError.result(produceResponse); + + ResultOrError error = + ResultOrError.error( + ErrorResponse.create( + 429, + "Backlog of messages waiting to be sent to Kafka is too large: Not " + + "sending to Kafka.")); + + ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); + ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); + + expect(clock.instant()) + .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response + + expect(requestsMappingIterator.hasNext()).andReturn(true); // first message + expect(clock.instant()) + .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. before timeout + expect(requestsMappingIterator.nextValue()).andReturn(request); + expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); + expect(mockedChunkedOutputFactory.getChunkedOutput()) + .andReturn(mockedChunkedOutput); // todo see if we can remove this extra cration of an async + // response quueue due to the inheritance + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); + // second message + expect(requestsMappingIterator.hasNext()).andReturn(true); // second message + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) + .andAnswer( + () -> { + // produceResponseFuture.complete(produceResponse); + return request; + }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(error); + + // third message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) + .andAnswer( + () -> { + produceResponseFuture.complete(produceResponse); + return request; + }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); expect(requestsMappingIterator.hasNext()) .andAnswer( @@ -335,28 +500,18 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); - StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory( - mockedChunkedOutputFactory, Duration.ofMillis(timeout), Duration.ofMillis(50)); - StreamingResponse streamingResponse = StreamingResponse.fromWithClock( new JsonStream<>(() -> requestsMappingIterator), mockedChunkedOutputFactory, Duration.ofMillis(timeout), Duration.ofMillis(50), + 1, + 2, clock); - CompletableFuture produceResponseFuture = new CompletableFuture<>(); - produceResponseFuture.complete(produceResponse); - FakeAsyncResponse response = new FakeAsyncResponse(); - streamingResponse - .compose( - result -> { - return produceResponseFuture; - }) - .resume(response); + streamingResponse.compose(result -> produceResponseFuture).resume(response); EasyMock.verify(mockedChunkedOutput); EasyMock.verify(mockedChunkedOutputFactory); diff --git a/pom.xml b/pom.xml index 51a6b81355..897eb5a7ce 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,12 @@ $(hamcrest.version) test + + org.awaitility + awaitility + 4.0.2 + test +