Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into chore/rbac_ad
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean committed Dec 30, 2024
2 parents 514c053 + d093752 commit daa6cb7
Show file tree
Hide file tree
Showing 68 changed files with 1,701 additions and 450 deletions.
10 changes: 5 additions & 5 deletions .dev/dev_arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
user: "0:0"
hostname: kafka0
container_name: kafka0
Expand Down Expand Up @@ -60,7 +60,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
image: confluentinc/cp-schema-registry:7.6.0.arm64
image: confluentinc/cp-schema-registry:7.8.0.arm64
ports:
- 8085:8085
depends_on:
Expand All @@ -76,7 +76,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.6.0.arm64
image: confluentinc/cp-kafka-connect:7.8.0.arm64
ports:
- 8083:8083
depends_on:
Expand All @@ -101,7 +101,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.6.0.arm64
image: confluentinc/cp-ksqldb-server:7.8.0.arm64
depends_on:
- kafka0
- kafka-connect0
Expand All @@ -119,7 +119,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0

kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

- uses: pnpm/[email protected]
with:
version: 9.11.0
version: 9.15.0

- name: Install node
uses: actions/[email protected]
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ build/
*.tgz

/docker/*.override.yaml
/e2e-tests/allure-results/
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
21
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.

<p align="center">
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
<a href="https://ui.docs.kafbat.io/quick-start/demo-run">Quick Start</a> •
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
<br/>
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
</p>

Expand All @@ -28,7 +27,7 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.

#### Kafbat UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.

Kafbat UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
[Kafbat UI](https://kafbat.io/) is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.

<i>
Kafbat UI, developed by <b>Kafbat</b>*, proudly carries forward the legacy of the UI Apache Kafka project.
Expand Down
5 changes: 4 additions & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<!-- ccs stands for Confluent Community Edition
See https://www.confluent.io/confluent-community-license-faq/
-->
<version>${confluent.version}-ccs</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
108 changes: 67 additions & 41 deletions api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
Expand Down Expand Up @@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
(WebClientResponseException.Conflict) signal.failure()));
}

private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static @NotNull Retry retryOnRebalance() {
return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {

if (e instanceof WebClientResponseException.InternalServerError exception) {
final var errorMessage = getMessage(exception);
return StringUtils.equals(errorMessage,
// From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
"Request cannot be completed because a rebalance is expected");
}
return false;
});
}

private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
return publisher.retryWhen(retryOnRebalance());
}


private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class,
Expand All @@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
return Mono.error(new ValidationException(getMessage(parseException)));
}

private static String getMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
return Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message();
}

@Override
public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
return withBadRequestErrorHandling(
super.createConnector(newConnector)
withRetryOnRebalance(super.createConnector(newConnector))
);
}

@Override
public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
throws RestClientException {
return withBadRequestErrorHandling(
super.setConnectorConfig(connectorName, requestBody)
withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
);
}

@Override
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
}

@Override
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnector(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
}


@Override
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnector(connectorName));
return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfig(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfig(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfigWithHttpInfo(connectorName));
}

@Override
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPlugins());
return withRetryOnConflictOrRebalance(super.getConnectorPlugins());
}

@Override
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
return withRetryOnConflictOrRebalance(super.getConnectorPluginsWithHttpInfo());
}

@Override
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatus(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatus(connectorName));
}

@Override
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatusWithHttpInfo(connectorName));
}

@Override
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatus(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
}

@Override
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasks(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasks(connectorName));
}

@Override
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasksWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopics(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopics(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopicsWithHttpInfo(connectorName));
}

@Override
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
return withRetryOnConflictOrRebalance(super.getConnectors(search));
}

@Override
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
}

@Override
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnector(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnector(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
Boolean onlyFailed) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTask(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return super.resumeConnector(connectorName);
return withRetryOnRebalance(super.resumeConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
return withRetryOnConflictOrRebalance(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
}

@Override
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfig(pluginName, requestBody));
}

@Override
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
}

private static class RetryingApiClient extends ApiClient {
Expand Down
Loading

0 comments on commit daa6cb7

Please sign in to comment.