Skip to content

Commit

Permalink
Load cluster configuration from YAML
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed May 13, 2024
1 parent cd5abc3 commit 623607b
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 85 deletions.
2 changes: 2 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@
</goals>
<configuration>
<systemProperties>
<quarkus.jacoco.reuse-data-file>true</quarkus.jacoco.reuse-data-file>
<quarkus.jacoco.report>false</quarkus.jacoco.report>
<quarkus.docker.dockerfile-jvm-path>src/main/docker/Dockerfile</quarkus.docker.dockerfile-jvm-path>
</systemProperties>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.github.eyefloaters.console.api;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -42,6 +46,10 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.github.eyefloaters.console.api.config.ConsoleConfig;
import com.github.eyefloaters.console.api.config.KafkaClusterConfig;
import com.github.eyefloaters.console.api.service.KafkaClusterService;
import com.github.eyefloaters.console.api.support.TrustAllCertificateManager;

Expand All @@ -68,7 +76,6 @@
@ApplicationScoped
public class ClientFactory {

static final String KAFKA_CONFIG_PREFIX = "console.kafka";
static final String NO_SUCH_KAFKA_MESSAGE = "Requested Kafka cluster %s does not exist or is not configured";
private final Function<String, NotFoundException> noSuchKafka =
clusterName -> new NotFoundException(NO_SUCH_KAFKA_MESSAGE.formatted(clusterName));
Expand All @@ -80,8 +87,8 @@ public class ClientFactory {
Config config;

@Inject
@ConfigProperty(name = KAFKA_CONFIG_PREFIX)
Optional<Map<String, String>> clusterNames;
@ConfigProperty(name = "console.config-path")
Optional<String> configPath;

@Inject
SharedIndexInformer<Kafka> kafkaInformer;
Expand Down Expand Up @@ -118,8 +125,29 @@ public class ClientFactory {
@Named("kafkaAdminFilter")
UnaryOperator<Admin> kafkaAdminFilter = UnaryOperator.identity();

private Map<String, String> clusterNames() {
return clusterNames.orElseGet(Collections::emptyMap);
@Produces
@ApplicationScoped
public ConsoleConfig produceConsoleConfig() {
return configPath.map(Path::of)
.map(Path::toUri)
.map(uri -> {
try {
return uri.toURL();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.filter(Objects::nonNull)
.map(url -> {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

try (InputStream stream = url.openStream()) {
return mapper.readValue(stream, ConsoleConfig.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.orElseGet(ConsoleConfig::new);
}

/**
Expand Down Expand Up @@ -153,7 +181,7 @@ public Supplier<Kafka> kafkaResourceSupplier() {

@Produces
@ApplicationScoped
Map<String, Admin> getAdmins(Function<Map<String, Object>, Admin> adminBuilder) {
Map<String, Admin> getAdmins(ConsoleConfig consoleConfig, Function<Map<String, Object>, Admin> adminBuilder) {
final Map<String, Admin> adminClients = new HashMap<>();

kafkaInformer.addEventHandlerWithResyncPeriod(new ResourceEventHandler<Kafka>() {
Expand All @@ -168,12 +196,11 @@ public void onUpdate(Kafka oldKafka, Kafka newKafka) {
private void put(Kafka kafka, String eventType) {
String clusterKey = Cache.metaNamespaceKeyFunc(kafka);

clusterNames().entrySet()
.stream()
.filter(e -> clusterKey.equals(e.getValue()))
.findFirst()
consoleConfig.getKafka()
.getCluster(clusterKey)
.map(e -> {
var configs = buildConfig(AdminClientConfig.configNames(), e.getKey(), "admin", kafka);
var configs = buildConfig(AdminClientConfig.configNames(), e, "admin", e::getAdminProperties, kafka);

if (truststoreRequired(configs)) {
log.warnf("""
%s Admin client for Kafka cluster %s failed. Connection \
Expand All @@ -183,7 +210,11 @@ private void put(Kafka kafka, String eventType) {
.formatted(eventType, kafka.getStatus().getClusterId()));
return null;
} else {
logConfig("Admin[key=%s, id=%s]".formatted(e.getKey(), kafka.getStatus().getClusterId()), configs);
logConfig("Admin[name=%s, namespace=%s, id=%s]".formatted(
e.getName(),
e.getNamespace(),
kafka.getStatus().getClusterId()),
configs);
return adminBuilder.apply(configs);
}
})
Expand Down Expand Up @@ -236,31 +267,34 @@ public void adminClientDisposer(@Disposes Supplier<Admin> client, Map<String, Ad

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(Supplier<Kafka> cluster) {
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, Supplier<Kafka> cluster) {
String clusterKey = Cache.metaNamespaceKeyFunc(cluster.get());

return clusterNames().entrySet()
.stream()
.filter(e -> clusterKey.equals(e.getValue()))
return consoleConfig.getKafka()
.getCluster(clusterKey)
.<Supplier<Consumer<byte[], byte[]>>>map(e -> {

Set<String> configNames = ConsumerConfig.configNames().stream()
// Do not allow a group Id to be set for this application
.filter(Predicate.not(ConsumerConfig.GROUP_ID_CONFIG::equals))
.collect(Collectors.toSet());
var configs = buildConfig(configNames, e.getKey(), "consumer", cluster.get());

var configs = buildConfig(configNames, e, "consumer", e::getConsumerProperties, cluster.get());
configs.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);

logConfig("Consumer[" + e.getKey() + ']', configs);
logConfig("Consumer[name=%s, namespace=%s]".formatted(
e.getName(),
e.getNamespace()),
configs);
@SuppressWarnings("resource") // no resource leak - client closed by disposer
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs);
return () -> client;
})
.findFirst()
.orElseThrow(() -> noSuchKafka.apply(cluster.get().getStatus().getClusterId()));
}

Expand All @@ -270,39 +304,46 @@ public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consum

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(Supplier<Kafka> cluster) {
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, Supplier<Kafka> cluster) {
String clusterKey = Cache.metaNamespaceKeyFunc(cluster.get());

return clusterNames().entrySet()
.stream()
.filter(e -> clusterKey.equals(e.getValue()))
return consoleConfig.getKafka()
.getCluster(clusterKey)
.<Supplier<Producer<String, String>>>map(e -> {
var configs = buildConfig(ProducerConfig.configNames(), e.getKey(), "producer", cluster.get());
var configs = buildConfig(ProducerConfig.configNames(), e, "producer", e::getProducerProperties, cluster.get());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
configs.put(ProducerConfig.RETRIES_CONFIG, 0);

logConfig("Producer[" + e.getKey() + ']', configs);
logConfig("Producer[name=%s, namespace=%s]".formatted(
e.getName(),
e.getNamespace()),
configs);
@SuppressWarnings("resource") // no resource leak - client closed by disposer
Producer<String, String> client = new KafkaProducer<>(configs);
return () -> client;
})
.findFirst()
.orElseThrow(() -> noSuchKafka.apply(cluster.get().getStatus().getClusterId()));
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
}

Map<String, Object> buildConfig(Set<String> configNames, String clusterKey, String clientType, Kafka cluster) {
Map<String, Object> buildConfig(Set<String> configNames,
KafkaClusterConfig config,
String clientType,
Supplier<Map<String, String>> clientProperties,
Kafka cluster) {

Map<String, Object> cfg = configNames
.stream()
.map(configName -> getClusterConfig(clusterKey, clientType, configName)
.or(() -> getDefaultConfig(clusterKey, clientType, configName))
.map(configName -> Optional.ofNullable(clientProperties.get().get(configName))
.or(() -> Optional.ofNullable(config.getProperties().get(configName)))
.or(() -> getDefaultConfig(clientType, configName))
.map(configValue -> Map.entry(configName, configValue)))
.filter(Optional::isPresent)
.map(Optional::get)
Expand All @@ -316,10 +357,17 @@ Map<String, Object> buildConfig(Set<String> configNames, String clusterKey, Stri
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> cfg.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "")
.toString()
.contains(listener.getBootstrapServers()))
.filter(listener -> {
if (listener.getName().equals(config.getListener())) {
return true;
}

return cfg.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "")
.toString()
.contains(listener.getBootstrapServers());
})
.map(ListenerStatus::getCertificates)
.filter(Objects::nonNull)
.filter(Predicate.not(Collection::isEmpty))
.findFirst()
.ifPresent(certificates -> {
Expand All @@ -332,28 +380,13 @@ Map<String, Object> buildConfig(Set<String> configNames, String clusterKey, Stri
return cfg;
}

Optional<String> getClusterConfig(String clusterKey, String clientType, String configName) {
String clientSpecificKey = "%s.%s.%s.%s".formatted(KAFKA_CONFIG_PREFIX, clusterKey, clientType, configName);
String generalKey = "%s.%s.%s".formatted(KAFKA_CONFIG_PREFIX, clusterKey, configName);

return config.getOptionalValue(clientSpecificKey, String.class)
.or(() -> config.getOptionalValue(generalKey, String.class))
.map(cfg -> {
log.tracef("OVERRIDE config %s for cluster %s", configName, clusterKey);
return unquote(cfg);
});
}

Optional<String> getDefaultConfig(String clusterKey, String clientType, String configName) {
String clientSpecificKey = "kafka.%s.%s".formatted(clientType, configName);
String generalKey = "kafka.%s".formatted(configName);
Optional<String> getDefaultConfig(String clientType, String configName) {
String clientSpecificKey = "console.kafka.%s.%s".formatted(clientType, configName);
String generalKey = "console.kafka.%s".formatted(configName);

return config.getOptionalValue(clientSpecificKey, String.class)
.or(() -> config.getOptionalValue(generalKey, String.class))
.map(cfg -> {
log.tracef("DEFAULT config %s for cluster %s", configName, clusterKey);
return unquote(cfg);
});
.map(this::unquote);
}

String unquote(String cfg) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.github.eyefloaters.console.api.config;

public class ConsoleConfig {

KafkaConfig kafka = new KafkaConfig();

public KafkaConfig getKafka() {
return kafka;
}

public void setKafka(KafkaConfig kafka) {
this.kafka = kafka;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.github.eyefloaters.console.api.config;

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnore;

public class KafkaClusterConfig {

private String name;
private String namespace;
private String listener;
private boolean readOnly;
private Map<String, String> properties = new HashMap<>();
private Map<String, String> adminProperties = new HashMap<>();
private Map<String, String> consumerProperties = new HashMap<>();
private Map<String, String> producerProperties = new HashMap<>();

@JsonIgnore
public String clusterKey() {
return "%s/%s".formatted(namespace, name);
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getListener() {
return listener;
}

public void setListener(String listener) {
this.listener = listener;
}

public boolean isReadOnly() {
return readOnly;
}

public void setReadOnly(boolean readOnly) {
this.readOnly = readOnly;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

public Map<String, String> getAdminProperties() {
return adminProperties;
}

public void setAdminProperties(Map<String, String> adminProperties) {
this.adminProperties = adminProperties;
}

public Map<String, String> getConsumerProperties() {
return consumerProperties;
}

public void setConsumerProperties(Map<String, String> consumerProperties) {
this.consumerProperties = consumerProperties;
}

public Map<String, String> getProducerProperties() {
return producerProperties;
}

public void setProducerProperties(Map<String, String> producerProperties) {
this.producerProperties = producerProperties;
}

}
Loading

0 comments on commit 623607b

Please sign in to comment.