Skip to content

Commit

Permalink
Add sessionId and autoApproval to KafkaRebalance
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Sep 27, 2024
1 parent 3532f35 commit c2c6288
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public Response listRebalances(
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT,
KafkaRebalance.Fields.CONDITIONS,
},
Expand All @@ -113,6 +114,7 @@ public Response listRebalances(
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT,
KafkaRebalance.Fields.CONDITIONS,
}))
Expand Down Expand Up @@ -184,6 +186,7 @@ public Response patchRebalance(
KafkaRebalance.Fields.CONCURRENT_LEADER_MOVEMENTS,
KafkaRebalance.Fields.REPLICATION_THROTTLE,
KafkaRebalance.Fields.REPLICA_MOVEMENT_STRATEGIES,
KafkaRebalance.Fields.SESSION_ID,
KafkaRebalance.Fields.OPTIMIZATION_RESULT));

var result = rebalanceService.patchRebalance(rebalanceId, rebalance.getData());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.github.streamshub.console.api.model;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import jakarta.json.JsonObject;

Expand Down Expand Up @@ -64,6 +66,7 @@ public static class Fields {
public static final String CONCURRENT_LEADER_MOVEMENTS = "concurrentLeaderMovements";
public static final String REPLICATION_THROTTLE = "replicationThrottle";
public static final String REPLICA_MOVEMENT_STRATEGIES = "replicaMovementStrategies";
public static final String SESSION_ID = "sessionId";
public static final String OPTIMIZATION_RESULT = "optimizationResult";
public static final String CONDITIONS = "conditions";

Expand Down Expand Up @@ -135,6 +138,25 @@ public RebalanceData(@JsonProperty("data") KafkaRebalance data) {
@Schema(name = "KafkaRebalanceMeta", additionalProperties = Object.class)
@JsonInclude(value = Include.NON_NULL)
static final class Meta extends JsonApiMeta {

@JsonProperty
@Schema(
description = """
Valid values allowed for the meta.action property for this resource, \
given its current status.
""",
readOnly = true)
List<String> allowedActions = new ArrayList<>(3);

@JsonProperty
@Schema(
description = """
Optimization proposal will be auto-approved without the \
need for manual approval.
""",
readOnly = true)
boolean autoApproval;

@JsonProperty
@Schema(
description = """
Expand All @@ -153,14 +175,6 @@ static final class Meta extends JsonApiMeta {
* @see io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation
*/
String action;

public String action() {
return action;
}

public void action(String action) {
this.action = action;
}
}

@JsonFilter("fieldFilter")
Expand Down Expand Up @@ -226,11 +240,16 @@ static class Attributes {
@Schema(readOnly = true, nullable = true)
List<String> replicaMovementStrategies;

@JsonProperty
@Schema(readOnly = true, nullable = true)
String sessionId;

@JsonProperty
@Schema(readOnly = true)
Map<String, Object> optimizationResult = new HashMap<>(0);

@JsonProperty
@Schema(readOnly = true)
List<Condition> conditions;
}

Expand All @@ -251,8 +270,19 @@ public static KafkaRebalance fromCursor(JsonObject cursor) {
return PaginatedKubeResource.fromCursor(cursor, KafkaRebalance::new);
}

public List<String> allowedActions() {
return ((Meta) getOrCreateMeta()).allowedActions;
}

public void autoApproval(boolean autoApproval) {
((Meta) getOrCreateMeta()).autoApproval = autoApproval;
}

public String action() {
return ((Meta) super.getMeta()).action();
return Optional.ofNullable(getMeta())
.map(Meta.class::cast)
.map(meta -> meta.action)
.orElse(null);
}

@Override
Expand Down Expand Up @@ -345,6 +375,10 @@ public void replicaMovementStrategies(List<String> replicaMovementStrategies) {
attributes.replicaMovementStrategies = replicaMovementStrategies;
}

public void sessionId(String sessionId) {
attributes.sessionId = sessionId;
}

public Map<String, Object> optimizationResult() {
return attributes.optimizationResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,20 @@ public JsonApiMeta getMeta() {
return meta;
}

@JsonIgnore
public JsonApiMeta getOrCreateMeta() {
if (meta == null) {
meta = metaFactory.get();
}
return meta;
}

public Object getMeta(String key) {
return meta != null ? meta.get(key) : null;
}

public Resource<T> addMeta(String key, Object value) {
if (meta == null) {
meta = metaFactory.get();
}
meta.put(key, value);
getOrCreateMeta().put(key, value);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -15,7 +14,6 @@
import jakarta.inject.Inject;
import jakarta.ws.rs.NotFoundException;

import org.eclipse.microprofile.context.ThreadContext;
import org.jboss.logging.Logger;

import com.github.streamshub.console.api.model.Condition;
Expand All @@ -26,6 +24,7 @@

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.strimzi.api.ResourceAnnotations;
import io.strimzi.api.ResourceLabels;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode;
Expand All @@ -36,18 +35,9 @@
@ApplicationScoped
public class KafkaRebalanceService {

private static final String REBALANCE_ANNOTATION = "strimzi.io/rebalance";

@Inject
Logger logger;

/**
* ThreadContext of the request thread. This is used to execute asynchronous
* tasks to allow access to request-scoped beans.
*/
@Inject
ThreadContext threadContext;

@Inject
KubernetesClient client;

Expand Down Expand Up @@ -80,9 +70,9 @@ public KafkaRebalance patchRebalance(String id, KafkaRebalance rebalance) {
String action = rebalance.action();

if (action != null) {
annotations.put(REBALANCE_ANNOTATION, action);
annotations.put(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE, action);
} else {
annotations.remove(REBALANCE_ANNOTATION);
annotations.remove(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE);
}

return client.resource(resource).patch();
Expand Down Expand Up @@ -120,16 +110,22 @@ KafkaRebalance toKafkaRebalance(io.strimzi.api.kafka.model.rebalance.KafkaRebala
rebalance.concurrentLeaderMovements(rebalanceSpec.getConcurrentLeaderMovements());
rebalance.replicationThrottle(rebalanceSpec.getReplicationThrottle());
rebalance.replicaMovementStrategies(rebalanceSpec.getReplicaMovementStrategies());

rebalanceStatus.map(KafkaRebalanceStatus::getSessionId)
.ifPresent(rebalance::sessionId);
rebalanceStatus.map(KafkaRebalanceStatus::getOptimizationResult)
.ifPresent(rebalance.optimizationResult()::putAll);
rebalanceStatus.map(KafkaRebalanceStatus::getConditions)
.map(conditions -> conditions.stream().map(Condition::new).toList())
.ifPresent(rebalance::conditions);

rebalanceStatus.map(KafkaRebalanceStatus::getConditions).ifPresent(conditions ->
rebalance.conditions(conditions.stream().map(Condition::new).toList()));
var annotations = resource.getMetadata().getAnnotations();
var autoApproval = Boolean.parseBoolean(annotations.get(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL));
rebalance.autoApproval(autoApproval);

rebalance.addMeta("allowedActions", state
.map(KafkaRebalanceState::getValidAnnotations)
state.map(KafkaRebalanceState::getValidAnnotations)
.map(allowed -> allowed.stream().map(Enum::name).toList())
.orElseGet(Collections::emptyList));
.ifPresent(rebalance.allowedActions()::addAll);

return rebalance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import static com.github.streamshub.console.test.TestHelper.whenRequesting;
import static java.util.Comparator.nullsLast;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand Down Expand Up @@ -170,7 +171,9 @@ void testListRebalancesIncludesAllowedActions() {
.assertThat()
.statusCode(is(Status.OK.getStatusCode()))
.body("data", not(emptyArray()))
.body("data.findAll { it }.collect { it.meta }", everyItem(hasKey("allowedActions")));
.body("data.findAll { it }.collect { it.meta }", everyItem(allOf(
hasKey("allowedActions"),
hasKey("autoApproval"))));
}

@Test
Expand Down

0 comments on commit c2c6288

Please sign in to comment.