diff --git a/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java b/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java index 306e4fef8..a70026149 100644 --- a/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/KafkaRebalancesResource.java @@ -87,6 +87,7 @@ public Response listRebalances( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT, KafkaRebalance.Fields.CONDITIONS, }, @@ -113,6 +114,7 @@ public Response listRebalances( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT, KafkaRebalance.Fields.CONDITIONS, })) @@ -184,6 +186,7 @@ public Response patchRebalance( KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS, KafkaRebalance.Fields.REPLICATION_THROTTLE, KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES, + KafkaRebalance.Fields.SESSION_ID, KafkaRebalance.Fields.OPTIMIZATION_RESULT)); var result = rebalanceService.patchRebalance(rebalanceId, rebalance.getData()); diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java index 80ef38296..8b61cddd8 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaRebalance.java @@ -1,5 +1,6 @@ package com.github.streamshub.console.api.model; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -64,6 +65,7 @@ public static class Fields { public static final String CONCURRENT_LEADER_MOVEMENTS = "concurrentLeaderMovements"; public static final String REPLICATION_THROTTLE = "replicationThrottle"; public static final String REPLICA_MOVEMENT_STRATEGIES = "replicaMovementStrategies"; + public static final String SESSION_ID = "sessionId"; public static final String OPTIMIZATION_RESULT = "optimizationResult"; public static final String CONDITIONS = "conditions"; @@ -135,6 +137,25 @@ public RebalanceData(@JsonProperty("data") KafkaRebalance data) { @Schema(name = "KafkaRebalanceMeta", additionalProperties = Object.class) @JsonInclude(value = Include.NON_NULL) static final class Meta extends JsonApiMeta { + + @JsonProperty + @Schema( + description = """ + Valid values allowed for the meta.action property for this resource, \ + given its current status. + """, + readOnly = true) + List allowedActions = new ArrayList<>(3); + + @JsonProperty + @Schema( + description = """ + Optimization proposal will be auto-approved without the \ + need for manual approval. + """, + readOnly = true) + boolean autoApproval; + @JsonProperty @Schema( description = """ @@ -153,14 +174,6 @@ static final class Meta extends JsonApiMeta { * @see io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation */ String action; - - public String action() { - return action; - } - - public void action(String action) { - this.action = action; - } } @JsonFilter("fieldFilter") @@ -226,11 +239,16 @@ static class Attributes { @Schema(readOnly = true, nullable = true) List replicaMovementStrategies; + @JsonProperty + @Schema(readOnly = true, nullable = true) + String sessionId; + @JsonProperty @Schema(readOnly = true) Map optimizationResult = new HashMap<>(0); @JsonProperty + @Schema(readOnly = true) List conditions; } @@ -251,8 +269,21 @@ public static KafkaRebalance fromCursor(JsonObject cursor) { return PaginatedKubeResource.fromCursor(cursor, KafkaRebalance::new); } + @Override + public Meta getMeta() { + return ((Meta) super.getMeta()); + } + + public List allowedActions() { + return getMeta().allowedActions; + } + + public void autoApproval(boolean autoApproval) { + getMeta().autoApproval = autoApproval; + } + public String action() { - return ((Meta) super.getMeta()).action(); + return getMeta().action; } @Override @@ -345,6 +376,10 @@ public void replicaMovementStrategies(List replicaMovementStrategies) { attributes.replicaMovementStrategies = replicaMovementStrategies; } + public void sessionId(String sessionId) { + attributes.sessionId = sessionId; + } + public Map optimizationResult() { return attributes.optimizationResult; } diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java index 4267d51bd..3c85da6e6 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaRebalanceService.java @@ -15,7 +15,6 @@ import jakarta.inject.Inject; import jakarta.ws.rs.NotFoundException; -import org.eclipse.microprofile.context.ThreadContext; import org.jboss.logging.Logger; import com.github.streamshub.console.api.model.Condition; @@ -26,6 +25,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.ResourceLabels; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode; @@ -36,18 +36,9 @@ @ApplicationScoped public class KafkaRebalanceService { - private static final String REBALANCE_ANNOTATION = "strimzi.io/rebalance"; - @Inject Logger logger; - /** - * ThreadContext of the request thread. This is used to execute asynchronous - * tasks to allow access to request-scoped beans. - */ - @Inject - ThreadContext threadContext; - @Inject KubernetesClient client; @@ -80,9 +71,9 @@ public KafkaRebalance patchRebalance(String id, KafkaRebalance rebalance) { String action = rebalance.action(); if (action != null) { - annotations.put(REBALANCE_ANNOTATION, action); + annotations.put(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE, action); } else { - annotations.remove(REBALANCE_ANNOTATION); + annotations.remove(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE); } return client.resource(resource).patch(); @@ -120,16 +111,22 @@ KafkaRebalance toKafkaRebalance(io.strimzi.api.kafka.model.rebalance.KafkaRebala rebalance.concurrentLeaderMovements(rebalanceSpec.getConcurrentLeaderMovements()); rebalance.replicationThrottle(rebalanceSpec.getReplicationThrottle()); rebalance.replicaMovementStrategies(rebalanceSpec.getReplicaMovementStrategies()); + + rebalanceStatus.map(KafkaRebalanceStatus::getSessionId) + .ifPresent(rebalance::sessionId); rebalanceStatus.map(KafkaRebalanceStatus::getOptimizationResult) .ifPresent(rebalance.optimizationResult()::putAll); + rebalanceStatus.map(KafkaRebalanceStatus::getConditions) + .map(conditions -> conditions.stream().map(Condition::new).toList()) + .ifPresent(rebalance::conditions); - rebalanceStatus.map(KafkaRebalanceStatus::getConditions).ifPresent(conditions -> - rebalance.conditions(conditions.stream().map(Condition::new).toList())); + var annotations = resource.getMetadata().getAnnotations(); + var autoApproval = Boolean.parseBoolean(annotations.get(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL)); + rebalance.autoApproval(autoApproval); - rebalance.addMeta("allowedActions", state - .map(KafkaRebalanceState::getValidAnnotations) + state.map(KafkaRebalanceState::getValidAnnotations) .map(allowed -> allowed.stream().map(Enum::name).toList()) - .orElseGet(Collections::emptyList)); + .ifPresent(rebalance.allowedActions()::addAll); return rebalance; } diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java index 913ac5002..6370eabff 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java @@ -40,6 +40,7 @@ import static com.github.streamshub.console.test.TestHelper.whenRequesting; import static java.util.Comparator.nullsLast; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -170,7 +171,9 @@ void testListRebalancesIncludesAllowedActions() { .assertThat() .statusCode(is(Status.OK.getStatusCode())) .body("data", not(emptyArray())) - .body("data.findAll { it }.collect { it.meta }", everyItem(hasKey("allowedActions"))); + .body("data.findAll { it }.collect { it.meta }", everyItem(allOf( + hasKey("allowedActions"), + hasKey("autoApproval")))); } @Test