Skip to content

Commit

Permalink
Add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent 5637380 commit 16de422
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@

package com.bakdata.kafka;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.TopicClient;
import com.bakdata.kafka.util.TopicSettings;
import com.bakdata.kafka.util.TopicSettings.TopicSettingsBuilder;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/**
* Client that supports communication with Kafka clusters in test setups, including topic management, reading from
* and sending to topics.
*/
@RequiredArgsConstructor
public class KafkaTestClient {

Expand All @@ -43,37 +50,84 @@ public class KafkaTestClient {
.replicationFactor((short) 1);
private final @NonNull KafkaEndpointConfig endpointConfig;

/**
* Create q new {@code TopicSettingsBuilder} which uses a single partition and no replicas
* @return default topic settings
*/
public static TopicSettingsBuilder defaultTopicSettings() {
return DEFAULT_TOPIC_SETTINGS;
}

/**
* Prepare sending new data to the cluster. {@link StringSerializer} is configured by default.
* @return configured {@code SenderBuilder}
*/
public SenderBuilder send() {
return new SenderBuilder(this.endpointConfig.createKafkaProperties())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}

/**
* Prepare reading data from the cluster. {@link StringDeserializer} is configured by default.
* @return configured {@code ReaderBuilder}
*/
public ReaderBuilder read() {
return new ReaderBuilder(this.endpointConfig.createKafkaProperties())
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

/**
* Create a new {@code ImprovedAdminClient} for the cluster
* @return configured admin client
*/
public ImprovedAdminClient admin() {
return ImprovedAdminClient.create(this.endpointConfig.createKafkaProperties());
}

public void createTopic(final String topicName, final TopicSettings settings) {
/**
* Creates a new Kafka topic with the specified settings.
*
* @param topicName the topic name
* @param settings settings for number of partitions and replicationFactor
* @param config topic configuration
*/
public void createTopic(final String topicName, final TopicSettings settings, final Map<String, String> config) {
try (final ImprovedAdminClient admin = this.admin();
final TopicClient topicClient = admin.getTopicClient()) {
topicClient.createTopic(topicName, settings);
topicClient.createTopic(topicName, settings, config);
}
}

/**
* Creates a new Kafka topic with the specified settings. No configs are used.
*
* @param topicName the topic name
* @param settings settings for number of partitions and replicationFactor
* @see #createTopic(String, TopicSettings, Map)
*/
public void createTopic(final String topicName, final TopicSettings settings) {
this.createTopic(topicName, settings, emptyMap());
}

/**
* Creates a new Kafka topic with default settings.
*
* @param topicName the topic name
* @see #createTopic(String, TopicSettings)
* @see #defaultTopicSettings()
*/
public void createTopic(final String topicName) {
this.createTopic(topicName, defaultTopicSettings().build());
}

/**
* Checks whether a Kafka topic exists.
*
* @param topicName the topic name
* @return whether a Kafka topic with the specified name exists or not
*/
public boolean existsTopic(final String topicName) {
try (final ImprovedAdminClient admin = this.admin();
final TopicClient topicClient = admin.getTopicClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/**
* Read data from a Kafka cluster
*/
@RequiredArgsConstructor
public class ReaderBuilder {

Expand All @@ -65,15 +68,29 @@ private static <K, V> List<ConsumerRecord<K, V>> readAll(final Consumer<K, V> co
return pollAll(consumer, timeout);
}

/**
* Add a consumer configuration
* @param key configuration key
* @param value configuration value
* @return {@code ReaderBuilder} with added configuration
*/
public ReaderBuilder with(final String key, final Object value) {
final Map<String, Object> newProperties = new HashMap<>(this.properties);
newProperties.put(key, value);
return new ReaderBuilder(Map.copyOf(newProperties));
}

public <K, V> List<ConsumerRecord<K, V>> from(final String output, final Duration timeout) {
/**
* Read data from a topic
* @param topic topic to read from
* @param timeout consumer poll timeout
* @return consumed records
* @param <K> type of keys
* @param <V> type of values
*/
public <K, V> List<ConsumerRecord<K, V>> from(final String topic, final Duration timeout) {
try (final Consumer<K, V> consumer = new KafkaConsumer<>(this.properties)) {
return readAll(consumer, output, timeout);
return readAll(consumer, topic, timeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,44 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

/**
* Send data to a Kafka cluster
*/
@RequiredArgsConstructor
public class SenderBuilder {

private final @NonNull Map<String, Object> properties;

/**
* Add a producer configuration
* @param key configuration key
* @param value configuration value
* @return {@code SenderBuilder} with added configuration
*/
public SenderBuilder with(final String key, final Object value) {
final Map<String, Object> newProperties = new HashMap<>(this.properties);
newProperties.put(key, value);
return new SenderBuilder(Map.copyOf(newProperties));
}

/**
* Send data to a topic
* @param topic topic to send to
* @param records records to send
* @param <K> type of keys
* @param <V> type of values
*/
public <K, V> void to(final String topic, final Iterable<SimpleProducerRecord<K, V>> records) {
try (final Producer<K, V> producer = new KafkaProducer<>(this.properties)) {
records.forEach(kv -> producer.send(kv.toProducerRecord(topic)));
}
}

/**
* Represents a {@link ProducerRecord} without topic assignment
* @param <K> type of keys
* @param <V> type of values
*/
@Value
@RequiredArgsConstructor
public static class SimpleProducerRecord<K, V> {
Expand All @@ -60,14 +81,31 @@ public static class SimpleProducerRecord<K, V> {
Instant timestamp;
Iterable<Header> headers;

/**
* Create a new {@code SimpleProducerRecord} without timestamp and headers
* @param key key
* @param value value
*/
public SimpleProducerRecord(final K key, final V value) {
this(key, value, (Instant) null);
}

/**
* Create a new {@code SimpleProducerRecord} without headers
* @param key key
* @param value value
* @param timestamp timestamp
*/
public SimpleProducerRecord(final K key, final V value, final Instant timestamp) {
this(key, value, timestamp, null);
}

/**
* Create a new {@code SimpleProducerRecord} without timestamp
* @param key key
* @param value value
* @param headers headers
*/
public SimpleProducerRecord(final K key, final V value, final Iterable<Header> headers) {
this(key, value, null, headers);
}
Expand Down

0 comments on commit 16de422

Please sign in to comment.