diff --git a/pom.xml b/pom.xml
index 62bf59c..c5bf209 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
0.0.2-SNAPSHOT
3.11.0
- 17
+ 21
UTF-8
UTF-8
quarkus-bom
@@ -44,7 +44,7 @@
io.quarkus
- quarkus-smallrye-reactive-messaging-kafka
+ quarkus-kafka-client
io.strimzi
diff --git a/src/main/java/com/github/eyefloaters/AdminFactory.java b/src/main/java/com/github/eyefloaters/AdminFactory.java
deleted file mode 100644
index fdb72e4..0000000
--- a/src/main/java/com/github/eyefloaters/AdminFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.github.eyefloaters;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.enterprise.inject.Produces;
-import jakarta.inject.Inject;
-
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-
-import io.smallrye.common.annotation.Identifier;
-
-@ApplicationScoped
-public class AdminFactory {
-
- @Inject
- @Identifier("default-kafka-broker")
- Map config;
-
- @Produces
- Admin getAdmin() {
- Map copy = new HashMap<>();
- for (Map.Entry entry : config.entrySet()) {
- if (AdminClientConfig.configNames().contains(entry.getKey())) {
- copy.put(entry.getKey(), entry.getValue());
- }
- }
- return Admin.create(copy);
- }
-
-}
diff --git a/src/main/java/com/github/eyefloaters/ClientConfigFactory.java b/src/main/java/com/github/eyefloaters/ClientConfigFactory.java
new file mode 100644
index 0000000..42ae5d6
--- /dev/null
+++ b/src/main/java/com/github/eyefloaters/ClientConfigFactory.java
@@ -0,0 +1,200 @@
+package com.github.eyefloaters;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import jakarta.inject.Inject;
+import jakarta.inject.Named;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
+
+import io.smallrye.common.annotation.Identifier;
+
+@ApplicationScoped
+public class ClientConfigFactory {
+
+ @Inject
+ Logger log;
+
+ @Inject
+ Config config;
+
+ @Inject
+ @ConfigProperty(name = "datagen.security.trust-certificates", defaultValue = "false")
+ boolean trustCertificates;
+
+ @Inject
+ @ConfigProperty(name = "datagen.kafka")
+ Map clusterNames;
+
+ @Inject
+ @Identifier("default-kafka-broker")
+ Map defaultClusterConfigs;
+
+ @Produces
+ @Named("adminConfigs")
+ Map> getAdminConfigs() {
+ return clusterNames.entrySet()
+ .stream()
+ .map(cluster -> {
+ var configs = buildConfig(AdminClientConfig.configNames(), cluster.getKey());
+ logConfig("Admin[" + cluster.getKey() + ']', configs);
+ return Map.entry(unquote(cluster.getValue()), configs);
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Produces
+ @Named("producerConfigs")
+ Map> getProducerConfigs() {
+ return clusterNames.entrySet()
+ .stream()
+ .map(cluster -> {
+ var configs = buildConfig(ProducerConfig.configNames(), cluster.getKey());
+ logConfig("Producer[" + cluster.getKey() + ']', configs);
+ return Map.entry(unquote(cluster.getValue()), configs);
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Produces
+ @Named("consumerConfigs")
+ Map> getConsumerConfigs() {
+ return clusterNames.entrySet()
+ .stream()
+ .map(cluster -> {
+ Set configNames = ConsumerConfig.configNames().stream()
+ // Do not allow a group Id to be set for this application
+ .filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals))
+ .collect(Collectors.toSet());
+ var configs = buildConfig(configNames, cluster.getKey());
+ logConfig("Consumer[" + cluster.getKey() + ']', configs);
+ return Map.entry(unquote(cluster.getValue()), configs);
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ Map buildConfig(Set configNames, String clusterKey) {
+ Map cfg = configNames
+ .stream()
+ .map(configName -> getClusterConfig(clusterKey, configName)
+ .or(() -> getDefaultConfig(clusterKey, configName))
+ .map(configValue -> Map.entry(configName, configValue)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (truststoreRequired(cfg)) {
+ trustClusterCertificate(cfg);
+ }
+
+ return cfg;
+ }
+
+ Optional getClusterConfig(String clusterKey, String configName) {
+ return config.getOptionalValue("datagen.kafka." + clusterKey + '.' + configName, String.class)
+ .map(cfg -> {
+ log.tracef("OVERRIDE config %s for cluster %s", configName, clusterKey);
+ return unquote(cfg);
+ });
+ }
+
+ Optional getDefaultConfig(String clusterKey, String configName) {
+ if (defaultClusterConfigs.containsKey(configName)) {
+ log.tracef("DEFAULT config %s for cluster %s", configName, clusterKey);
+ String cfg = defaultClusterConfigs.get(configName).toString();
+ return Optional.of(unquote(cfg));
+ }
+
+ return Optional.empty();
+ }
+
+ String unquote(String cfg) {
+ return BOUNDARY_QUOTES.matcher(cfg).replaceAll("");
+ }
+
+ boolean truststoreRequired(Map cfg) {
+ var securityProtocol = cfg.getOrDefault(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "");
+ var trustStoreMissing = !cfg.containsKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
+
+ return trustCertificates && trustStoreMissing && securityProtocol.toString().contains("SSL");
+ }
+
+ void trustClusterCertificate(Map cfg) {
+ TrustManager trustAllCerts = new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null; // NOSONAR
+ }
+
+ public void checkClientTrusted(X509Certificate[] certs, String authType) { // NOSONAR
+ // all trusted
+ }
+
+ public void checkServerTrusted(X509Certificate[] certs, String authType) { // NOSONAR
+ // all trusted
+ }
+ };
+
+ try {
+ SSLContext sc = SSLContext.getInstance("TLS");
+ sc.init(null, new TrustManager[] { trustAllCerts }, new SecureRandom());
+ SSLSocketFactory factory = sc.getSocketFactory();
+ String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ String[] hostport = bootstrap.split(",")[0].split(":");
+ ByteArrayOutputStream certificateOut = new ByteArrayOutputStream();
+
+ try (SSLSocket socket = (SSLSocket) factory.createSocket(hostport[0], Integer.parseInt(hostport[1]))) {
+ Certificate[] chain = socket.getSession().getPeerCertificates();
+ for (Certificate certificate : chain) {
+ certificateOut.write("-----BEGIN CERTIFICATE-----\n".getBytes(StandardCharsets.UTF_8));
+ certificateOut.write(Base64.getMimeEncoder(80, new byte[] {'\n'}).encode(certificate.getEncoded()));
+ certificateOut.write("\n-----END CERTIFICATE-----\n".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG,
+ new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim());
+ cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
+ log.warnf("Certificate hosted at %s:%s is automatically trusted", hostport[0], hostport[1]);
+ } catch (Exception e) {
+ log.infof("Exception setting up trusted certificate: %s", e.getMessage());
+ }
+ }
+
+ void logConfig(String clientType, Map config) {
+ if (log.isDebugEnabled()) {
+ String msg = config.entrySet()
+ .stream()
+ .map(entry -> "\t%s = %s".formatted(entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining("\n", "%s configuration:\n", ""));
+ log.debugf(msg, clientType);
+ }
+ }
+
+ private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)");
+}
diff --git a/src/main/java/com/github/eyefloaters/DataGenerator.java b/src/main/java/com/github/eyefloaters/DataGenerator.java
index ed0b46a..f5ea60e 100644
--- a/src/main/java/com/github/eyefloaters/DataGenerator.java
+++ b/src/main/java/com/github/eyefloaters/DataGenerator.java
@@ -1,133 +1,346 @@
package com.github.eyefloaters;
+import java.time.Duration;
import java.time.Instant;
import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.IntStream;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Shutdown;
import jakarta.enterprise.event.Startup;
import jakarta.inject.Inject;
+import jakarta.inject.Named;
import jakarta.json.Json;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
-import org.eclipse.microprofile.context.ManagedExecutor;
-import org.eclipse.microprofile.reactive.messaging.Channel;
-import org.eclipse.microprofile.reactive.messaging.Emitter;
-import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
+import org.jboss.logging.MDC;
@ApplicationScoped
public class DataGenerator {
+ static final String TOPIC_NAME_TEMPLATE = "console_datagen_%03d-%s";
+ static final String CLUSTER_NAME_KEY = "cluster.name";
+
@Inject
Logger log;
@Inject
- ManagedExecutor exec;
+ @ConfigProperty(name = "datagen.enabled", defaultValue = "true")
+ boolean datagenEnabled;
+
+ @Inject
+ @ConfigProperty(name = "datagen.consumer-groups", defaultValue = "1")
+ int consumerCount;
+
+ @Inject
+ @ConfigProperty(name = "datagen.topics-per-consumer", defaultValue = "1")
+ int topicsPerConsumer;
+
+ @Inject
+ @ConfigProperty(name = "datagen.partitions-per-topic", defaultValue = "1")
+ int partitionsPerTopic;
+
+ @Inject
+ @ConfigProperty(name = "datagen.topic-name-template", defaultValue = TOPIC_NAME_TEMPLATE)
+ String topicNameTemplate;
@Inject
- @Channel("timestamps-out")
- Emitter timestampEmitter;
+ @Named("adminConfigs")
+ Map> adminConfigs;
@Inject
- Admin adminClient;
+ @Named("producerConfigs")
+ Map> producerConfigs;
+
+ @Inject
+ @Named("consumerConfigs")
+ Map> consumerConfigs;
+
+ static ExecutorService virtualExec = Executors.newVirtualThreadPerTaskExecutor();
AtomicBoolean running = new AtomicBoolean(true);
+ Random generator = new Random();
- long recordsProduced = 0;
- long recordsConsumed = 0;
+ Map adminClients = new HashMap<>();
+ Map> recordsConsumed = new ConcurrentHashMap<>();
+ Map> recordsProduced = new ConcurrentHashMap<>();
void start(@Observes Startup startupEvent /* NOSONAR */) {
- Random generator = new Random();
- byte[] buffer = new byte[1000];
+ if (!datagenEnabled) {
+ log.info("datagen.enabled=false ; producers and consumers will not be started");
+ return;
+ }
- exec.submit(() -> {
- while (running.get()) {
- long start = System.currentTimeMillis();
- long rate = 100 * ((start / 10000) % 5) + 10;
+ adminConfigs.forEach((clusterKey, configProperties) -> {
+ virtualExec.submit(() -> {
+ MDC.put(CLUSTER_NAME_KEY, clusterKey);
- for (int i = 0; i < rate; i++) {
- generator.nextBytes(buffer);
- String value = Json.createObjectBuilder()
- .add("timestamp", Instant.now().toString())
- .add("payload", Base64.getEncoder().encodeToString(buffer))
- .build()
- .toString();
+ Admin adminClient = Admin.create(configProperties);
+ adminClients.put(clusterKey, adminClient);
- timestampEmitter.send(value);
+ IntStream.range(0, consumerCount).forEach(groupNumber -> {
+ var topics = IntStream.range(0, topicsPerConsumer)
+ .mapToObj(t -> topicNameTemplate.formatted(groupNumber, (char) ('a' + t)))
+ .toList();
- if (++recordsProduced % 10_000 == 0) {
- log.infof("Produced %d records (since startup)", recordsProduced);
- }
- }
+ initialize(adminClient, topics, partitionsPerTopic);
- long end = System.currentTimeMillis();
- long sleepTime = Math.max(0, 1000 - (end - start));
+ virtualExec.submit(() -> {
+ var configs = new HashMap<>(producerConfigs.get(clusterKey));
+ String clientId = "console-datagen-producer-" + groupNumber;
+ configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- log.debugf("Produced %d messages in %dms, sleeping %dms", rate, end - start, sleepTime);
+ MDC.put(CLUSTER_NAME_KEY, clusterKey);
+ MDC.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- log.warn("Producer thread was interrupted, breaking from loop");
- Thread.currentThread().interrupt();
- break;
- }
- }
+ try (Producer producer = new KafkaProducer<>(producerConfigs.get(clusterKey))) {
+ while (running.get()) {
+ produce(clusterKey, producer, topics);
+ }
+ } catch (Exception e) {
+ log.warnf(e, "Error producing: %s", e.getMessage());
+ }
+
+ log.infof("Run loop complete for producer %d on cluster %s", groupNumber, clusterKey);
+ });
+
+ virtualExec.submit(() -> {
+ var configs = new HashMap<>(consumerConfigs.get(clusterKey));
+ String groupId = "console-datagen-group-" + groupNumber;
+ String clientId = "console-datagen-consumer-" + groupNumber;
+
+ configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+ MDC.put(CLUSTER_NAME_KEY, clusterKey);
+ MDC.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+ try (Consumer consumer = new KafkaConsumer<>(configs)) {
+ consumer.subscribe(topics);
+
+ while (running.get()) {
+ consumer.poll(Duration.ofSeconds(2)).forEach(rec -> consume(clusterKey, rec));
+ }
+ } catch (Exception e) {
+ log.warnf(e, "Error consuming: %s", e.getMessage());
+ }
+
+ log.infof("Run loop complete for consumer group %s", groupId);
+ });
+ });
+ });
});
}
- void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) {
+ void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) throws Exception {
running.set(false);
+
+ adminClients.forEach((clusterKey, client) -> {
+ log.infof("Stopping Admin client for cluster %s...", clusterKey);
+ client.close();
+ log.infof("Admin client for cluster %s closed", clusterKey);
+ });
+
+ virtualExec.shutdown();
+ virtualExec.awaitTermination(10, TimeUnit.SECONDS);
}
- @Incoming("timestamps")
- public void consume(ConsumerRecord rec) {
- if (++recordsConsumed % 10_000 == 0) {
- log.infof("Consumed %d records, latest is partition %d, offset %d",
- recordsConsumed, rec.partition(), rec.offset());
-
- var partition = new TopicPartition(rec.topic(), rec.partition());
- var earliest = getOffset(partition, OffsetSpec.earliest());
- var latest = getOffset(partition, OffsetSpec.latest());
-
- CompletableFuture.allOf(earliest, latest)
- .thenCompose(nothing -> {
- long diff = latest.join() - earliest.join();
-
- if (diff >= 1_000_000) {
- log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d",
- diff, rec.topic(), rec.partition(), rec.offset());
- // Truncate the topic to the up to the previous offset
- return adminClient.deleteRecords(Map.of(new TopicPartition(rec.topic(), rec.partition()),
- RecordsToDelete.beforeOffset(rec.offset())))
- .all()
- .toCompletionStage();
- } else {
- log.infof("Offset diff is %d for topic %s, partition %d at offset %d",
- diff, rec.topic(), rec.partition(), rec.offset());
- return CompletableFuture.completedStage(null);
+ void initialize(Admin adminClient, List topics, int partitionsPerTopic) {
+ List dataGenGroups = adminClient.listConsumerGroups()
+ .all()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .join()
+ .stream()
+ .map(ConsumerGroupListing::groupId)
+ .filter(name -> name.startsWith("console-datagen-group-"))
+ .toList();
+
+ adminClient.deleteConsumerGroups(dataGenGroups)
+ .all()
+ .toCompletionStage()
+ .exceptionally(error -> {
+ log.warnf(error, "Error deleting consumer groups: %s", error.getMessage());
+ return null;
+ })
+ .toCompletableFuture()
+ .join();
+
+ adminClient.deleteTopics(topics)
+ .all()
+ .toCompletionStage()
+ .exceptionally(error -> {
+ log.warnf(error, "Error deleting topics: %s", error.getMessage());
+ return null;
+ })
+ .toCompletableFuture()
+ .join();
+
+ var newTopics = topics.stream()
+ .map(t -> new NewTopic(t, partitionsPerTopic, (short) 3)
+ .configs(Map.of(
+ // 10 MiB
+ "segment.bytes", Integer.toString(10 * 1024 * 1024),
+ // 10 minutes
+ "segment.ms", Long.toString(TimeUnit.MINUTES.toMillis(10)))))
+ .toList();
+
+ log.debugf("Creating topics: %s", topics);
+
+ var pending = adminClient.createTopics(newTopics)
+ .values()
+ .values()
+ .stream()
+ .map(KafkaFuture::toCompletionStage)
+ .map(CompletionStage::toCompletableFuture)
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFuture.allOf(pending)
+ .thenRun(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)))
+ .thenRun(() -> log.infof("Topics created: %s", topics))
+ .join();
+ }
+
+ void produce(String clusterKey, Producer producer, List topics) {
+ byte[] buffer = new byte[1000];
+ int t = 0;
+ long start = System.currentTimeMillis();
+ long rate = 100 * ((start / 10000) % 5) + 10;
+
+
+ for (int i = 0; i < rate; i++) {
+ if (!running.get()) {
+ return;
+ }
+
+ generator.nextBytes(buffer);
+
+ byte[] value = Json.createObjectBuilder()
+ .add("timestamp", Instant.now().toString())
+ .add("payload", Base64.getEncoder().encodeToString(buffer))
+ .build()
+ .toString()
+ .getBytes();
+
+ String topic = topics.get(t++ % topics.size());
+
+ complete(producer.send(new ProducerRecord<>(topic, value)))
+ .thenAccept(meta -> {
+ TopicPartition topicPartition = new TopicPartition(meta.topic(), meta.partition());
+ var currentCount = incrementAndGet(recordsProduced, clusterKey, topicPartition);
+
+ if (currentCount % 5_000 == 0) {
+ log.infof("Produced %d records to %s/%s (since startup)", currentCount, clusterKey, topicPartition);
}
})
- .join();
+ .exceptionally(error -> {
+ log.warnf(error, "Error producing record: %s", error.getMessage());
+ return null;
+ });
+ }
+
+ long end = System.currentTimeMillis();
+ long sleepTime = Math.max(0, 1000 - (end - start));
+
+ log.debugf("Produced %d messages in %dms, sleeping %dms", rate, end - start, sleepTime);
+ if (running.get()) {
+ LockSupport.parkUntil(System.currentTimeMillis() + sleepTime);
+ }
+ }
+
+ public void consume(String clusterKey, ConsumerRecord rec) {
+ TopicPartition topicPartition = new TopicPartition(rec.topic(), rec.partition());
+ var currentCount = incrementAndGet(recordsConsumed, clusterKey, topicPartition);
+
+ if (currentCount % 5_000 == 0) {
+ log.infof("Consumed %d records from partition %s, latest is offset %d",
+ currentCount, topicPartition, rec.offset());
+ maybeDeleteRecords(adminClients.get(clusterKey), topicPartition, rec.offset());
}
}
- CompletableFuture getOffset(TopicPartition partition, OffsetSpec spec) {
+ long incrementAndGet(Map> counters, String clusterKey, TopicPartition topicPartition) {
+ return counters
+ .computeIfAbsent(clusterKey, k -> new ConcurrentHashMap<>())
+ .compute(topicPartition, (k, v) -> v == null ? 1 : v + 1);
+ }
+
+ void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long offset) {
+ var earliest = getOffset(adminClient, topicPartition, OffsetSpec.earliest());
+ var latest = getOffset(adminClient, topicPartition, OffsetSpec.latest());
+
+ CompletableFuture.allOf(earliest, latest)
+ .thenComposeAsync(nothing -> {
+ long diff = latest.join() - earliest.join();
+
+ if (diff >= 5_000) {
+ log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d",
+ diff, topicPartition.topic(), topicPartition.partition(), offset);
+ // Truncate the topic to the up to the previous offset
+ var recordsToDelete = Map.of(topicPartition, RecordsToDelete.beforeOffset(offset));
+ return adminClient.deleteRecords(recordsToDelete)
+ .all()
+ .toCompletionStage();
+ } else {
+ log.debugf("Offset diff is %d for topic %s, partition %d at offset %d",
+ diff, topicPartition.topic(), topicPartition.partition(), offset);
+ return CompletableFuture.completedStage(null);
+ }
+ }, virtualExec)
+ .join();
+ }
+
+ CompletableFuture getOffset(Admin adminClient, TopicPartition partition, OffsetSpec spec) {
return adminClient.listOffsets(Map.of(partition, spec))
.partitionResult(partition)
.toCompletionStage()
.thenApply(ListOffsetsResultInfo::offset)
.toCompletableFuture();
}
+
+ static CompletableFuture complete(Future future) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new CompletionException(e);
+ } catch (ExecutionException e) {
+ throw new CompletionException(e.getCause());
+ }
+ }, virtualExec);
+ }
}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 750c76a..a2445e0 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,14 +1,21 @@
-mp.messaging.outgoing.timestamps-out.connector=smallrye-kafka
-mp.messaging.outgoing.timestamps-out.topic=__console_datagen_timestamps
+quarkus.devservices.enabled=false
+quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %X [%c{3.}] (%t) %s%e%n
+# Noisy logger in Kafka client
+quarkus.log.category."org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin".level=WARN
-mp.messaging.incoming.timestamps.connector=smallrye-kafka
-mp.messaging.incoming.timestamps.topic=__console_datagen_timestamps
+datagen.enabled=true
+datagen.security.trust-certificates=true
+datagen.consumer-groups=1
+datagen.topics-per-consumer=1
+datagen.partitions-per-topic=1
+datagen.topic-pattern=console_datagen_%03d-%s
-kafka.bootstrap.servers=REQUIRED
-kafka.sasl.jaas.config=REQUIRED
-kafka.ssl.truststore.certificates=REQUIRED
+kafka.sasl.login.refresh.min.period.seconds=60
+kafka.sasl.login.refresh.buffer.seconds=60
-kafka.ssl.truststore.type=PEM
-kafka.security.protocol=SASL_SSL
-kafka.sasl.mechanism=OAUTHBEARER
-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
+kafka.allow.auto.create.topics=false
+kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
+kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
+kafka.auto.offset.reset=earliest
+kafka.key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
+kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer