Skip to content

Commit

Permalink
BE: Chore: Cleanup api module (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
wernerdv authored Feb 11, 2025
1 parent 844fbc9 commit eaeb4a4
Show file tree
Hide file tree
Showing 30 changed files with 89 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.kafbat.ui.connect.model.ConnectorTopics;
import io.kafbat.ui.connect.model.NewConnector;
import io.kafbat.ui.connect.model.TaskStatus;
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
Expand Down Expand Up @@ -48,7 +48,7 @@ private static Retry conflictCodeRetry() {
.fixedDelay(MAX_RETRIES, RETRIES_DELAY)
.filter(e -> e instanceof WebClientResponseException.Conflict)
.onRetryExhaustedThrow((spec, signal) ->
new KafkaConnectConflictReponseException(
new KafkaConnectConflictResponseException(
(WebClientResponseException.Conflict) signal.failure()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.kafbat.ui.config.auth;

import io.kafbat.ui.util.EmptyRedirectStrategy;
import io.kafbat.ui.util.StaticFileWebFilter;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -12,8 +10,6 @@
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Mono<Void> handle(WebFilterExchange exchange, Authentication authenticati
requestUri.getPath(), requestUri.getQuery());

final UriComponents baseUrl = UriComponentsBuilder
.fromHttpUrl(fullUrl)
.fromUriString(fullUrl)
.replacePath("/")
.replaceQuery(null)
.fragment(null)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer
Collection<TopicPartition> partitions) {
try {
// we try to use offsetsForTimes() to find earliest offsets, since for
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
// some topics (like compacted) beginningOffsets() returning 0 offsets
// even when effectively first offset can be very high
var offsets = consumer.offsetsForTimes(
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
Expand Down
23 changes: 0 additions & 23 deletions api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java

This file was deleted.

This file was deleted.

This file was deleted.

6 changes: 0 additions & 6 deletions api/src/main/java/io/kafbat/ui/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;


public enum ErrorCode {

FORBIDDEN(403, HttpStatus.FORBIDDEN),

UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR),
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
NOT_FOUND(404, HttpStatus.NOT_FOUND),
VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT),
DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT),
UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY),
CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND),
KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND),
DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST),
TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST),
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ private Mono<ServerResponse> render(CustomBaseException baseException, ServerReq

private Mono<ServerResponse> render(WebExchangeBindException exception, ServerRequest request) {
Map<String, Set<String>> fieldErrorsMap = exception.getFieldErrors().stream()
.collect(Collectors
.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));
.collect(Collectors.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union));

