diff --git a/README.md b/README.md
index 56c5f708b..d5f965815 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ kubectl patch deployment -n ingress-nginx ingress-nginx-controller \
```
### Prerequisites
-#### Apache Kafka®
+#### Apache Kafka®
The instructions below assume an existing Apache Kafka® cluster is available to use from the console. We recommend using [Strimzi](https://strimzi.io) to create and manage your Apache Kafka® clusters - plus the console provides additional features and insights for Strimzi Apache Kafka® clusters.
If you already have Strimzi installed but would like to create an Apache Kafka® cluster for use with the console, example deployment resources are available to get started. The resources create an Apache Kafka® cluster in KRaft mode with SCRAM-SHA-512 authentication, a Strimzi `KafkaNodePool` resource to manage the cluster nodes, and a Strimzi `KafkaUser` resource that may be used to connect to the cluster.
diff --git a/api/pom.xml b/api/pom.xml
index e084f8a58..2e7b7114c 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -134,7 +134,6 @@
io.xlate
validators
- 1.4.2
org.slf4j
diff --git a/common/pom.xml b/common/pom.xml
index b1d89e037..f89185fa5 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -33,5 +33,56 @@
jakarta.validation-api
provided
+
+ io.xlate
+ validators
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.hibernate.validator
+ hibernate-validator
+ test
+
+
+ org.glassfish.expressly
+ expressly
+ test
+ true
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ default-prepare-agent
+
+ prepare-agent
+
+
+
+ default-report
+
+ report
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
diff --git a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java
index 5c3c0f794..19d7ec94b 100644
--- a/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java
+++ b/common/src/main/java/com/github/streamshub/console/config/ConsoleConfig.java
@@ -3,14 +3,32 @@
import java.util.ArrayList;
import java.util.List;
+import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import io.xlate.validation.constraints.Expression;
+
+@Expression(
+ message = "Kafka cluster references an unknown schema registry",
+ value = """
+ registryNames = self.schemaRegistries.stream()
+ .map(registry -> registry.getName())
+ .toList();
+ self.kafka.clusters.stream()
+ .map(cluster -> cluster.getSchemaRegistry())
+ .filter(registry -> registry != null)
+ .allMatch(registry -> registryNames.contains(registry))
+ """)
public class ConsoleConfig {
KubernetesConfig kubernetes = new KubernetesConfig();
+
+ @Valid
List schemaRegistries = new ArrayList<>();
+
+ @Valid
KafkaConfig kafka = new KafkaConfig();
@JsonIgnore
diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java
index 0d27c0ad8..a676d80c6 100644
--- a/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java
+++ b/common/src/main/java/com/github/streamshub/console/config/KafkaClusterConfig.java
@@ -10,7 +10,7 @@
public class KafkaClusterConfig {
private String id;
- @NotBlank
+ @NotBlank(message = "Kafka cluster `name` is required")
private String name;
private String namespace;
private String listener;
diff --git a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java
index 9c68d1622..429817137 100644
--- a/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java
+++ b/common/src/main/java/com/github/streamshub/console/config/KafkaConfig.java
@@ -4,12 +4,14 @@
import java.util.List;
import java.util.Optional;
+import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import com.fasterxml.jackson.annotation.JsonIgnore;
public class KafkaConfig {
+ @Valid
List clusters = new ArrayList<>();
@JsonIgnore
diff --git a/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java b/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java
new file mode 100644
index 000000000..b8debb754
--- /dev/null
+++ b/common/src/test/java/com/github/streamshub/console/config/ConsoleConfigTest.java
@@ -0,0 +1,124 @@
+package com.github.streamshub.console.config;
+
+import java.util.Comparator;
+import java.util.List;
+
+import jakarta.validation.ConstraintViolation;
+import jakarta.validation.Validation;
+import jakarta.validation.Validator;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ConsoleConfigTest {
+
+ ConsoleConfig config;
+ Validator validator;
+
+ @BeforeEach
+ void setup() {
+ config = new ConsoleConfig();
+ validator = Validation.buildDefaultValidatorFactory().getValidator();
+ }
+
+ @Test
+ void testRegistryNamesNotUniqueFailsValidation() {
+ for (String name : List.of("name1", "name2", "name1")) {
+ SchemaRegistryConfig registry = new SchemaRegistryConfig();
+ registry.setName(name);
+ registry.setUrl("http://example.com");
+ config.getSchemaRegistries().add(registry);
+ }
+
+ var violations = validator.validate(config);
+
+ assertEquals(1, violations.size());
+ assertEquals("Schema registry names must be unique", violations.iterator().next().getMessage());
+ }
+
+ @Test
+ void testRegistryNamesUniquePassesValidation() {
+ for (String name : List.of("name1", "name2", "name3")) {
+ SchemaRegistryConfig registry = new SchemaRegistryConfig();
+ registry.setName(name);
+ registry.setUrl("http://example.com");
+ config.getSchemaRegistries().add(registry);
+ }
+
+ var violations = validator.validate(config);
+
+ assertTrue(violations.isEmpty());
+ }
+
+ @Test
+ void testRegistryMissingPropertiesFailsValidation() {
+ SchemaRegistryConfig registry = new SchemaRegistryConfig();
+ // name and url are null
+ config.getSchemaRegistries().add(registry);
+
+ var violations = validator.validate(config).stream()
+ .sorted(Comparator.comparing(ConstraintViolation::getMessage))
+ .toList();
+
+ assertEquals(2, violations.size());
+ assertEquals("Schema registry `name` is required", violations.get(0).getMessage());
+ assertEquals("Schema registry `url` is required", violations.get(1).getMessage());
+ }
+
+ @Test
+ void testKafkaNamesNotUniqueFailsValidation() {
+ for (String name : List.of("name1", "name2", "name1")) {
+ KafkaClusterConfig cluster = new KafkaClusterConfig();
+ cluster.setName(name);
+ config.getKafka().getClusters().add(cluster);
+ }
+
+ var violations = validator.validate(config);
+
+ assertEquals(1, violations.size());
+ assertEquals("Kafka cluster names must be unique", violations.iterator().next().getMessage());
+ }
+
+ @Test
+ void testKafkaNameMissingFailsValidation() {
+ config.getKafka().getClusters().add(new KafkaClusterConfig());
+
+ var violations = validator.validate(config);
+
+ assertEquals(1, violations.size());
+ assertEquals("Kafka cluster `name` is required", violations.iterator().next().getMessage());
+ }
+
+ @Test
+ void testRegistryNamePassesValidation() {
+ SchemaRegistryConfig registry = new SchemaRegistryConfig();
+ registry.setName("known-registry");
+ registry.setUrl("http://example.com");
+ config.getSchemaRegistries().add(registry);
+
+ KafkaClusterConfig cluster = new KafkaClusterConfig();
+ cluster.setName("name1");
+ cluster.setSchemaRegistry("known-registry");
+ config.getKafka().getClusters().add(cluster);
+
+ var violations = validator.validate(config);
+
+ assertTrue(violations.isEmpty());
+ }
+
+ @Test
+ void testUnknownRegistryNameFailsValidation() {
+ KafkaClusterConfig cluster = new KafkaClusterConfig();
+ cluster.setName("name1");
+ cluster.setSchemaRegistry("unknown-registry");
+ config.getKafka().getClusters().add(cluster);
+
+ var violations = validator.validate(config);
+
+ assertEquals(1, violations.size());
+ assertEquals("Kafka cluster references an unknown schema registry", violations.iterator().next().getMessage());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 60eca8bea..6c75b2f0b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,11 @@
pom
import
+
+ io.xlate
+ validators
+ 1.4.2
+