Skip to content

Commit

Permalink
Apicurio Registry integration with Avro/Protobuf ser/des support (#1109)
Browse files Browse the repository at this point in the history
* Apicurio Registry integration with Avro+Protobuf ser/des support
* Minimal UI changes to display encoded key/value types
* Add registry configuration to CRD, map to console YAML secret
* Add error meta data for scenarios when key/value can not be decoded
* Re-factor registry config to top level, reference from Kafka config
* Improve config model validation, add tests

---------

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Oct 28, 2024
1 parent cf5ba52 commit 87ee6b0
Show file tree
Hide file tree
Showing 50 changed files with 2,714 additions and 438 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ kubectl patch deployment -n ingress-nginx ingress-nginx-controller \
```

### Prerequisites
#### Apache Kafka<sup>®</sup>
#### Apache Kafka®
The instructions below assume an existing Apache Kafka<sup>®</sup> cluster is available to use from the console. We recommend using [Strimzi](https://strimzi.io) to create and manage your Apache Kafka<sup>®</sup> clusters - plus the console provides additional features and insights for Strimzi Apache Kafka<sup>®</sup> clusters.

If you already have Strimzi installed but would like to create an Apache Kafka<sup>®</sup> cluster for use with the console, example deployment resources are available to get started. The resources create an Apache Kafka<sup>®</sup> cluster in KRaft mode with SCRAM-SHA-512 authentication, a Strimzi `KafkaNodePool` resource to manage the cluster nodes, and a Strimzi `KafkaUser` resource that may be used to connect to the cluster.
Expand Down
26 changes: 22 additions & 4 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -84,6 +84,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-annotation</artifactId>
Expand All @@ -109,14 +113,27 @@
<artifactId>kafka-oauth-client</artifactId>
</dependency>

<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-protobuf-serde</artifactId>
<version>${apicurio-registry.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>io.xlate</groupId>
<artifactId>validators</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -358,6 +375,7 @@
<systemProperties>
<keycloak.image>${keycloak.image}</keycloak.image>
<strimzi.test-container.kafka.custom.image>${strimzi-kafka.tag}</strimzi.test-container.kafka.custom.image>
<apicurio-registry.version>${apicurio-registry.version}</apicurio-registry.version>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
<quarkus.jacoco.reuse-data-file>true</quarkus.jacoco.reuse-data-file>
Expand Down
153 changes: 47 additions & 106 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.github.streamshub.console.api;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -15,7 +11,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -56,19 +51,18 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.github.streamshub.console.api.service.KafkaClusterService;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand Down Expand Up @@ -123,21 +117,14 @@ public class ClientFactory {
Config config;

@Inject
ScheduledExecutorService scheduler;
ObjectMapper mapper;

@Inject
@ConfigProperty(name = "console.config-path")
Optional<String> configPath;
ConsoleConfig consoleConfig;

@Inject
Holder<SharedIndexInformer<Kafka>> kafkaInformer;

@Inject
KafkaClusterService kafkaClusterService;

@Inject
ValidationProxy validationService;

@Inject
Instance<TrustAllCertificateManager> trustManager;

Expand Down Expand Up @@ -169,54 +156,12 @@ public class ClientFactory {

@Produces
@ApplicationScoped
public ConsoleConfig produceConsoleConfig() {
return configPath.map(Path::of)
.map(Path::toUri)
.map(uri -> {
try {
return uri.toURL();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.filter(Objects::nonNull)
.map(url -> {
log.infof("Loading console configuration from %s", url);

ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

try (InputStream stream = url.openStream()) {
return mapper.readValue(stream, ConsoleConfig.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.map(consoleConfig -> {
consoleConfig.getKafka().getClusters().stream().forEach(cluster -> {
resolveValues(cluster.getProperties());
resolveValues(cluster.getAdminProperties());
resolveValues(cluster.getProducerProperties());
resolveValues(cluster.getConsumerProperties());
});

return consoleConfig;
})
.map(validationService::validate)
.orElseGet(() -> {
log.warn("Console configuration has not been specified using `console.config-path` property");
return new ConsoleConfig();
});
}

@Produces
@ApplicationScoped
Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
Function<Map<String, Object>, Admin> adminBuilder) {
Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Admin> adminBuilder) {

final Map<String, KafkaContext> contexts = new ConcurrentHashMap<>();

if (kafkaInformer.isPresent()) {
addKafkaEventHandler(contexts, consoleConfig, adminBuilder);
addKafkaEventHandler(contexts, adminBuilder);
}

// Configure clusters that will not be configured by events
Expand All @@ -234,7 +179,6 @@ Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
}

void addKafkaEventHandler(Map<String, KafkaContext> contexts,
ConsoleConfig consoleConfig,
Function<Map<String, Object>, Admin> adminBuilder) {

kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler<Kafka>() {
Expand Down Expand Up @@ -278,7 +222,7 @@ public void onDelete(Kafka kafka, boolean deletedFinalStateUnknown) {
findConfig(kafka).ifPresentOrElse(
clusterConfig -> {
String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, Optional.of(kafka));
String clusterId = KafkaContext.clusterId(clusterConfig, Optional.of(kafka));
log.infof("Removing KafkaContext for cluster %s, id=%s", clusterKey, clusterId);
log.debugf("Known KafkaContext identifiers: %s", contexts.keySet());
KafkaContext previous = contexts.remove(clusterId);
Expand Down Expand Up @@ -332,7 +276,7 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
clientConfigs.put(Producer.class, Collections.unmodifiableMap(producerConfigs));

String clusterKey = clusterConfig.clusterKey();
String clusterId = clusterId(clusterConfig, kafkaResource);
String clusterId = KafkaContext.clusterId(clusterConfig, kafkaResource);

if (contexts.containsKey(clusterId) && !allowReplacement) {
log.warnf("""
Expand All @@ -346,7 +290,19 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
admin = adminBuilder.apply(adminConfigs);
}

SchemaRegistryConfig registryConfig = null;

if (clusterConfig.getSchemaRegistry() != null) {
registryConfig = consoleConfig.getSchemaRegistries()
.stream()
.filter(registry -> registry.getName().equals(clusterConfig.getSchemaRegistry()))
.findFirst()
.orElseThrow();
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(registryConfig, mapper);

KafkaContext previous = contexts.put(clusterId, ctx);

if (previous == null) {
Expand All @@ -362,12 +318,6 @@ boolean defaultedClusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kaf
return clusterConfig.getId() == null && kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId).isEmpty();
}

String clusterId(KafkaClusterConfig clusterConfig, Optional<Kafka> kafkaResource) {
return Optional.ofNullable(clusterConfig.getId())
.or(() -> kafkaResource.map(Kafka::getStatus).map(KafkaStatus::getClusterId))
.orElseGet(clusterConfig::getName);
}

Optional<Kafka> cachedKafkaResource(KafkaClusterConfig clusterConfig) {
return clusterConfig.hasNamespace() ? kafkaInformer.map(SharedIndexInformer::getStore)
.map(store -> store.getByKey(clusterConfig.clusterKey()))
Expand Down Expand Up @@ -413,8 +363,10 @@ boolean validConfigs(Optional<Kafka> kafkaResource, KafkaClusterConfig clusterCo
}

if (createContext) {
clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey()));
log.warn(clientsMessage.toString().trim());
if (clientsMessage.length() > 0) {
clientsMessage.insert(0, "Some configuration may be missing for connection to cluster %s, connection attempts may fail".formatted(clusterConfig.clusterKey()));
log.warn(clientsMessage.toString().trim());
}
} else {
clientsMessage.insert(0, "Missing configuration detected for connection to cluster %s, no connection will be setup".formatted(clusterConfig.clusterKey()));
log.error(clientsMessage.toString().trim());
Expand All @@ -438,6 +390,7 @@ Map<String, Object> requiredConsumerConfig() {
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);
configs.put(SerdeConfig.ENABLE_HEADERS, "true");
return configs;
}

Expand Down Expand Up @@ -552,26 +505,31 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Supplier<Consumer<byte[], byte[]>> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Consumer<RecordData, RecordData> consumerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);
Consumer<byte[], byte[]> client = new KafkaConsumer<>(configs); // NOSONAR / closed in consumerDisposer
return () -> client;

return new KafkaConsumer<>(
configs,
context.schemaRegistryContext().keyDeserializer(),
context.schemaRegistryContext().valueDeserializer());
}

public void consumerDisposer(@Disposes Supplier<Consumer<byte[], byte[]>> consumer) {
consumer.get().close();
public void disposeConsumer(@Disposes Consumer<RecordData, RecordData> consumer) {
consumer.close();
}

@Produces
@RequestScoped
public Supplier<Producer<String, String>> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Producer<RecordData, RecordData> producerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
Producer<String, String> client = new KafkaProducer<>(configs); // NOSONAR / closed in producerDisposer
return () -> client;
return new KafkaProducer<>(
configs,
context.schemaRegistryContext().keySerializer(),
context.schemaRegistryContext().valueSerializer());
}

public void producerDisposer(@Disposes Supplier<Producer<String, String>> producer) {
producer.get().close();
public void disposeProducer(@Disposes Producer<RecordData, RecordData> producer) {
producer.close();
}

Map<String, Object> maybeAuthenticate(KafkaContext context, Class<?> clientType) {
Expand Down Expand Up @@ -603,6 +561,12 @@ Map<String, Object> buildConfig(Set<String> configNames,
.map(Optional::get)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (k1, k2) -> k1, TreeMap::new));

// Ensure no given properties are skipped. The previous stream processing allows
// for the standard config names to be obtained from the given maps, but also from
// config overrides via MicroProfile Config.
clientProperties.get().forEach(cfg::putIfAbsent);
config.getProperties().forEach(cfg::putIfAbsent);

var listenerSpec = cluster.map(Kafka::getSpec)
.map(KafkaSpec::getKafka)
.map(KafkaClusterSpec::getListeners)
Expand Down Expand Up @@ -689,29 +653,6 @@ private void applyListenerConfiguration(Map<String, Object> cfg, GenericKafkaLis
cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString());
}

private void resolveValues(Map<String, String> properties) {
properties.entrySet().forEach(entry ->
entry.setValue(resolveValue(entry.getValue())));
}

/**
* If the given value is an expression referencing a configuration value,
* replace it with the target property value.
*
* @param value configuration value that may be a reference to another
* configuration property
* @return replacement property or the same value if the given string is not a
* reference.
*/
private String resolveValue(String value) {
if (value.startsWith("${") && value.endsWith("}")) {
String replacement = value.substring(2, value.length() - 1);
return config.getOptionalValue(replacement, String.class).orElse(value);
}

return value;
}

Optional<String> getDefaultConfig(String clientType, String configName) {
String clientSpecificKey = "console.kafka.%s.%s".formatted(clientType, configName);
String generalKey = "console.kafka.%s".formatted(configName);
Expand Down
Loading

0 comments on commit 87ee6b0

Please sign in to comment.