Skip to content

Commit

Permalink
Add message size with tooltip, use variable polling duration (#362)
Browse files Browse the repository at this point in the history
* Add message size with tooltip, use variable polling duration

Signed-off-by: Michael Edgar <[email protected]>

* Update ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/MessageDetails.tsx

Co-authored-by: Riccardo Forina <[email protected]>

---------

Signed-off-by: Michael Edgar <[email protected]>
Co-authored-by: Riccardo Forina <[email protected]>
  • Loading branch information
MikeEdgar and riccardo-forina authored Jan 10, 2024
1 parent 79b896b commit 468d532
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.CacheControl;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import jakarta.ws.rs.ext.RuntimeDelegate;
import jakarta.ws.rs.core.Response.Status;

import org.eclipse.microprofile.openapi.annotations.Operation;
Expand Down Expand Up @@ -95,7 +97,8 @@ public Response consumeRecords(
KafkaRecord.Fields.TIMESTAMP_TYPE,
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -112,15 +115,16 @@ public Response consumeRecords(
KafkaRecord.Fields.TIMESTAMP_TYPE,
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
}))
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

return Response.ok(new KafkaRecord.ListResponse(result)).build();

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static final class Fields {
public static final String HEADERS = "headers";
public static final String KEY = "key";
public static final String VALUE = "value";
public static final String SIZE = "size";

public static final String DEFAULT =
PARTITION +
Expand All @@ -39,7 +40,8 @@ public static final class Fields {
", " + TIMESTAMP_TYPE +
", " + HEADERS +
", " + KEY +
", " + VALUE;
", " + VALUE +
", " + SIZE;

public static final List<String> ALL = List.of(
PARTITION,
Expand All @@ -48,7 +50,8 @@ public static final class Fields {
TIMESTAMP_TYPE,
HEADERS,
KEY,
VALUE);
VALUE,
SIZE);

private Fields() {
// Prevent instances
Expand Down Expand Up @@ -118,6 +121,9 @@ public RecordResource(KafkaRecord attributes) {
@Schema(description = "Record value")
String value;

@Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.")
Long size;

public KafkaRecord() {
super();
}
Expand All @@ -126,13 +132,14 @@ public KafkaRecord(String topic) {
this.topic = topic;
}

public KafkaRecord(String topic, Integer partition, Instant timestamp, Map<String, String> headers, String key, String value) {
public KafkaRecord(String topic, Integer partition, Instant timestamp, Map<String, String> headers, String key, String value, Long size) {
this(topic);
this.partition = partition;
this.timestamp = timestamp;
this.headers = headers;
this.key = key;
this.value = value;
this.size = size;
}

@JsonIgnore
Expand Down Expand Up @@ -211,4 +218,13 @@ public String getValue() {
public void setValue(String value) {
this.value = value;
}

public Long getSize() {
return size;
}

public void setSize(Long size) {
this.size = size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -30,6 +31,7 @@
import jakarta.inject.Inject;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -121,7 +123,7 @@ public ConsumerRecords<byte[], byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
var records = consumer.poll(Duration.ofMillis(100));
var records = consumer.poll(Duration.between(Instant.now(), timeout));
int pollSize = records.count();
emptyPoll = pollSize == 0;
recordsConsumed.addAndGet(pollSize);
Expand Down Expand Up @@ -227,7 +229,7 @@ CompletionStage<String> topicNameForId(String topicId) {
Uuid kafkaTopicId = Uuid.fromString(topicId);

return clientSupplier.get()
.listTopics()
.listTopics(new ListTopicsOptions().listInternal(true))
.listings()
.toCompletionStage()
.thenApply(Collection::stream)
Expand Down Expand Up @@ -316,6 +318,7 @@ KafkaRecord getItems(ConsumerRecord<byte[], byte[]> rec, String topicId, List<St
setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize);

return item;
}
Expand Down Expand Up @@ -365,6 +368,14 @@ Map<String, String> headersToMap(Headers headers, Integer maxValueLength) {
return headerMap;
}

long sizeOf(ConsumerRecord<?, ?> rec) {
return rec.serializedKeySize() +
rec.serializedValueSize() +
Arrays.stream(rec.headers().toArray())
.mapToLong(h -> h.key().length() + h.value().length)
.sum();
}

static UnknownTopicIdException noSuchTopic(String topicId) {
return new UnknownTopicIdException("No such topic: " + topicId);
}
Expand Down
2 changes: 1 addition & 1 deletion ui/api/messages/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function getTopicMessages(
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[records]":
"partition,offset,timestamp,timestampType,headers,key,value",
"partition,offset,timestamp,timestampType,headers,key,value,size",
"filter[partition]": params.partition,
"filter[offset]":
params.filter?.type === "offset"
Expand Down
1 change: 1 addition & 0 deletions ui/api/messages/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const MessageSchema = z.object({
headers: z.record(z.any()),
key: z.string().nullable(),
value: z.string().nullable(),
size: z.number().optional(),
}),
});
export const MessageApiResponse = z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { useState } from "react";

export const columns = [
"offset-partition",
"size",
"timestamp",
"timestampUTC",
"key",
Expand All @@ -29,6 +30,7 @@ export function useColumnLabels() {
headers: "Headers",
"offset-partition": "Offset",
value: "Value",
size: "Size",
timestamp: `Timestamp (${timeZone})`,
timestampUTC: "Timestamp (UTC)",
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Message } from "@/api/messages/schema";
import { DateTime } from "@/components/DateTime";
import { Bytes } from "@/components/Bytes";
import { Number } from "@/components/Number";
import {
ClipboardCopy,
Expand All @@ -20,7 +21,9 @@ import {
Text,
TextContent,
TextVariants,
Tooltip,
} from "@/libs/patternfly/react-core";
import { HelpIcon } from "@/libs/patternfly/react-icons";
import { useTranslations } from "next-intl";
import { NoDataCell } from "./NoDataCell";
import { beautifyUnknownValue } from "./utils";
Expand Down Expand Up @@ -88,6 +91,17 @@ export function MessageDetailsBody({
<Number value={message.attributes.offset} />
</DescriptionListDescription>
</DescriptionListGroup>
<DescriptionListGroup>
<DescriptionListTerm>
{t("field.size")}{" "}
<Tooltip content={t("tooltip.size")}>
<HelpIcon />
</Tooltip>
</DescriptionListTerm>
<DescriptionListDescription>
<Bytes value={message.attributes.size} />
</DescriptionListDescription>
</DescriptionListGroup>
<DescriptionListGroup>
<DescriptionListTerm>{t("field.timestamp")}</DescriptionListTerm>
<DescriptionListDescription>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { FilterGroup } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/mes
import { NoResultsEmptyState } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/NoResultsEmptyState";
import { PartitionSelector } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/PartitionSelector";
import { DateTime } from "@/components/DateTime";
import { Bytes } from "@/components/Bytes";
import { Number } from "@/components/Number";
import { ResponsiveTable } from "@/components/table";
import {
Expand All @@ -23,15 +24,16 @@ import {
ToolbarGroup,
ToolbarItem,
ToolbarToggleGroup,
Tooltip,
} from "@/libs/patternfly/react-core";
import { FilterIcon } from "@/libs/patternfly/react-icons";
import { FilterIcon, HelpIcon } from "@/libs/patternfly/react-icons";
import {
BaseCellProps,
InnerScrollContainer,
OuterScrollContainer,
TableVariant,
} from "@/libs/patternfly/react-table";
import { useTranslations } from "next-intl";
import { MessageKeys, useTranslations } from "next-intl";
import { PropsWithChildren, useState } from "react";
import { LimitSelector } from "./LimitSelector";
import { MessageDetails, MessageDetailsProps } from "./MessageDetails";
Expand All @@ -41,6 +43,7 @@ import { beautifyUnknownValue, isSameMessage } from "./utils";

const columnWidths: Record<Column, BaseCellProps["width"]> = {
"offset-partition": 10,
size: 10,
key: 15,
timestamp: 15,
timestampUTC: 15,
Expand Down Expand Up @@ -108,6 +111,21 @@ export function MessagesTable({
return defaultColumns;
})();

const columnTooltips: Record<Column, any> = {
"offset-partition": undefined,
size: <>
{" "}
<Tooltip content={t("tooltip.size")}>
<HelpIcon />
</Tooltip>
</>,
key: undefined,
timestamp: undefined,
timestampUTC: undefined,
headers: undefined,
value: undefined,
};

const [selectedColumns, setSelectedColumns] = useState<Column[]>(
previouslySelectedColumns,
);
Expand Down Expand Up @@ -161,6 +179,7 @@ export function MessagesTable({
modifier={"nowrap"}
>
{columnLabels[column]}
{columnTooltips[column] ?? ""}
</Th>
)}
renderCell={({ column, row, colIndex, Td, key }) => {
Expand Down Expand Up @@ -195,6 +214,12 @@ export function MessagesTable({
</TextContent>
</Cell>
);
case "size":
return (
<Cell>
<Bytes value={row.attributes.size} />
</Cell>
);
case "timestamp":
return (
<Cell>
Expand Down
3 changes: 3 additions & 0 deletions ui/messages/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
"timestamp_toggle_calendar_menu": "Toggle calendar menu",
"timestamp_toggle_time_menu": "Toggle time menu"
},
"tooltip": {
"size": "The serialized size of the key and value. Calculated from the key/value/headers and does not include any overhead associated with the record in the physical log segment within Kafka."
},
"limit_label": "Limit",
"message": "Message",
"no_data_body": "Data will appear shortly after we receive produced messages.",
Expand Down

0 comments on commit 468d532

Please sign in to comment.