diff --git a/api/src/main/java/com/github/eyefloaters/console/api/RecordsResource.java b/api/src/main/java/com/github/eyefloaters/console/api/RecordsResource.java index ff65b694e..905dd1824 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/RecordsResource.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/RecordsResource.java @@ -16,10 +16,12 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.CacheControl; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.UriBuilder; import jakarta.ws.rs.core.UriInfo; +import jakarta.ws.rs.ext.RuntimeDelegate; import jakarta.ws.rs.core.Response.Status; import org.eclipse.microprofile.openapi.annotations.Operation; @@ -95,7 +97,8 @@ public Response consumeRecords( KafkaRecord.Fields.TIMESTAMP_TYPE, KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, - KafkaRecord.Fields.VALUE + KafkaRecord.Fields.VALUE, + KafkaRecord.Fields.SIZE }, payload = ErrorCategory.InvalidQueryParameter.class) @Parameter( @@ -112,15 +115,16 @@ public Response consumeRecords( KafkaRecord.Fields.TIMESTAMP_TYPE, KafkaRecord.Fields.HEADERS, KafkaRecord.Fields.KEY, - KafkaRecord.Fields.VALUE + KafkaRecord.Fields.VALUE, + KafkaRecord.Fields.SIZE })) List fields) { requestedFields.accept(fields); var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength()); - return Response.ok(new KafkaRecord.ListResponse(result)).build(); - + CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store"); + return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build(); } @POST diff --git a/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaRecord.java b/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaRecord.java index 80fd7ecee..9f275fa14 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaRecord.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaRecord.java @@ -31,6 +31,7 @@ public static final class Fields { public static final String HEADERS = "headers"; public static final String KEY = "key"; public static final String VALUE = "value"; + public static final String SIZE = "size"; public static final String DEFAULT = PARTITION + @@ -39,7 +40,8 @@ public static final class Fields { ", " + TIMESTAMP_TYPE + ", " + HEADERS + ", " + KEY + - ", " + VALUE; + ", " + VALUE + + ", " + SIZE; public static final List ALL = List.of( PARTITION, @@ -48,7 +50,8 @@ public static final class Fields { TIMESTAMP_TYPE, HEADERS, KEY, - VALUE); + VALUE, + SIZE); private Fields() { // Prevent instances @@ -118,6 +121,9 @@ public RecordResource(KafkaRecord attributes) { @Schema(description = "Record value") String value; + @Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.") + Long size; + public KafkaRecord() { super(); } @@ -126,13 +132,14 @@ public KafkaRecord(String topic) { this.topic = topic; } - public KafkaRecord(String topic, Integer partition, Instant timestamp, Map headers, String key, String value) { + public KafkaRecord(String topic, Integer partition, Instant timestamp, Map headers, String key, String value, Long size) { this(topic); this.partition = partition; this.timestamp = timestamp; this.headers = headers; this.key = key; this.value = value; + this.size = size; } @JsonIgnore @@ -211,4 +218,13 @@ public String getValue() { public void setValue(String value) { this.value = value; } + + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + } diff --git a/api/src/main/java/com/github/eyefloaters/console/api/service/RecordService.java b/api/src/main/java/com/github/eyefloaters/console/api/service/RecordService.java index e224b5c6f..d5a25b6ac 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/service/RecordService.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/service/RecordService.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -30,6 +31,7 @@ import jakarta.inject.Inject; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -121,7 +123,7 @@ public ConsumerRecords next() { if (!hasNext()) { throw new NoSuchElementException(); } - var records = consumer.poll(Duration.ofMillis(100)); + var records = consumer.poll(Duration.between(Instant.now(), timeout)); int pollSize = records.count(); emptyPoll = pollSize == 0; recordsConsumed.addAndGet(pollSize); @@ -227,7 +229,7 @@ CompletionStage topicNameForId(String topicId) { Uuid kafkaTopicId = Uuid.fromString(topicId); return clientSupplier.get() - .listTopics() + .listTopics(new ListTopicsOptions().listInternal(true)) .listings() .toCompletionStage() .thenApply(Collection::stream) @@ -316,6 +318,7 @@ KafkaRecord getItems(ConsumerRecord rec, String topicId, List item.setKey(bytesToString(k, maxValueLength))); setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength))); setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders); + setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize); return item; } @@ -365,6 +368,14 @@ Map headersToMap(Headers headers, Integer maxValueLength) { return headerMap; } + long sizeOf(ConsumerRecord rec) { + return rec.serializedKeySize() + + rec.serializedValueSize() + + Arrays.stream(rec.headers().toArray()) + .mapToLong(h -> h.key().length() + h.value().length) + .sum(); + } + static UnknownTopicIdException noSuchTopic(String topicId) { return new UnknownTopicIdException("No such topic: " + topicId); } diff --git a/ui/api/messages/actions.ts b/ui/api/messages/actions.ts index 2f2c02499..5aa4672ab 100644 --- a/ui/api/messages/actions.ts +++ b/ui/api/messages/actions.ts @@ -28,7 +28,7 @@ export async function getTopicMessages( const sp = new URLSearchParams( filterUndefinedFromObj({ "fields[records]": - "partition,offset,timestamp,timestampType,headers,key,value", + "partition,offset,timestamp,timestampType,headers,key,value,size", "filter[partition]": params.partition, "filter[offset]": params.filter?.type === "offset" diff --git a/ui/api/messages/schema.ts b/ui/api/messages/schema.ts index b4529aa7e..120459baf 100644 --- a/ui/api/messages/schema.ts +++ b/ui/api/messages/schema.ts @@ -10,6 +10,7 @@ export const MessageSchema = z.object({ headers: z.record(z.any()), key: z.string().nullable(), value: z.string().nullable(), + size: z.number().optional(), }), }); export const MessageApiResponse = z.object({ diff --git a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/ColumnsModal.tsx b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/ColumnsModal.tsx index 89a77965c..cf47cf6bd 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/ColumnsModal.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/ColumnsModal.tsx @@ -14,6 +14,7 @@ import { useState } from "react"; export const columns = [ "offset-partition", + "size", "timestamp", "timestampUTC", "key", @@ -29,6 +30,7 @@ export function useColumnLabels() { headers: "Headers", "offset-partition": "Offset", value: "Value", + size: "Size", timestamp: `Timestamp (${timeZone})`, timestampUTC: "Timestamp (UTC)", }; diff --git a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessageDetails.tsx b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessageDetails.tsx index 8c8f80b62..47be6bb27 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessageDetails.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessageDetails.tsx @@ -1,5 +1,6 @@ import { Message } from "@/api/messages/schema"; import { DateTime } from "@/components/DateTime"; +import { Bytes } from "@/components/Bytes"; import { Number } from "@/components/Number"; import { ClipboardCopy, @@ -20,7 +21,9 @@ import { Text, TextContent, TextVariants, + Tooltip, } from "@/libs/patternfly/react-core"; +import { HelpIcon } from "@/libs/patternfly/react-icons"; import { useTranslations } from "next-intl"; import { NoDataCell } from "./NoDataCell"; import { beautifyUnknownValue } from "./utils"; @@ -88,6 +91,17 @@ export function MessageDetailsBody({ + + + {t("field.size")}{" "} + + + + + + + + {t("field.timestamp")} diff --git a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessagesTable.tsx b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessagesTable.tsx index 77be1004b..d54e42e2a 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessagesTable.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessagesTable.tsx @@ -9,6 +9,7 @@ import { FilterGroup } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/mes import { NoResultsEmptyState } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/NoResultsEmptyState"; import { PartitionSelector } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/PartitionSelector"; import { DateTime } from "@/components/DateTime"; +import { Bytes } from "@/components/Bytes"; import { Number } from "@/components/Number"; import { ResponsiveTable } from "@/components/table"; import { @@ -23,15 +24,16 @@ import { ToolbarGroup, ToolbarItem, ToolbarToggleGroup, + Tooltip, } from "@/libs/patternfly/react-core"; -import { FilterIcon } from "@/libs/patternfly/react-icons"; +import { FilterIcon, HelpIcon } from "@/libs/patternfly/react-icons"; import { BaseCellProps, InnerScrollContainer, OuterScrollContainer, TableVariant, } from "@/libs/patternfly/react-table"; -import { useTranslations } from "next-intl"; +import { MessageKeys, useTranslations } from "next-intl"; import { PropsWithChildren, useState } from "react"; import { LimitSelector } from "./LimitSelector"; import { MessageDetails, MessageDetailsProps } from "./MessageDetails"; @@ -41,6 +43,7 @@ import { beautifyUnknownValue, isSameMessage } from "./utils"; const columnWidths: Record = { "offset-partition": 10, + size: 10, key: 15, timestamp: 15, timestampUTC: 15, @@ -108,6 +111,21 @@ export function MessagesTable({ return defaultColumns; })(); + const columnTooltips: Record = { + "offset-partition": undefined, + size: <> + {" "} + + + + , + key: undefined, + timestamp: undefined, + timestampUTC: undefined, + headers: undefined, + value: undefined, + }; + const [selectedColumns, setSelectedColumns] = useState( previouslySelectedColumns, ); @@ -161,6 +179,7 @@ export function MessagesTable({ modifier={"nowrap"} > {columnLabels[column]} + {columnTooltips[column] ?? ""} )} renderCell={({ column, row, colIndex, Td, key }) => { @@ -195,6 +214,12 @@ export function MessagesTable({ ); + case "size": + return ( + + + + ); case "timestamp": return ( diff --git a/ui/messages/en.json b/ui/messages/en.json index 54975721f..dad221047 100644 --- a/ui/messages/en.json +++ b/ui/messages/en.json @@ -109,6 +109,9 @@ "timestamp_toggle_calendar_menu": "Toggle calendar menu", "timestamp_toggle_time_menu": "Toggle time menu" }, + "tooltip": { + "size": "The serialized size of the key and value. Calculated from the key/value/headers and does not include any overhead associated with the record in the physical log segment within Kafka." + }, "limit_label": "Limit", "message": "Message", "no_data_body": "Data will appear shortly after we receive produced messages.",