Skip to content

Commit

Permalink
Add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent 16de422 commit b8b23e2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,19 @@ public ReaderBuilder with(final String key, final Object value) {
* @param <V> type of values
*/
public <K, V> List<ConsumerRecord<K, V>> from(final String topic, final Duration timeout) {
try (final Consumer<K, V> consumer = new KafkaConsumer<>(this.properties)) {
try (final Consumer<K, V> consumer = this.createConsumer()) {
return readAll(consumer, topic, timeout);
}
}

/**
* Create a new {@code Consumer} for a Kafka cluster
* @return {@code Consumer}
* @param <K> type of keys
* @param <V> type of values
*/
public <K, V> Consumer<K, V> createConsumer() {
return new KafkaConsumer<>(this.properties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@ public SenderBuilder with(final String key, final Object value) {
* @param <V> type of values
*/
public <K, V> void to(final String topic, final Iterable<SimpleProducerRecord<K, V>> records) {
try (final Producer<K, V> producer = new KafkaProducer<>(this.properties)) {
try (final Producer<K, V> producer = this.createProducer()) {
records.forEach(kv -> producer.send(kv.toProducerRecord(topic)));
}
}

/**
* Create a new {@code Producer} for a Kafka cluster
* @return {@code Producer}
* @param <K> type of keys
* @param <V> type of values
*/
public <K, V> Producer<K, V> createProducer() {
return new KafkaProducer<>(this.properties);
}

/**
* Represents a {@link ProducerRecord} without topic assignment
* @param <K> type of keys
Expand Down

0 comments on commit b8b23e2

Please sign in to comment.