Skip to content

Commit

Permalink
Improve detection and logging of invalid/missing configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 16, 2024
1 parent 815173d commit d9028c8
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 146 deletions.
135 changes: 83 additions & 52 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -297,7 +298,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
KafkaClusterConfig clusterConfig,
Optional<Kafka> kafkaResource,
Function<Map<String, Object>, Admin> adminBuilder,
boolean replace) {
boolean allowReplacement) {

var adminConfigs = buildConfig(AdminClientConfig.configNames(),
clusterConfig,
Expand Down Expand Up @@ -330,38 +331,29 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
clientConfigs.put(Consumer.class, Collections.unmodifiableMap(consumerConfigs));
clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs));

Admin admin = null;

if (establishGlobalConnection(adminConfigs)) {
admin = adminBuilder.apply(adminConfigs);
}

String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, kafkaResource);

if (!replace && contexts.containsKey(clusterId)) {
if (contexts.containsKey(clusterId) && !allowReplacement) {
log.warnf("""
Ignoring duplicate Kafka cluster id: %s for cluster %s. Cluster id values in \
configuration must be unique and may not match id values of \
clusters discovered using Strimzi Kafka Kubernetes API resources.""", clusterId, clusterKey);
} else {
boolean truststoreNowRequired = truststoreRequired(adminConfigs);
} else if (validConfigs(kafkaResource, clusterConfig, clientConfigs)) {
Admin admin = null;

if (resourceStatusDroppedCertificates(contexts.get(clusterId), kafkaResource, truststoreNowRequired)) {
log.warnf("""
Ignoring update to Kafka custom resource %s. Connection requires \
trusted certificate which is no longer available.""", clusterKey);
} else {
if (truststoreNowRequired && kafkaResource.isPresent()) {
log.warnf("""
Connection requires trusted certificate(s) which are not present \
in the Kafka CR status of resource %s.""", clusterKey);
}
if (establishGlobalConnection(adminConfigs)) {
admin = adminBuilder.apply(adminConfigs);
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
log.infof("%s KafkaContext for cluster %s, id=%s", replace ? "Replacing" : "Adding", clusterKey, clusterId);
KafkaContext previous = contexts.put(clusterId, ctx);
Optional.ofNullable(previous).ifPresent(KafkaContext::close);
KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
KafkaContext previous = contexts.put(clusterId, ctx);

if (previous == null) {
log.infof("Added KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
} else {
log.infof("Replaced KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
previous.close();
}
}
}
Expand All @@ -376,19 +368,6 @@ String clusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource
.orElseGet(clusterConfig::getName);
}

/**
* Checks whether the previous KafkaContext contained TLS trusted certificates, but due to them being
* removed from the Strimzi Kafka CR being in a transient state, they are no longer present. We will ignore
* this update and keep the old KafkaContext.
*/
boolean resourceStatusDroppedCertificates(KafkaContext context, Optional<Kafka> kafkaResource, boolean truststoreNowRequired) {
if (!truststoreNowRequired || context == null || kafkaResource.isEmpty()) {
return false;
}

return !truststoreRequired(context.configs(Admin.class));
}

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> store.getByKey(clusterConfig.clusterKey()))
Expand All @@ -405,6 +384,45 @@ Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
}) : Optional.empty();
}

boolean validConfigs(Optional<Kafka> kafkaResource, KafkaClusterConfig clusterConfig, Map<Class<?>, Map<String, Object>> clientConfigs) {
boolean createContext = true;
boolean missingListenerStatus = kafkaResource.isPresent()
&& getListenerStatus(kafkaResource, clusterConfig.getListener()).isEmpty();
StringBuilder clientsMessage = new StringBuilder();

if (missingListenerStatus) {
clientsMessage.append("; Kafka resource has no status for listener '%s', bootstrap servers and trusted certificates could not be derived"
.formatted(clusterConfig.getListener()));
}

for (Map.Entry<Class<?>, Map<String, Object>> client : clientConfigs.entrySet()) {
Class<?> clientType = client.getKey();
Set<String> missing = findMissingRequiredConfigs(client.getValue());

if (!missing.isEmpty()) {
clientsMessage.append("; %s client is missing required properties: %s"
.formatted(clientType.getSimpleName(), missing));
createContext = false;
}

if (truststoreRequired(client.getValue())) {
clientsMessage.append("""
; %s client is a secure/SSL connection, but no truststore configuration is available. \
The connection may fail if the Kafka cluster is using an untrusted certificate""".formatted(clientType.getSimpleName()));
}
}

if (createContext) {
clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey()));
log.warn(clientsMessage.toString().trim());
} else {
clientsMessage.insert(0, "Missing configuration detected for connection to cluster %s, no connection will be setup".formatted(clusterConfig.clusterKey()));
log.error(clientsMessage.toString().trim());
}

return createContext;
}

Map<String, Object> requiredAdminConfig() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
Expand Down Expand Up @@ -450,6 +468,20 @@ static boolean establishGlobalConnection(Map<String, Object> configs) {
return false;
}

static Set<String> findMissingRequiredConfigs(Map<String, Object> configs) {
Set<String> missing = new LinkedHashSet<>();

if (!configs.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
missing.add(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
}

if (configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG) && !configs.containsKey(SaslConfigs.SASL_MECHANISM)) {
missing.add(SaslConfigs.SASL_MECHANISM);
}

return missing;
}

void disposeKafkaContexts(@Disposes Map<String, KafkaContext> contexts) {
log.infof("Closing all known KafkaContexts");

Expand Down Expand Up @@ -579,12 +611,7 @@ Map<String, Object> buildConfig(Set<String> configNames,
.filter(listener -> listener.getName().equals(config.getListener()))
.findFirst();

var listenerStatus = cluster.map(Kafka::getStatus)
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> listener.getName().equals(config.getListener()))
.findFirst();
var listenerStatus = getListenerStatus(cluster, config.getListener());

listenerSpec.ifPresent(listener -> applyListenerConfiguration(cfg, listener));

Expand All @@ -600,15 +627,10 @@ Map<String, Object> buildConfig(Set<String> configNames,
cfg.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certificates);
});

if (truststoreRequired(cfg)) {
if (cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) && trustManager.isResolvable()) {
trustManager.get().trustClusterCertificate(cfg);
} else {
log.warnf("""
Failed to set configuration for %s client to Kafka cluster %s. Connection \
requires truststore which could not be obtained from the Kafka resource status."""
.formatted(clientType, config.clusterKey()));
}
if (truststoreRequired(cfg) && trustManager.isResolvable()
&& cfg.containsKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
// trustManager is only resolvable in 'dev' mode
trustManager.get().trustClusterCertificate(cfg);
}

cfg.putAll(overrideProperties);
Expand All @@ -622,6 +644,15 @@ Map<String, Object> buildConfig(Set<String> configNames,
return cfg;
}

Optional<ListenerStatus> getListenerStatus(Optional<Kafka> cluster, String listenerName) {
return cluster.map(Kafka::getStatus)
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listener -> listener.getName().equals(listenerName))
.findFirst();
}

private void applyListenerConfiguration(Map<String, Object> cfg, GenericKafkaListener listener) {
var authType = Optional.ofNullable(listener.getAuth())
.map(KafkaListenerAuthentication::getType)
Expand Down
Loading

0 comments on commit d9028c8

Please sign in to comment.