Skip to content

Commit

Permalink
Default name of TopicName to metadata.name when spec.topicName absent
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Apr 11, 2024
1 parent 40b806d commit 358d27d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicSpec;

@ApplicationScoped
public class InformerFactory {
Expand Down Expand Up @@ -58,7 +59,7 @@ void onStartup(@Observes Startup event) {
topicInformer.addEventHandler(new ResourceEventHandler<KafkaTopic>() {
@Override
public void onAdd(KafkaTopic topic) {
topicMap(topic).ifPresent(map -> map.put(topic.getSpec().getTopicName(), topic));
topicMap(topic).ifPresent(map -> map.put(topicName(topic), topic));
}

@Override
Expand All @@ -69,7 +70,13 @@ public void onUpdate(KafkaTopic oldTopic, KafkaTopic topic) {

@Override
public void onDelete(KafkaTopic topic, boolean deletedFinalStateUnknown) {
topicMap(topic).ifPresent(map -> map.remove(topic.getSpec().getTopicName()));
topicMap(topic).ifPresent(map -> map.remove(topicName(topic)));
}

private static String topicName(KafkaTopic topic) {
return Optional.ofNullable(topic.getSpec())
.map(KafkaTopicSpec::getTopicName)
.orElseGet(() -> topic.getMetadata().getName());
}

Optional<Map<String, KafkaTopic>> topicMap(KafkaTopic topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ void testListTopicsWithNumPartitions() {
void testListTopicsWithManagedTopic() {
String topic1 = "t1-" + UUID.randomUUID().toString();
String topic2 = "t2-" + UUID.randomUUID().toString();
String topic3 = "t3-" + UUID.randomUUID().toString();
topicUtils.createTopics(clusterId1, List.of(topic1), 1);
topicUtils.createTopics(clusterId1, List.of(topic2), 1);

Expand All @@ -722,6 +723,19 @@ void testListTopicsWithManagedTopic() {
.build())
.create();

client.resource(new KafkaTopicBuilder()
.withNewMetadata()
.withName(topic3)
.withNamespace("default")
.withLabels(Map.of("strimzi.io/cluster", clusterName1))
.endMetadata()
.withNewSpec()
// topicName (optional) is not set
.withPartitions(1)
.endSpec()
.build())
.create();

// Wait for the managed topic list to include the topic
await().atMost(10, TimeUnit.SECONDS)
.until(() -> Optional.ofNullable(managedTopics.get("default"))
Expand All @@ -732,9 +746,10 @@ void testListTopicsWithManagedTopic() {
whenRequesting(req -> req.get("", clusterId1))
.assertThat()
.statusCode(is(Status.OK.getStatusCode()))
.body("data.size()", is(2))
.body("data.size()", is(3))
.body("data.find { it.attributes.name == '%s' }.meta.managed".formatted(topic1), is(true))
.body("data.find { it.attributes.name == '%s' }.meta.managed".formatted(topic2), is(false));
.body("data.find { it.attributes.name == '%s' }.meta.managed".formatted(topic2), is(false))
.body("data.find { it.attributes.name == '%s' }.meta.managed".formatted(topic3), is(true));
}

@Test
Expand Down

0 comments on commit 358d27d

Please sign in to comment.