名称:kafka
参数说明:
-
url
- 描述:kafka 使用zk 地址
- 必选:否
- 默认值:无
-
brokerUrls
- 描述:kafka broker地址
- 必选:否
- 默认值:无
-
kerberosConfig
- 描述:kerberos 配置信息
- 必选:否
- 默认值:无
构造sourceDTO
KafkaSourceDTO source = KafkaSourceDTO.builder()
.url("172.16.101.236:2181,172.16.101.17:2181,172.16.100.109:2181/kafka")
.build();
入参类型:
- KafkaSourceDTO:数据源连接信息
出参类型:
- Boolean:连接信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
Boolean isConnected = client.testCon(sourceDTO);
构造sourceDTO
KafkaSourceDTO source = KafkaSourceDTO.builder()
.url("172.16.101.236:2181,172.16.101.17:2181,172.16.100.109:2181/kafka")
.build();
入参类型:
- KafkaSourceDTO:数据源连接信息
出参类型:
- Boolean:连接信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
Boolean isConnected = client.testCon(sourceDTO);
入参类型:
- KafkaSourceDTO:数据源连接信息
出参类型:
- String:broker地址
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
String brokersAddress = client.getAllBrokersAddress(source);
入参类型:
- KafkaSourceDTO:数据源连接信息
出参类型:
- List:topic 集合
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
List<String> topicList = client.getTopicList(source);
入参类型:
- KafkaSourceDTO:数据源连接信息
- KafkaTopicDTO:topic 信息
出参类型:
- Boolean:创建结果
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
KafkaTopicDTO topicDTO = KafkaTopicDTO.builder()
.partitions(3)
.replicationFactor((short) 1)
.topicName("loader_test")
.build();
Boolean clientTopic = client.createTopic(source, topicDTO);
入参类型:
- KafkaSourceDTO:数据源连接信息
- SqlQueryDTO:查询信息
出参类型:
- List:偏移量信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal());
List<KafkaOffsetDTO> offset = client.getOffset(source, "topic_test");
入参类型:
- KafkaSourceDTO:数据源连接信息
- SqlQueryDTO:查询信息
出参类型:
- List<List>:预览数据信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); SqlQueryDTO sqlQueryDTO = SqlQueryDTO.builder().tableName("topic_test").build(); List<List<Object>> results = client.getPreview(source, sqlQueryDTO, "latest");
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:topic partition信息
出参类型:
- List<List>:预览数据信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<KafkaPartitionDTO> partitionDTOS = client.getTopicPartitions(source, "topic_test");
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:topic 信息
出参类型:
- List:分区信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List partitionDTOS = client.getAllPartitions(source, "topic_test");
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:topic 信息
- Integer:最大条数
- String:从哪里开始消费
- Long:消费启始位置
- Integer:最大等待时间,单位秒
出参类型:
- List:kafka数据
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<String> data = client.consumeData(source, topic, collectNum, offsetReset, timestampOffset, maxTimeWait);
入参类型:
- KafkaSourceDTO:数据源连接信息
出参类型:
- List:消费者组列表
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<String> data = client.listConsumerGroup(source);
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:topic
出参类型:
- List:消费者组列表
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<String> data = client.listConsumerGroupByTopic(source);
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:groupId 消费者组
出参类型:
- List:消费者组详细信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<KafkaConsumerDTO> data = client.getGroupInfoByGroupId(source, groupId);
入参类型:
- KafkaSourceDTO:数据源连接信息
- String:topic kafka主题
出参类型:
- List:消费者组详细信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<KafkaConsumerDTO> data = client.getGroupInfoByTopic(source, topic);
入参类型:
- KafkaSourceDTO:数据源连接信息
- String: groupId 消费者组
- String:topic kafka主题
出参类型:
- List:消费者组详细信息
使用:
IKafka client = ClientCache.getKafka(DataSourceType.KAFKA.getVal()); List<KafkaConsumerDTO> data = client.getGroupInfoByGroupIdAndTopic(source, groupId, topic);