Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Add configurable timeout for publishing #1583

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

samizzy
Copy link
Member

@samizzy samizzy commented Jan 23, 2024

One-line summary

Zalando ticket : 1622

Description

Add ability to specify timeout when using publishing apis and is taken from custom header X-TIMEOUT.

  • This ability will allow users to fail fast in case the latency during publishing becomes too high for their use case.
  • Originally if the publishing request timed out from user side (due to setting smaller client timeout) then users would assume all events failed publishing and retry the whole batch. With the addition of this custom publish timeout, they can retry only the truly failed events.

@@ -93,11 +94,13 @@ public EventPublishingController(final EventPublisher publisher,
public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName,
@RequestBody final String eventsAsString,
final HttpServletRequest request,
final Client client)
final Client client,
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0")
final @RequestHeader(value = "X-Timeout", required = false, defaultValue = "0")

Also, I wonder if this should be called more specifically, smt. like X-Client-Wait-Write-Timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets be consistent, X-Nakadi-Cursors, X-Nakadi-Stream-Id hence X-Nakadi-Publishing-Timeout

Copy link
Member

@adyach adyach Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultValue = "0", maybe it is possible to have default value from config 30 seconds

Comment on lines +29 to +35
//TODO: better way to get max timeout instead of hardcoding
if (desiredPublishingTimeout < 0 || desiredPublishingTimeout > 30_000) {
throw new InvalidPublishingParamException("X-TIMEOUT cannot be less than 0 or greater than 30000 ms");
}
//0 means either nothing was supplied or 0 was supplied, in both cases it means we will leave
//the timeout to be current default
this.desiredPublishingTimeout = Optional.of(desiredPublishingTimeout).filter(v -> v != 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor contains api level validation of this field.
I suggest to move "outside" this logic to the callers of the constructor methods.
(Like it's now for validation of other attributes passed here).
This should make this class POJO.

Comment on lines +350 to +351
return customTimeout.filter(t -> t <= timeOut).
map(Integer::longValue).orElse(timeOut);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return customTimeout.filter(t -> t <= timeOut).
map(Integer::longValue).orElse(timeOut);
return customTimeout.map(t -> Math.min(t, timeOut)).orElse(timeOut);

@adyach
Copy link
Member

adyach commented Jan 24, 2024

there is a problem with this approach: client sets 300ms timeout, connection is close by Nakadi due to timeout (300ms), but Kafka Producer continues to retry events for this request for the 29 700ms. this is a problem by itself for Nakadi, what if there are many more such requests but also it can be that the event will appear in the middle of the other published batch breaking the order.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants