Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration of trust stores for Prometheus and Apicurio clients #1280

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
Expand All @@ -52,7 +53,7 @@ public class MetricsService {
Logger logger;

@Inject
TlsConfigurationRegistry certificates;
TlsConfigurationRegistry tlsRegistry;

@Inject
KubernetesClient k8s;
Expand Down Expand Up @@ -91,15 +92,18 @@ ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) {

public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) {
PrometheusConfig prometheusConfig;
String sourceName = clusterConfig.getMetricsSource();

if (clusterConfig.getMetricsSource() != null) {
if (sourceName != null) {
prometheusConfig = consoleConfig.getMetricsSources()
.stream()
.filter(source -> source.getName().equals(clusterConfig.getMetricsSource()))
.filter(source -> source.getName().equals(sourceName))
.findFirst()
.orElseThrow();

var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null);
var trustStore = getTlsConfiguration(sourceName)
.map(TlsConfiguration::getTrustStore)
.orElse(null);

RestClientBuilder builder = RestClientBuilder.newBuilder()
.baseUri(URI.create(prometheusConfig.getUrl()))
Expand All @@ -114,6 +118,12 @@ public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfi
return null;
}

Optional<TlsConfiguration> getTlsConfiguration(String sourceName) {
String dotSeparatedSource = "metrics.source." + replaceNonAlphanumeric(sourceName, '.');
String dashSeparatedSource = "metrics-source-" + replaceNonAlphanumeric(sourceName, '-');
MikeEdgar marked this conversation as resolved.
Show resolved Hide resolved
return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource));
}

CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query) {
PrometheusAPI prometheusAPI = kafkaContext.prometheus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jboss.logging.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.streamshub.console.api.support.serdes.ApicurioClient;
import com.github.streamshub.console.api.support.serdes.ForceCloseable;
import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer;
import com.github.streamshub.console.api.support.serdes.MultiformatSerializer;
Expand Down Expand Up @@ -190,7 +191,7 @@ public class SchemaRegistryContext implements Closeable {
this.config = config;

if (config != null) {
registryClient = RegistryClientFactory.create(config.getUrl());
registryClient = RegistryClientFactory.create(new ApicurioClient(config));
} else {
registryClient = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package com.github.streamshub.console.api.support.serdes;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import jakarta.enterprise.inject.spi.CDI;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.github.streamshub.console.config.SchemaRegistryConfig;

import io.apicurio.registry.rest.client.impl.ErrorHandler;
import io.apicurio.rest.client.auth.Auth;
import io.apicurio.rest.client.error.RestClientErrorHandler;
import io.apicurio.rest.client.request.Request;
import io.apicurio.rest.client.spi.ApicurioHttpClient;
import io.apicurio.rest.client.util.RegistryDateDeserializer;
import io.apicurio.rest.client.util.UriUtil;
import io.quarkus.tls.TlsConfiguration;
import io.quarkus.tls.TlsConfigurationRegistry;

import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric;
import static java.util.Objects.requireNonNull;

/**
MikeEdgar marked this conversation as resolved.
Show resolved Hide resolved
* Apicurio client based on {@code io.apicurio.rest.client.JdkHttpClient}, but
* with awareness of Quarkus TLS registry configuration and removing other
* unused options for mTLS keystores, headers, etc.
*
* @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/JdkHttpClient.java
*/
public class ApicurioClient implements ApicurioHttpClient {

public static final String INVALID_EMPTY_HTTP_KEY = "";
private final HttpClient client;
private final String endpoint;
private final Auth auth;
private final RestClientErrorHandler errorHandler;

private static final ThreadLocal<Map<String, String>> HEADERS = ThreadLocal.withInitial(Collections::emptyMap);

public ApicurioClient(SchemaRegistryConfig config) {
String url = config.getUrl();

if (!url.endsWith("/")) {
url += "/";
}

final HttpClient.Builder httpClientBuilder = handleConfiguration(config);
this.endpoint = url;
this.auth = null;
this.client = httpClientBuilder.build();
this.errorHandler = new ErrorHandler();
}

private HttpClient.Builder handleConfiguration(SchemaRegistryConfig config) {
HttpClient.Builder clientBuilder = HttpClient.newBuilder();
clientBuilder.version(HttpClient.Version.HTTP_1_1);

var tlsConfig = getTlsConfiguration(config.getName());

if (tlsConfig.isPresent()) {
try {
clientBuilder = clientBuilder.sslContext(tlsConfig.get().createSSLContext());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return clientBuilder;
}

Optional<TlsConfiguration> getTlsConfiguration(String sourceName) {
String dotSeparatedSource = "schema.registry." + replaceNonAlphanumeric(sourceName, '.');
String dashSeparatedSource = "schema-registry-" + replaceNonAlphanumeric(sourceName, '-');
var tlsRegistry = CDI.current().select(TlsConfigurationRegistry.class).get();
return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource));
}

@Override
public <T> T sendRequest(Request<T> request) {
try {
requireNonNull(request.getOperation(), "Request operation cannot be null");
requireNonNull(request.getResponseType(), "Response type cannot be null");

final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(UriUtil.buildURI(endpoint + request.getRequestPath(), request.getQueryParams(), request.getPathParams()));

//Add current request headers
MikeEdgar marked this conversation as resolved.
Show resolved Hide resolved
HEADERS.get().forEach(requestBuilder::header);
HEADERS.remove();

Map<String, String> headers = request.getHeaders();
if (this.auth != null) {
//make headers mutable...
headers = new HashMap<>(headers);
this.auth.apply(headers);
}
headers.forEach(requestBuilder::header);

switch (request.getOperation()) {
case GET:
requestBuilder.GET();
break;
case PUT:
requestBuilder.PUT(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes()));
break;
case POST:
if (request.getDataString() != null) {
requestBuilder.POST(HttpRequest.BodyPublishers.ofString(request.getDataString()));
} else {
requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes()));
}
break;
case DELETE:
requestBuilder.DELETE();
break;
default:
throw new IllegalStateException("Operation not allowed");
}

return client.sendAsync(requestBuilder.build(), new BodyHandler<>(request.getResponseType(), errorHandler))
.join()
.body()
.get();

} catch (IOException e) {
throw errorHandler.parseError(e);
}
}

@Override
public void setNextRequestHeaders(Map<String, String> headers) {
HEADERS.set(headers);
}

@Override
public Map<String, String> getHeaders() {
return HEADERS.get();
}

@Override
public void close() {
// No-op
}

/**
* From {@code io.apicurio.rest.client.handler.BodyHandler}
*
* @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/handler/BodyHandler.java
*/
private static class BodyHandler<W> implements HttpResponse.BodyHandler<Supplier<W>> {

private final TypeReference<W> wClass;
private final RestClientErrorHandler errorHandler;
private static final ObjectMapper MAPPER = new ObjectMapper();
static {
SimpleModule module = new SimpleModule("Custom date handler");
module.addDeserializer(Date.class, new RegistryDateDeserializer());
MAPPER.registerModule(module);
}

public BodyHandler(TypeReference<W> wClass, RestClientErrorHandler errorHandler) {
this.wClass = wClass;
this.errorHandler = errorHandler;
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public HttpResponse.BodySubscriber<Supplier<W>> apply(HttpResponse.ResponseInfo responseInfo) {
return asJSON(wClass, responseInfo, errorHandler);
}

public static <W> HttpResponse.BodySubscriber<Supplier<W>> asJSON(TypeReference<W> targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) {
HttpResponse.BodySubscriber<InputStream> upstream = HttpResponse.BodySubscribers.ofInputStream();
return HttpResponse.BodySubscribers.mapping(
upstream,
inputStream -> toSupplierOfType(inputStream, targetType, responseInfo, errorHandler));
}

@SuppressWarnings("unchecked")
public static <W> Supplier<W> toSupplierOfType(InputStream body, TypeReference<W> targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) {
return () -> {
try {
if (isFailure(responseInfo)) {
throw errorHandler.handleErrorResponse(body, responseInfo.statusCode());
} else {
final String typeName = targetType.getType().getTypeName();
if (typeName.contains("InputStream")) {
return (W) body;
} else if (typeName.contains("Void")) {
//Intended null return
return null;
} else {
return MAPPER.readValue(body, targetType);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}

private static boolean isFailure(HttpResponse.ResponseInfo responseInfo) {
return responseInfo.statusCode() / 100 != 2;
MikeEdgar marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
2 changes: 1 addition & 1 deletion api/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ console.kafka.admin.default.api.timeout.ms=10000
########
#%dev.quarkus.http.auth.proactive=false
#%dev.quarkus.http.auth.permission."oidc".policy=permit
%dev.quarkus.tls.trust-all=true
#%dev.quarkus.tls.trust-all=true
%dev.quarkus.kubernetes-client.trust-certs=true
%dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF
%dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.streamshub.console.support;

import java.util.Locale;

public final class StringSupport {

private StringSupport() {
// No instances
}

public static boolean isAsciiLetterOrDigit(char c) {
return 'a' <= c && c <= 'z' ||
'A' <= c && c <= 'Z' ||
'0' <= c && c <= '9';
}

public static String replaceNonAlphanumeric(final String name, char replacement) {
return replaceNonAlphanumeric(name, replacement, new StringBuilder(name.length()));
}

public static String replaceNonAlphanumeric(final String name, char replacement, final StringBuilder sb) {
int length = name.length();
for (int i = 0; i < length; i++) {
char c = name.charAt(i);
if (isAsciiLetterOrDigit(c)) {
sb.append(c);
} else {
sb.append(replacement);
if (c == '"' && i + 1 == length) {
sb.append(replacement);
}
}
}
return sb.toString();
}

public static String toEnv(String name) {
return replaceNonAlphanumeric(name, '_').toUpperCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.github.streamshub.console.support;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StringSupportTest {

@Test
void testReplaceNonAlphanumericStringChar() {
String actual = StringSupport.replaceNonAlphanumeric("hyphenated-segment", '.');
assertEquals("hyphenated.segment", actual);
}

@ParameterizedTest
@CsvSource({
"console.config.\"Hyphenated-Segment-1\".value, CONSOLE_CONFIG__HYPHENATED_SEGMENT_1__VALUE",
"console.config.\"Hyphenated-Segment-1\", CONSOLE_CONFIG__HYPHENATED_SEGMENT_1__",
"console.config.{Braced-Segment}, CONSOLE_CONFIG__BRACED_SEGMENT_",
})
void testToEnv(String input, String expected) {
String actual = StringSupport.toEnv(input);
assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public class SchemaRegistry {
@JsonPropertyDescription("URL of the Apicurio Registry server API.")
private String url;

@JsonPropertyDescription("""
Trust store configuration for when the schema registry uses \
TLS certificates signed by an unknown CA.
""")
private TrustStore trustStore;

public String getName() {
return name;
}
Expand All @@ -37,4 +43,12 @@ public String getUrl() {
public void setUrl(String url) {
this.url = url;
}

public TrustStore getTrustStore() {
return trustStore;
}

public void setTrustStore(TrustStore trustStore) {
this.trustStore = trustStore;
}
}
Loading