Skip to content

Commit

Permalink
Configuration of trust stores for Prometheus and Apicurio clients
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Dec 5, 2024
1 parent 5926b79 commit f224aed
Show file tree
Hide file tree
Showing 14 changed files with 675 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
Expand All @@ -52,7 +53,7 @@ public class MetricsService {
Logger logger;

@Inject
TlsConfigurationRegistry certificates;
TlsConfigurationRegistry tlsRegistry;

@Inject
KubernetesClient k8s;
Expand Down Expand Up @@ -91,15 +92,18 @@ ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) {

public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) {
PrometheusConfig prometheusConfig;
String sourceName = clusterConfig.getMetricsSource();

if (clusterConfig.getMetricsSource() != null) {
if (sourceName != null) {
prometheusConfig = consoleConfig.getMetricsSources()
.stream()
.filter(source -> source.getName().equals(clusterConfig.getMetricsSource()))
.filter(source -> source.getName().equals(sourceName))
.findFirst()
.orElseThrow();

var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null);
var trustStore = getTlsConfiguration(sourceName)
.map(TlsConfiguration::getTrustStore)
.orElse(null);

RestClientBuilder builder = RestClientBuilder.newBuilder()
.baseUri(URI.create(prometheusConfig.getUrl()))
Expand All @@ -114,6 +118,12 @@ public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfi
return null;
}

Optional<TlsConfiguration> getTlsConfiguration(String sourceName) {
String dotSeparatedSource = "metrics.source." + replaceNonAlphanumeric(sourceName, '.');
String dashSeparatedSource = "metrics-source-" + replaceNonAlphanumeric(sourceName, '-');
return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource));
}

CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query) {
PrometheusAPI prometheusAPI = kafkaContext.prometheus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.support.serdes.ApicurioClient;
import com.github.streamshub.console.api.support.serdes.ForceCloseable;
import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer;
import com.github.streamshub.console.api.support.serdes.MultiformatSerializer;
Expand Down Expand Up @@ -190,7 +191,7 @@ public class SchemaRegistryContext implements Closeable {
this.config = config;

if (config != null) {
registryClient = RegistryClientFactory.create(config.getUrl());
registryClient = RegistryClientFactory.create(new ApicurioClient(config));
} else {
registryClient = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package com.github.streamshub.console.api.support.serdes;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import jakarta.enterprise.inject.spi.CDI;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.rest.client.impl.ErrorHandler;
import io.apicurio.rest.client.auth.Auth;
import io.apicurio.rest.client.error.RestClientErrorHandler;
import io.apicurio.rest.client.request.Request;
import io.apicurio.rest.client.spi.ApicurioHttpClient;
import io.apicurio.rest.client.util.RegistryDateDeserializer;
import io.apicurio.rest.client.util.UriUtil;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric;
import static java.util.Objects.requireNonNull;

/**
* Apicurio client based on {@code io.apicurio.rest.client.JdkHttpClient}, but
* with awareness of Quarkus TLS registry configuration and removing other
* unused options for mTLS keystores, headers, etc.
*
* @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/JdkHttpClient.java
*/
public class ApicurioClient implements ApicurioHttpClient {

public static final String INVALID_EMPTY_HTTP_KEY = "";
private final HttpClient client;
private final String endpoint;
private final Auth auth;
private final RestClientErrorHandler errorHandler;

private static final ThreadLocal<Map<String, String>> HEADERS = ThreadLocal.withInitial(Collections::emptyMap);

public ApicurioClient(SchemaRegistryConfig config) {
String url = config.getUrl();

if (!url.endsWith("/")) {
url += "/";
}

final HttpClient.Builder httpClientBuilder = handleConfiguration(config);
this.endpoint = url;
this.auth = null;
this.client = httpClientBuilder.build();
this.errorHandler = new ErrorHandler();
}

private HttpClient.Builder handleConfiguration(SchemaRegistryConfig config) {
HttpClient.Builder clientBuilder = HttpClient.newBuilder();
clientBuilder.version(HttpClient.Version.HTTP_1_1);

var tlsConfig = getTlsConfiguration(config.getName());

if (tlsConfig.isPresent()) {
try {
clientBuilder = clientBuilder.sslContext(tlsConfig.get().createSSLContext());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return clientBuilder;
}

Optional<TlsConfiguration> getTlsConfiguration(String sourceName) {
String dotSeparatedSource = "schema.registry." + replaceNonAlphanumeric(sourceName, '.');
String dashSeparatedSource = "schema-registry-" + replaceNonAlphanumeric(sourceName, '-');
var tlsRegistry = CDI.current().select(TlsConfigurationRegistry.class).get();
return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource));
}

@Override
public <T> T sendRequest(Request<T> request) {
try {
requireNonNull(request.getOperation(), "Request operation cannot be null");
requireNonNull(request.getResponseType(), "Response type cannot be null");

final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(UriUtil.buildURI(endpoint + request.getRequestPath(), request.getQueryParams(), request.getPathParams()));

//Add current request headers
HEADERS.get().forEach(requestBuilder::header);
HEADERS.remove();

Map<String, String> headers = request.getHeaders();
if (this.auth != null) {
//make headers mutable...
headers = new HashMap<>(headers);
this.auth.apply(headers);
}
headers.forEach(requestBuilder::header);

switch (request.getOperation()) {
case GET:
requestBuilder.GET();
break;
case PUT:
requestBuilder.PUT(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes()));
break;
case POST:
if (request.getDataString() != null) {
requestBuilder.POST(HttpRequest.BodyPublishers.ofString(request.getDataString()));
} else {
requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes()));
}
break;
case DELETE:
requestBuilder.DELETE();
break;
default:
throw new IllegalStateException("Operation not allowed");
}

return client.sendAsync(requestBuilder.build(), new BodyHandler<>(request.getResponseType(), errorHandler))
.join()
.body()
.get();

} catch (IOException e) {
throw errorHandler.parseError(e);
}
}

@Override
public void setNextRequestHeaders(Map<String, String> headers) {
HEADERS.set(headers);
}

@Override
public Map<String, String> getHeaders() {
return HEADERS.get();
}

@Override
public void close() {
// No-op
}

/**
* From {@code io.apicurio.rest.client.handler.BodyHandler}
*
* @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/handler/BodyHandler.java
*/
private static class BodyHandler<W> implements HttpResponse.BodyHandler<Supplier<W>> {

private final TypeReference<W> wClass;
private final RestClientErrorHandler errorHandler;
private static final ObjectMapper mapper = new ObjectMapper();
static {
SimpleModule module = new SimpleModule("Custom date handler");
module.addDeserializer(Date.class, new RegistryDateDeserializer());
mapper.registerModule(module);
}

public BodyHandler(TypeReference<W> wClass, RestClientErrorHandler errorHandler) {
this.wClass = wClass;
this.errorHandler = errorHandler;
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public HttpResponse.BodySubscriber<Supplier<W>> apply(HttpResponse.ResponseInfo responseInfo) {
return asJSON(wClass, responseInfo, errorHandler);
}

public static <W> HttpResponse.BodySubscriber<Supplier<W>> asJSON(TypeReference<W> targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) {
HttpResponse.BodySubscriber<InputStream> upstream = HttpResponse.BodySubscribers.ofInputStream();
return HttpResponse.BodySubscribers.mapping(
upstream,
inputStream -> toSupplierOfType(inputStream, targetType, responseInfo, errorHandler));
}

@SuppressWarnings("unchecked")
public static <W> Supplier<W> toSupplierOfType(InputStream body, TypeReference<W> targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) {
return () -> {
try {
if (isFailure(responseInfo)) {
throw errorHandler.handleErrorResponse(body, responseInfo.statusCode());
} else {
final String typeName = targetType.getType().getTypeName();
if (typeName.contains("InputStream")) {
return (W) body;
} else if (typeName.contains("Void")) {
//Intended null return
return null;
} else {
return mapper.readValue(body, targetType);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}

private static boolean isFailure(HttpResponse.ResponseInfo responseInfo) {
return responseInfo.statusCode() / 100 != 2;
}
}
}
2 changes: 1 addition & 1 deletion api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ console.kafka.admin.default.api.timeout.ms=10000
########
#%dev.quarkus.http.auth.proactive=false
#%dev.quarkus.http.auth.permission."oidc".policy=permit
%dev.quarkus.tls.trust-all=true
#%dev.quarkus.tls.trust-all=true
%dev.quarkus.kubernetes-client.trust-certs=true
%dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF
%dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.streamshub.console.support;

import java.util.Locale;

public class StringSupport {

public static boolean isAsciiLetterOrDigit(char c) {
return 'a' <= c && c <= 'z' ||
'A' <= c && c <= 'Z' ||
'0' <= c && c <= '9';
}

public static String replaceNonAlphanumeric(final String name, char replacement) {
return replaceNonAlphanumeric(name, replacement, new StringBuilder(name.length()));
}

public static String replaceNonAlphanumeric(final String name, char replacement, final StringBuilder sb) {
int length = name.length();
for (int i = 0; i < length; i++) {
char c = name.charAt(i);
if (isAsciiLetterOrDigit(c)) {
sb.append(c);
} else {
sb.append(replacement);
if (c == '"' && i + 1 == length) {
sb.append(replacement);
}
}
}
return sb.toString();
}

public static String toEnv(String name) {
return replaceNonAlphanumeric(name, '_').toUpperCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public class SchemaRegistry {
@JsonPropertyDescription("URL of the Apicurio Registry server API.")
private String url;

@JsonPropertyDescription("""
Trust store configuration for when the schema registry uses \
TLS certificates signed by an unknown CA.
""")
private TrustStore trustStore;

public String getName() {
return name;
}
Expand All @@ -37,4 +43,12 @@ public String getUrl() {
public void setUrl(String url) {
this.url = url;
}

public TrustStore getTrustStore() {
return trustStore;
}

public void setTrustStore(TrustStore trustStore) {
this.trustStore = trustStore;
}
}
Loading

0 comments on commit f224aed

Please sign in to comment.