Skip to content

Commit

Permalink
[API] Add endpoint to patch Kafka resource pause annotation
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Oct 9, 2024
1 parent 45f2f5a commit 4f20c6d
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.github.streamshub.console.api;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.validation.ConstraintTarget;
import jakarta.validation.Valid;
import jakarta.ws.rs.BeanParam;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PATCH;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
Expand All @@ -34,12 +38,12 @@
import com.github.streamshub.console.api.support.ListRequestContext;
import com.github.streamshub.console.api.support.StringEnumeration;

import io.xlate.validation.constraints.Expression;

@Path("/api/kafkas")
@Tag(name = "Kafka Cluster Resources")
public class KafkaClustersResource {

static final String FIELDS_PARAM = "fields[kafkas]";

@Inject
UriInfo uriInfo;

Expand All @@ -56,14 +60,14 @@ public class KafkaClustersResource {

@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(KafkaCluster.ListResponse.class)
@APIResponseSchema(KafkaCluster.KafkaClusterDataList.class)
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
public Response listClusters(
@QueryParam(FIELDS_PARAM)
@QueryParam(KafkaCluster.FIELDS_PARAM)
@DefaultValue(KafkaCluster.Fields.LIST_DEFAULT)
@StringEnumeration(
source = FIELDS_PARAM,
source = KafkaCluster.FIELDS_PARAM,
allowedValues = {
KafkaCluster.Fields.NAME,
KafkaCluster.Fields.NAMESPACE,
Expand Down Expand Up @@ -105,15 +109,15 @@ public Response listClusters(

ListRequestContext<KafkaCluster> listSupport = new ListRequestContext<>(KafkaCluster.Fields.COMPARATOR_BUILDER, uriInfo.getRequestUri(), listParams, KafkaCluster::fromCursor);
var clusterList = clusterService.listClusters(listSupport);
var responseEntity = new KafkaCluster.ListResponse(clusterList, listSupport);
var responseEntity = new KafkaCluster.KafkaClusterDataList(clusterList, listSupport);

return Response.ok(responseEntity).build();
}

@GET
@Path("{clusterId}")
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(KafkaCluster.SingleResponse.class)
@APIResponseSchema(KafkaCluster.KafkaClusterData.class)
@APIResponse(responseCode = "404", ref = "NotFound")
@APIResponse(responseCode = "500", ref = "ServerError")
@APIResponse(responseCode = "504", ref = "ServerTimeout")
Expand All @@ -122,10 +126,10 @@ public CompletionStage<Response> describeCluster(
@PathParam("clusterId")
String clusterId,

@QueryParam(FIELDS_PARAM)
@QueryParam(KafkaCluster.FIELDS_PARAM)
@DefaultValue(KafkaCluster.Fields.DESCRIBE_DEFAULT)
@StringEnumeration(
source = FIELDS_PARAM,
source = KafkaCluster.FIELDS_PARAM,
allowedValues = {
KafkaCluster.Fields.NAME,
KafkaCluster.Fields.NAMESPACE,
Expand Down Expand Up @@ -169,9 +173,41 @@ public CompletionStage<Response> describeCluster(
requestedFields.accept(fields);

return clusterService.describeCluster(fields)
.thenApply(KafkaCluster.SingleResponse::new)
.thenApply(KafkaCluster.KafkaClusterData::new)
.thenApply(Response::ok)
.thenApply(Response.ResponseBuilder::build);
}


@Path("{clusterId}")
@PATCH
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@APIResponseSchema(responseCode = "200", value = KafkaCluster.KafkaClusterData.class)
@Expression(
targetName = "args",
// Only check when the request body Id is present (separately checked for @NotNull)
when = "args[2].data.id != null",
// Verify the Id in the request body matches the Id in the URL
value = "args[1].equals(args[2].data.id)",
message = "resource ID conflicts with operation URL",
node = { "data", "id" },
payload = ErrorCategory.InvalidResource.class,
validationAppliesTo = ConstraintTarget.PARAMETERS)
public Response patchCluster(
@Parameter(description = "Cluster identifier")
@PathParam("clusterId")
String clusterId,

@Valid
KafkaCluster.KafkaClusterData clusterData) {

// Return all fields
requestedFields.accept(Arrays.asList(KafkaCluster.Fields.DESCRIBE_DEFAULT.split(",\\s*")));

var result = clusterService.patchCluster(clusterId, clusterData.getData());
var responseEntity = new KafkaCluster.KafkaClusterData(result);

return Response.ok(responseEntity).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,50 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import jakarta.json.JsonObject;

import org.eclipse.microprofile.openapi.annotations.media.Schema;
import org.eclipse.microprofile.openapi.annotations.media.SchemaProperty;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.streamshub.console.api.support.ComparatorBuilder;
import com.github.streamshub.console.api.support.ErrorCategory;
import com.github.streamshub.console.api.support.ListRequestContext;

import io.xlate.validation.constraints.Expression;

import static java.util.Comparator.comparing;
import static java.util.Comparator.nullsLast;

@Schema(name = "KafkaCluster")
@Schema(
name = "KafkaCluster",
properties = {
@SchemaProperty(name = "type", enumeration = KafkaCluster.API_TYPE),
@SchemaProperty(name = "meta", implementation = KafkaCluster.Meta.class)
})
@Expression(
value = "self.id != null",
message = "resource ID is required",
node = "id",
payload = ErrorCategory.InvalidResource.class)
@Expression(
when = "self.type != null",
value = "self.type == '" + KafkaCluster.API_TYPE + "'",
message = "resource type conflicts with operation",
node = "type",
payload = ErrorCategory.ResourceConflict.class)
public class KafkaCluster extends Resource<KafkaCluster.Attributes> implements PaginatedKubeResource {

public static final String API_TYPE = "kafkas";
public static final String FIELDS_PARAM = "fields[" + API_TYPE + "]";

public static class Fields {
public static final String NAME = "name";
public static final String NAMESPACE = "namespace";
Expand Down Expand Up @@ -86,9 +113,9 @@ public static Comparator<KafkaCluster> comparator(String fieldName, boolean desc
}
}

@Schema(name = "KafkaClusterListResponse")
public static final class ListResponse extends DataList<KafkaCluster> {
public ListResponse(List<KafkaCluster> data, ListRequestContext<KafkaCluster> listSupport) {
@Schema(name = "KafkaClusterDataList")
public static final class KafkaClusterDataList extends DataList<KafkaCluster> {
public KafkaClusterDataList(List<KafkaCluster> data, ListRequestContext<KafkaCluster> listSupport) {
super(data.stream()
.map(entry -> {
entry.addMeta("page", listSupport.buildPageMeta(entry::toCursor));
Expand All @@ -100,13 +127,26 @@ public ListResponse(List<KafkaCluster> data, ListRequestContext<KafkaCluster> li
}
}

@Schema(name = "KafkaClusterResponse")
public static final class SingleResponse extends DataSingleton<KafkaCluster> {
public SingleResponse(KafkaCluster data) {
@Schema(name = "KafkaClusterData")
public static final class KafkaClusterData extends DataSingleton<KafkaCluster> {
@JsonCreator
public KafkaClusterData(@JsonProperty("data") KafkaCluster data) {
super(data);
}
}

@Schema(name = "KafkaClusterMeta", additionalProperties = Object.class)
@JsonInclude(value = Include.NON_NULL)
static final class Meta extends JsonApiMeta {
@JsonProperty
@Schema(description = """
Indicates whether reconciliation is paused for the associated Kafka \
custom resource. This value is optional and will not be present when \
the Kafka cluster is not (known) to be managed by Strimzi.
""")
Boolean reconciliationPaused;
}

@JsonFilter("fieldFilter")
static class Attributes {
@JsonProperty
Expand Down Expand Up @@ -153,15 +193,35 @@ static class Attributes {
}

public KafkaCluster(String id, List<Node> nodes, Node controller, List<String> authorizedOperations) {
super(id, "kafkas", new Attributes(nodes, controller, authorizedOperations));
super(id, API_TYPE, Meta::new, new Attributes(nodes, controller, authorizedOperations));
}

@JsonCreator
public KafkaCluster(String id, String type, Attributes attributes, Meta meta) {
super(id, type, meta, attributes);
}

public static KafkaCluster fromId(String id) {
return new KafkaCluster(id, (List<Node>) null, (Node) null, (List<String>) null);
}

/**
* Constructs a "cursor" Topic from the encoded string representation of the subset
* of Topic fields used to compare entities for pagination/sorting.
*/
public static KafkaCluster fromCursor(JsonObject cursor) {
return PaginatedKubeResource.fromCursor(cursor, id -> new KafkaCluster(id, null, null, null));
return PaginatedKubeResource.fromCursor(cursor, KafkaCluster::fromId);
}

public void reconciliationPaused(Boolean reconciliationPaused) {
((Meta) getOrCreateMeta()).reconciliationPaused = reconciliationPaused;
}

public Boolean reconciliationPaused() {
return Optional.ofNullable(getMeta())
.map(Meta.class::cast)
.map(meta -> meta.reconciliationPaused)
.orElse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
Expand All @@ -35,8 +36,10 @@
import com.github.streamshub.console.config.ConsoleConfig;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.strimzi.api.ResourceAnnotations;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
Expand All @@ -56,6 +59,9 @@ public class KafkaClusterService {
@Inject
Logger logger;

@Inject
KubernetesClient client;

/**
* ThreadContext of the request thread. This is used to execute asynchronous
* tasks to allow access to request-scoped beans.
Expand Down Expand Up @@ -104,7 +110,7 @@ public List<KafkaCluster> listClusters(ListRequestContext<KafkaCluster> listSupp
.filter(k -> Objects.equals(k.namespace(), config.getNamespace()))
.map(k -> addKafkaContextData(k, ctx.getValue()))
.findFirst()
.orElseGet(() -> addKafkaContextData(new KafkaCluster(id, null, null, null), ctx.getValue()));
.orElseGet(() -> addKafkaContextData(KafkaCluster.fromId(id), ctx.getValue()));
})
.collect(Collectors.toMap(KafkaCluster::getId, Function.identity()));

Expand Down Expand Up @@ -144,8 +150,28 @@ public CompletionStage<KafkaCluster> describeCluster(List<String> fields) {
.thenApply(this::setManaged);
}

public KafkaCluster patchCluster(String id, KafkaCluster cluster) {
Kafka resource = kafkaContext.resource();

if (resource != null) {
var annotations = resource.getMetadata().getAnnotations();
var paused = cluster.reconciliationPaused();

if (paused != null) {
annotations.put(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION, paused.toString());
} else {
annotations.remove(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION);
}

resource = client.resource(resource).patch();
return toKafkaCluster(resource);
} else {
throw new BadRequestException("Kafka cluster is not associated with a Strimzi Kafka resource");
}
}

KafkaCluster toKafkaCluster(Kafka kafka) {
KafkaCluster cluster = new KafkaCluster(kafka.getStatus().getClusterId(), null, null, null);
KafkaCluster cluster = KafkaCluster.fromId(kafka.getStatus().getClusterId());
setKafkaClusterProperties(cluster, kafka);

// Identify that the cluster is configured with connection information
Expand Down Expand Up @@ -205,9 +231,17 @@ KafkaCluster addKafkaResourceData(KafkaCluster cluster) {
}

void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) {
cluster.name(kafka.getMetadata().getName());
cluster.namespace(kafka.getMetadata().getNamespace());
cluster.creationTimestamp(kafka.getMetadata().getCreationTimestamp());
ObjectMeta kafkaMeta = kafka.getMetadata();
cluster.name(kafkaMeta.getName());
cluster.namespace(kafkaMeta.getNamespace());
cluster.creationTimestamp(kafkaMeta.getCreationTimestamp());
Optional.ofNullable(kafkaMeta.getAnnotations()).ifPresent(annotations -> {
String paused = annotations.get(ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION);

if (paused != null) {
cluster.reconciliationPaused(Boolean.parseBoolean(paused));
}
});

var comparator = Comparator
.comparingInt((GenericKafkaListener listener) ->
Expand Down
Loading

0 comments on commit 4f20c6d

Please sign in to comment.