Skip to content

Commit

Permalink
Fix pr note
Browse files Browse the repository at this point in the history
  • Loading branch information
Ledostuff committed Jan 9, 2020
1 parent b09a39d commit 9c1f5b7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public class KafkaRestConfig extends RestConfig {
* <code>client.init.timeout.ms</code>
*/
public static final String KAFKACLIENT_INIT_TIMEOUT_CONFIG = "client.init.timeout.ms";
/**
* <code>client.request.timeout.ms</code>
*/
public static final String KAFKACLIENT_REQUEST_TIMEOUT_CONFIG = "client.request.timeout.ms";

public static final String ZOOKEEPER_SET_ACL_CONFIG = "zookeeper.set.acl";
public static final String KAFKACLIENT_SECURITY_PROTOCOL_CONFIG =
Expand Down Expand Up @@ -247,6 +251,9 @@ public class KafkaRestConfig extends RestConfig {
protected static final String KAFKACLIENT_INIT_TIMEOUT_DOC =
"The timeout for initialization of the Kafka store, including creation of the Kafka topic "
+ "that stores schema data.";
protected static final String KAFKACLIENT_REQUEST_TIMEOUT_DOC =
"The timeout for sending any admin-client request to Kafka cluster including waiting for"
+ " the response on client side.";
protected static final String KAFKACLIENT_TIMEOUT_DOC =
"The timeout for an operation on the Kafka store";
protected static final String
Expand Down Expand Up @@ -450,6 +457,14 @@ protected static ConfigDef baseKafkaRestConfigDef() {
Importance.MEDIUM,
KAFKACLIENT_INIT_TIMEOUT_DOC
)
.define(
KAFKACLIENT_REQUEST_TIMEOUT_CONFIG,
Type.INT,
60000,
Range.atLeast(0),
Importance.MEDIUM,
KAFKACLIENT_REQUEST_TIMEOUT_DOC
)
.define(
KAFKACLIENT_TIMEOUT_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
Expand All @@ -43,10 +45,12 @@ public class AdminClientWrapper {

private final AdminClient adminClient;
private final int initTimeOut;
private final int requestTimeOut;

public AdminClientWrapper(KafkaRestConfig kafkaRestConfig, AdminClient adminClient) {
this.adminClient = adminClient;
this.initTimeOut = kafkaRestConfig.getInt(KafkaRestConfig.KAFKACLIENT_INIT_TIMEOUT_CONFIG);
this.requestTimeOut = kafkaRestConfig.getInt(KafkaRestConfig.KAFKACLIENT_REQUEST_TIMEOUT_CONFIG);
}

static Properties adminProperties(KafkaRestConfig kafkaRestConfig) {
Expand Down Expand Up @@ -107,13 +111,15 @@ public boolean partitionExists(String topicName, int partition) throws Exception
}

public Collection<ConsumerGroupListing> listConsumerGroups() throws Exception {
return adminClient.listConsumerGroups().all().get(initTimeOut, TimeUnit.MILLISECONDS);
return adminClient.listConsumerGroups(new ListConsumerGroupsOptions()
.timeoutMs(requestTimeOut)).all().get(requestTimeOut, TimeUnit.MILLISECONDS);
}

public Map<String, ConsumerGroupDescription> describeConsumerGroups(
Collection<String> groupIds) throws Exception {
return adminClient.describeConsumerGroups(groupIds)
.all().get(initTimeOut, TimeUnit.MILLISECONDS);
return adminClient.describeConsumerGroups(groupIds,
new DescribeConsumerGroupsOptions().timeoutMs(requestTimeOut))
.all().get(requestTimeOut, TimeUnit.MILLISECONDS);
}

private Topic buildTopic(String topicName, TopicDescription topicDescription) throws Exception {
Expand Down

0 comments on commit 9c1f5b7

Please sign in to comment.