Skip to content

Commit

Permalink
KREST-2746 Reflect slow responses from Kafka back to the http client
Browse files Browse the repository at this point in the history
  • Loading branch information
ehumber committed Jul 18, 2022
1 parent af54143 commit d3ef908
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 66 deletions.
6 changes: 6 additions & 0 deletions kafka-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,24 @@ public class KafkaRestConfig extends RestConfig {
+ "requests will be processed for before the connection is closed.";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";

public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH =
"streaming.response.queue.throttle.depth";
private static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC =
"The maximum depth of the response queue that is used to hold requests as they are being "
+ "processed by Kafka. If this queue grows too long it indicates that Kafka is "
+ "processing messages more slowly than the user is sending them. After this "
+ "queue depth is reached, then all requests will receive a 429 response until the"
+ "queue depth returns under the limit.";
private static final Integer STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT = 100;

public static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH =
"streaming.response.queue.disconnect.depth";
private static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC =
"The maximum depth of the response queue that is used to hold requests as they are being"
+ "processed by Kafka. If the queue depth grows beyond this limit then the connection"
+ "is closed.";
private static final Integer STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT = 200;

private static final ConfigDef config;
private volatile Metrics metrics;

Expand Down Expand Up @@ -847,7 +865,19 @@ protected static ConfigDef baseKafkaRestConfigDef() {
Type.LONG,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
Importance.LOW,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC);
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
.define(
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH,
Type.INT,
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT,
Importance.LOW,
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC)
.define(
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH,
Type.INT,
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT,
Importance.LOW,
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC);
}

private static Properties getPropsFromFile(String propsFile) throws RestConfigException {
Expand Down Expand Up @@ -1088,6 +1118,14 @@ public final Duration getStreamingConnectionMaxDurationGracePeriod() {
return Duration.ofMillis(getLong(STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS));
}

public final Integer getStreamingConnectionMaxQueueDepthBeforeThrottling() {
return getInt(STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH);
}

public final Integer getStreamingConnectionMaxQueueDepthBeforeDisconnect() {
return getInt(STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH);
}

public final int getRateLimitDefaultCost() {
return getInt(RATE_LIMIT_DEFAULT_COST_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ protected void configure() {
.qualifiedBy(new StreamingConnectionMaxDurationGracePeriodImpl())
.to(Duration.class);

bind(config.getStreamingConnectionMaxQueueDepthBeforeThrottling())
.qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl())
.to(Integer.class);

bind(config.getStreamingConnectionMaxQueueDepthBeforeDisconnect())
.qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl())
.to(Integer.class);

bind(config.getSchemaRegistryConfigs())
.qualifiedBy(new SchemaRegistryConfigsImpl())
.to(new TypeLiteral<Map<String, Object>>() {});
Expand Down Expand Up @@ -441,5 +449,23 @@ private static final class StreamingConnectionMaxDurationConfigImpl
private static final class StreamingConnectionMaxDurationGracePeriodImpl
extends AnnotationLiteral<StreamingMaxConnectionGracePeriod>
implements StreamingMaxConnectionGracePeriod {}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface getStreamingConnectionMaxQueueDepthBeforeThrottling {}

private static final class getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl
extends AnnotationLiteral<getStreamingConnectionMaxQueueDepthBeforeThrottling>
implements getStreamingConnectionMaxQueueDepthBeforeThrottling {}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface getStreamingConnectionMaxQueueDepthBeforeDisconnect {}

private static final class getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl
extends AnnotationLiteral<getStreamingConnectionMaxQueueDepthBeforeDisconnect>
implements getStreamingConnectionMaxQueueDepthBeforeDisconnect {}
}
// CHECKSTYLE:ON:ClassDataAbstractionCoupling
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ public T nextValue() throws IOException {

@Override
public void close() throws IOException {
System.out.println("CLOSE in json stream which calls mapping iterator close");
if (delegate.get() != null) {
System.out.println("delegate not null" + delegate.get().getClass());
delegate.get().close();
System.out.println("delegate now closed");
}
}
}
Loading

0 comments on commit d3ef908

Please sign in to comment.