From 700f95647fbde323ef2f4551f1de4532fd62efbe Mon Sep 17 00:00:00 2001 From: Emma Humber Date: Mon, 18 Jul 2022 15:09:53 +0100 Subject: [PATCH] KREST-2746 passing unit tests --- kafka-rest/pom.xml | 6 - .../confluent/kafkarest/KafkaRestConfig.java | 6 +- .../kafkarest/response/JsonStream.java | 3 - .../kafkarest/response/StreamingResponse.java | 69 +++--- .../response/StreamingResponseTest.java | 199 +++++++++++------- pom.xml | 6 - 6 files changed, 156 insertions(+), 133 deletions(-) diff --git a/kafka-rest/pom.xml b/kafka-rest/pom.xml index 3281975c7f..b3f193aa79 100644 --- a/kafka-rest/pom.xml +++ b/kafka-rest/pom.xml @@ -156,12 +156,6 @@ hamcrest-all test - - org.awaitility - awaitility - 4.0.2 - test - diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java index d174bb9197..554cba2664 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java @@ -466,8 +466,11 @@ public class KafkaRestConfig extends RestConfig { "streaming.connection.max.duration.grace.period.ms"; private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC = "How long after a streaming connection reaches its maximum duration outstanding " - + "requests will be processed for before the connection is closed."; + + "requests will be processed for before the connection is closed. " + + "Maximum value is 10 seconds."; private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500"; + public static final ConfigDef.Range STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR = + ConfigDef.Range.between(1, 10000); public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH = "streaming.response.queue.throttle.depth"; @@ -864,6 +867,7 @@ protected static ConfigDef baseKafkaRestConfigDef() { STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS, Type.LONG, STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT, + STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR, Importance.LOW, STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC) .define( diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java index 072f7ba740..3aee9c60ae 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java @@ -49,11 +49,8 @@ public T nextValue() throws IOException { @Override public void close() throws IOException { - System.out.println("CLOSE in json stream which calls mapping iterator close"); if (delegate.get() != null) { - System.out.println("delegate not null" + delegate.get().getClass()); delegate.get().close(); - System.out.println("delegate now closed"); } } } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java index d780574198..64d5a5bda1 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java @@ -73,6 +73,7 @@ public abstract class StreamingResponse { private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class); private static final int ONE_SECOND_MS = 1000; + private static final int MAX_CLOSE_RETRIES = 2; private static final CompositeErrorMapper EXCEPTION_MAPPER = new CompositeErrorMapper.Builder() @@ -189,7 +190,7 @@ public final StreamingResponse compose( * written to {@code asyncResponse} asynchronously. */ public final void resume(AsyncResponse asyncResponse) throws InterruptedException { - log.error("Resuming StreamingResponse - about to call new async response queue"); + log.debug("Resuming StreamingResponse"); AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory); responseQueue.asyncResume(asyncResponse); ScheduledExecutorService executorService = null; @@ -210,32 +211,25 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio } } } catch (Exception e) { - log.error("Exception thrown when processing stream ", e); + log.debug("Exception thrown when processing stream ", e); responseQueue.push( CompletableFuture.completedFuture( ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e)))); } finally { - System.out.println( - "why isfinally triggered closing started" - + closingStarted); // can't call hasnext because of the mocking // if there are still outstanding response to send back, for example hasNext has returned // false, but the executorService hasn't triggered yet, give everything a chance to - // complete within the grace period if there are still outstanding responses. - System.out.println("FINALLY before tail length wait"); - if (responseQueue.getTailLength().get() > 0) { - System.out.println("in tail length wait"); - Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE + // complete within the grace period plus a little bit. + int retries = 0; + while (!closingFinished && retries < MAX_CLOSE_RETRIES) { + retries++; + Thread.sleep(gracePeriod.toMillis() / retries); } - System.out.println("FINALL calling a close " + this.getClass()); - //if (!closingFinished) { - close(); - System.out.println("FINALLY calling responseQueue close" + this.getClass()); - responseQueue.close(); - //} + close(); + responseQueue.close(); if (executorService != null) { executorService.shutdown(); try { - if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { @@ -245,34 +239,34 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio } } - private void handleResponseQueueDepthTooLarge( + private void handleOverMaxStreamDuration( ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { - if (responseQueue.getTailLength().get() >= disconnectDepth) { - triggerDelayedClose(executorService, responseQueue); - } + triggerDelayedClose(executorService, responseQueue); next(); responseQueue.push( CompletableFuture.completedFuture( ResultOrError.error( EXCEPTION_MAPPER.toErrorResponse( new StatusCodeException( - Status.TOO_MANY_REQUESTS, - "Backlog of messages waiting to be sent to Kafka is too large", - "Not sending to Kafka."))))); + Status.REQUEST_TIMEOUT, + "Streaming connection open for longer than allowed", + "Connection will be closed."))))); } - private void handleOverMaxStreamDuration( + private void handleResponseQueueDepthTooLarge( ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { - triggerDelayedClose(executorService, responseQueue); + if (responseQueue.getTailLength().get() >= disconnectDepth) { + triggerDelayedClose(executorService, responseQueue); + } next(); responseQueue.push( CompletableFuture.completedFuture( ResultOrError.error( EXCEPTION_MAPPER.toErrorResponse( new StatusCodeException( - Status.REQUEST_TIMEOUT, - "Streaming connection open for longer than allowed", - "Connection will be closed."))))); + Status.TOO_MANY_REQUESTS, + "Backlog of messages waiting to be sent to Kafka is too large", + "Not sending to Kafka."))))); } private void triggerDelayedClose( @@ -318,7 +312,7 @@ private InputStreamingResponse( } public void closeAll(AsyncResponseQueue responseQueue) { - System.out.println("!!! do nothing"); + // Never called } public void close() { @@ -382,17 +376,10 @@ private ComposingStreamingResponse( @Override protected void closeAll(AsyncResponseQueue responseQueue) { - System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass()); streamingResponseInput.closingStarted = true; closingStarted = true; - System.out.println( - "THREAD EXECUTOR closeAll from thread executor call through to a sub close " - + this.getClass()); close(); - System.out.println( - "THREAD EXECUTOR closeAll from thread executor calling responsequeue.close"); responseQueue.close(); - System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close"); streamingResponseInput.closingFinished = true; closingFinished = true; } @@ -420,9 +407,6 @@ public CompletableFuture next() { } public void close() { - System.out.println( - "CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close" - + this.getClass()); streamingResponseInput.close(); } } @@ -469,7 +453,6 @@ private void push(CompletableFuture result) { unused -> { try { if (sinkClosed || sink.isClosed()) { - System.out.println("closing sink " + this.getClass()); sinkClosed = true; return null; } @@ -485,14 +468,10 @@ private void push(CompletableFuture result) { } private void close() { - System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass()); tail.whenComplete( (unused, throwable) -> { try { sinkClosed = true; - System.out.println( - "*** actually closing tail from FINALLY or TRHEAD (Is an Async call)" - + this.getClass()); sink.close(); } catch (IOException e) { log.error("Error when closing response channel.", e); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java index e0d414104d..8c24318434 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java @@ -33,17 +33,15 @@ import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import org.easymock.EasyMock; import org.eclipse.jetty.http.HttpStatus; import org.glassfish.jersey.server.ChunkedOutput; import org.junit.jupiter.api.Test; -// import org.awaitility.Awaitility; - public class StreamingResponseTest { private static final Duration DURATION = Duration.ofMillis(5000); + private static final Duration GRACE_DURATION = Duration.ofMillis(1); private static final int DEPTH = 100; @Test @@ -96,7 +94,8 @@ public void testGracePeriodExceededExceptionThrown() throws IOException, Interru replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -160,7 +159,8 @@ public void testWriteToChunkedOutput() throws IOException, InterruptedException replay(mockedChunkedOutput, mockedChunkedOutputFactory); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator)); @@ -205,7 +205,8 @@ public void testHasNextMappingException() throws IOException, InterruptedExcepti replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -245,7 +246,8 @@ public void testHasNextRuntimeException() throws IOException, InterruptedExcepti replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -299,7 +301,8 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt ResultOrError.error( ErrorResponse.create( 408, - "Streaming connection open for longer than allowed: Connection will be closed.")); + "Streaming connection open for longer than allowed: " + + "Connection will be closed.")); ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); @@ -307,9 +310,9 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt expect(clock.instant()) .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response - //first message - expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response - expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK + // first message + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + expect(requestsMappingIterator.hasNext()).andReturn(true); expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout @@ -317,7 +320,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(sucessResult); - //second message + // second message expect(requestsMappingIterator.hasNext()).andReturn(true); expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) @@ -325,31 +328,14 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(error); - //no third message - the thread executor should kick in after 50 ms to close the mapping iterator - //and the chunkedOutput at this point: + // no third message + expect(requestsMappingIterator.hasNext()).andReturn(false); requestsMappingIterator.close(); // call from thread executor - mockedChunkedOutput.close(); //call from thread executor - - - final CompletableFuture finished = new CompletableFuture<>(); - //we don't let the hasNext fire until the "finished" future has completed - //This future is defined below, after the streaming response, because we need information from the streaming response to create it - //It completes when the two close calls from the "closeAll" called by the Thread executor have both fired - - expect(requestsMappingIterator.hasNext()) - .andAnswer( - () -> { // return hasnext true AFTER the thread executor has timed out and closed the - // connections. Then expect no other calls except the connection closing. - System.out.println("** has next delayed response"); - while (!finished.isDone()) { - Thread.sleep(10); - } - return false; - }); + mockedChunkedOutput.close(); // call from thread executor requestsMappingIterator.close(); // call from finally - mockedChunkedOutput.close(); //call from finally + mockedChunkedOutput.close(); // call from finally replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); @@ -363,23 +349,6 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt DEPTH, clock); - //This finished future fires when the streamingResponse.closingFinished volatile boolean fires. This happens when the thread executor has finished doing its thing and doing the two closes - Executors.newCachedThreadPool() - .submit( - () -> { - try { - System.out.println( - "&&& TEST thread streaming response object" + streamingResponse.getClass()); - while (!streamingResponse.closingFinished) { - Thread.sleep(100); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - System.out.println("&& test thread completing future"); - finished.complete(true); - }); - CompletableFuture produceResponseFuture = new CompletableFuture<>(); produceResponseFuture.complete(produceResponse); @@ -397,7 +366,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt } @Test - public void testSlowWritesToKafka429ThenDisconnect() throws IOException, InterruptedException { + public void testSlowWritesToKafka429() throws IOException, InterruptedException { String key = "foo"; String value = "bar"; @@ -445,57 +414,143 @@ public void testSlowWritesToKafka429ThenDisconnect() throws IOException, Interru ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); - expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response - expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); - expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. before timeout + // first message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); expect(requestsMappingIterator.nextValue()).andReturn(request); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(1)); expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); - expect(mockedChunkedOutputFactory.getChunkedOutput()) - .andReturn(mockedChunkedOutput); // todo see if we can remove this extra cration of an async - // response quueue due to the inheritance expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(sucessResult); // second message - expect(requestsMappingIterator.hasNext()).andReturn(true); // second message + expect(requestsMappingIterator.hasNext()).andReturn(true); expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); expect(requestsMappingIterator.nextValue()) .andAnswer( () -> { - // produceResponseFuture.complete(produceResponse); + produceResponseFuture.complete(produceResponse); return request; }); expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(error); - // third message + expect(requestsMappingIterator.hasNext()).andReturn(false); + + requestsMappingIterator.close(); // closes from the finally + mockedChunkedOutput.close(); + + replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); + + StreamingResponse streamingResponse = + StreamingResponse.fromWithClock( + new JsonStream<>(() -> requestsMappingIterator), + mockedChunkedOutputFactory, + Duration.ofMillis(timeout), + Duration.ofMillis(50), + 1, + 2, + clock); + + FakeAsyncResponse response = new FakeAsyncResponse(); + streamingResponse.compose(result -> produceResponseFuture).resume(response); + + EasyMock.verify(mockedChunkedOutput); + EasyMock.verify(mockedChunkedOutputFactory); + EasyMock.verify(requestsMappingIterator); + EasyMock.verify(clock); + } + + @Test + public void testSlowWritesToKafkaDisconnects() throws IOException, InterruptedException { + + String key = "foo"; + String value = "bar"; + ProduceRequest request = + ProduceRequest.builder() + .setKey( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(key)) + .build()) + .setValue( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(value)) + .build()) + .setOriginalSize(0L) + .build(); + + MappingIterator requestsMappingIterator = mock(MappingIterator.class); + + long timeout = 10; + Clock clock = mock(Clock.class); + + CompletableFuture produceResponseFuture = new CompletableFuture<>(); + + ProduceResponse produceResponse = + ProduceResponse.builder() + .setClusterId("clusterId") + .setTopicName("topicName") + .setPartitionId(1) + .setOffset(1L) + .setErrorCode(HttpStatus.OK_200) + .build(); + ResultOrError sucessResult = ResultOrError.result(produceResponse); + + ResultOrError error = + ResultOrError.error( + ErrorResponse.create( + 429, + "Backlog of messages waiting to be sent to Kafka is too large: Not " + + "sending to Kafka.")); + + ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); + ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); + + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + + // first message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + expect(requestsMappingIterator.nextValue()).andReturn(request); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(1)); + expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); + + // second message expect(requestsMappingIterator.hasNext()).andReturn(true); expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); expect(requestsMappingIterator.nextValue()) .andAnswer( () -> { - produceResponseFuture.complete(produceResponse); return request; }); - expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(error); - expect(requestsMappingIterator.hasNext()) + // third message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) .andAnswer( - () -> { // return hasnext true AFTER the thread executor has timed out and closed the - // connections. Then expect no other calls except the connection closing. - Thread.sleep(500); - return true; + () -> { + produceResponseFuture.complete(produceResponse); + return request; }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); + + expect(requestsMappingIterator.hasNext()).andReturn(false); - requestsMappingIterator.close(); // this ensures the closes have been called + requestsMappingIterator.close(); // closes from thread executor mockedChunkedOutput.close(); - requestsMappingIterator.close(); // expect twice - one from the thread and one from the finally + requestsMappingIterator.close(); // closes from finally mockedChunkedOutput.close(); replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); diff --git a/pom.xml b/pom.xml index 897eb5a7ce..51a6b81355 100644 --- a/pom.xml +++ b/pom.xml @@ -122,12 +122,6 @@ $(hamcrest.version) test - - org.awaitility - awaitility - 4.0.2 - test -