Skip to content

Commit

Permalink
KREST-2746 passing unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ehumber committed Jul 19, 2022
1 parent d3ef908 commit 9a95641
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 145 deletions.
6 changes: 0 additions & 6 deletions kafka-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,11 @@ public class KafkaRestConfig extends RestConfig {
"streaming.connection.max.duration.grace.period.ms";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC =
"How long after a streaming connection reaches its maximum duration outstanding "
+ "requests will be processed for before the connection is closed.";
+ "requests will be processed for before the connection is closed. "
+ "Maximum value is 10 seconds.";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";
public static final ConfigDef.Range STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR =
ConfigDef.Range.between(1, 10000);

public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH =
"streaming.response.queue.throttle.depth";
Expand Down Expand Up @@ -864,6 +867,7 @@ protected static ConfigDef baseKafkaRestConfigDef() {
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS,
Type.LONG,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR,
Importance.LOW,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ public T nextValue() throws IOException {

@Override
public void close() throws IOException {
System.out.println("CLOSE in json stream which calls mapping iterator close");
if (delegate.get() != null) {
System.out.println("delegate not null" + delegate.get().getClass());
delegate.get().close();
System.out.println("delegate now closed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public abstract class StreamingResponse<T> {

private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class);
private static final int ONE_SECOND_MS = 1000;
private static final int MAX_CLOSE_RETRIES = 2;

private static final CompositeErrorMapper EXCEPTION_MAPPER =
new CompositeErrorMapper.Builder()
Expand Down Expand Up @@ -188,8 +189,8 @@ public final <O> StreamingResponse<O> compose(
* <p>This method will block until all requests are read in. The responses are computed and
* written to {@code asyncResponse} asynchronously.
*/
public final void resume(AsyncResponse asyncResponse) throws InterruptedException {
log.error("Resuming StreamingResponse - about to call new async response queue");
public final void resume(AsyncResponse asyncResponse) {
log.debug("Resuming StreamingResponse");
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
responseQueue.asyncResume(asyncResponse);
ScheduledExecutorService executorService = null;
Expand All @@ -210,32 +211,29 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}
} catch (Exception e) {
log.error("Exception thrown when processing stream ", e);
log.debug("Exception thrown when processing stream ", e);
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
} finally {
System.out.println(
"why isfinally triggered closing started"
+ closingStarted); // can't call hasnext because of the mocking
// if there are still outstanding response to send back, for example hasNext has returned
// false, but the executorService hasn't triggered yet, give everything a chance to
// complete within the grace period if there are still outstanding responses.
System.out.println("FINALLY before tail length wait");
if (responseQueue.getTailLength().get() > 0) {
System.out.println("in tail length wait");
Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE
// complete within the grace period plus a little bit.
int retries = 0;
while (!closingFinished && retries < MAX_CLOSE_RETRIES) {
retries++;
try {
Thread.sleep(gracePeriod.toMillis() / retries);
} catch (InterruptedException e) {
// do nothing
}
}
System.out.println("FINALL calling a close " + this.getClass());
//if (!closingFinished) {
close();
System.out.println("FINALLY calling responseQueue close" + this.getClass());
responseQueue.close();
//}
close();
responseQueue.close();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) {
if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
Expand All @@ -245,34 +243,34 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}

private void handleResponseQueueDepthTooLarge(
private void handleOverMaxStreamDuration(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
if (responseQueue.getTailLength().get() >= disconnectDepth) {
triggerDelayedClose(executorService, responseQueue);
}
triggerDelayedClose(executorService, responseQueue);
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
Status.TOO_MANY_REQUESTS,
"Backlog of messages waiting to be sent to Kafka is too large",
"Not sending to Kafka.")))));
Status.REQUEST_TIMEOUT,
"Streaming connection open for longer than allowed",
"Connection will be closed.")))));
}

private void handleOverMaxStreamDuration(
private void handleResponseQueueDepthTooLarge(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
triggerDelayedClose(executorService, responseQueue);
if (responseQueue.getTailLength().get() >= disconnectDepth) {
triggerDelayedClose(executorService, responseQueue);
}
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
Status.REQUEST_TIMEOUT,
"Streaming connection open for longer than allowed",
"Connection will be closed.")))));
Status.TOO_MANY_REQUESTS,
"Backlog of messages waiting to be sent to Kafka is too large",
"Not sending to Kafka.")))));
}

private void triggerDelayedClose(
Expand Down Expand Up @@ -318,13 +316,10 @@ private InputStreamingResponse(
}

public void closeAll(AsyncResponseQueue responseQueue) {
System.out.println("!!! do nothing");
// Never called
}

public void close() {
System.out.println(
"CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming close"
+ this.getClass());
try {
inputStream.close();
} catch (IOException e) {
Expand Down Expand Up @@ -382,17 +377,10 @@ private ComposingStreamingResponse(

@Override
protected void closeAll(AsyncResponseQueue responseQueue) {
System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass());
streamingResponseInput.closingStarted = true;
closingStarted = true;
System.out.println(
"THREAD EXECUTOR closeAll from thread executor call through to a sub close "
+ this.getClass());
close();
System.out.println(
"THREAD EXECUTOR closeAll from thread executor calling responsequeue.close");
responseQueue.close();
System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close");
streamingResponseInput.closingFinished = true;
closingFinished = true;
}
Expand Down Expand Up @@ -420,9 +408,6 @@ public CompletableFuture<O> next() {
}

public void close() {
System.out.println(
"CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close"
+ this.getClass());
streamingResponseInput.close();
}
}
Expand Down Expand Up @@ -469,7 +454,6 @@ private void push(CompletableFuture<ResultOrError> result) {
unused -> {
try {
if (sinkClosed || sink.isClosed()) {
System.out.println("closing sink " + this.getClass());
sinkClosed = true;
return null;
}
Expand All @@ -485,14 +469,10 @@ private void push(CompletableFuture<ResultOrError> result) {
}

private void close() {
System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass());
tail.whenComplete(
(unused, throwable) -> {
try {
sinkClosed = true;
System.out.println(
"*** actually closing tail from FINALLY or TRHEAD (Is an Async call)"
+ this.getClass());
sink.close();
} catch (IOException e) {
log.error("Error when closing response channel.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
public class ProduceActionTest {

private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000);
private static final Duration FIVE_MS = Duration.ofMillis(5);
private static final int DEPTH = 100;

@AfterAll
Expand Down Expand Up @@ -840,8 +841,7 @@ private static ProduceAction getProduceAction(
replay(produceControllerProvider, produceController);

StreamingResponseFactory streamingResponseFactory =
new StreamingResponseFactory(
chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS, DEPTH, DEPTH);
new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_MS, DEPTH, DEPTH);

// get the current thread so that the call counts can be seen by easy mock
ExecutorService executorService = MoreExecutors.newDirectExecutorService();
Expand Down
Loading

0 comments on commit 9a95641

Please sign in to comment.