Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated Clients: implement commitSync() and committed() #129

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.common;

import java.util.Collections;
import java.util.Set;


// This contains the result of the location lookup for a set of topics/partitions or a map of values keyed by
// topic/partitition.
public abstract class LocationLookupResult {
public enum ValueType {
PARTITIONS, TOPICS, PARTITION_KEYED_MAP
}

private Set<String> _nonexistentTopics;

public LocationLookupResult() {
_nonexistentTopics = Collections.emptySet();
}

public LocationLookupResult(/*Map<ClusterDescriptor, T> valuesByCluster, */Set<String> nonexistentTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you just delete the commented out part ?

_nonexistentTopics = nonexistentTopics;
}

public abstract ValueType getValueType();

public Set<String> getNonexistentTopics() {
return _nonexistentTopics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/

package com.linkedin.kafka.clients.common;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.TopicPartition;


// This contains the result of the location lookup for a map keyed by partition across multiple clusters in a cluster
// group.
public class PartitionKeyedMapLookupResult<T> extends LocationLookupResult {
private Map<ClusterDescriptor, Map<TopicPartition, T>> _partitionKeyedMapsByCluster;

public PartitionKeyedMapLookupResult() {
this(Collections.emptyMap(), Collections.emptySet());
}

public PartitionKeyedMapLookupResult(Map<ClusterDescriptor, Map<TopicPartition, T>> partitionKeyedMapsByCluster,
Set<String> nonexistentTopics) {
super(nonexistentTopics);
_partitionKeyedMapsByCluster = partitionKeyedMapsByCluster;
}

@Override
public LocationLookupResult.ValueType getValueType() {
return LocationLookupResult.ValueType.PARTITION_KEYED_MAP;
}

public Map<ClusterDescriptor, Map<TopicPartition, T>> getPartitionKeyedMapsByCluster() {
return _partitionKeyedMapsByCluster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,25 @@


// This contains the result of the location lookup for a set of partitions across multiple clusters in a cluster group.
public class PartitionLookupResult {
public class PartitionLookupResult extends LocationLookupResult {
private Map<ClusterDescriptor, Set<TopicPartition>> _partitionsByCluster;
private Set<String> _nonexistentTopics;

public PartitionLookupResult() {
_partitionsByCluster = Collections.emptyMap();
_nonexistentTopics = Collections.emptySet();
this(Collections.emptyMap(), Collections.emptySet());
}

public PartitionLookupResult(Map<ClusterDescriptor, Set<TopicPartition>> partitionsByCluster,
Set<String> nonexistentTopics) {
super(nonexistentTopics);
_partitionsByCluster = partitionsByCluster;
_nonexistentTopics = nonexistentTopics;
}

public Map<ClusterDescriptor, Set<TopicPartition>> getPartitionsByCluster() {
return _partitionsByCluster;
@Override
public LocationLookupResult.ValueType getValueType() {
return LocationLookupResult.ValueType.PARTITIONS;
}

public Set<String> getNonexistentTopics() {
return _nonexistentTopics;
public Map<ClusterDescriptor, Set<TopicPartition>> getPartitionsByCluster() {
return _partitionsByCluster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,24 @@


// This contains the result of the location lookup for a set of topics across multiple clusters in a cluster group.
public class TopicLookupResult {
public class TopicLookupResult extends LocationLookupResult {
private Map<ClusterDescriptor, Set<String>> _topicsByCluster;
private Set<String> _nonexistentTopics;

public TopicLookupResult() {
_topicsByCluster = Collections.emptyMap();
_nonexistentTopics = Collections.emptySet();
this(Collections.emptyMap(), Collections.emptySet());
}

public TopicLookupResult(Map<ClusterDescriptor, Set<String>> topicsByCluster, Set<String> nonexistentTopics) {
super(nonexistentTopics);
_topicsByCluster = topicsByCluster;
_nonexistentTopics = nonexistentTopics;
}

public Map<ClusterDescriptor, Set<String>> getTopicsByCluster() {
return _topicsByCluster;
@Override
public LocationLookupResult.ValueType getValueType() {
return LocationLookupResult.ValueType.TOPICS;
}

public Set<String> getNonexistentTopics() {
return _nonexistentTopics;
public Map<ClusterDescriptor, Set<String>> getTopicsByCluster() {
return _topicsByCluster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,13 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
60 * 1000,
atLeast(0),
Importance.MEDIUM,
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_DOC);
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_DOC)
.define(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC);
}

public LiKafkaConsumerConfig(Map<?, ?> props) {
Expand Down
Loading