Skip to content

Commit cc1c50c

Browse files
committed
Use blank ID during Kafka creation, only fetch leader's storage, test
Signed-off-by: Michael Edgar <[email protected]>
1 parent 2b8eb01 commit cc1c50c

File tree

3 files changed

+77
-22
lines changed

3 files changed

+77
-22
lines changed

api/src/main/java/com/github/eyefloaters/console/api/model/KafkaCluster.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public static KafkaCluster fromCursor(JsonObject cursor) {
177177

178178
public String toCursor(List<String> sortFields) {
179179
JsonObjectBuilder cursor = Json.createObjectBuilder()
180-
.add("id", id == null ? JsonValue.NULL : Json.createValue(id));
180+
.add("id", id == null ? Json.createValue("") : Json.createValue(id));
181181

182182
JsonObjectBuilder attrBuilder = Json.createObjectBuilder();
183183
maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, name);

api/src/main/java/com/github/eyefloaters/console/api/service/TopicService.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import com.github.eyefloaters.console.api.model.OffsetInfo;
5151
import com.github.eyefloaters.console.api.model.PartitionId;
5252
import com.github.eyefloaters.console.api.model.PartitionInfo;
53-
import com.github.eyefloaters.console.api.model.PartitionReplica;
5453
import com.github.eyefloaters.console.api.model.ReplicaLocalStorage;
5554
import com.github.eyefloaters.console.api.model.Topic;
5655
import com.github.eyefloaters.console.api.model.TopicPatch;
@@ -486,7 +485,7 @@ CompletionStage<Void> listOffsets(Admin adminClient, Map<Uuid, Either<Topic, Thr
486485

487486
var pendingOffsets = getRequestOffsetSpecs(offsetSpec)
488487
.stream()
489-
.map(reqOffsetSpec -> topicPartitionReplicas(onlineTopics, topicIds)
488+
.map(reqOffsetSpec -> topicPartitionLeaders(onlineTopics, topicIds)
490489
.keySet()
491490
.stream()
492491
.collect(Collectors.toMap(Function.identity(), ignored -> reqOffsetSpec)))
@@ -513,20 +512,19 @@ List<OffsetSpec> getRequestOffsetSpecs(String offsetSpec) {
513512
}
514513

515514
/**
516-
* Build of map of {@linkplain PartitionId}s to the list of replicas where
517-
* the partitions are placed. Concurrently, a map of topic names to topic
518-
* identifiers is constructed to support cross referencing the
519-
* {@linkplain PartitionId} keys (via {@linkplain PartitionId#topicId()})
520-
* back to the topic's {@linkplain Uuid}. This allows easy access of the topics
521-
* located in the topics map provided to this method and is particularly useful
522-
* for Kafka operations that still require topic name.
515+
* Build of map of {@linkplain PartitionId}s to the partition leader node ID.
516+
* Concurrently, a map of topic names to topic identifiers is constructed to
517+
* support cross referencing the {@linkplain PartitionId} keys (via
518+
* {@linkplain PartitionId#topicId()}) back to the topic's {@linkplain Uuid}.
519+
* This allows easy access of the topics located in the topics map provided to
520+
* this method and is particularly useful for Kafka operations that still
521+
* require topic name.
523522
*
524523
* @param topics map of topics (keyed by Id)
525524
* @param topicIds map of topic names to topic Ids, modified by this method
526-
* @return map of {@linkplain PartitionId}s to the list of replicas where the
527-
* partitions are placed
525+
* @return map of {@linkplain PartitionId}s to the partition leader node ID
528526
*/
529-
Map<PartitionId, List<Integer>> topicPartitionReplicas(Map<Uuid, Either<Topic, Throwable>> topics, Map<String, Uuid> topicIds) {
527+
Map<PartitionId, Integer> topicPartitionLeaders(Map<Uuid, Either<Topic, Throwable>> topics, Map<String, Uuid> topicIds) {
530528
return topics.entrySet()
531529
.stream()
532530
.filter(entry -> entry.getValue().isPrimaryPresent())
@@ -541,8 +539,7 @@ Map<PartitionId, List<Integer>> topicPartitionReplicas(Map<Uuid, Either<Topic, T
541539
.filter(PartitionInfo::online)
542540
.map(partition -> {
543541
var key = new PartitionId(topic.getId(), topic.name(), partition.getPartition());
544-
List<Integer> value = partition.getReplicas().stream().map(PartitionReplica::nodeId).toList();
545-
return Map.entry(key, value);
542+
return Map.entry(key, partition.getLeaderId());
546543
}))
547544
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
548545
}
@@ -609,15 +606,14 @@ Either<OffsetInfo, Throwable> either(ListOffsetsResultInfo result, Throwable err
609606
CompletionStage<Void> describeLogDirs(Admin adminClient, Map<Uuid, Either<Topic, Throwable>> topics) {
610607
Map<String, Uuid> topicIds = new HashMap<>(topics.size());
611608

612-
var topicPartitionReplicas = topicPartitionReplicas(topics, topicIds);
613-
var nodeIds = topicPartitionReplicas.values().stream().flatMap(Collection::stream).distinct().toList();
609+
var topicPartitionReplicas = topicPartitionLeaders(topics, topicIds);
610+
var nodeIds = topicPartitionReplicas.values().stream().distinct().toList();
614611
var logDirs = adminClient.describeLogDirs(nodeIds, new DescribeLogDirsOptions()
615612
.timeoutMs(5000))
616613
.descriptions();
617614

618615
var pendingInfo = topicPartitionReplicas.entrySet()
619616
.stream()
620-
.flatMap(e -> e.getValue().stream().map(node -> Map.entry(e.getKey(), node)))
621617
.map(e -> {
622618
var topicPartition = e.getKey().toKafkaModel();
623619
int nodeId = e.getValue();

api/src/test/java/com/github/eyefloaters/console/api/TopicsResourceIT.java

+63-4
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@
4141
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
4242
import org.apache.kafka.clients.admin.TopicDescription;
4343
import org.apache.kafka.common.KafkaFuture;
44+
import org.apache.kafka.common.Node;
45+
import org.apache.kafka.common.TopicCollection;
4446
import org.apache.kafka.common.TopicCollection.TopicIdCollection;
4547
import org.apache.kafka.common.TopicPartition;
48+
import org.apache.kafka.common.TopicPartitionInfo;
4649
import org.apache.kafka.common.Uuid;
50+
import org.apache.kafka.common.acl.AclOperation;
4751
import org.apache.kafka.common.config.ConfigResource;
4852
import org.apache.kafka.common.errors.ApiException;
4953
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -125,28 +129,29 @@ class TopicsResourceIT {
125129
TopicHelper topicUtils;
126130
ConsumerUtils groupUtils;
127131
String clusterId1;
132+
URI bootstrapServers1;
128133
String clusterId2;
129134
ServerSocket randomSocket;
130135

131136
@BeforeEach
132137
void setup() throws IOException {
133-
URI bootstrapServers = URI.create(deployments.getExternalBootstrapServers());
138+
bootstrapServers1 = URI.create(deployments.getExternalBootstrapServers());
134139
randomSocket = new ServerSocket(0);
135140
URI randomBootstrapServers = URI.create("dummy://localhost:" + randomSocket.getLocalPort());
136141

137-
topicUtils = new TopicHelper(bootstrapServers, config, null);
142+
topicUtils = new TopicHelper(bootstrapServers1, config, null);
138143
topicUtils.deleteAllTopics();
139144

140145
groupUtils = new ConsumerUtils(config, null);
141146

142-
utils = new TestHelper(bootstrapServers, config, null);
147+
utils = new TestHelper(bootstrapServers1, config, null);
143148

144149
clusterId1 = utils.getClusterId();
145150
clusterId2 = UUID.randomUUID().toString();
146151

147152
client.resources(Kafka.class).delete();
148153
client.resources(Kafka.class)
149-
.resource(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers))
154+
.resource(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers1))
150155
.create();
151156
// Second cluster is offline/non-existent
152157
client.resources(Kafka.class)
@@ -955,6 +960,60 @@ void testDescribeTopicWithNoSuchTopic() {
955960
.body("errors.code", contains("4041"));
956961
}
957962

963+
@Test
964+
void testDescribeTopicWithOfflinePartition() {
965+
String topicName = UUID.randomUUID().toString();
966+
Map<String, String> topicIds = topicUtils.createTopics(clusterId1, List.of(topicName), 2);
967+
968+
//int partition, Node leader, List<Node> replicas, List<Node> isr
969+
Node node0 = new Node(0, "node0", bootstrapServers1.getPort());
970+
Node node1 = new Node(1, "node1", bootstrapServers1.getPort());
971+
972+
Answer<DescribeTopicsResult> describeTopicsResult = args -> {
973+
List<TopicPartitionInfo> partitions = List.of(
974+
// Online, 2 replicas, 1 ISR
975+
new TopicPartitionInfo(0, node0, List.of(node0, node1), List.of(node0)),
976+
// Offline, 2 replicas, no ISRs
977+
new TopicPartitionInfo(1, null, List.of(node0, node1), List.of()));
978+
Set<AclOperation> authorizedOperations = Set.of(AclOperation.ALL);
979+
Uuid topicId = Uuid.fromString(topicIds.get(topicName));
980+
981+
var description = KafkaFuture.completedFuture(
982+
new TopicDescription(topicName, false, partitions, authorizedOperations, topicId));
983+
984+
class Result extends DescribeTopicsResult {
985+
Result() {
986+
super(Map.of(topicId, description), null);
987+
}
988+
}
989+
990+
return new Result();
991+
};
992+
993+
AdminClientSpy.install(client -> {
994+
// Mock listOffsets
995+
doAnswer(describeTopicsResult)
996+
.when(client)
997+
.describeTopics(any(TopicCollection.class), any(DescribeTopicsOptions.class));
998+
});
999+
1000+
whenRequesting(req -> req.get("{topicId}", clusterId1, topicIds.get(topicName)))
1001+
.assertThat()
1002+
.statusCode(is(Status.OK.getStatusCode()))
1003+
.body("data.attributes.name", is(topicName))
1004+
.body("data.attributes.status", is("PartiallyOffline"))
1005+
.body("data.attributes.partitions", hasSize(2))
1006+
.body("data.attributes.partitions[0].status", is("UnderReplicated"))
1007+
.body("data.attributes.partitions[0].replicas[0].inSync", is(true))
1008+
.body("data.attributes.partitions[0].replicas[0].localStorage", notNullValue())
1009+
// storage not fetched for followers
1010+
.body("data.attributes.partitions[0].replicas[1].inSync", is(false))
1011+
.body("data.attributes.partitions[0].replicas[1].localStorage", nullValue())
1012+
// Partition 2, offline with no ISRs
1013+
.body("data.attributes.partitions[1].status", is("Offline"))
1014+
.body("data.attributes.partitions[1].replicas.inSync", everyItem(is(false)));
1015+
}
1016+
9581017
@Test
9591018
void testCreateTopicSucceeds() {
9601019
String topicName = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)