Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated Clients: implement commitSync() and committed() #129

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jonlee2
Copy link
Contributor

@jonlee2 jonlee2 commented Jul 17, 2019

No description provided.

}

try {
Thread.sleep(Math.min(deadlineTimeMs - System.currentTimeMillis(), _retryBackoffMs));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can deadlineTimeMs - System.currentTimeMillis() be negative and cause infinite sleep?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep would throw for a negative

}

try {
if (!countDownLatch.await(deadlineTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, not sure if we need to consider negative values for await here.

_nonexistentTopics = Collections.emptySet();
}

public LocationLookupResult(/*Map<ClusterDescriptor, T> valuesByCluster, */Set<String> nonexistentTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you just delete the commented out part ?

}

// ATTN: UnknownTopicOrPartitionException may be received - this is a retriable exception..
// if not resolved by the time, timeout exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"by the time" - "within the timeout" ?

long now = System.currentTimeMillis();
long deadlineTimeMs = now + timeout.toMillis();
while (now < deadlineTimeMs) {
PartitionKeyedMapLookupResult offsetsByClusterResult = getPartitionKeyedMapsByCluster(offsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the while() loop executes multiple times (because some topics done exist?) it will commit the same offsets for existing topicPartitions N times. i think on successful commit the TPs committed should be taken out and not committed over again - its just wasteful

}

@Override
synchronized public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
throw new UnsupportedOperationException("Not implemented yet");
if (partition == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to null check timeout as well

@@ -552,17 +589,48 @@ synchronized public long position(TopicPartition partition, Duration timeout) {

@Override
synchronized public OffsetAndMetadata committed(TopicPartition partition) {
throw new UnsupportedOperationException("Not implemented yet");

return committed(partition, _defaultApiTimeout);
}

@Override
synchronized public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does vanilla block here for non-existent TPs? if so, we should probably document this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants