diff --git a/.dev/dev_arm64.yaml b/.dev/dev_arm64.yaml
index 220140d3d..dc1a8726e 100644
--- a/.dev/dev_arm64.yaml
+++ b/.dev/dev_arm64.yaml
@@ -32,7 +32,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
kafka0:
- image: confluentinc/cp-kafka:7.6.0.arm64
+ image: confluentinc/cp-kafka:7.8.0.arm64
user: "0:0"
hostname: kafka0
container_name: kafka0
@@ -60,7 +60,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry0:
- image: confluentinc/cp-schema-registry:7.6.0.arm64
+ image: confluentinc/cp-schema-registry:7.8.0.arm64
ports:
- 8085:8085
depends_on:
@@ -76,7 +76,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
kafka-connect0:
- image: confluentinc/cp-kafka-connect:7.6.0.arm64
+ image: confluentinc/cp-kafka-connect:7.8.0.arm64
ports:
- 8083:8083
depends_on:
@@ -101,7 +101,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"
ksqldb0:
- image: confluentinc/cp-ksqldb-server:7.6.0.arm64
+ image: confluentinc/cp-ksqldb-server:7.8.0.arm64
depends_on:
- kafka0
- kafka-connect0
@@ -119,7 +119,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
kafka-init-topics:
- image: confluentinc/cp-kafka:7.6.0.arm64
+ image: confluentinc/cp-kafka:7.8.0.arm64
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
diff --git a/.gitignore b/.gitignore
index efd6a9749..51efbef39 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,3 +42,4 @@ build/
*.tgz
/docker/*.override.yaml
+/e2e-tests/allure-results/
diff --git a/.java-version b/.java-version
new file mode 100644
index 000000000..aabe6ec39
--- /dev/null
+++ b/.java-version
@@ -0,0 +1 @@
+21
diff --git a/README.md b/README.md
index 9b6eb8e80..d6206100a 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,6 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
Quick Start •
Community
- AWS Marketplace •
ProductHunt
@@ -28,7 +27,7 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
#### Kafbat UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.
-Kafbat UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
+[Kafbat UI](https://kafbat.io/) is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
Kafbat UI, developed by Kafbat*, proudly carries forward the legacy of the UI Apache Kafka project.
diff --git a/api/pom.xml b/api/pom.xml
index 3f7c044d0..dc774c09a 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -50,7 +50,10 @@
org.apache.kafka
kafka-clients
- ${kafka-clients.version}
+
+ ${confluent.version}-ccs
org.apache.commons
diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
index 72ab7386a..474a0c159 100644
--- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
+++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
@@ -22,6 +22,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
(WebClientResponseException.Conflict) signal.failure()));
}
- private static Mono withRetryOnConflict(Mono publisher) {
- return publisher.retryWhen(conflictCodeRetry());
+ private static @NotNull Retry retryOnRebalance() {
+ return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {
+
+ if (e instanceof WebClientResponseException.InternalServerError exception) {
+ final var errorMessage = getMessage(exception);
+ return StringUtils.equals(errorMessage,
+ // From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
+ "Request cannot be completed because a rebalance is expected");
+ }
+ return false;
+ });
+ }
+
+ private static Mono withRetryOnConflictOrRebalance(Mono publisher) {
+ return publisher
+ .retryWhen(retryOnRebalance())
+ .retryWhen(conflictCodeRetry());
+ }
+
+ private static Flux withRetryOnConflictOrRebalance(Flux publisher) {
+ return publisher
+ .retryWhen(retryOnRebalance())
+ .retryWhen(conflictCodeRetry());
}
- private static Flux withRetryOnConflict(Flux publisher) {
- return publisher.retryWhen(conflictCodeRetry());
+ private static Mono withRetryOnRebalance(Mono publisher) {
+ return publisher.retryWhen(retryOnRebalance());
}
+
private static Mono withBadRequestErrorHandling(Mono publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class,
@@ -73,18 +96,21 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}
private static @NotNull Mono parseConnectErrorMessage(WebClientResponseException parseException) {
+ return Mono.error(new ValidationException(getMessage(parseException)));
+ }
+
+ private static String getMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
- return Mono.error(new ValidationException(
- Objects.requireNonNull(errorMessage,
- // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
- "This should not happen according to the ConnectExceptionMapper")
- .message()));
+ return Objects.requireNonNull(errorMessage,
+ // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
+ "This should not happen according to the ConnectExceptionMapper")
+ .message();
}
@Override
public Mono createConnector(NewConnector newConnector) throws RestClientException {
return withBadRequestErrorHandling(
- super.createConnector(newConnector)
+ withRetryOnRebalance(super.createConnector(newConnector))
);
}
@@ -92,178 +118,178 @@ public Mono createConnector(NewConnector newConnector) throws RestCli
public Mono setConnectorConfig(String connectorName, Map requestBody)
throws RestClientException {
return withBadRequestErrorHandling(
- super.setConnectorConfig(connectorName, requestBody)
+ withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
);
}
@Override
public Mono> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
- return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
+ return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
}
@Override
public Mono deleteConnector(String connectorName) throws WebClientResponseException {
- return withRetryOnConflict(super.deleteConnector(connectorName));
+ return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
}
@Override
public Mono> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
- return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
+ return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
}
@Override
public Mono getConnector(String connectorName) throws WebClientResponseException {
- return withRetryOnConflict(super.getConnector(connectorName));
+ return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
}
@Override
public Mono> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
- return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
+ return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
}
@Override
public Mono