diff --git a/kafka-rest/pom.xml b/kafka-rest/pom.xml
index 3281975c7f..b3f193aa79 100644
--- a/kafka-rest/pom.xml
+++ b/kafka-rest/pom.xml
@@ -156,12 +156,6 @@
hamcrest-all
test
-
- org.awaitility
- awaitility
- 4.0.2
- test
-
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java
index d174bb9197..554cba2664 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java
@@ -466,8 +466,11 @@ public class KafkaRestConfig extends RestConfig {
"streaming.connection.max.duration.grace.period.ms";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC =
"How long after a streaming connection reaches its maximum duration outstanding "
- + "requests will be processed for before the connection is closed.";
+ + "requests will be processed for before the connection is closed. "
+ + "Maximum value is 10 seconds.";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";
+ public static final ConfigDef.Range STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR =
+ ConfigDef.Range.between(1, 10000);
public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH =
"streaming.response.queue.throttle.depth";
@@ -864,6 +867,7 @@ protected static ConfigDef baseKafkaRestConfigDef() {
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS,
Type.LONG,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
+ STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR,
Importance.LOW,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
.define(
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java
index 072f7ba740..3aee9c60ae 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/JsonStream.java
@@ -49,11 +49,8 @@ public T nextValue() throws IOException {
@Override
public void close() throws IOException {
- System.out.println("CLOSE in json stream which calls mapping iterator close");
if (delegate.get() != null) {
- System.out.println("delegate not null" + delegate.get().getClass());
delegate.get().close();
- System.out.println("delegate now closed");
}
}
}
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java
index d780574198..64d5a5bda1 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java
@@ -73,6 +73,7 @@ public abstract class StreamingResponse {
private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class);
private static final int ONE_SECOND_MS = 1000;
+ private static final int MAX_CLOSE_RETRIES = 2;
private static final CompositeErrorMapper EXCEPTION_MAPPER =
new CompositeErrorMapper.Builder()
@@ -189,7 +190,7 @@ public final StreamingResponse compose(
* written to {@code asyncResponse} asynchronously.
*/
public final void resume(AsyncResponse asyncResponse) throws InterruptedException {
- log.error("Resuming StreamingResponse - about to call new async response queue");
+ log.debug("Resuming StreamingResponse");
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
responseQueue.asyncResume(asyncResponse);
ScheduledExecutorService executorService = null;
@@ -210,32 +211,25 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}
} catch (Exception e) {
- log.error("Exception thrown when processing stream ", e);
+ log.debug("Exception thrown when processing stream ", e);
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
} finally {
- System.out.println(
- "why isfinally triggered closing started"
- + closingStarted); // can't call hasnext because of the mocking
// if there are still outstanding response to send back, for example hasNext has returned
// false, but the executorService hasn't triggered yet, give everything a chance to
- // complete within the grace period if there are still outstanding responses.
- System.out.println("FINALLY before tail length wait");
- if (responseQueue.getTailLength().get() > 0) {
- System.out.println("in tail length wait");
- Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE
+ // complete within the grace period plus a little bit.
+ int retries = 0;
+ while (!closingFinished && retries < MAX_CLOSE_RETRIES) {
+ retries++;
+ Thread.sleep(gracePeriod.toMillis() / retries);
}
- System.out.println("FINALL calling a close " + this.getClass());
- //if (!closingFinished) {
- close();
- System.out.println("FINALLY calling responseQueue close" + this.getClass());
- responseQueue.close();
- //}
+ close();
+ responseQueue.close();
if (executorService != null) {
executorService.shutdown();
try {
- if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) {
+ if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
@@ -245,34 +239,34 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}
- private void handleResponseQueueDepthTooLarge(
+ private void handleOverMaxStreamDuration(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
- if (responseQueue.getTailLength().get() >= disconnectDepth) {
- triggerDelayedClose(executorService, responseQueue);
- }
+ triggerDelayedClose(executorService, responseQueue);
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
- Status.TOO_MANY_REQUESTS,
- "Backlog of messages waiting to be sent to Kafka is too large",
- "Not sending to Kafka.")))));
+ Status.REQUEST_TIMEOUT,
+ "Streaming connection open for longer than allowed",
+ "Connection will be closed.")))));
}
- private void handleOverMaxStreamDuration(
+ private void handleResponseQueueDepthTooLarge(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
- triggerDelayedClose(executorService, responseQueue);
+ if (responseQueue.getTailLength().get() >= disconnectDepth) {
+ triggerDelayedClose(executorService, responseQueue);
+ }
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
- Status.REQUEST_TIMEOUT,
- "Streaming connection open for longer than allowed",
- "Connection will be closed.")))));
+ Status.TOO_MANY_REQUESTS,
+ "Backlog of messages waiting to be sent to Kafka is too large",
+ "Not sending to Kafka.")))));
}
private void triggerDelayedClose(
@@ -318,7 +312,7 @@ private InputStreamingResponse(
}
public void closeAll(AsyncResponseQueue responseQueue) {
- System.out.println("!!! do nothing");
+ // Never called
}
public void close() {
@@ -382,17 +376,10 @@ private ComposingStreamingResponse(
@Override
protected void closeAll(AsyncResponseQueue responseQueue) {
- System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass());
streamingResponseInput.closingStarted = true;
closingStarted = true;
- System.out.println(
- "THREAD EXECUTOR closeAll from thread executor call through to a sub close "
- + this.getClass());
close();
- System.out.println(
- "THREAD EXECUTOR closeAll from thread executor calling responsequeue.close");
responseQueue.close();
- System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close");
streamingResponseInput.closingFinished = true;
closingFinished = true;
}
@@ -420,9 +407,6 @@ public CompletableFuture next() {
}
public void close() {
- System.out.println(
- "CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close"
- + this.getClass());
streamingResponseInput.close();
}
}
@@ -469,7 +453,6 @@ private void push(CompletableFuture result) {
unused -> {
try {
if (sinkClosed || sink.isClosed()) {
- System.out.println("closing sink " + this.getClass());
sinkClosed = true;
return null;
}
@@ -485,14 +468,10 @@ private void push(CompletableFuture result) {
}
private void close() {
- System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass());
tail.whenComplete(
(unused, throwable) -> {
try {
sinkClosed = true;
- System.out.println(
- "*** actually closing tail from FINALLY or TRHEAD (Is an Async call)"
- + this.getClass());
sink.close();
} catch (IOException e) {
log.error("Error when closing response channel.", e);
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
index e0d414104d..8c24318434 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
@@ -33,17 +33,15 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.eclipse.jetty.http.HttpStatus;
import org.glassfish.jersey.server.ChunkedOutput;
import org.junit.jupiter.api.Test;
-// import org.awaitility.Awaitility;
-
public class StreamingResponseTest {
private static final Duration DURATION = Duration.ofMillis(5000);
+ private static final Duration GRACE_DURATION = Duration.ofMillis(1);
private static final int DEPTH = 100;
@Test
@@ -96,7 +94,8 @@ public void testGracePeriodExceededExceptionThrown() throws IOException, Interru
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -160,7 +159,8 @@ public void testWriteToChunkedOutput() throws IOException, InterruptedException
replay(mockedChunkedOutput, mockedChunkedOutputFactory);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator));
@@ -205,7 +205,8 @@ public void testHasNextMappingException() throws IOException, InterruptedExcepti
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -245,7 +246,8 @@ public void testHasNextRuntimeException() throws IOException, InterruptedExcepti
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -299,7 +301,8 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
ResultOrError.error(
ErrorResponse.create(
408,
- "Streaming connection open for longer than allowed: Connection will be closed."));
+ "Streaming connection open for longer than allowed: "
+ + "Connection will be closed."));
ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
@@ -307,9 +310,9 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
- //first message
- expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
- expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK
+ // first message
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout
@@ -317,7 +320,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(sucessResult);
- //second message
+ // second message
expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
@@ -325,31 +328,14 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- //no third message - the thread executor should kick in after 50 ms to close the mapping iterator
- //and the chunkedOutput at this point:
+ // no third message
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
requestsMappingIterator.close(); // call from thread executor
- mockedChunkedOutput.close(); //call from thread executor
-
-
- final CompletableFuture finished = new CompletableFuture<>();
- //we don't let the hasNext fire until the "finished" future has completed
- //This future is defined below, after the streaming response, because we need information from the streaming response to create it
- //It completes when the two close calls from the "closeAll" called by the Thread executor have both fired
-
- expect(requestsMappingIterator.hasNext())
- .andAnswer(
- () -> { // return hasnext true AFTER the thread executor has timed out and closed the
- // connections. Then expect no other calls except the connection closing.
- System.out.println("** has next delayed response");
- while (!finished.isDone()) {
- Thread.sleep(10);
- }
- return false;
- });
+ mockedChunkedOutput.close(); // call from thread executor
requestsMappingIterator.close(); // call from finally
- mockedChunkedOutput.close(); //call from finally
+ mockedChunkedOutput.close(); // call from finally
replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
@@ -363,23 +349,6 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
DEPTH,
clock);
- //This finished future fires when the streamingResponse.closingFinished volatile boolean fires. This happens when the thread executor has finished doing its thing and doing the two closes
- Executors.newCachedThreadPool()
- .submit(
- () -> {
- try {
- System.out.println(
- "&&& TEST thread streaming response object" + streamingResponse.getClass());
- while (!streamingResponse.closingFinished) {
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("&& test thread completing future");
- finished.complete(true);
- });
-
CompletableFuture produceResponseFuture = new CompletableFuture<>();
produceResponseFuture.complete(produceResponse);
@@ -397,7 +366,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
}
@Test
- public void testSlowWritesToKafka429ThenDisconnect() throws IOException, InterruptedException {
+ public void testSlowWritesToKafka429() throws IOException, InterruptedException {
String key = "foo";
String value = "bar";
@@ -445,57 +414,143 @@ public void testSlowWritesToKafka429ThenDisconnect() throws IOException, Interru
ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
- expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
- expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
- expect(requestsMappingIterator.hasNext()).andReturn(true); // first message
- expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. before timeout
+ // first message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
expect(requestsMappingIterator.nextValue()).andReturn(request);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(1));
expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
- expect(mockedChunkedOutputFactory.getChunkedOutput())
- .andReturn(mockedChunkedOutput); // todo see if we can remove this extra cration of an async
- // response quueue due to the inheritance
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(sucessResult);
// second message
- expect(requestsMappingIterator.hasNext()).andReturn(true); // second message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
expect(requestsMappingIterator.nextValue())
.andAnswer(
() -> {
- // produceResponseFuture.complete(produceResponse);
+ produceResponseFuture.complete(produceResponse);
return request;
});
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- // third message
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
+
+ requestsMappingIterator.close(); // closes from the finally
+ mockedChunkedOutput.close();
+
+ replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
+
+ StreamingResponse streamingResponse =
+ StreamingResponse.fromWithClock(
+ new JsonStream<>(() -> requestsMappingIterator),
+ mockedChunkedOutputFactory,
+ Duration.ofMillis(timeout),
+ Duration.ofMillis(50),
+ 1,
+ 2,
+ clock);
+
+ FakeAsyncResponse response = new FakeAsyncResponse();
+ streamingResponse.compose(result -> produceResponseFuture).resume(response);
+
+ EasyMock.verify(mockedChunkedOutput);
+ EasyMock.verify(mockedChunkedOutputFactory);
+ EasyMock.verify(requestsMappingIterator);
+ EasyMock.verify(clock);
+ }
+
+ @Test
+ public void testSlowWritesToKafkaDisconnects() throws IOException, InterruptedException {
+
+ String key = "foo";
+ String value = "bar";
+ ProduceRequest request =
+ ProduceRequest.builder()
+ .setKey(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(key))
+ .build())
+ .setValue(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(value))
+ .build())
+ .setOriginalSize(0L)
+ .build();
+
+ MappingIterator requestsMappingIterator = mock(MappingIterator.class);
+
+ long timeout = 10;
+ Clock clock = mock(Clock.class);
+
+ CompletableFuture produceResponseFuture = new CompletableFuture<>();
+
+ ProduceResponse produceResponse =
+ ProduceResponse.builder()
+ .setClusterId("clusterId")
+ .setTopicName("topicName")
+ .setPartitionId(1)
+ .setOffset(1L)
+ .setErrorCode(HttpStatus.OK_200)
+ .build();
+ ResultOrError sucessResult = ResultOrError.result(produceResponse);
+
+ ResultOrError error =
+ ResultOrError.error(
+ ErrorResponse.create(
+ 429,
+ "Backlog of messages waiting to be sent to Kafka is too large: Not "
+ + "sending to Kafka."));
+
+ ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
+ ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
+
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+
+ // first message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+ expect(requestsMappingIterator.nextValue()).andReturn(request);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(1));
+ expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(sucessResult);
+
+ // second message
expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
expect(requestsMappingIterator.nextValue())
.andAnswer(
() -> {
- produceResponseFuture.complete(produceResponse);
return request;
});
-
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- expect(requestsMappingIterator.hasNext())
+ // third message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
+ expect(requestsMappingIterator.nextValue())
.andAnswer(
- () -> { // return hasnext true AFTER the thread executor has timed out and closed the
- // connections. Then expect no other calls except the connection closing.
- Thread.sleep(500);
- return true;
+ () -> {
+ produceResponseFuture.complete(produceResponse);
+ return request;
});
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(error);
+
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
- requestsMappingIterator.close(); // this ensures the closes have been called
+ requestsMappingIterator.close(); // closes from thread executor
mockedChunkedOutput.close();
- requestsMappingIterator.close(); // expect twice - one from the thread and one from the finally
+ requestsMappingIterator.close(); // closes from finally
mockedChunkedOutput.close();
replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
diff --git a/pom.xml b/pom.xml
index 897eb5a7ce..51a6b81355 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,12 +122,6 @@
$(hamcrest.version)
test
-
- org.awaitility
- awaitility
- 4.0.2
- test
-