Skip to content

Commit

Permalink
Re-factor registry config to top level, reference from Kafka config
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 24, 2024
1 parent dd37e5c commit ac5c347
Show file tree
Hide file tree
Showing 15 changed files with 284 additions and 152 deletions.
104 changes: 13 additions & 91 deletions api/src/main/java/com/github/streamshub/console/api/ClientFactory.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.github.streamshub.console.api;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -55,21 +51,17 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.github.streamshub.console.api.support.Holder;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.TrustAllCertificateManager;
import com.github.streamshub.console.api.support.ValidationProxy;
import com.github.streamshub.console.api.support.serdes.RecordData;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.KafkaClusterConfig;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.apicurio.registry.serde.SerdeConfig;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
Expand Down Expand Up @@ -128,15 +120,11 @@ public class ClientFactory {
ObjectMapper mapper;

@Inject
@ConfigProperty(name = "console.config-path")
Optional<String> configPath;
ConsoleConfig consoleConfig;

@Inject
Holder<SharedIndexInformer<Kafka>> kafkaInformer;

@Inject
ValidationProxy validationService;

@Inject
Instance<TrustAllCertificateManager> trustManager;

Expand Down Expand Up @@ -168,57 +156,12 @@ public class ClientFactory {

@Produces
@ApplicationScoped
public ConsoleConfig produceConsoleConfig() {
return configPath.map(Path::of)
.map(Path::toUri)
.map(uri -> {
try {
return uri.toURL();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.filter(Objects::nonNull)
.map(url -> {
log.infof("Loading console configuration from %s", url);
ObjectMapper yamlMapper = mapper.copyWith(new YAMLFactory());

try (InputStream stream = url.openStream()) {
return yamlMapper.readValue(stream, ConsoleConfig.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.map(consoleConfig -> {
consoleConfig.getKafka().getClusters().stream().forEach(cluster -> {
if (cluster.getSchemaRegistry() != null) {
var registryConfig = cluster.getSchemaRegistry();
registryConfig.setUrl(resolveValue(registryConfig.getUrl()));
}
resolveValues(cluster.getProperties());
resolveValues(cluster.getAdminProperties());
resolveValues(cluster.getProducerProperties());
resolveValues(cluster.getConsumerProperties());
});

return consoleConfig;
})
.map(validationService::validate)
.orElseGet(() -> {
log.warn("Console configuration has not been specified using `console.config-path` property");
return new ConsoleConfig();
});
}

@Produces
@ApplicationScoped
Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
Function<Map<String, Object>, Admin> adminBuilder) {
Map<String, KafkaContext> produceKafkaContexts(Function<Map<String, Object>, Admin> adminBuilder) {

final Map<String, KafkaContext> contexts = new ConcurrentHashMap<>();

if (kafkaInformer.isPresent()) {
addKafkaEventHandler(contexts, consoleConfig, adminBuilder);
addKafkaEventHandler(contexts, adminBuilder);
}

// Configure clusters that will not be configured by events
Expand All @@ -236,7 +179,6 @@ Map<String, KafkaContext> produceKafkaContexts(ConsoleConfig consoleConfig,
}

void addKafkaEventHandler(Map<String, KafkaContext> contexts,
ConsoleConfig consoleConfig,
Function<Map<String, Object>, Admin> adminBuilder) {

kafkaInformer.get().addEventHandlerWithResyncPeriod(new ResourceEventHandler<Kafka>() {
Expand Down Expand Up @@ -348,15 +290,18 @@ void putKafkaContext(Map<String, KafkaContext> contexts,
admin = adminBuilder.apply(adminConfigs);
}

RegistryClient schemaRegistryClient = null;
SchemaRegistryConfig registryConfig = null;

if (clusterConfig.getSchemaRegistry() != null) {
String registryUrl = clusterConfig.getSchemaRegistry().getUrl();
schemaRegistryClient = RegistryClientFactory.create(registryUrl); // NOSONAR - closed elsewhere
registryConfig = consoleConfig.getSchemaRegistries()
.stream()
.filter(registry -> registry.getName().equals(clusterConfig.getSchemaRegistry()))
.findFirst()
.orElseThrow();
}

KafkaContext ctx = new KafkaContext(clusterConfig, kafkaResource.orElse(null), clientConfigs, admin);
ctx.schemaRegistryClient(schemaRegistryClient, mapper);
ctx.schemaRegistryClient(registryConfig, mapper);

KafkaContext previous = contexts.put(clusterId, ctx);

Expand Down Expand Up @@ -560,7 +505,7 @@ public void disposeKafkaContext(@Disposes KafkaContext context, Map<String, Kafk

@Produces
@RequestScoped
public Consumer<RecordData, RecordData> consumerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Consumer<RecordData, RecordData> consumerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Consumer.class);

return new KafkaConsumer<>(
Expand All @@ -575,7 +520,7 @@ public void disposeConsumer(@Disposes Consumer<RecordData, RecordData> consumer)

@Produces
@RequestScoped
public Producer<RecordData, RecordData> producerSupplier(ConsoleConfig consoleConfig, KafkaContext context) {
public Producer<RecordData, RecordData> producerSupplier(KafkaContext context) {
var configs = maybeAuthenticate(context, Producer.class);
return new KafkaProducer<>(
configs,
Expand Down Expand Up @@ -708,29 +653,6 @@ private void applyListenerConfiguration(Map<String, Object> cfg, GenericKafkaLis
cfg.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString());
}

private void resolveValues(Map<String, String> properties) {
properties.entrySet().forEach(entry ->
entry.setValue(resolveValue(entry.getValue())));
}

/**
* If the given value is an expression referencing a configuration value,
* replace it with the target property value.
*
* @param value configuration value that may be a reference to another
* configuration property
* @return replacement property or the same value if the given string is not a
* reference.
*/
private String resolveValue(String value) {
if (value.startsWith("${") && value.endsWith("}")) {
String replacement = value.substring(2, value.length() - 1);
return config.getOptionalValue(replacement, String.class).orElse(value);
}

return value;
}

Optional<String> getDefaultConfig(String clientType, String configName) {
String clientSpecificKey = "console.kafka.%s.%s".formatted(clientType, configName);
String generalKey = "console.kafka.%s".formatted(configName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.github.streamshub.console.api;

import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import jakarta.inject.Inject;
Expand All @@ -21,15 +19,17 @@
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.support.KafkaContext;
import com.github.streamshub.console.api.support.serdes.ArtifactReferences;
import com.github.streamshub.console.api.support.serdes.MultiformatSchemaParser;
import com.github.streamshub.console.config.ConsoleConfig;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.resolver.DefaultSchemaResolver;
import io.apicurio.registry.resolver.SchemaResolver;
import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;

@Path("/api/schemas/{schemaId}")
@Path("/api/registries/{registryId}/schemas/{schemaId}")
@Tag(name = "Schema Registry Resources")
public class SchemasResource {

Expand All @@ -40,44 +40,44 @@ public class SchemasResource {
ObjectMapper objectMapper;

@Inject
Map<String, KafkaContext> kafkaContexts;

ConsoleConfig consoleConfig;

/**
* Retrieve the schema content from the identified/named registry.
*
* <p>
* Although opaque to the client, the schemaId is a base-64 encoded, JSON-ified
* {@link io.apicurio.registry.resolver.strategy.ArtifactReference ArtifactReference}
* which will be parsed with {@link ArtifactReferences#fromSchemaId}.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponse(responseCode = "200", ref = "Configurations", content = @Content())
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response getSchemaContent(
@Parameter(description = "Schema registry identifier (name)")
@PathParam("registryId")
String registryId,

@Parameter(description = "Schema identifier")
@PathParam("schemaId")
String schemaId) {

int clusterTerm = schemaId.indexOf('.');

if (clusterTerm < 0) {
throw new NotFoundException("No such schema");
}

String clusterId = new String(Base64.getUrlDecoder().decode(schemaId.substring(0, clusterTerm)));
SchemaRegistryConfig registryConfig = consoleConfig.getSchemaRegistries()
.stream()
.filter(config -> config.getName().equals(registryId))
.findFirst()
.orElseThrow(() -> new NotFoundException("Unknown registry"));

if (!kafkaContexts.containsKey(clusterId)) {
throw new NotFoundException("No such schema");
}

RegistryClient registryClient = kafkaContexts.get(clusterId).schemaRegistryContext().registryClient();

if (registryClient == null) {
throw new NotFoundException("Schema not found, no registry is configured");
}
RegistryClient registryClient = RegistryClientFactory.create(registryConfig.getUrl());

@SuppressWarnings("resource")
SchemaResolver<Object, ?> schemaResolver = new DefaultSchemaResolver<>();
schemaResolver.setClient(registryClient);
schemaResolver.configure(Collections.emptyMap(), new MultiformatSchemaParser<>(Collections.emptySet()));

schemaId = schemaId.substring(clusterTerm + 1);

var reference = ArtifactReferences.fromSchemaId(schemaId, objectMapper);
var schema = schemaResolver.resolveSchemaByArtifactReference(reference);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -333,15 +332,16 @@ Optional<JsonApiRelationship> schemaRelationship(RecordData data) {
.filter(recordMeta -> recordMeta.containsKey("schema-id"))
.map(recordMeta -> {
String artifactType = recordMeta.get("schema-type");
String clusterIdEnc = Base64.getUrlEncoder().encodeToString(kafkaContext.clusterId().getBytes());
String schemaId = clusterIdEnc + '.' + recordMeta.get("schema-id");
// schema-id is present, it is null-safe to retrieve the name from configuration
String registryId = kafkaContext.schemaRegistryContext().getConfig().getName();
String schemaId = recordMeta.get("schema-id");
String name = recordMeta.get("schema-name");

var relationship = new JsonApiRelationship();
relationship.addMeta("artifactType", artifactType);
relationship.addMeta("name", name);
relationship.data(new Identifier("schemas", schemaId));
relationship.addLink("content", "/api/schemas/" + schemaId);
relationship.addLink("content", "/api/registries/%s/schemas/%s".formatted(registryId, schemaId));

schemaError(data).ifPresent(error -> relationship.addMeta("errors", List.of(error)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer;
import com.github.streamshub.console.api.support.serdes.MultiformatSerializer;
import com.github.streamshub.console.config.KafkaClusterConfig;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.rest.client.RegistryClient;
import io.apicurio.registry.rest.client.RegistryClientFactory;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
Expand Down Expand Up @@ -127,8 +129,8 @@ public boolean applicationScoped() {
return applicationScoped;
}

public void schemaRegistryClient(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) {
schemaRegistryContext = new SchemaRegistryContext(schemaRegistryClient, objectMapper);
public void schemaRegistryClient(SchemaRegistryConfig config, ObjectMapper objectMapper) {
schemaRegistryContext = new SchemaRegistryContext(config, objectMapper);
}

public SchemaRegistryContext schemaRegistryContext() {
Expand Down Expand Up @@ -167,14 +169,21 @@ public Optional<String> tokenUrl() {
* the parent KafkaContext is disposed of at application shutdown.
*/
public class SchemaRegistryContext implements Closeable {
private final SchemaRegistryConfig config;
private final RegistryClient registryClient;
private final MultiformatDeserializer keyDeserializer;
private final MultiformatDeserializer valueDeserializer;
private final MultiformatSerializer keySerializer;
private final MultiformatSerializer valueSerializer;

SchemaRegistryContext(RegistryClient schemaRegistryClient, ObjectMapper objectMapper) {
this.registryClient = schemaRegistryClient;
SchemaRegistryContext(SchemaRegistryConfig config, ObjectMapper objectMapper) {
this.config = config;

if (config != null) {
registryClient = RegistryClientFactory.create(config.getUrl());
} else {
registryClient = null;
}

keyDeserializer = new MultiformatDeserializer(registryClient, objectMapper);
keyDeserializer.configure(configs(Consumer.class), true);
Expand All @@ -189,6 +198,10 @@ public class SchemaRegistryContext implements Closeable {
valueSerializer.configure(configs(Producer.class), false);
}

public SchemaRegistryConfig getConfig() {
return config;
}

public RegistryClient registryClient() {
return registryClient;
}
Expand Down
Loading

0 comments on commit ac5c347

Please sign in to comment.