This repository has been archived by the owner on Jun 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 292
Add configurable timeout for publishing #1583
Open
samizzy
wants to merge
3
commits into
master
Choose a base branch
from
add_configurable_timeout
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
91 changes: 91 additions & 0 deletions
91
api-publishing/src/main/java/org/zalando/nakadi/PublishRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package org.zalando.nakadi; | ||
|
||
import org.zalando.nakadi.domain.HeaderTag; | ||
import org.zalando.nakadi.exceptions.runtime.InvalidPublishingParamException; | ||
import org.zalando.nakadi.security.Client; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
public class PublishRequest<T> { | ||
private final String eventTypeName; | ||
private final T eventsRaw; | ||
private final Client client; | ||
private final Map<HeaderTag, String> consumerTags; | ||
private final Optional<Integer> desiredPublishingTimeout; | ||
private final boolean isDeleteRequest; | ||
|
||
public PublishRequest(final String eventTypeName, | ||
final T eventsRaw, | ||
final Client client, | ||
final Map<HeaderTag, String> consumerTags, | ||
final int desiredPublishingTimeout, | ||
final boolean isDeleteRequest) { | ||
this.eventTypeName = eventTypeName; | ||
this.eventsRaw = eventsRaw; | ||
this.client = client; | ||
this.consumerTags = consumerTags; | ||
//TODO: better way to get max timeout instead of hardcoding | ||
if (desiredPublishingTimeout < 0 || desiredPublishingTimeout > 30_000) { | ||
throw new InvalidPublishingParamException("X-TIMEOUT cannot be less than 0 or greater than 30000 ms"); | ||
} | ||
//0 means either nothing was supplied or 0 was supplied, in both cases it means we will leave | ||
//the timeout to be current default | ||
this.desiredPublishingTimeout = Optional.of(desiredPublishingTimeout).filter(v -> v != 0); | ||
Comment on lines
+29
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This constructor contains api level validation of this field. |
||
this.isDeleteRequest = isDeleteRequest; | ||
} | ||
|
||
public String getEventTypeName() { | ||
return eventTypeName; | ||
} | ||
|
||
public T getEventsRaw() { | ||
return eventsRaw; | ||
} | ||
|
||
public Client getClient() { | ||
return client; | ||
} | ||
|
||
public Map<HeaderTag, String> getConsumerTags() { | ||
return consumerTags; | ||
} | ||
|
||
public Optional<Integer> getDesiredPublishingTimeout() { | ||
return desiredPublishingTimeout; | ||
} | ||
|
||
public boolean isDeleteRequest() { | ||
return isDeleteRequest; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PublishRequest{" + | ||
"eventTypeName='" + eventTypeName + '\'' + | ||
", eventsAsString='" + eventsRaw + '\'' + | ||
", client=" + client + | ||
", consumerTags=" + consumerTags + | ||
", desiredPublishingTimeout=" + desiredPublishingTimeout + | ||
", isDeleteRequest=" + isDeleteRequest + | ||
'}'; | ||
} | ||
|
||
public static <T> PublishRequest<T> asPublish(final String eventTypeName, | ||
final T eventsRaw, | ||
final Client client, | ||
final Map<HeaderTag, String> consumerTags, | ||
final int desiredPublishingTimeout) { | ||
return new PublishRequest<>(eventTypeName, eventsRaw, client, | ||
consumerTags, desiredPublishingTimeout, false); | ||
} | ||
|
||
public static <T> PublishRequest<T> asDelete(final String eventTypeName, | ||
final T eventsRaw, | ||
final Client client) { | ||
return new PublishRequest<>(eventTypeName, eventsRaw, client, | ||
Collections.emptyMap(), 0, true); | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I wonder if this should be called more specifically, smt. like
X-Client-Wait-Write-Timeout
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets be consistent, X-Nakadi-Cursors, X-Nakadi-Stream-Id hence X-Nakadi-Publishing-Timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaultValue = "0"
, maybe it is possible to have default value from config 30 seconds