diff --git a/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java index 8b80ac4e4..89dd2459b 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java @@ -38,6 +38,7 @@ import io.quarkus.tls.TlsConfiguration; import io.quarkus.tls.TlsConfigurationRegistry; +import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; @@ -52,7 +53,7 @@ public class MetricsService { Logger logger; @Inject - TlsConfigurationRegistry certificates; + TlsConfigurationRegistry tlsRegistry; @Inject KubernetesClient k8s; @@ -91,15 +92,18 @@ ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) { public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) { PrometheusConfig prometheusConfig; + String sourceName = clusterConfig.getMetricsSource(); - if (clusterConfig.getMetricsSource() != null) { + if (sourceName != null) { prometheusConfig = consoleConfig.getMetricsSources() .stream() - .filter(source -> source.getName().equals(clusterConfig.getMetricsSource())) + .filter(source -> source.getName().equals(sourceName)) .findFirst() .orElseThrow(); - var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null); + var trustStore = getTlsConfiguration(sourceName) + .map(TlsConfiguration::getTrustStore) + .orElse(null); RestClientBuilder builder = RestClientBuilder.newBuilder() .baseUri(URI.create(prometheusConfig.getUrl())) @@ -114,6 +118,12 @@ public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfi return null; } + Optional getTlsConfiguration(String sourceName) { + String dotSeparatedSource = "metrics.source." + replaceNonAlphanumeric(sourceName, '.'); + String dashSeparatedSource = "metrics-source-" + replaceNonAlphanumeric(sourceName, '-'); + return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource)); + } + CompletionStage>> queryValues(String query) { PrometheusAPI prometheusAPI = kafkaContext.prometheus(); diff --git a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java index df3caf59d..abf0a806c 100644 --- a/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java +++ b/api/src/main/java/com/github/streamshub/console/api/support/KafkaContext.java @@ -16,6 +16,7 @@ import org.jboss.logging.Logger; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.streamshub.console.api.support.serdes.ApicurioClient; import com.github.streamshub.console.api.support.serdes.ForceCloseable; import com.github.streamshub.console.api.support.serdes.MultiformatDeserializer; import com.github.streamshub.console.api.support.serdes.MultiformatSerializer; @@ -190,7 +191,7 @@ public class SchemaRegistryContext implements Closeable { this.config = config; if (config != null) { - registryClient = RegistryClientFactory.create(config.getUrl()); + registryClient = RegistryClientFactory.create(new ApicurioClient(config)); } else { registryClient = null; } diff --git a/api/src/main/java/com/github/streamshub/console/api/support/serdes/ApicurioClient.java b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ApicurioClient.java new file mode 100644 index 000000000..21cde32f2 --- /dev/null +++ b/api/src/main/java/com/github/streamshub/console/api/support/serdes/ApicurioClient.java @@ -0,0 +1,220 @@ +package com.github.streamshub.console.api.support.serdes; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import jakarta.enterprise.inject.spi.CDI; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.github.streamshub.console.config.SchemaRegistryConfig; + +import io.apicurio.registry.rest.client.impl.ErrorHandler; +import io.apicurio.rest.client.auth.Auth; +import io.apicurio.rest.client.error.RestClientErrorHandler; +import io.apicurio.rest.client.request.Request; +import io.apicurio.rest.client.spi.ApicurioHttpClient; +import io.apicurio.rest.client.util.RegistryDateDeserializer; +import io.apicurio.rest.client.util.UriUtil; +import io.quarkus.tls.TlsConfiguration; +import io.quarkus.tls.TlsConfigurationRegistry; + +import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric; +import static java.util.Objects.requireNonNull; + +/** + * Apicurio client based on {@code io.apicurio.rest.client.JdkHttpClient}, but + * with awareness of Quarkus TLS registry configuration and removing other + * unused options for mTLS keystores, headers, etc. + * + * @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/JdkHttpClient.java + */ +public class ApicurioClient implements ApicurioHttpClient { + + public static final String INVALID_EMPTY_HTTP_KEY = ""; + private final HttpClient client; + private final String endpoint; + private final Auth auth; + private final RestClientErrorHandler errorHandler; + + private static final ThreadLocal> HEADERS = ThreadLocal.withInitial(Collections::emptyMap); + + public ApicurioClient(SchemaRegistryConfig config) { + String url = config.getUrl(); + + if (!url.endsWith("/")) { + url += "/"; + } + + final HttpClient.Builder httpClientBuilder = handleConfiguration(config); + this.endpoint = url; + this.auth = null; + this.client = httpClientBuilder.build(); + this.errorHandler = new ErrorHandler(); + } + + private HttpClient.Builder handleConfiguration(SchemaRegistryConfig config) { + HttpClient.Builder clientBuilder = HttpClient.newBuilder(); + clientBuilder.version(HttpClient.Version.HTTP_1_1); + + var tlsConfig = getTlsConfiguration(config.getName()); + + if (tlsConfig.isPresent()) { + try { + clientBuilder = clientBuilder.sslContext(tlsConfig.get().createSSLContext()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return clientBuilder; + } + + Optional getTlsConfiguration(String sourceName) { + String dotSeparatedSource = "schema.registry." + replaceNonAlphanumeric(sourceName, '.'); + String dashSeparatedSource = "schema-registry-" + replaceNonAlphanumeric(sourceName, '-'); + var tlsRegistry = CDI.current().select(TlsConfigurationRegistry.class).get(); + return tlsRegistry.get(dotSeparatedSource).or(() -> tlsRegistry.get(dashSeparatedSource)); + } + + @Override + public T sendRequest(Request request) { + try { + requireNonNull(request.getOperation(), "Request operation cannot be null"); + requireNonNull(request.getResponseType(), "Response type cannot be null"); + + final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() + .uri(UriUtil.buildURI(endpoint + request.getRequestPath(), request.getQueryParams(), request.getPathParams())); + + //Add current request headers + HEADERS.get().forEach(requestBuilder::header); + HEADERS.remove(); + + Map headers = request.getHeaders(); + if (this.auth != null) { + //make headers mutable... + headers = new HashMap<>(headers); + this.auth.apply(headers); + } + headers.forEach(requestBuilder::header); + + switch (request.getOperation()) { + case GET: + requestBuilder.GET(); + break; + case PUT: + requestBuilder.PUT(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes())); + break; + case POST: + if (request.getDataString() != null) { + requestBuilder.POST(HttpRequest.BodyPublishers.ofString(request.getDataString())); + } else { + requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(request.getData().readAllBytes())); + } + break; + case DELETE: + requestBuilder.DELETE(); + break; + default: + throw new IllegalStateException("Operation not allowed"); + } + + return client.sendAsync(requestBuilder.build(), new BodyHandler<>(request.getResponseType(), errorHandler)) + .join() + .body() + .get(); + + } catch (IOException e) { + throw errorHandler.parseError(e); + } + } + + @Override + public void setNextRequestHeaders(Map headers) { + HEADERS.set(headers); + } + + @Override + public Map getHeaders() { + return HEADERS.get(); + } + + @Override + public void close() { + // No-op + } + + /** + * From {@code io.apicurio.rest.client.handler.BodyHandler} + * + * @see https://github.com/Apicurio/apicurio-common-rest-client/blob/0868773f61e33d40dcac88608aa111e26ab71bc7/rest-client-jdk/src/main/java/io/apicurio/rest/client/handler/BodyHandler.java + */ + private static class BodyHandler implements HttpResponse.BodyHandler> { + + private final TypeReference wClass; + private final RestClientErrorHandler errorHandler; + private static final ObjectMapper MAPPER = new ObjectMapper(); + static { + SimpleModule module = new SimpleModule("Custom date handler"); + module.addDeserializer(Date.class, new RegistryDateDeserializer()); + MAPPER.registerModule(module); + } + + public BodyHandler(TypeReference wClass, RestClientErrorHandler errorHandler) { + this.wClass = wClass; + this.errorHandler = errorHandler; + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public HttpResponse.BodySubscriber> apply(HttpResponse.ResponseInfo responseInfo) { + return asJSON(wClass, responseInfo, errorHandler); + } + + public static HttpResponse.BodySubscriber> asJSON(TypeReference targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) { + HttpResponse.BodySubscriber upstream = HttpResponse.BodySubscribers.ofInputStream(); + return HttpResponse.BodySubscribers.mapping( + upstream, + inputStream -> toSupplierOfType(inputStream, targetType, responseInfo, errorHandler)); + } + + @SuppressWarnings("unchecked") + public static Supplier toSupplierOfType(InputStream body, TypeReference targetType, HttpResponse.ResponseInfo responseInfo, RestClientErrorHandler errorHandler) { + return () -> { + try { + if (isFailure(responseInfo)) { + throw errorHandler.handleErrorResponse(body, responseInfo.statusCode()); + } else { + final String typeName = targetType.getType().getTypeName(); + if (typeName.contains("InputStream")) { + return (W) body; + } else if (typeName.contains("Void")) { + //Intended null return + return null; + } else { + return MAPPER.readValue(body, targetType); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + } + + private static boolean isFailure(HttpResponse.ResponseInfo responseInfo) { + return responseInfo.statusCode() / 100 != 2; + } + } +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index 758def621..16e9fee69 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -74,7 +74,7 @@ console.kafka.admin.default.api.timeout.ms=10000 ######## #%dev.quarkus.http.auth.proactive=false #%dev.quarkus.http.auth.permission."oidc".policy=permit -%dev.quarkus.tls.trust-all=true +#%dev.quarkus.tls.trust-all=true %dev.quarkus.kubernetes-client.trust-certs=true %dev.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF %dev.quarkus.log.category."com.github.streamshub.console".level=DEBUG diff --git a/common/src/main/java/com/github/streamshub/console/support/StringSupport.java b/common/src/main/java/com/github/streamshub/console/support/StringSupport.java new file mode 100644 index 000000000..7d717a978 --- /dev/null +++ b/common/src/main/java/com/github/streamshub/console/support/StringSupport.java @@ -0,0 +1,41 @@ +package com.github.streamshub.console.support; + +import java.util.Locale; + +public final class StringSupport { + + private StringSupport() { + // No instances + } + + public static boolean isAsciiLetterOrDigit(char c) { + return 'a' <= c && c <= 'z' || + 'A' <= c && c <= 'Z' || + '0' <= c && c <= '9'; + } + + public static String replaceNonAlphanumeric(final String name, char replacement) { + return replaceNonAlphanumeric(name, replacement, new StringBuilder(name.length())); + } + + public static String replaceNonAlphanumeric(final String name, char replacement, final StringBuilder sb) { + int length = name.length(); + for (int i = 0; i < length; i++) { + char c = name.charAt(i); + if (isAsciiLetterOrDigit(c)) { + sb.append(c); + } else { + sb.append(replacement); + if (c == '"' && i + 1 == length) { + sb.append(replacement); + } + } + } + return sb.toString(); + } + + public static String toEnv(String name) { + return replaceNonAlphanumeric(name, '_').toUpperCase(Locale.ROOT); + } + +} diff --git a/common/src/test/java/com/github/streamshub/console/support/StringSupportTest.java b/common/src/test/java/com/github/streamshub/console/support/StringSupportTest.java new file mode 100644 index 000000000..0fe0aed68 --- /dev/null +++ b/common/src/test/java/com/github/streamshub/console/support/StringSupportTest.java @@ -0,0 +1,28 @@ +package com.github.streamshub.console.support; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StringSupportTest { + + @Test + void testReplaceNonAlphanumericStringChar() { + String actual = StringSupport.replaceNonAlphanumeric("hyphenated-segment", '.'); + assertEquals("hyphenated.segment", actual); + } + + @ParameterizedTest + @CsvSource({ + "console.config.\"Hyphenated-Segment-1\".value, CONSOLE_CONFIG__HYPHENATED_SEGMENT_1__VALUE", + "console.config.\"Hyphenated-Segment-1\", CONSOLE_CONFIG__HYPHENATED_SEGMENT_1__", + "console.config.{Braced-Segment}, CONSOLE_CONFIG__BRACED_SEGMENT_", + }) + void testToEnv(String input, String expected) { + String actual = StringSupport.toEnv(input); + assertEquals(expected, actual); + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java index ee79587a6..026013d10 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/SchemaRegistry.java @@ -22,6 +22,12 @@ public class SchemaRegistry { @JsonPropertyDescription("URL of the Apicurio Registry server API.") private String url; + @JsonPropertyDescription(""" + Trust store configuration for when the schema registry uses \ + TLS certificates signed by an unknown CA. + """) + private TrustStore trustStore; + public String getName() { return name; } @@ -37,4 +43,12 @@ public String getUrl() { public void setUrl(String url) { this.url = url; } + + public TrustStore getTrustStore() { + return trustStore; + } + + public void setTrustStore(TrustStore trustStore) { + this.trustStore = trustStore; + } } diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/TrustStore.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/TrustStore.java new file mode 100644 index 000000000..193ab6213 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/TrustStore.java @@ -0,0 +1,74 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class TrustStore { + + @JsonProperty("type") + private Type type; // NOSONAR + + @JsonProperty("content") + @JsonPropertyDescription("Content of the trust store") + private Value content; + + @JsonProperty("password") + @JsonPropertyDescription("Content used to access the trust store, if necessary") + private Value password; + + @JsonProperty("alias") + @JsonPropertyDescription("Alias to select the appropriate certificate when multiple certificates are included.") + private String alias; + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public Value getContent() { + return content; + } + + public void setContent(Value content) { + this.content = content; + } + + public Value getPassword() { + return password; + } + + public void setPassword(Value password) { + this.password = password; + } + + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + + public enum Type { + PEM("pem"), PKCS12("p12"), JKS("jks"); + + private final String value; + + private Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Value.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Value.java new file mode 100644 index 000000000..f7f44b323 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/Value.java @@ -0,0 +1,50 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Value { + + @JsonProperty("value") + @JsonPropertyDescription("Literal string to be used for this value") + private String value; // NOSONAR + + @JsonProperty("valueFrom") + @JsonPropertyDescription("Reference to an external source to use for this value") + private ValueReference valueFrom; + + public Value() { + } + + private Value(String value) { + this.value = value; + } + + @JsonIgnore + public static Value of(String value) { + return value != null ? new Value(value) : null; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public ValueReference getValueFrom() { + return valueFrom; + } + + public void setValueFrom(ValueReference valueFrom) { + this.valueFrom = valueFrom; + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ValueReference.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ValueReference.java new file mode 100644 index 000000000..cb4245a38 --- /dev/null +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/ValueReference.java @@ -0,0 +1,39 @@ +package com.github.streamshub.console.api.v1alpha1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +import io.fabric8.kubernetes.api.model.ConfigMapKeySelector; +import io.fabric8.kubernetes.api.model.SecretKeySelector; +import io.sundr.builder.annotations.Buildable; + +@Buildable(builderPackage = "io.fabric8.kubernetes.api.builder") +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ValueReference { + + @JsonProperty("configMapKeyRef") + @JsonPropertyDescription("Reference to a ConfigMap entry value") + private ConfigMapKeySelector configMapKeyRef; + + @JsonProperty("secretKeyRef") + @JsonPropertyDescription("Reference to a Secret entry value") + private SecretKeySelector secretKeyRef; + + public ConfigMapKeySelector getConfigMapKeyRef() { + return configMapKeyRef; + } + + public void setConfigMapKeyRef(ConfigMapKeySelector configMapKeyRef) { + this.configMapKeyRef = configMapKeyRef; + } + + public SecretKeySelector getSecretKeyRef() { + return secretKeyRef; + } + + public void setSecretKeyRef(SecretKeySelector secretKeyRef) { + this.secretKeyRef = secretKeyRef; + } + +} diff --git a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/metrics/MetricsSource.java b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/metrics/MetricsSource.java index d9b1546fa..ca3fe7205 100644 --- a/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/metrics/MetricsSource.java +++ b/operator/src/main/java/com/github/streamshub/console/api/v1alpha1/spec/metrics/MetricsSource.java @@ -2,7 +2,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.annotation.JsonValue; +import com.github.streamshub.console.api.v1alpha1.spec.TrustStore; import io.fabric8.generator.annotation.Required; import io.sundr.builder.annotations.Buildable; @@ -16,6 +18,13 @@ public class MetricsSource { @Required private Type type; private String url; + + @JsonPropertyDescription(""" + Trust store configuration for when the metrics source uses \ + TLS certificates signed by an unknown CA. + """) + private TrustStore trustStore; + private MetricsSourceAuthentication authentication; public String getName() { @@ -42,6 +51,14 @@ public void setUrl(String url) { this.url = url; } + public TrustStore getTrustStore() { + return trustStore; + } + + public void setTrustStore(TrustStore trustStore) { + this.trustStore = trustStore; + } + public MetricsSourceAuthentication getAuthentication() { return authentication; } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java index 28f9cb6da..b05c1a568 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleDeployment.java @@ -1,6 +1,8 @@ package com.github.streamshub.console.dependents; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -12,6 +14,10 @@ import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.spec.Images; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource; @@ -58,6 +64,11 @@ protected Deployment desired(Console primary, Context context) { String imageAPI = imagesSpec.map(Images::getApi).orElse(defaultAPIImage); String imageUI = imagesSpec.map(Images::getUi).orElse(defaultUIImage); + var envVars = new ArrayList<>(coalesce(primary.getSpec().getEnv(), Collections::emptyList)); + + var trustResources = getTrustResources(context); + envVars.addAll(getResourcesByType(trustResources, EnvVar.class)); + return desired.edit() .editMetadata() .withName(name) @@ -82,9 +93,11 @@ protected Deployment desired(Console primary, Context context) { .withSecretName(configSecretName) .endSecret() .endVolume() + .addAllToVolumes(getResourcesByType(trustResources, Volume.class)) .editMatchingContainer(c -> "console-api".equals(c.getName())) .withImage(imageAPI) - .addAllToEnv(coalesce(primary.getSpec().getEnv(), Collections::emptyList)) + .addAllToVolumeMounts(getResourcesByType(trustResources, VolumeMount.class)) + .addAllToEnv(envVars) .endContainer() .editMatchingContainer(c -> "console-ui".equals(c.getName())) .withImage(imageUI) @@ -104,4 +117,16 @@ protected Deployment desired(Console primary, Context context) { .endSpec() .build(); } + + @SuppressWarnings("unchecked") + Map, List> getTrustResources(Context context) { + return context.managedDependentResourceContext().getMandatory("TrustStoreResources", Map.class); + } + + @SuppressWarnings("unchecked") + List getResourcesByType( + Map, List> resources, + Class key) { + return (List) resources.getOrDefault(key, Collections.emptyList()); + } } diff --git a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java index 733ef8315..f8df349f0 100644 --- a/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java +++ b/operator/src/main/java/com/github/streamshub/console/dependents/ConsoleSecret.java @@ -5,10 +5,13 @@ import java.io.OutputStream; import java.io.UncheckedIOException; import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Base64; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -32,6 +35,9 @@ import com.github.streamshub.console.api.v1alpha1.spec.Credentials; import com.github.streamshub.console.api.v1alpha1.spec.KafkaCluster; import com.github.streamshub.console.api.v1alpha1.spec.SchemaRegistry; +import com.github.streamshub.console.api.v1alpha1.spec.TrustStore; +import com.github.streamshub.console.api.v1alpha1.spec.Value; +import com.github.streamshub.console.api.v1alpha1.spec.ValueReference; import com.github.streamshub.console.api.v1alpha1.spec.metrics.MetricsSource; import com.github.streamshub.console.api.v1alpha1.spec.metrics.MetricsSource.Type; import com.github.streamshub.console.config.ConsoleConfig; @@ -40,9 +46,15 @@ import com.github.streamshub.console.config.SchemaRegistryConfig; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.openshift.api.model.Route; import io.fabric8.openshift.api.model.RouteIngress; @@ -58,12 +70,18 @@ import io.strimzi.api.kafka.model.user.KafkaUser; import io.strimzi.api.kafka.model.user.KafkaUserStatus; +import static com.github.streamshub.console.support.StringSupport.replaceNonAlphanumeric; +import static com.github.streamshub.console.support.StringSupport.toEnv; + @ApplicationScoped @KubernetesDependent(labelSelector = ConsoleResource.MANAGEMENT_SELECTOR) public class ConsoleSecret extends CRUDKubernetesDependentResource implements ConsoleResource { public static final String NAME = "console-secret"; + private static final String EMBEDDED_METRICS_NAME = "streamshub.console.embedded-prometheus"; + private static final String METRICS_TRUST_PREFIX = "metrics-source-truststore."; + private static final String REGISTRY_TRUST_PREFIX = "schema-registry-truststore."; private static final Random RANDOM = new SecureRandom(); @Inject @@ -98,6 +116,8 @@ protected Secret desired(Console primary, Context context) { throw new UncheckedIOException(e); } + buildTrustStores(primary, context, data); + updateDigest(context, "console-digest", data); return new SecretBuilder() @@ -110,6 +130,120 @@ protected Secret desired(Console primary, Context context) { .build(); } + /** + * Generate additional entries in the secret for metric source trust stores. Also, this + * method will add to the context the resources to be added to the console deployment to + * access the secret entries. + */ + private void buildTrustStores(Console primary, Context context, Map data) { + Map, List> deploymentResources = new HashMap<>(); + + for (var metricsSource : Optional.ofNullable(primary.getSpec().getMetricsSources()) + .orElse(Collections.emptyList())) { + var truststore = metricsSource.getTrustStore(); + + if (truststore != null) { + reconcileTrustStore(primary, context, data, metricsSource.getName(), METRICS_TRUST_PREFIX, truststore, "metrics-source", deploymentResources); + } + } + + for (var registry : Optional.ofNullable(primary.getSpec().getSchemaRegistries()) + .orElse(Collections.emptyList())) { + var truststore = registry.getTrustStore(); + + if (truststore != null) { + reconcileTrustStore(primary, context, data, registry.getName(), REGISTRY_TRUST_PREFIX, truststore, "schema-registry", deploymentResources); + } + } + + context.managedDependentResourceContext().put("TrustStoreResources", deploymentResources); + } + + @SuppressWarnings("java:S107") // Ignore Sonar warning for too many args + private void reconcileTrustStore( + Console primary, + Context context, + Map data, + String sourceName, + String sourcePrefix, + TrustStore truststore, + String bucketPrefix, + Map, List> deploymentResources) { + + String namespace = primary.getMetadata().getNamespace(); + String secretName = instanceName(primary); + String typeCode = truststore.getType().toString(); + String volumeName = replaceNonAlphanumeric(sourcePrefix + sourceName, '-'); + String fileName = sourcePrefix + sourceName + "." + typeCode; + + @SuppressWarnings("unchecked") + List volumes = (List) deploymentResources.computeIfAbsent(Volume.class, k -> new ArrayList<>()); + + volumes.add(new VolumeBuilder() + .withName(volumeName) + .withNewSecret() + .withSecretName(secretName) + .addNewItem() + .withKey(sourcePrefix + sourceName + ".content") + .withPath(fileName) + .endItem() + .withDefaultMode(420) + .endSecret() + .build()); + + @SuppressWarnings("unchecked") + List mounts = (List) deploymentResources.computeIfAbsent(VolumeMount.class, k -> new ArrayList<>()); + + mounts.add(new VolumeMountBuilder() + .withName(volumeName) + .withMountPath("/etc/ssl/" + fileName) + .withSubPath(fileName) + .build()); + + String configTemplate = "quarkus.tls.\"" + bucketPrefix + "-%s\".trust-store.%s.%s"; + + @SuppressWarnings("unchecked") + List vars = (List) deploymentResources.computeIfAbsent(EnvVar.class, k -> new ArrayList<>()); + + if (putMetricsTrustStoreValue(data, sourceName, "content", getValue(context, namespace, truststore.getContent()))) { + String pathKey = switch (truststore.getType()) { + case JKS, PKCS12 -> "path"; + case PEM -> "certs"; + }; + + vars.add(new EnvVarBuilder() + .withName(toEnv(configTemplate.formatted(sourceName, typeCode, pathKey))) + .withValue("/etc/ssl/" + fileName) + .build()); + } + + if (putMetricsTrustStoreValue(data, sourceName, "password", getValue(context, namespace, truststore.getPassword()))) { + vars.add(new EnvVarBuilder() + .withName(toEnv(configTemplate.formatted(sourceName, typeCode, "password"))) + .withNewValueFrom() + .withNewSecretKeyRef(sourcePrefix + sourceName + ".password", secretName, false) + .endValueFrom() + .build()); + } + + if (putMetricsTrustStoreValue(data, sourceName, "alias", getValue(context, namespace, Value.of(truststore.getAlias())))) { + vars.add(new EnvVarBuilder() + .withName(toEnv(configTemplate.formatted(sourceName, typeCode, "alias"))) + .withNewValueFrom() + .withNewSecretKeyRef(sourcePrefix + sourceName + ".alias", secretName, false) + .endValueFrom() + .build()); + } + } + + private boolean putMetricsTrustStoreValue(Map data, String sourceName, String key, String value) { + if (value != null) { + data.put(METRICS_TRUST_PREFIX + sourceName + "." + key, value); + return true; + } + return false; + } + private static String base64String(int length) { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); OutputStream out = Base64.getEncoder().wrap(buffer); @@ -395,6 +529,46 @@ void copyData(Map target, Map source, String pre }); } + /** + * Fetch the string value from the given valueSpec. The return string + * will be encoded for use in the Console secret data map. + */ + String getValue(Context context, String namespace, Value valueSpec) { + if (valueSpec == null) { + return null; + } + + return Optional.ofNullable(valueSpec.getValue()) + .map(this::encodeString) + .or(() -> Optional.ofNullable(valueSpec.getValueFrom()) + .map(ValueReference::getConfigMapKeyRef) + .flatMap(ref -> getValue(context, ConfigMap.class, namespace, ref.getName(), ref.getKey(), ref.getOptional(), ConfigMap::getData) + .map(this::encodeString) + .or(() -> getValue(context, ConfigMap.class, namespace, ref.getName(), ref.getKey(), ref.getOptional(), ConfigMap::getBinaryData)))) + .or(() -> Optional.ofNullable(valueSpec.getValueFrom()) + .map(ValueReference::getSecretKeyRef) + .flatMap(ref -> getValue(context, Secret.class, namespace, ref.getName(), ref.getKey(), ref.getOptional(), Secret::getData)) + /* No need to call encodeString, the value is already encoded from Secret */) + .orElse(null); + } + + Optional getValue(Context context, + Class sourceType, + String namespace, + String name, + String key, + Boolean optional, + Function> dataProvider) { + + S source = getResource(context, sourceType, namespace, name, Boolean.TRUE.equals(optional)); + + if (source != null) { + return Optional.ofNullable(dataProvider.apply(source).get(key)); + } + + return Optional.empty(); + } + static T getResource( Context context, Class resourceType, String namespace, String name) { return getResource(context, resourceType, namespace, name, false); diff --git a/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml b/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml index c16a39d66..6f5759998 100644 --- a/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml +++ b/operator/src/main/resources/com/github/streamshub/console/dependents/console.deployment.yaml @@ -13,13 +13,6 @@ spec: spec: serviceAccountName: placeholder volumes: - - name: kubernetes-ca - configMap: - name: kube-root-ca.crt - items: - - key: ca.crt - path: kubernetes-ca.pem - defaultMode: 420 - name: cache emptyDir: {} - name: config @@ -33,15 +26,10 @@ spec: - containerPort: 8080 name: http volumeMounts: - - name: kubernetes-ca - mountPath: /etc/ssl/kubernetes-ca.pem - subPath: kubernetes-ca.pem - name: config mountPath: /deployments/console-config.yaml subPath: console-config.yaml env: - - name: QUARKUS_TLS_TRUST_STORE_PEM_CERTS - value: /etc/ssl/kubernetes-ca.pem - name: CONSOLE_CONFIG_PATH value: /deployments/console-config.yaml startupProbe: diff --git a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java index cf9193c3d..8cd0da87a 100644 --- a/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java +++ b/operator/src/test/java/com/github/streamshub/console/ConsoleReconcilerTest.java @@ -6,6 +6,8 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import jakarta.inject.Inject; @@ -19,6 +21,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.github.streamshub.console.api.v1alpha1.Console; import com.github.streamshub.console.api.v1alpha1.ConsoleBuilder; +import com.github.streamshub.console.api.v1alpha1.spec.TrustStore; +import com.github.streamshub.console.api.v1alpha1.spec.metrics.MetricsSource; import com.github.streamshub.console.api.v1alpha1.spec.metrics.MetricsSource.Type; import com.github.streamshub.console.config.ConsoleConfig; import com.github.streamshub.console.config.PrometheusConfig; @@ -27,13 +31,17 @@ import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.SecretBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionBuilder; import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.networking.v1.Ingress; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.openshift.api.model.Route; @@ -120,6 +128,7 @@ void setUp() throws Exception { var allDeployments = client.resources(Deployment.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); var allConfigMaps = client.resources(ConfigMap.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); var allSecrets = client.resources(Secret.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); + var allIngresses = client.resources(Ingress.class).inAnyNamespace().withLabels(ConsoleResource.MANAGEMENT_LABEL); allConsoles.delete(); allKafkas.delete(); @@ -127,6 +136,7 @@ void setUp() throws Exception { allDeployments.delete(); allConfigMaps.delete(); allSecrets.delete(); + allIngresses.delete(); await().atMost(LIMIT).untilAsserted(() -> { assertTrue(allConsoles.list().getItems().isEmpty()); @@ -135,6 +145,7 @@ void setUp() throws Exception { assertTrue(allDeployments.list().getItems().isEmpty()); assertTrue(allConfigMaps.list().getItems().isEmpty()); assertTrue(allSecrets.list().getItems().isEmpty()); + assertTrue(allIngresses.list().getItems().isEmpty()); }); operator.start(); @@ -225,22 +236,7 @@ void testBasicConsoleReconciliation() { .editStatus(this::setReady); LOGGER.info("Set ready replicas for Prometheus deployment"); - var consoleIngress = client.network().v1().ingresses() - .inNamespace(consoleCR.getMetadata().getNamespace()) - .withName("console-1-console-ingress") - .get(); - - consoleIngress = consoleIngress.edit() - .editOrNewStatus() - .withNewLoadBalancer() - .addNewIngress() - .withHostname("ingress.example.com") - .endIngress() - .endLoadBalancer() - .endStatus() - .build(); - client.resource(consoleIngress).patchStatus(); - LOGGER.info("Set ingress status for Console ingress"); + setConsoleIngressReady(consoleCR); await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { var console = client.resources(Console.class) @@ -848,6 +844,153 @@ void testConsoleReconciliationWithPrometheusEmptyAuthN() { assertThrows(KubernetesClientException.class, resourceClient::create); } + @Test + void testConsoleReconciliationWithTrustStores() { + Secret passwordSecret = new SecretBuilder() + .withNewMetadata() + .withName("my-secret") + .withNamespace("ns2") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) + .endMetadata() + .addToData("pass", Base64.getEncoder().encodeToString("0p3n535@m3".getBytes())) + .build(); + ConfigMap contentConfigMap = new ConfigMapBuilder() + .withNewMetadata() + .withName("my-configmap") + .withNamespace("ns2") + .addToLabels(ConsoleResource.MANAGEMENT_LABEL) + .endMetadata() + .addToData("truststore", "dummy-keystore") + .build(); + + client.resource(passwordSecret).create(); + client.resource(contentConfigMap).create(); + + Console consoleCR = new ConsoleBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName("console-1") + .withNamespace("ns2") + .build()) + .withNewSpec() + .withHostname("example.com") + .addNewMetricsSource() + .withName("example-prometheus") + .withType(MetricsSource.Type.STANDALONE) + .withUrl("https://prometheus.example.com") + .withNewTrustStore() + .withType(TrustStore.Type.JKS) + .withNewPassword() + .withNewValueFrom() + .withNewSecretKeyRef("pass", "my-secret", Boolean.FALSE) + .endValueFrom() + .endPassword() + .withNewContent() + .withNewValueFrom() + .withNewConfigMapKeyRef("truststore", "my-configmap", Boolean.FALSE) + .endValueFrom() + .endContent() + .withAlias("cert-ca") + .endTrustStore() + .endMetricsSource() + .addNewSchemaRegistry() + .withName("example-registry") + .withUrl("https://example.com/apis/registry/v2") + .withNewTrustStore() + .withType(TrustStore.Type.PEM) + .withNewContent() + .withValue("---CERT---") + .endContent() + .endTrustStore() + .endSchemaRegistry() + .addNewKafkaCluster() + .withName(kafkaCR.getMetadata().getName()) + .withNamespace(kafkaCR.getMetadata().getNamespace()) + .withListener(kafkaCR.getSpec().getKafka().getListeners().get(0).getName()) + .withSchemaRegistry("example-registry") + .endKafkaCluster() + .endSpec() + .build(); + + client.resource(consoleCR).create(); + + await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { + var console = client.resources(Console.class) + .inNamespace(consoleCR.getMetadata().getNamespace()) + .withName(consoleCR.getMetadata().getName()) + .get(); + assertEquals(1, console.getStatus().getConditions().size()); + var condition = console.getStatus().getConditions().get(0); + assertEquals("Ready", condition.getType()); + assertEquals("False", condition.getStatus()); + assertEquals("DependentsNotReady", condition.getReason()); + assertTrue(condition.getMessage().contains("ConsoleIngress")); + }); + + setConsoleIngressReady(consoleCR); + + await().ignoreException(NullPointerException.class).atMost(LIMIT).untilAsserted(() -> { + var console = client.resources(Console.class) + .inNamespace(consoleCR.getMetadata().getNamespace()) + .withName(consoleCR.getMetadata().getName()) + .get(); + assertEquals(1, console.getStatus().getConditions().size()); + var condition = console.getStatus().getConditions().get(0); + assertEquals("Ready", condition.getType()); + assertEquals("False", condition.getStatus()); + assertEquals("DependentsNotReady", condition.getReason()); + assertTrue(condition.getMessage().contains("ConsoleDeployment")); + }); + + var consoleDeployment = client.apps().deployments() + .inNamespace(consoleCR.getMetadata().getNamespace()) + .withName("console-1-console-deployment") + .get(); + + var podSpec = consoleDeployment.getSpec().getTemplate().getSpec(); + var containerSpecAPI = podSpec.getContainers().get(0); + + var volumes = podSpec.getVolumes().stream().collect(Collectors.toMap(Volume::getName, Function.identity())); + assertEquals(4, volumes.size()); // cache, config + 2 volumes for truststores + + var metricsVolName = "metrics-source-truststore-example-prometheus"; + var registryVolName = "schema-registry-truststore-example-registry"; + + var metricsVolume = volumes.get(metricsVolName); + assertEquals("metrics-source-truststore.example-prometheus.content", metricsVolume.getSecret().getItems().get(0).getKey()); + assertEquals("metrics-source-truststore.example-prometheus.jks", metricsVolume.getSecret().getItems().get(0).getPath()); + + var registryVolume = volumes.get(registryVolName); + assertEquals("schema-registry-truststore.example-registry.content", registryVolume.getSecret().getItems().get(0).getKey()); + assertEquals("schema-registry-truststore.example-registry.pem", registryVolume.getSecret().getItems().get(0).getPath()); + + var mounts = containerSpecAPI.getVolumeMounts().stream().collect(Collectors.toMap(VolumeMount::getName, Function.identity())); + assertEquals(3, mounts.size()); + + var metricsMount = mounts.get(metricsVolName); + var metricsMountPath = "/etc/ssl/metrics-source-truststore.example-prometheus.jks"; + assertEquals(metricsMountPath, metricsMount.getMountPath()); + assertEquals("metrics-source-truststore.example-prometheus.jks", metricsMount.getSubPath()); + + var registryMount = mounts.get(registryVolName); + var registryMountPath = "/etc/ssl/schema-registry-truststore.example-registry.pem"; + assertEquals(registryMountPath, registryMount.getMountPath()); + assertEquals("schema-registry-truststore.example-registry.pem", registryMount.getSubPath()); + + var envVars = containerSpecAPI.getEnv().stream().collect(Collectors.toMap(EnvVar::getName, Function.identity())); + + var metricsTrustPath = envVars.get("QUARKUS_TLS__METRICS_SOURCE_EXAMPLE_PROMETHEUS__TRUST_STORE_JKS_PATH"); + assertEquals(metricsMountPath, metricsTrustPath.getValue()); + var metricsAliasSource = envVars.get("QUARKUS_TLS__METRICS_SOURCE_EXAMPLE_PROMETHEUS__TRUST_STORE_JKS_ALIAS"); + assertEquals("console-1-console-secret", metricsAliasSource.getValueFrom().getSecretKeyRef().getName()); + assertEquals("metrics-source-truststore.example-prometheus.alias", metricsAliasSource.getValueFrom().getSecretKeyRef().getKey()); + var metricsPasswordSource = envVars.get("QUARKUS_TLS__METRICS_SOURCE_EXAMPLE_PROMETHEUS__TRUST_STORE_JKS_PASSWORD"); + assertEquals("console-1-console-secret", metricsPasswordSource.getValueFrom().getSecretKeyRef().getName()); + assertEquals("metrics-source-truststore.example-prometheus.password", metricsPasswordSource.getValueFrom().getSecretKeyRef().getKey()); + + var registryTrustPath = envVars.get("QUARKUS_TLS__SCHEMA_REGISTRY_EXAMPLE_REGISTRY__TRUST_STORE_PEM_CERTS"); + assertEquals(registryMountPath, registryTrustPath.getValue()); + } + // Utility private void assertConsoleConfig(Consumer assertion) { @@ -862,6 +1005,25 @@ private void assertConsoleConfig(Consumer assertion) { }); } + private void setConsoleIngressReady(Console consoleCR) { + var consoleIngress = client.network().v1().ingresses() + .inNamespace(consoleCR.getMetadata().getNamespace()) + .withName("console-1-console-ingress") + .get(); + + consoleIngress = consoleIngress.edit() + .editOrNewStatus() + .withNewLoadBalancer() + .addNewIngress() + .withHostname("ingress.example.com") + .endIngress() + .endLoadBalancer() + .endStatus() + .build(); + client.resource(consoleIngress).patchStatus(); + LOGGER.info("Set ingress status for Console ingress"); + } + private Deployment setReady(Deployment deployment) { int desiredReplicas = Optional.ofNullable(deployment.getSpec().getReplicas()).orElse(1);