var fieldsErrors = fieldErrorsMap.entrySet().stream()
.map(e -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.kafbat.ui.exception;


import org.springframework.web.reactive.function.client.WebClientResponseException;

public class KafkaConnectConflictReponseException extends CustomBaseException {
public class KafkaConnectConflictResponseException extends CustomBaseException {

public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) {
public KafkaConnectConflictResponseException(WebClientResponseException.Conflict e) {
super("Kafka Connect responded with 409 (Conflict) code. Response body: "
+ e.getResponseBodyAsString());
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

26 changes: 9 additions & 17 deletions api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,15 @@ private static BrokerDTO mapCoordinator(Node node) {
return new BrokerDTO().host(node.host()).id(node.id()).port(node.port());
}

private static ConsumerGroupStateDTO mapConsumerGroupState(
org.apache.kafka.common.ConsumerGroupState state) {
switch (state) {
case DEAD:
return ConsumerGroupStateDTO.DEAD;
case EMPTY:
return ConsumerGroupStateDTO.EMPTY;
case STABLE:
return ConsumerGroupStateDTO.STABLE;
case PREPARING_REBALANCE:
return ConsumerGroupStateDTO.PREPARING_REBALANCE;
case COMPLETING_REBALANCE:
return ConsumerGroupStateDTO.COMPLETING_REBALANCE;
default:
return ConsumerGroupStateDTO.UNKNOWN;
}
private static ConsumerGroupStateDTO mapConsumerGroupState(org.apache.kafka.common.ConsumerGroupState state) {
return switch (state) {
case DEAD -> ConsumerGroupStateDTO.DEAD;
case EMPTY -> ConsumerGroupStateDTO.EMPTY;
case STABLE -> ConsumerGroupStateDTO.STABLE;
case PREPARING_REBALANCE -> ConsumerGroupStateDTO.PREPARING_REBALANCE;
case COMPLETING_REBALANCE -> ConsumerGroupStateDTO.COMPLETING_REBALANCE;
default -> ConsumerGroupStateDTO.UNKNOWN;
};
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,

private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
List<Map.Entry<TopicPartition,
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
topic.setName(name);
topic.setPartitions(
Expand All @@ -54,8 +54,7 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
}

private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
DescribeLogDirsResponse.ReplicaInfo
replicaInfo) {
DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
logDir.setBroker(broker);
logDir.setPartition(partition);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void close() {
try {
serde.close();
} catch (Exception e) {
log.error("Error closing serde " + name, e);
log.error("Error closing serde {}", name, e);
}
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private Map<String, ProtoFile> knownProtoFiles() {
}

private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) {
String protoFileString = null;
String protoFileString;
// know type file contains either message or enum
if (!fileDescriptor.getMessageTypes().isEmpty()) {
protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().getFirst()).canonicalString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
Expand Down Expand Up @@ -44,7 +43,6 @@
public class KafkaConnectService {
private final ClusterMapper clusterMapper;
private final KafkaConnectMapper kafkaConnectMapper;
private final ObjectMapper objectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;

public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,6 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
);
}

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
return describeCluster()
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
.flatMap(this::describeLogDirs);
}

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
Collection<Integer> brokerIds) {
return toMono(client.describeLogDirs(brokerIds).all())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
@SneakyThrows
private List<String> parseSubjectListString(String subjectNamesStr) {
//workaround for https://github.com/spring-projects/spring-framework/issues/24734
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
return new JsonMapper().readValue(subjectNamesStr, new TypeReference<>() {
});
}

Expand Down
12 changes: 5 additions & 7 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {

/**
* After creation topic can be invisible via API for some time.
* To workaround this, we retyring topic loading until it becomes visible.
* To workaround this, we're retrying topic loading until it becomes visible.
*/
private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
return loadTopic(c, topicName)
Expand Down Expand Up @@ -137,8 +137,7 @@ private List<InternalTopic> createList(List<String> orderedNames,
.collect(toList());
}

private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
descriptionsMap,
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription> descriptionsMap,
ReactiveAdminClient ac) {
var descriptions = descriptionsMap.values();
return ac.listOffsets(descriptions, OffsetSpec.earliest())
Expand Down Expand Up @@ -225,8 +224,7 @@ private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
.then(loadTopic(cluster, topicName)));
}

public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
Mono<TopicUpdateDTO> topicUpdate) {
public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName, Mono<TopicUpdateDTO> topicUpdate) {
return topicUpdate
.flatMap(t -> updateTopic(cl, topicName, t));
}
Expand Down Expand Up @@ -298,7 +296,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
var brokers = brokersUsage.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.collect(toList());
.toList();

// Iterate brokers and try to add them in assignment
// while partition replicas count != requested replication factor
Expand Down Expand Up @@ -326,7 +324,7 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
var brokersUsageList = brokersUsage.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.map(Map.Entry::getKey)
.collect(toList());
.toList();

// Iterate brokers and try to remove them from assignment
// while partition replicas count != requested replication factor
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/acl/AclsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set
if (!toBeAdded.isEmpty()) {
log.info("ACLs to be added ({}): ", toBeAdded.size());
for (AclBinding aclBinding : toBeAdded) {
log.info(" " + AclCsv.createAclString(aclBinding));
log.info(" {}", AclCsv.createAclString(aclBinding));
}
}
if (!toBeDeleted.isEmpty()) {
log.info("ACLs to be deleted ({}): ", toBeDeleted.size());
for (AclBinding aclBinding : toBeDeleted) {
log.info(" " + AclCsv.createAclString(aclBinding));
log.info(" {}", AclCsv.createAclString(aclBinding));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamPr
if (statements.size() > 1) {
return errorTableFlux("Only single statement supported now");
}
if (statements.size() == 0) {
if (statements.isEmpty()) {
return errorTableFlux("No valid ksql statement found");
}
if (isUnsupportedStatementType(statements.get(0))) {
Expand Down
Loading

0 comments on commit eaeb4a4

Please sign in to comment.