From 34e6eb9aa12ea998870b6557e0577eb7701ee35a Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Mon, 7 Oct 2024 14:12:23 -0400 Subject: [PATCH] [API] Add endpoint to patch Kafka resource pause annotation Signed-off-by: Michael Edgar --- .../console/api/KafkaClustersResource.java | 56 ++++++++++--- .../console/api/model/KafkaCluster.java | 78 ++++++++++++++++--- .../api/service/KafkaClusterService.java | 44 +++++++++-- .../console/api/KafkaClustersResourceIT.java | 62 +++++++++++++++ .../api/KafkaRebalancesResourceIT.java | 5 +- 5 files changed, 219 insertions(+), 26 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java b/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java index d56f7baa0..ee260d9f9 100644 --- a/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java +++ b/api/src/main/java/com/github/streamshub/console/api/KafkaClustersResource.java @@ -1,15 +1,19 @@ package com.github.streamshub.console.api; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import jakarta.inject.Inject; import jakarta.inject.Named; +import jakarta.validation.ConstraintTarget; import jakarta.validation.Valid; import jakarta.ws.rs.BeanParam; +import jakarta.ws.rs.Consumes; import jakarta.ws.rs.DefaultValue; import jakarta.ws.rs.GET; +import jakarta.ws.rs.PATCH; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; @@ -34,12 +38,12 @@ import com.github.streamshub.console.api.support.ListRequestContext; import com.github.streamshub.console.api.support.StringEnumeration; +import io.xlate.validation.constraints.Expression; + @Path("/api/kafkas") @Tag(name = "Kafka Cluster Resources") public class KafkaClustersResource { - static final String FIELDS_PARAM = "fields[kafkas]"; - @Inject UriInfo uriInfo; @@ -56,14 +60,14 @@ public class KafkaClustersResource { @GET @Produces(MediaType.APPLICATION_JSON) - @APIResponseSchema(KafkaCluster.ListResponse.class) + @APIResponseSchema(KafkaCluster.KafkaClusterDataList.class) @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") public Response listClusters( - @QueryParam(FIELDS_PARAM) + @QueryParam(KafkaCluster.FIELDS_PARAM) @DefaultValue(KafkaCluster.Fields.LIST_DEFAULT) @StringEnumeration( - source = FIELDS_PARAM, + source = KafkaCluster.FIELDS_PARAM, allowedValues = { KafkaCluster.Fields.NAME, KafkaCluster.Fields.NAMESPACE, @@ -105,7 +109,7 @@ public Response listClusters( ListRequestContext listSupport = new ListRequestContext<>(KafkaCluster.Fields.COMPARATOR_BUILDER, uriInfo.getRequestUri(), listParams, KafkaCluster::fromCursor); var clusterList = clusterService.listClusters(listSupport); - var responseEntity = new KafkaCluster.ListResponse(clusterList, listSupport); + var responseEntity = new KafkaCluster.KafkaClusterDataList(clusterList, listSupport); return Response.ok(responseEntity).build(); } @@ -113,7 +117,7 @@ public Response listClusters( @GET @Path("{clusterId}") @Produces(MediaType.APPLICATION_JSON) - @APIResponseSchema(KafkaCluster.SingleResponse.class) + @APIResponseSchema(KafkaCluster.KafkaClusterData.class) @APIResponse(responseCode = "404", ref = "NotFound") @APIResponse(responseCode = "500", ref = "ServerError") @APIResponse(responseCode = "504", ref = "ServerTimeout") @@ -122,10 +126,10 @@ public CompletionStage describeCluster( @PathParam("clusterId") String clusterId, - @QueryParam(FIELDS_PARAM) + @QueryParam(KafkaCluster.FIELDS_PARAM) @DefaultValue(KafkaCluster.Fields.DESCRIBE_DEFAULT) @StringEnumeration( - source = FIELDS_PARAM, + source = KafkaCluster.FIELDS_PARAM, allowedValues = { KafkaCluster.Fields.NAME, KafkaCluster.Fields.NAMESPACE, @@ -169,9 +173,41 @@ public CompletionStage describeCluster( requestedFields.accept(fields); return clusterService.describeCluster(fields) - .thenApply(KafkaCluster.SingleResponse::new) + .thenApply(KafkaCluster.KafkaClusterData::new) .thenApply(Response::ok) .thenApply(Response.ResponseBuilder::build); } + + @Path("{clusterId}") + @PATCH + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @APIResponseSchema(responseCode = "200", value = KafkaCluster.KafkaClusterData.class) + @Expression( + targetName = "args", + // Only check when the request body Id is present (separately checked for @NotNull) + when = "args[2].data.id != null", + // Verify the Id in the request body matches the Id in the URL + value = "args[1].equals(args[2].data.id)", + message = "resource ID conflicts with operation URL", + node = { "data", "id" }, + payload = ErrorCategory.InvalidResource.class, + validationAppliesTo = ConstraintTarget.PARAMETERS) + public Response patchCluster( + @Parameter(description = "Cluster identifier") + @PathParam("clusterId") + String clusterId, + + @Valid + KafkaCluster.KafkaClusterData clusterData) { + + // Return all fields + requestedFields.accept(Arrays.asList(KafkaCluster.Fields.DESCRIBE_DEFAULT.split(",\\s*"))); + + var result = clusterService.patchCluster(clusterId, clusterData.getData()); + var responseEntity = new KafkaCluster.KafkaClusterData(result); + + return Response.ok(responseEntity).build(); + } } diff --git a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java index 64795ff0d..d24a1a596 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/KafkaCluster.java @@ -4,23 +4,50 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import jakarta.json.JsonObject; import org.eclipse.microprofile.openapi.annotations.media.Schema; +import org.eclipse.microprofile.openapi.annotations.media.SchemaProperty; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; import com.github.streamshub.console.api.support.ComparatorBuilder; +import com.github.streamshub.console.api.support.ErrorCategory; import com.github.streamshub.console.api.support.ListRequestContext; +import io.xlate.validation.constraints.Expression; + import static java.util.Comparator.comparing; import static java.util.Comparator.nullsLast; -@Schema(name = "KafkaCluster") +@Schema( + name = "KafkaCluster", + properties = { + @SchemaProperty(name = "type", enumeration = KafkaCluster.API_TYPE), + @SchemaProperty(name = "meta", implementation = KafkaCluster.Meta.class) + }) +@Expression( + value = "self.id != null", + message = "resource ID is required", + node = "id", + payload = ErrorCategory.InvalidResource.class) +@Expression( + when = "self.type != null", + value = "self.type == '" + KafkaCluster.API_TYPE + "'", + message = "resource type conflicts with operation", + node = "type", + payload = ErrorCategory.ResourceConflict.class) public class KafkaCluster extends Resource implements PaginatedKubeResource { + public static final String API_TYPE = "kafkas"; + public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]"; + public static class Fields { public static final String NAME = "name"; public static final String NAMESPACE = "namespace"; @@ -86,9 +113,9 @@ public static Comparator comparator(String fieldName, boolean desc } } - @Schema(name = "KafkaClusterListResponse") - public static final class ListResponse extends DataList { - public ListResponse(List data, ListRequestContext listSupport) { + @Schema(name = "KafkaClusterDataList") + public static final class KafkaClusterDataList extends DataList { + public KafkaClusterDataList(List data, ListRequestContext listSupport) { super(data.stream() .map(entry -> { entry.addMeta("page", listSupport.buildPageMeta(entry::toCursor)); @@ -100,13 +127,26 @@ public ListResponse(List data, ListRequestContext li } } - @Schema(name = "KafkaClusterResponse") - public static final class SingleResponse extends DataSingleton { - public SingleResponse(KafkaCluster data) { + @Schema(name = "KafkaClusterData") + public static final class KafkaClusterData extends DataSingleton { + @JsonCreator + public KafkaClusterData(@JsonProperty("data") KafkaCluster data) { super(data); } } + @Schema(name = "KafkaClusterMeta", additionalProperties = Object.class) + @JsonInclude(value = Include.NON_NULL) + static final class Meta extends JsonApiMeta { + @JsonProperty + @Schema(description = """ + Indicates whether reconciliation is paused for the associated Kafka \ + custom resource. This value is optional and will not be present when \ + the Kafka cluster is not (known) to be managed by Strimzi. + """) + Boolean reconciliationPaused; + } + @JsonFilter("fieldFilter") static class Attributes { @JsonProperty @@ -153,7 +193,16 @@ static class Attributes { } public KafkaCluster(String id, List nodes, Node controller, List authorizedOperations) { - super(id, "kafkas", new Attributes(nodes, controller, authorizedOperations)); + super(id, API_TYPE, Meta::new, new Attributes(nodes, controller, authorizedOperations)); + } + + @JsonCreator + public KafkaCluster(String id, String type, Attributes attributes, Meta meta) { + super(id, type, meta, attributes); + } + + public static KafkaCluster fromId(String id) { + return new KafkaCluster(id, (List) null, (Node) null, (List) null); } /** @@ -161,7 +210,18 @@ public KafkaCluster(String id, List nodes, Node controller, List a * of Topic fields used to compare entities for pagination/sorting. */ public static KafkaCluster fromCursor(JsonObject cursor) { - return PaginatedKubeResource.fromCursor(cursor, id -> new KafkaCluster(id, null, null, null)); + return PaginatedKubeResource.fromCursor(cursor, KafkaCluster::fromId); + } + + public void reconciliationPaused(Boolean reconciliationPaused) { + ((Meta) getOrCreateMeta()).reconciliationPaused = reconciliationPaused; + } + + public Boolean reconciliationPaused() { + return Optional.ofNullable(getMeta()) + .map(Meta.class::cast) + .map(meta -> meta.reconciliationPaused) + .orElse(null); } @Override diff --git a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java index 7154add80..2452771a1 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/KafkaClusterService.java @@ -15,6 +15,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import jakarta.ws.rs.BadRequestException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DescribeClusterOptions; @@ -35,8 +36,10 @@ import com.github.streamshub.console.config.ConsoleConfig; import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaStatus; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; @@ -56,6 +59,9 @@ public class KafkaClusterService { @Inject Logger logger; + @Inject + KubernetesClient client; + /** * ThreadContext of the request thread. This is used to execute asynchronous * tasks to allow access to request-scoped beans. @@ -104,7 +110,7 @@ public List listClusters(ListRequestContext listSupp .filter(k -> Objects.equals(k.namespace(), config.getNamespace())) .map(k -> addKafkaContextData(k, ctx.getValue())) .findFirst() - .orElseGet(() -> addKafkaContextData(new KafkaCluster(id, null, null, null), ctx.getValue())); + .orElseGet(() -> addKafkaContextData(KafkaCluster.fromId(id), ctx.getValue())); }) .collect(Collectors.toMap(KafkaCluster::getId, Function.identity())); @@ -144,8 +150,28 @@ public CompletionStage describeCluster(List fields) { .thenApply(this::setManaged); } + public KafkaCluster patchCluster(String id, KafkaCluster cluster) { + Kafka resource = kafkaContext.resource(); + + if (resource != null) { + var annotations = resource.getMetadata().getAnnotations(); + var paused = cluster.reconciliationPaused(); + + if (paused != null) { + annotations.put(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, paused.toString()); + } else { + annotations.remove(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION); + } + + resource = client.resource(resource).patch(); + return toKafkaCluster(resource); + } else { + throw new BadRequestException("Kafka cluster is not associated with a Strimzi Kafka resource"); + } + } + KafkaCluster toKafkaCluster(Kafka kafka) { - KafkaCluster cluster = new KafkaCluster(kafka.getStatus().getClusterId(), null, null, null); + KafkaCluster cluster = KafkaCluster.fromId(kafka.getStatus().getClusterId()); setKafkaClusterProperties(cluster, kafka); // Identify that the cluster is configured with connection information @@ -205,9 +231,17 @@ KafkaCluster addKafkaResourceData(KafkaCluster cluster) { } void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) { - cluster.name(kafka.getMetadata().getName()); - cluster.namespace(kafka.getMetadata().getNamespace()); - cluster.creationTimestamp(kafka.getMetadata().getCreationTimestamp()); + ObjectMeta kafkaMeta = kafka.getMetadata(); + cluster.name(kafkaMeta.getName()); + cluster.namespace(kafkaMeta.getNamespace()); + cluster.creationTimestamp(kafkaMeta.getCreationTimestamp()); + Optional.ofNullable(kafkaMeta.getAnnotations()).ifPresent(annotations -> { + String paused = annotations.get(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION); + + if (paused != null) { + cluster.reconciliationPaused(Boolean.parseBoolean(paused)); + } + }); var comparator = Comparator .comparingInt((GenericKafkaListener listener) -> diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java index aec59c4d1..f902108aa 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaClustersResourceIT.java @@ -21,6 +21,8 @@ import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.json.JsonString; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import org.apache.kafka.clients.CommonClientConfigs; @@ -54,6 +56,7 @@ import io.quarkus.test.junit.QuarkusMock; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustomBuilder; @@ -79,6 +82,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertNull; import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest @@ -728,6 +732,64 @@ void testDescribeClusterWithScram(boolean tls, String expectedProtocol) { containsString(ScramLoginModule.class.getName())); } + @ParameterizedTest + @CsvSource({ + "true", + "false" + }) + void testPatchClusterReconciliationPaused(Boolean paused) { + whenRequesting(req -> req.get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.attributes.name", is("test-kafka1")) + .body("data.meta", not(hasKey("reconciliationPaused"))); + + whenRequesting(req -> req + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .body(Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("id", clusterId1) + .add("type", com.github.streamshub.console.api.model.KafkaCluster.API_TYPE) + .add("meta", Json.createObjectBuilder() + .add("reconciliationPaused", paused)) + .add("attributes", Json.createObjectBuilder())) + .build() + .toString()) + .patch("{clusterId1}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.meta.reconciliationPaused", is(paused)); + + Kafka kafkaCR = client.resources(Kafka.class) + .inNamespace("default") + .withName("test-kafka1") + .get(); + + assertEquals(paused.toString(), kafkaCR.getMetadata().getAnnotations().get(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION)); + + whenRequesting(req -> req + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) + .body(Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("id", clusterId1) + .add("type", com.github.streamshub.console.api.model.KafkaCluster.API_TYPE) + .add("meta", Json.createObjectBuilder()) // reconciliationPaused is omitted + .add("attributes", Json.createObjectBuilder())) + .build() + .toString()) + .patch("{clusterId1}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.meta", not(hasKey("reconciliationPaused"))); + + kafkaCR = client.resources(Kafka.class) + .inNamespace("default") + .withName("test-kafka1") + .get(); + + assertNull(kafkaCR.getMetadata().getAnnotations().get(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION)); + } + // Helper methods static Map mockAdminClient() { diff --git a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java index 6370eabff..d2835be2f 100644 --- a/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java +++ b/api/src/test/java/com/github/streamshub/console/api/KafkaRebalancesResourceIT.java @@ -29,6 +29,7 @@ import io.quarkus.test.common.http.TestHTTPEndpoint; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import io.strimzi.api.ResourceAnnotations; import io.strimzi.api.ResourceLabels; import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaBuilder; @@ -254,7 +255,7 @@ void testPatchRebalanceWithStatusProposalReady() { .withName(name) .get(); - assertEquals("refresh", rebalanceCR.getMetadata().getAnnotations().get("strimzi.io/rebalance")); + assertEquals("refresh", rebalanceCR.getMetadata().getAnnotations().get(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE)); whenRequesting(req -> req .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON) @@ -277,7 +278,7 @@ void testPatchRebalanceWithStatusProposalReady() { .withName(name) .get(); - assertNull(rebalanceCR.getMetadata().getAnnotations().get("strimzi.io/rebalance")); + assertNull(rebalanceCR.getMetadata().getAnnotations().get(ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE)); } @ParameterizedTest