From 0a3c3c821bdae39d3309bdabd2215d90cbbf0253 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 25 Sep 2024 08:35:13 -0400 Subject: [PATCH] Add sessionId and autoApproval to KafkaRebalance Signed-off-by: Michael Edgar --- .../console/api/KafkaRebalancesResource.java | 3 ++ .../console/api/model/KafkaRebalance.java | 52 +++++++++++++++---- .../console/api/model/Resource.java | 13 +++-- .../api/service/KafkaRebalanceService.java | 32 +++++------- .../api/KafkaRebalancesResourceIT.java | 5 +- 5 files changed, 73 insertions(+), 32 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java b/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java index 306e4fef8..a70026149 100644 --- a/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java @@ -87,6 +87,7 @@ public Response listRebalances( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT, KafkaRebalance.Fields.CONDITIONS, }, @@ -113,6 +114,7 @@ public Response listRebalances( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT, KafkaRebalance.Fields.CONDITIONS, })) @@ -184,6 +186,7 @@ public Response patchRebalance( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT)); var result = rebalanceService.patchRebalance(rebalanceId, rebalance.getData()); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java index 80ef38296..8fbb3df3a 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java @@ -1,10 +1,12 @@ package com.github.streamshub.console.api.model; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import jakarta.json.JsonObject; @@ -64,6 +66,7 @@ public static class Fields { public static final String CONCURRENT_LEADER_MOVEMENTS = "concurrentLeaderMovements"; public static final String REPLICATION_THROTTLE = "replicationThrottle"; public static final String REPLICA_MOVEMENT_STRATEGIES = "replicaMovementStrategies"; + public static final String SESSION_ID = "sessionId"; public static final String OPTIMIZATION_RESULT = "optimizationResult"; public static final String CONDITIONS = "conditions"; @@ -135,6 +138,25 @@ public RebalanceData(@JsonProperty("data") KafkaRebalance data) { @Schema(name = "KafkaRebalanceMeta", additionalProperties = Object.class) @JsonInclude(value = Include.NON_NULL) static final class Meta extends JsonApiMeta { + + @JsonProperty + @Schema( + description = """ + Valid values allowed for the meta.action property for this resource, \ + given its current status. + """, + readOnly = true) + List allowedActions = new ArrayList<>(3); + + @JsonProperty + @Schema( + description = """ + Optimization proposal will be auto-approved without the \ + need for manual approval. + """, + readOnly = true) + boolean autoApproval; + @JsonProperty @Schema( description = """ @@ -153,14 +175,6 @@ static final class Meta extends JsonApiMeta { * @see io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation */ String action; - - public String action() { - return action; - } - - public void action(String action) { - this.action = action; - } } @JsonFilter("fieldFilter") @@ -226,11 +240,16 @@ static class Attributes { @Schema(readOnly = true, nullable = true) List replicaMovementStrategies; + @JsonProperty + @Schema(readOnly = true, nullable = true) + String sessionId; + @JsonProperty @Schema(readOnly = true) Map optimizationResult = new HashMap<>(0); @JsonProperty + @Schema(readOnly = true) List conditions; } @@ -251,8 +270,19 @@ public static KafkaRebalance fromCursor(JsonObject cursor) { return PaginatedKubeResource.fromCursor(cursor, KafkaRebalance::new); } + public List allowedActions() { + return ((Meta) getOrCreateMeta()).allowedActions; + } + + public void autoApproval(boolean autoApproval) { + ((Meta) getOrCreateMeta()).autoApproval = autoApproval; + } + public String action() { - return ((Meta) super.getMeta()).action(); + return Optional.ofNullable(getMeta()) + .map(Meta.class::cast) + .map(meta -> meta.action) + .orElse(null); } @Override @@ -345,6 +375,10 @@ public void replicaMovementStrategies(List replicaMovementStrategies) { attributes.replicaMovementStrategies = replicaMovementStrategies; } + public void sessionId(String sessionId) { + attributes.sessionId = sessionId; + } + public Map optimizationResult() { return attributes.optimizationResult; } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java index 4bb4b2316..84beccea5 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/Resource.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/Resource.java @@ -70,15 +70,20 @@ public JsonApiMeta getMeta() { return meta; } + @JsonIgnore + public JsonApiMeta getOrCreateMeta() { + if (meta == null) { + meta = metaFactory.get(); + } + return meta; + } + public Object getMeta(String key) { return meta != null ? meta.get(key) : null; } public Resource addMeta(String key, Object value) { - if (meta == null) { - meta = metaFactory.get(); - } - meta.put(key, value); + getOrCreateMeta().put(key, value); return this; } } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java index 4267d51bd..28ddaf71c 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java @@ -3,7 +3,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -15,7 +14,6 @@ import jakarta.inject.Inject; import jakarta.ws.rs.NotFoundException; -import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; import com.github.streamshub.console.api.model.Condition; @@ -26,6 +24,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.ResourceLabels; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode; @@ -36,18 +35,9 @@ @ApplicationScoped public class KafkaRebalanceService { - private static final String REBALANCE_ANNOTATION = "strimzi.io/rebalance"; - @Inject Logger logger; - /** - * ThreadContext of the request thread. This is used to execute asynchronous - * tasks to allow access to request-scoped beans. - */ - @Inject - ThreadContext threadContext; - @Inject KubernetesClient client; @@ -80,9 +70,9 @@ public KafkaRebalance patchRebalance(String id, KafkaRebalance rebalance) { String action = rebalance.action(); if (action != null) { - annotations.put(REBALANCE_ANNOTATION, action); + annotations.put(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE, action); } else { - annotations.remove(REBALANCE_ANNOTATION); + annotations.remove(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE); } return client.resource(resource).patch(); @@ -120,16 +110,22 @@ KafkaRebalance toKafkaRebalance(io.strimzi.api.kafka.model.rebalance.KafkaRebala rebalance.concurrentLeaderMovements(rebalanceSpec.getConcurrentLeaderMovements()); rebalance.replicationThrottle(rebalanceSpec.getReplicationThrottle()); rebalance.replicaMovementStrategies(rebalanceSpec.getReplicaMovementStrategies()); + + rebalanceStatus.map(KafkaRebalanceStatus::getSessionId) + .ifPresent(rebalance::sessionId); rebalanceStatus.map(KafkaRebalanceStatus::getOptimizationResult) .ifPresent(rebalance.optimizationResult()::putAll); + rebalanceStatus.map(KafkaRebalanceStatus::getConditions) + .map(conditions -> conditions.stream().map(Condition::new).toList()) + .ifPresent(rebalance::conditions); - rebalanceStatus.map(KafkaRebalanceStatus::getConditions).ifPresent(conditions -> - rebalance.conditions(conditions.stream().map(Condition::new).toList())); + var annotations = resource.getMetadata().getAnnotations(); + var autoApproval = Boolean.parseBoolean(annotations.get(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL)); + rebalance.autoApproval(autoApproval); - rebalance.addMeta("allowedActions", state - .map(KafkaRebalanceState::getValidAnnotations) + state.map(KafkaRebalanceState::getValidAnnotations) .map(allowed -> allowed.stream().map(Enum::name).toList()) - .orElseGet(Collections::emptyList)); + .ifPresent(rebalance.allowedActions()::addAll); return rebalance; } diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java index 913ac5002..6370eabff 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java @@ -40,6 +40,7 @@ import static com.github.streamshub.console.test.TestHelper.whenRequesting; import static java.util.Comparator.nullsLast; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -170,7 +171,9 @@ void testListRebalancesIncludesAllowedActions() { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", not(emptyArray())) - .body("data.findAll { it }.collect { it.meta }", everyItem(hasKey("allowedActions"))); + .body("data.findAll { it }.collect { it.meta }", everyItem(allOf( + hasKey("allowedActions"), + hasKey("autoApproval")))); } @Test