-
Notifications
You must be signed in to change notification settings - Fork 292
Add configurable timeout for publishing #1583
base: master
Are you sure you want to change the base?
Conversation
- add timeout logic - add json and avro timeout test cases
@@ -93,11 +94,13 @@ public EventPublishingController(final EventPublisher publisher, | |||
public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName, | |||
@RequestBody final String eventsAsString, | |||
final HttpServletRequest request, | |||
final Client client) | |||
final Client client, | |||
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0") | |
final @RequestHeader(value = "X-Timeout", required = false, defaultValue = "0") |
Also, I wonder if this should be called more specifically, smt. like X-Client-Wait-Write-Timeout
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets be consistent, X-Nakadi-Cursors, X-Nakadi-Stream-Id hence X-Nakadi-Publishing-Timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultValue = "0"
, maybe it is possible to have default value from config 30 seconds
//TODO: better way to get max timeout instead of hardcoding | ||
if (desiredPublishingTimeout < 0 || desiredPublishingTimeout > 30_000) { | ||
throw new InvalidPublishingParamException("X-TIMEOUT cannot be less than 0 or greater than 30000 ms"); | ||
} | ||
//0 means either nothing was supplied or 0 was supplied, in both cases it means we will leave | ||
//the timeout to be current default | ||
this.desiredPublishingTimeout = Optional.of(desiredPublishingTimeout).filter(v -> v != 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor contains api level validation of this field.
I suggest to move "outside" this logic to the callers of the constructor methods.
(Like it's now for validation of other attributes passed here).
This should make this class POJO.
return customTimeout.filter(t -> t <= timeOut). | ||
map(Integer::longValue).orElse(timeOut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return customTimeout.filter(t -> t <= timeOut). | |
map(Integer::longValue).orElse(timeOut); | |
return customTimeout.map(t -> Math.min(t, timeOut)).orElse(timeOut); |
there is a problem with this approach: client sets 300ms timeout, connection is close by Nakadi due to timeout (300ms), but Kafka Producer continues to retry events for this request for the 29 700ms. this is a problem by itself for Nakadi, what if there are many more such requests but also it can be that the event will appear in the middle of the other published batch breaking the order. |
One-line summary
Description
Add ability to specify timeout when using publishing apis and is taken from custom header
X-TIMEOUT
.