From ceb6222dbc32b849578ad10f07cc76409421fd7c Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 17 Jan 2024 14:29:42 -0500 Subject: [PATCH] Support multiple Kafka clusters and configurable consumer counts (#15) * Support multiple Kafka clusters and configurable consumer counts Signed-off-by: Michael Edgar * Update build files for Java 21 Signed-off-by: Michael Edgar --------- Signed-off-by: Michael Edgar --- .github/workflows/build.yml | 6 +- .github/workflows/release.yml | 2 +- pom.xml | 4 +- src/main/docker/Dockerfile.jvm | 3 +- .../com/github/eyefloaters/AdminFactory.java | 33 -- .../eyefloaters/ClientConfigFactory.java | 200 ++++++++++ .../com/github/eyefloaters/DataGenerator.java | 349 ++++++++++++++---- src/main/resources/application.properties | 29 +- 8 files changed, 506 insertions(+), 120 deletions(-) delete mode 100644 src/main/java/com/github/eyefloaters/AdminFactory.java create mode 100644 src/main/java/com/github/eyefloaters/ClientConfigFactory.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1c09060..329396f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,16 +7,16 @@ on: types: [ opened, reopened, synchronize ] jobs: - build-api: + build: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Setup JDK uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' cache: maven diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 52b5e2a..433b4ba 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -30,7 +30,7 @@ jobs: - name: Setup JDK uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Build and Push API Image diff --git a/pom.xml b/pom.xml index 62bf59c..c5bf209 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 0.0.2-SNAPSHOT 3.11.0 - 17 + 21 UTF-8 UTF-8 quarkus-bom @@ -44,7 +44,7 @@ io.quarkus - quarkus-smallrye-reactive-messaging-kafka + quarkus-kafka-client io.strimzi diff --git a/src/main/docker/Dockerfile.jvm b/src/main/docker/Dockerfile.jvm index 6bdf67e..6aad483 100644 --- a/src/main/docker/Dockerfile.jvm +++ b/src/main/docker/Dockerfile.jvm @@ -77,11 +77,10 @@ # accessed directly. (example: "foo.example.com,bar.example.com") # ### -FROM registry.access.redhat.com/ubi8/openjdk-17:1.18 +FROM registry.access.redhat.com/ubi9/openjdk-21:1.17-2 ENV LANGUAGE='en_US:en' - # We make four distinct layers so if there are application changes the library layers can be re-used COPY --chown=185 target/quarkus-app/lib/ /deployments/lib/ COPY --chown=185 target/quarkus-app/*.jar /deployments/ diff --git a/src/main/java/com/github/eyefloaters/AdminFactory.java b/src/main/java/com/github/eyefloaters/AdminFactory.java deleted file mode 100644 index fdb72e4..0000000 --- a/src/main/java/com/github/eyefloaters/AdminFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.github.eyefloaters; - -import java.util.HashMap; -import java.util.Map; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Produces; -import jakarta.inject.Inject; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; - -import io.smallrye.common.annotation.Identifier; - -@ApplicationScoped -public class AdminFactory { - - @Inject - @Identifier("default-kafka-broker") - Map config; - - @Produces - Admin getAdmin() { - Map copy = new HashMap<>(); - for (Map.Entry entry : config.entrySet()) { - if (AdminClientConfig.configNames().contains(entry.getKey())) { - copy.put(entry.getKey(), entry.getValue()); - } - } - return Admin.create(copy); - } - -} diff --git a/src/main/java/com/github/eyefloaters/ClientConfigFactory.java b/src/main/java/com/github/eyefloaters/ClientConfigFactory.java new file mode 100644 index 0000000..42ae5d6 --- /dev/null +++ b/src/main/java/com/github/eyefloaters/ClientConfigFactory.java @@ -0,0 +1,200 @@ +package com.github.eyefloaters; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import jakarta.inject.Named; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class ClientConfigFactory { + + @Inject + Logger log; + + @Inject + Config config; + + @Inject + @ConfigProperty(name = "datagen.security.trust-certificates", defaultValue = "false") + boolean trustCertificates; + + @Inject + @ConfigProperty(name = "datagen.kafka") + Map clusterNames; + + @Inject + @Identifier("default-kafka-broker") + Map defaultClusterConfigs; + + @Produces + @Named("adminConfigs") + Map> getAdminConfigs() { + return clusterNames.entrySet() + .stream() + .map(cluster -> { + var configs = buildConfig(AdminClientConfig.configNames(), cluster.getKey()); + logConfig("Admin[" + cluster.getKey() + ']', configs); + return Map.entry(unquote(cluster.getValue()), configs); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Produces + @Named("producerConfigs") + Map> getProducerConfigs() { + return clusterNames.entrySet() + .stream() + .map(cluster -> { + var configs = buildConfig(ProducerConfig.configNames(), cluster.getKey()); + logConfig("Producer[" + cluster.getKey() + ']', configs); + return Map.entry(unquote(cluster.getValue()), configs); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Produces + @Named("consumerConfigs") + Map> getConsumerConfigs() { + return clusterNames.entrySet() + .stream() + .map(cluster -> { + Set configNames = ConsumerConfig.configNames().stream() + // Do not allow a group Id to be set for this application + .filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals)) + .collect(Collectors.toSet()); + var configs = buildConfig(configNames, cluster.getKey()); + logConfig("Consumer[" + cluster.getKey() + ']', configs); + return Map.entry(unquote(cluster.getValue()), configs); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + Map buildConfig(Set configNames, String clusterKey) { + Map cfg = configNames + .stream() + .map(configName -> getClusterConfig(clusterKey, configName) + .or(() -> getDefaultConfig(clusterKey, configName)) + .map(configValue -> Map.entry(configName, configValue))) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (truststoreRequired(cfg)) { + trustClusterCertificate(cfg); + } + + return cfg; + } + + Optional getClusterConfig(String clusterKey, String configName) { + return config.getOptionalValue("datagen.kafka." + clusterKey + '.' + configName, String.class) + .map(cfg -> { + log.tracef("OVERRIDE config %s for cluster %s", configName, clusterKey); + return unquote(cfg); + }); + } + + Optional getDefaultConfig(String clusterKey, String configName) { + if (defaultClusterConfigs.containsKey(configName)) { + log.tracef("DEFAULT config %s for cluster %s", configName, clusterKey); + String cfg = defaultClusterConfigs.get(configName).toString(); + return Optional.of(unquote(cfg)); + } + + return Optional.empty(); + } + + String unquote(String cfg) { + return BOUNDARY_QUOTES.matcher(cfg).replaceAll(""); + } + + boolean truststoreRequired(Map cfg) { + var securityProtocol = cfg.getOrDefault(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ""); + var trustStoreMissing = !cfg.containsKey(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); + + return trustCertificates && trustStoreMissing && securityProtocol.toString().contains("SSL"); + } + + void trustClusterCertificate(Map cfg) { + TrustManager trustAllCerts = new X509TrustManager() { + public X509Certificate[] getAcceptedIssuers() { + return null; // NOSONAR + } + + public void checkClientTrusted(X509Certificate[] certs, String authType) { // NOSONAR + // all trusted + } + + public void checkServerTrusted(X509Certificate[] certs, String authType) { // NOSONAR + // all trusted + } + }; + + try { + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, new TrustManager[] { trustAllCerts }, new SecureRandom()); + SSLSocketFactory factory = sc.getSocketFactory(); + String bootstrap = (String) cfg.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + String[] hostport = bootstrap.split(",")[0].split(":"); + ByteArrayOutputStream certificateOut = new ByteArrayOutputStream(); + + try (SSLSocket socket = (SSLSocket) factory.createSocket(hostport[0], Integer.parseInt(hostport[1]))) { + Certificate[] chain = socket.getSession().getPeerCertificates(); + for (Certificate certificate : chain) { + certificateOut.write("-----BEGIN CERTIFICATE-----\n".getBytes(StandardCharsets.UTF_8)); + certificateOut.write(Base64.getMimeEncoder(80, new byte[] {'\n'}).encode(certificate.getEncoded())); + certificateOut.write("\n-----END CERTIFICATE-----\n".getBytes(StandardCharsets.UTF_8)); + } + } + + cfg.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, + new String(certificateOut.toByteArray(), StandardCharsets.UTF_8).trim()); + cfg.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + log.warnf("Certificate hosted at %s:%s is automatically trusted", hostport[0], hostport[1]); + } catch (Exception e) { + log.infof("Exception setting up trusted certificate: %s", e.getMessage()); + } + } + + void logConfig(String clientType, Map config) { + if (log.isDebugEnabled()) { + String msg = config.entrySet() + .stream() + .map(entry -> "\t%s = %s".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining("\n", "%s configuration:\n", "")); + log.debugf(msg, clientType); + } + } + + private static final Pattern BOUNDARY_QUOTES = Pattern.compile("(^[\"'])|([\"']$)"); +} diff --git a/src/main/java/com/github/eyefloaters/DataGenerator.java b/src/main/java/com/github/eyefloaters/DataGenerator.java index ed0b46a..f5ea60e 100644 --- a/src/main/java/com/github/eyefloaters/DataGenerator.java +++ b/src/main/java/com/github/eyefloaters/DataGenerator.java @@ -1,133 +1,346 @@ package com.github.eyefloaters; +import java.time.Duration; import java.time.Instant; import java.util.Base64; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.IntStream; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Shutdown; import jakarta.enterprise.event.Startup; import jakarta.inject.Inject; +import jakarta.inject.Named; import jakarta.json.Json; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; -import org.eclipse.microprofile.context.ManagedExecutor; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Emitter; -import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; +import org.jboss.logging.MDC; @ApplicationScoped public class DataGenerator { + static final String TOPIC_NAME_TEMPLATE = "console_datagen_%03d-%s"; + static final String CLUSTER_NAME_KEY = "cluster.name"; + @Inject Logger log; @Inject - ManagedExecutor exec; + @ConfigProperty(name = "datagen.enabled", defaultValue = "true") + boolean datagenEnabled; + + @Inject + @ConfigProperty(name = "datagen.consumer-groups", defaultValue = "1") + int consumerCount; + + @Inject + @ConfigProperty(name = "datagen.topics-per-consumer", defaultValue = "1") + int topicsPerConsumer; + + @Inject + @ConfigProperty(name = "datagen.partitions-per-topic", defaultValue = "1") + int partitionsPerTopic; + + @Inject + @ConfigProperty(name = "datagen.topic-name-template", defaultValue = TOPIC_NAME_TEMPLATE) + String topicNameTemplate; @Inject - @Channel("timestamps-out") - Emitter timestampEmitter; + @Named("adminConfigs") + Map> adminConfigs; @Inject - Admin adminClient; + @Named("producerConfigs") + Map> producerConfigs; + + @Inject + @Named("consumerConfigs") + Map> consumerConfigs; + + static ExecutorService virtualExec = Executors.newVirtualThreadPerTaskExecutor(); AtomicBoolean running = new AtomicBoolean(true); + Random generator = new Random(); - long recordsProduced = 0; - long recordsConsumed = 0; + Map adminClients = new HashMap<>(); + Map> recordsConsumed = new ConcurrentHashMap<>(); + Map> recordsProduced = new ConcurrentHashMap<>(); void start(@Observes Startup startupEvent /* NOSONAR */) { - Random generator = new Random(); - byte[] buffer = new byte[1000]; + if (!datagenEnabled) { + log.info("datagen.enabled=false ; producers and consumers will not be started"); + return; + } - exec.submit(() -> { - while (running.get()) { - long start = System.currentTimeMillis(); - long rate = 100 * ((start / 10000) % 5) + 10; + adminConfigs.forEach((clusterKey, configProperties) -> { + virtualExec.submit(() -> { + MDC.put(CLUSTER_NAME_KEY, clusterKey); - for (int i = 0; i < rate; i++) { - generator.nextBytes(buffer); - String value = Json.createObjectBuilder() - .add("timestamp", Instant.now().toString()) - .add("payload", Base64.getEncoder().encodeToString(buffer)) - .build() - .toString(); + Admin adminClient = Admin.create(configProperties); + adminClients.put(clusterKey, adminClient); - timestampEmitter.send(value); + IntStream.range(0, consumerCount).forEach(groupNumber -> { + var topics = IntStream.range(0, topicsPerConsumer) + .mapToObj(t -> topicNameTemplate.formatted(groupNumber, (char) ('a' + t))) + .toList(); - if (++recordsProduced % 10_000 == 0) { - log.infof("Produced %d records (since startup)", recordsProduced); - } - } + initialize(adminClient, topics, partitionsPerTopic); - long end = System.currentTimeMillis(); - long sleepTime = Math.max(0, 1000 - (end - start)); + virtualExec.submit(() -> { + var configs = new HashMap<>(producerConfigs.get(clusterKey)); + String clientId = "console-datagen-producer-" + groupNumber; + configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - log.debugf("Produced %d messages in %dms, sleeping %dms", rate, end - start, sleepTime); + MDC.put(CLUSTER_NAME_KEY, clusterKey); + MDC.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - log.warn("Producer thread was interrupted, breaking from loop"); - Thread.currentThread().interrupt(); - break; - } - } + try (Producer producer = new KafkaProducer<>(producerConfigs.get(clusterKey))) { + while (running.get()) { + produce(clusterKey, producer, topics); + } + } catch (Exception e) { + log.warnf(e, "Error producing: %s", e.getMessage()); + } + + log.infof("Run loop complete for producer %d on cluster %s", groupNumber, clusterKey); + }); + + virtualExec.submit(() -> { + var configs = new HashMap<>(consumerConfigs.get(clusterKey)); + String groupId = "console-datagen-group-" + groupNumber; + String clientId = "console-datagen-consumer-" + groupNumber; + + configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + MDC.put(CLUSTER_NAME_KEY, clusterKey); + MDC.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + try (Consumer consumer = new KafkaConsumer<>(configs)) { + consumer.subscribe(topics); + + while (running.get()) { + consumer.poll(Duration.ofSeconds(2)).forEach(rec -> consume(clusterKey, rec)); + } + } catch (Exception e) { + log.warnf(e, "Error consuming: %s", e.getMessage()); + } + + log.infof("Run loop complete for consumer group %s", groupId); + }); + }); + }); }); } - void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) { + void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) throws Exception { running.set(false); + + adminClients.forEach((clusterKey, client) -> { + log.infof("Stopping Admin client for cluster %s...", clusterKey); + client.close(); + log.infof("Admin client for cluster %s closed", clusterKey); + }); + + virtualExec.shutdown(); + virtualExec.awaitTermination(10, TimeUnit.SECONDS); } - @Incoming("timestamps") - public void consume(ConsumerRecord rec) { - if (++recordsConsumed % 10_000 == 0) { - log.infof("Consumed %d records, latest is partition %d, offset %d", - recordsConsumed, rec.partition(), rec.offset()); - - var partition = new TopicPartition(rec.topic(), rec.partition()); - var earliest = getOffset(partition, OffsetSpec.earliest()); - var latest = getOffset(partition, OffsetSpec.latest()); - - CompletableFuture.allOf(earliest, latest) - .thenCompose(nothing -> { - long diff = latest.join() - earliest.join(); - - if (diff >= 1_000_000) { - log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d", - diff, rec.topic(), rec.partition(), rec.offset()); - // Truncate the topic to the up to the previous offset - return adminClient.deleteRecords(Map.of(new TopicPartition(rec.topic(), rec.partition()), - RecordsToDelete.beforeOffset(rec.offset()))) - .all() - .toCompletionStage(); - } else { - log.infof("Offset diff is %d for topic %s, partition %d at offset %d", - diff, rec.topic(), rec.partition(), rec.offset()); - return CompletableFuture.completedStage(null); + void initialize(Admin adminClient, List topics, int partitionsPerTopic) { + List dataGenGroups = adminClient.listConsumerGroups() + .all() + .toCompletionStage() + .toCompletableFuture() + .join() + .stream() + .map(ConsumerGroupListing::groupId) + .filter(name -> name.startsWith("console-datagen-group-")) + .toList(); + + adminClient.deleteConsumerGroups(dataGenGroups) + .all() + .toCompletionStage() + .exceptionally(error -> { + log.warnf(error, "Error deleting consumer groups: %s", error.getMessage()); + return null; + }) + .toCompletableFuture() + .join(); + + adminClient.deleteTopics(topics) + .all() + .toCompletionStage() + .exceptionally(error -> { + log.warnf(error, "Error deleting topics: %s", error.getMessage()); + return null; + }) + .toCompletableFuture() + .join(); + + var newTopics = topics.stream() + .map(t -> new NewTopic(t, partitionsPerTopic, (short) 3) + .configs(Map.of( + // 10 MiB + "segment.bytes", Integer.toString(10 * 1024 * 1024), + // 10 minutes + "segment.ms", Long.toString(TimeUnit.MINUTES.toMillis(10))))) + .toList(); + + log.debugf("Creating topics: %s", topics); + + var pending = adminClient.createTopics(newTopics) + .values() + .values() + .stream() + .map(KafkaFuture::toCompletionStage) + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(pending) + .thenRun(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5))) + .thenRun(() -> log.infof("Topics created: %s", topics)) + .join(); + } + + void produce(String clusterKey, Producer producer, List topics) { + byte[] buffer = new byte[1000]; + int t = 0; + long start = System.currentTimeMillis(); + long rate = 100 * ((start / 10000) % 5) + 10; + + + for (int i = 0; i < rate; i++) { + if (!running.get()) { + return; + } + + generator.nextBytes(buffer); + + byte[] value = Json.createObjectBuilder() + .add("timestamp", Instant.now().toString()) + .add("payload", Base64.getEncoder().encodeToString(buffer)) + .build() + .toString() + .getBytes(); + + String topic = topics.get(t++ % topics.size()); + + complete(producer.send(new ProducerRecord<>(topic, value))) + .thenAccept(meta -> { + TopicPartition topicPartition = new TopicPartition(meta.topic(), meta.partition()); + var currentCount = incrementAndGet(recordsProduced, clusterKey, topicPartition); + + if (currentCount % 5_000 == 0) { + log.infof("Produced %d records to %s/%s (since startup)", currentCount, clusterKey, topicPartition); } }) - .join(); + .exceptionally(error -> { + log.warnf(error, "Error producing record: %s", error.getMessage()); + return null; + }); + } + + long end = System.currentTimeMillis(); + long sleepTime = Math.max(0, 1000 - (end - start)); + + log.debugf("Produced %d messages in %dms, sleeping %dms", rate, end - start, sleepTime); + if (running.get()) { + LockSupport.parkUntil(System.currentTimeMillis() + sleepTime); + } + } + + public void consume(String clusterKey, ConsumerRecord rec) { + TopicPartition topicPartition = new TopicPartition(rec.topic(), rec.partition()); + var currentCount = incrementAndGet(recordsConsumed, clusterKey, topicPartition); + + if (currentCount % 5_000 == 0) { + log.infof("Consumed %d records from partition %s, latest is offset %d", + currentCount, topicPartition, rec.offset()); + maybeDeleteRecords(adminClients.get(clusterKey), topicPartition, rec.offset()); } } - CompletableFuture getOffset(TopicPartition partition, OffsetSpec spec) { + long incrementAndGet(Map> counters, String clusterKey, TopicPartition topicPartition) { + return counters + .computeIfAbsent(clusterKey, k -> new ConcurrentHashMap<>()) + .compute(topicPartition, (k, v) -> v == null ? 1 : v + 1); + } + + void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long offset) { + var earliest = getOffset(adminClient, topicPartition, OffsetSpec.earliest()); + var latest = getOffset(adminClient, topicPartition, OffsetSpec.latest()); + + CompletableFuture.allOf(earliest, latest) + .thenComposeAsync(nothing -> { + long diff = latest.join() - earliest.join(); + + if (diff >= 5_000) { + log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d", + diff, topicPartition.topic(), topicPartition.partition(), offset); + // Truncate the topic to the up to the previous offset + var recordsToDelete = Map.of(topicPartition, RecordsToDelete.beforeOffset(offset)); + return adminClient.deleteRecords(recordsToDelete) + .all() + .toCompletionStage(); + } else { + log.debugf("Offset diff is %d for topic %s, partition %d at offset %d", + diff, topicPartition.topic(), topicPartition.partition(), offset); + return CompletableFuture.completedStage(null); + } + }, virtualExec) + .join(); + } + + CompletableFuture getOffset(Admin adminClient, TopicPartition partition, OffsetSpec spec) { return adminClient.listOffsets(Map.of(partition, spec)) .partitionResult(partition) .toCompletionStage() .thenApply(ListOffsetsResultInfo::offset) .toCompletableFuture(); } + + static CompletableFuture complete(Future future) { + return CompletableFuture.supplyAsync(() -> { + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } catch (ExecutionException e) { + throw new CompletionException(e.getCause()); + } + }, virtualExec); + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 750c76a..a2445e0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,14 +1,21 @@ -mp.messaging.outgoing.timestamps-out.connector=smallrye-kafka -mp.messaging.outgoing.timestamps-out.topic=__console_datagen_timestamps +quarkus.devservices.enabled=false +quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %X [%c{3.}] (%t) %s%e%n +# Noisy logger in Kafka client +quarkus.log.category."org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin".level=WARN -mp.messaging.incoming.timestamps.connector=smallrye-kafka -mp.messaging.incoming.timestamps.topic=__console_datagen_timestamps +datagen.enabled=true +datagen.security.trust-certificates=true +datagen.consumer-groups=1 +datagen.topics-per-consumer=1 +datagen.partitions-per-topic=1 +datagen.topic-pattern=console_datagen_%03d-%s -kafka.bootstrap.servers=REQUIRED -kafka.sasl.jaas.config=REQUIRED -kafka.ssl.truststore.certificates=REQUIRED +kafka.sasl.login.refresh.min.period.seconds=60 +kafka.sasl.login.refresh.buffer.seconds=60 -kafka.ssl.truststore.type=PEM -kafka.security.protocol=SASL_SSL -kafka.sasl.mechanism=OAUTHBEARER -kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +kafka.allow.auto.create.topics=false +kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer +kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer +kafka.auto.offset.reset=earliest +kafka.key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer +kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer