Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message size with tooltip, use variable polling duration #362

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.CacheControl;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import jakarta.ws.rs.ext.RuntimeDelegate;
import jakarta.ws.rs.core.Response.Status;

import org.eclipse.microprofile.openapi.annotations.Operation;
Expand Down Expand Up @@ -95,7 +97,8 @@ public Response consumeRecords(
KafkaRecord.Fields.TIMESTAMP_TYPE,
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -112,15 +115,16 @@ public Response consumeRecords(
KafkaRecord.Fields.TIMESTAMP_TYPE,
KafkaRecord.Fields.HEADERS,
KafkaRecord.Fields.KEY,
KafkaRecord.Fields.VALUE
KafkaRecord.Fields.VALUE,
KafkaRecord.Fields.SIZE
}))
List<String> fields) {

requestedFields.accept(fields);
var result = recordService.consumeRecords(topicId, params.getPartition(), params.getOffset(), params.getTimestamp(), params.getLimit(), fields, params.getMaxValueLength());

return Response.ok(new KafkaRecord.ListResponse(result)).build();

CacheControl noStore = RuntimeDelegate.getInstance().createHeaderDelegate(CacheControl.class).fromString("no-store");
return Response.ok(new KafkaRecord.ListResponse(result)).cacheControl(noStore).build();
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static final class Fields {
public static final String HEADERS = "headers";
public static final String KEY = "key";
public static final String VALUE = "value";
public static final String SIZE = "size";

public static final String DEFAULT =
PARTITION +
Expand All @@ -39,7 +40,8 @@ public static final class Fields {
", " + TIMESTAMP_TYPE +
", " + HEADERS +
", " + KEY +
", " + VALUE;
", " + VALUE +
", " + SIZE;

public static final List<String> ALL = List.of(
PARTITION,
Expand All @@ -48,7 +50,8 @@ public static final class Fields {
TIMESTAMP_TYPE,
HEADERS,
KEY,
VALUE);
VALUE,
SIZE);

private Fields() {
// Prevent instances
Expand Down Expand Up @@ -118,6 +121,9 @@ public RecordResource(KafkaRecord attributes) {
@Schema(description = "Record value")
String value;

@Schema(readOnly = true, description = "Size of the uncompressed record, not including the overhead of the record in the log segment.")
Long size;

public KafkaRecord() {
super();
}
Expand All @@ -126,13 +132,14 @@ public KafkaRecord(String topic) {
this.topic = topic;
}

public KafkaRecord(String topic, Integer partition, Instant timestamp, Map<String, String> headers, String key, String value) {
public KafkaRecord(String topic, Integer partition, Instant timestamp, Map<String, String> headers, String key, String value, Long size) {
this(topic);
this.partition = partition;
this.timestamp = timestamp;
this.headers = headers;
this.key = key;
this.value = value;
this.size = size;
}

@JsonIgnore
Expand Down Expand Up @@ -211,4 +218,13 @@ public String getValue() {
public void setValue(String value) {
this.value = value;
}

public Long getSize() {
return size;
}

public void setSize(Long size) {
this.size = size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -30,6 +31,7 @@
import jakarta.inject.Inject;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -121,7 +123,7 @@ public ConsumerRecords<byte[], byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
var records = consumer.poll(Duration.ofMillis(100));
var records = consumer.poll(Duration.between(Instant.now(), timeout));
int pollSize = records.count();
emptyPoll = pollSize == 0;
recordsConsumed.addAndGet(pollSize);
Expand Down Expand Up @@ -227,7 +229,7 @@ CompletionStage<String> topicNameForId(String topicId) {
Uuid kafkaTopicId = Uuid.fromString(topicId);

return clientSupplier.get()
.listTopics()
.listTopics(new ListTopicsOptions().listInternal(true))
.listings()
.toCompletionStage()
.thenApply(Collection::stream)
Expand Down Expand Up @@ -316,6 +318,7 @@ KafkaRecord getItems(ConsumerRecord<byte[], byte[]> rec, String topicId, List<St
setProperty(KafkaRecord.Fields.KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
setProperty(KafkaRecord.Fields.VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
setProperty(KafkaRecord.Fields.HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
setProperty(KafkaRecord.Fields.SIZE, include, () -> sizeOf(rec), item::setSize);

return item;
}
Expand Down Expand Up @@ -365,6 +368,14 @@ Map<String, String> headersToMap(Headers headers, Integer maxValueLength) {
return headerMap;
}

long sizeOf(ConsumerRecord<?, ?> rec) {
return rec.serializedKeySize() +
rec.serializedValueSize() +
Arrays.stream(rec.headers().toArray())
.mapToLong(h -> h.key().length() + h.value().length)
.sum();
}

static UnknownTopicIdException noSuchTopic(String topicId) {
return new UnknownTopicIdException("No such topic: " + topicId);
}
Expand Down
2 changes: 1 addition & 1 deletion ui/api/messages/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export async function getTopicMessages(
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[records]":
"partition,offset,timestamp,timestampType,headers,key,value",
"partition,offset,timestamp,timestampType,headers,key,value,size",
"filter[partition]": params.partition,
"filter[offset]":
params.filter?.type === "offset"
Expand Down
1 change: 1 addition & 0 deletions ui/api/messages/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const MessageSchema = z.object({
headers: z.record(z.any()),
key: z.string().nullable(),
value: z.string().nullable(),
size: z.number().optional(),
}),
});
export const MessageApiResponse = z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { useState } from "react";

export const columns = [
"offset-partition",
"size",
"timestamp",
"timestampUTC",
"key",
Expand All @@ -29,6 +30,7 @@ export function useColumnLabels() {
headers: "Headers",
"offset-partition": "Offset",
value: "Value",
size: "Size",
timestamp: `Timestamp (${timeZone})`,
timestampUTC: "Timestamp (UTC)",
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Message } from "@/api/messages/schema";
import { DateTime } from "@/components/DateTime";
import { Bytes } from "@/components/Bytes";
import { Number } from "@/components/Number";
import {
ClipboardCopy,
Expand All @@ -20,7 +21,9 @@ import {
Text,
TextContent,
TextVariants,
Tooltip,
} from "@/libs/patternfly/react-core";
import { HelpIcon } from "@/libs/patternfly/react-icons";
import { useTranslations } from "next-intl";
import { NoDataCell } from "./NoDataCell";
import { beautifyUnknownValue } from "./utils";
Expand Down Expand Up @@ -88,6 +91,17 @@ export function MessageDetailsBody({
<Number value={message.attributes.offset} />
</DescriptionListDescription>
</DescriptionListGroup>
<DescriptionListGroup>
<DescriptionListTerm>
{t("field.size")}{" "}
<Tooltip content={t("tooltip.size")}>
<HelpIcon />
</Tooltip>
</DescriptionListTerm>
<DescriptionListDescription>
<Bytes value={message.attributes.size} />
</DescriptionListDescription>
</DescriptionListGroup>
<DescriptionListGroup>
<DescriptionListTerm>{t("field.timestamp")}</DescriptionListTerm>
<DescriptionListDescription>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { FilterGroup } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/mes
import { NoResultsEmptyState } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/NoResultsEmptyState";
import { PartitionSelector } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/PartitionSelector";
import { DateTime } from "@/components/DateTime";
import { Bytes } from "@/components/Bytes";
import { Number } from "@/components/Number";
import { ResponsiveTable } from "@/components/table";
import {
Expand All @@ -23,15 +24,16 @@ import {
ToolbarGroup,
ToolbarItem,
ToolbarToggleGroup,
Tooltip,
} from "@/libs/patternfly/react-core";
import { FilterIcon } from "@/libs/patternfly/react-icons";
import { FilterIcon, HelpIcon } from "@/libs/patternfly/react-icons";
import {
BaseCellProps,
InnerScrollContainer,
OuterScrollContainer,
TableVariant,
} from "@/libs/patternfly/react-table";
import { useTranslations } from "next-intl";
import { MessageKeys, useTranslations } from "next-intl";
import { PropsWithChildren, useState } from "react";
import { LimitSelector } from "./LimitSelector";
import { MessageDetails, MessageDetailsProps } from "./MessageDetails";
Expand All @@ -41,6 +43,7 @@ import { beautifyUnknownValue, isSameMessage } from "./utils";

const columnWidths: Record<Column, BaseCellProps["width"]> = {
"offset-partition": 10,
size: 10,
key: 15,
timestamp: 15,
timestampUTC: 15,
Expand Down Expand Up @@ -108,6 +111,21 @@ export function MessagesTable({
return defaultColumns;
})();

const columnTooltips: Record<Column, any> = {
"offset-partition": undefined,
size: <>
{" "}
<Tooltip content={t("tooltip.size")}>
<HelpIcon />
</Tooltip>
</>,
key: undefined,
timestamp: undefined,
timestampUTC: undefined,
headers: undefined,
value: undefined,
};

const [selectedColumns, setSelectedColumns] = useState<Column[]>(
previouslySelectedColumns,
);
Expand Down Expand Up @@ -161,6 +179,7 @@ export function MessagesTable({
modifier={"nowrap"}
>
{columnLabels[column]}
{columnTooltips[column] ?? ""}
</Th>
)}
renderCell={({ column, row, colIndex, Td, key }) => {
Expand Down Expand Up @@ -195,6 +214,12 @@ export function MessagesTable({
</TextContent>
</Cell>
);
case "size":
return (
<Cell>
<Bytes value={row.attributes.size} />
</Cell>
);
case "timestamp":
return (
<Cell>
Expand Down
3 changes: 3 additions & 0 deletions ui/messages/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
"timestamp_toggle_calendar_menu": "Toggle calendar menu",
"timestamp_toggle_time_menu": "Toggle time menu"
},
"tooltip": {
"size": "The serialized size of the key and value. Calculated from the key/value/headers and does not include any overhead associated with the record in the physical log segment within Kafka."
},
"limit_label": "Limit",
"message": "Message",
"no_data_body": "Data will appear shortly after we receive produced messages.",
Expand Down