From 519c637c781e3f38d5c1d2f76362d252deef28f4 Mon Sep 17 00:00:00 2001 From: hemahg Date: Wed, 11 Sep 2024 16:38:48 +0530 Subject: [PATCH] Consumer group reset offset Signed-off-by: hemahg --- ui/api/consumerGroups/actions.ts | 92 +- ui/api/consumerGroups/schema.ts | 55 +- .../ConnectedConsumerGroupTable.tsx | 192 ++ .../ConsumerGroupsTable.stories.tsx | 1695 ++++++++++++++++- .../consumer-groups/ConsumerGroupsTable.tsx | 137 +- .../[groupId]/ResetOffsetModal.stories.tsx | 15 + .../[groupId]/ResetOffsetModal.tsx | 72 + .../[groupId]/reset-offset/Dryrun.stories.tsx | 29 + .../[groupId]/reset-offset/Dryrun.tsx | 173 ++ .../reset-offset/OffsetSelect.stories.tsx | 13 + .../[groupId]/reset-offset/OffsetSelect.tsx | 71 + .../ResetConsumerOffset.stories.tsx | 16 + .../reset-offset/ResetConsumerOffset.tsx | 348 ++++ .../reset-offset/SelectComponent.tsx | 258 +++ .../reset-offset/TypeaheadSelect.stories.tsx | 14 + .../reset-offset/TypeaheadSelect.tsx | 142 ++ .../[groupId]/reset-offset/page.tsx | 55 + .../consumer-groups/[groupId]/types.ts | 7 + .../kafka/[kafkaId]/consumer-groups/page.tsx | 127 +- .../consumer-groups/ConsumerGroupsTable.tsx | 163 ++ .../topics/[topicId]/consumer-groups/page.tsx | 2 +- ui/messages/en.json | 45 +- 22 files changed, 3656 insertions(+), 65 deletions(-) create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConnectedConsumerGroupTable.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.stories.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.stories.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.stories.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.stories.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/SelectComponent.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.stories.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/page.tsx create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/types.ts create mode 100644 ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/ConsumerGroupsTable.tsx diff --git a/ui/api/consumerGroups/actions.ts b/ui/api/consumerGroups/actions.ts index 423733577..b1cc24743 100644 --- a/ui/api/consumerGroups/actions.ts +++ b/ui/api/consumerGroups/actions.ts @@ -2,14 +2,18 @@ import { getHeaders } from "@/api/api"; import { ConsumerGroup, + ConsumerGroupDryrunResponseSchema, ConsumerGroupResponseSchema, ConsumerGroupsResponse, ConsumerGroupsResponseSchema, + ConsumerGroupState, + DryrunResponse, + UpdateConsumerGroupErrorSchema, } from "@/api/consumerGroups/schema"; import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj"; import { logger } from "@/utils/logger"; -const log = logger.child({ module: "topics-api" }); +const log = logger.child({ module: "consumergroup-api" }); export async function getConsumerGroup( kafkaId: string, @@ -32,6 +36,8 @@ export async function getConsumerGroups( kafkaId: string, params: { fields?: string; + id?: string; + state?: ConsumerGroupState[]; pageSize?: number; pageCursor?: string; sort?: string; @@ -43,8 +49,12 @@ export async function getConsumerGroups( filterUndefinedFromObj({ "fields[consumerGroups]": params.fields ?? "state,simpleConsumerGroup,members,offsets", + "filter[id]": params.id ? `eq,${params.id}` : undefined, // TODO: pass filter from UI - "filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE", + "filter[state]": + params.state && params.state.length > 0 + ? `in,${params.state.join(",")}` + : undefined, "page[size]": params.pageSize, "page[after]": params.pageCursor, sort: params.sort @@ -107,3 +117,81 @@ export async function getTopicConsumerGroups( log.debug({ url, rawData }, "getTopicConsumerGroups response"); return ConsumerGroupsResponseSchema.parse(rawData); } + +export async function updateConsumerGroup( + kafkaId: string, + consumerGroupId: string, + offsets: Array<{ + topicId: string; + partition?: number; + offset: string | number; + metadata?: string; + }>, +): Promise { + const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`; + const body = { + data: { + type: "consumerGroups", + id: consumerGroupId, + attributes: { + offsets, + }, + }, + }; + + log.debug({ url, body }, "calling updateConsumerGroup"); + + try { + const res = await fetch(url, { + headers: await getHeaders(), + method: "PATCH", + body: JSON.stringify(body), + }); + + log.debug({ status: res.status }, "updateConsumerGroup response"); + + if (res.status === 204) { + return true; + } else { + const rawData = await res.json(); + return UpdateConsumerGroupErrorSchema.parse(rawData); + } + } catch (e) { + log.error(e, "updateConsumerGroup unknown error"); + console.error("Unknown error occurred:", e); + return false; + } +} + +export async function getDryrunResult( + kafkaId: string, + consumerGroupId: string, + offsets: Array<{ + topicId: string; + partition?: number; + offset: string | number; + metadata?: string; + }>, +): Promise { + const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/consumerGroups/${consumerGroupId}`; + const body = { + meta: { + dryRun: true, + }, + data: { + type: "consumerGroups", + id: consumerGroupId, + attributes: { + offsets, + }, + }, + }; + const res = await fetch(url, { + headers: await getHeaders(), + method: "PATCH", + body: JSON.stringify(body), + }); + const rawData = await res.json(); + log.debug({ url, rawData }, "getConsumerGroup response"); + return ConsumerGroupDryrunResponseSchema.parse(rawData).data; +} diff --git a/ui/api/consumerGroups/schema.ts b/ui/api/consumerGroups/schema.ts index f7d26d2c7..f06e51687 100644 --- a/ui/api/consumerGroups/schema.ts +++ b/ui/api/consumerGroups/schema.ts @@ -2,6 +2,11 @@ import { ApiError } from "@/api/api"; import { NodeSchema } from "@/api/kafka/schema"; import { z } from "zod"; +const ConsumerGroupStateSchema = z.union([ + z.literal("STABLE"), + z.literal("EMPTY"), +]); + const OffsetAndMetadataSchema = z.object({ topicId: z.string(), topicName: z.string(), @@ -29,7 +34,7 @@ export const ConsumerGroupSchema = z.object({ type: z.literal("consumerGroups"), attributes: z.object({ simpleConsumerGroup: z.boolean().optional(), - state: z.string().optional(), + state: ConsumerGroupStateSchema, members: z.array(MemberDescriptionSchema).optional(), partitionAssignor: z.string().nullable().optional(), coordinator: NodeSchema.nullable().optional(), @@ -38,7 +43,17 @@ export const ConsumerGroupSchema = z.object({ errors: z.array(ApiError).optional(), }), }); + +const DryrunOffsetSchema = z.object({ + topicId: z.string(), + topicName: z.string(), + partition: z.number(), + offset: z.number(), + metadata: z.string(), +}); + export type ConsumerGroup = z.infer; +export type ConsumerGroupState = z.infer; export const ConsumerGroupsResponseSchema = z.object({ meta: z.object({ @@ -55,6 +70,34 @@ export const ConsumerGroupsResponseSchema = z.object({ }), data: z.array(ConsumerGroupSchema), }); + +export const DryrunSchema = z.object({ + id: z.string(), + type: z.literal("consumerGroups"), + attributes: z.object({ + state: ConsumerGroupStateSchema, + members: z.array(MemberDescriptionSchema).optional(), + offsets: z.array(DryrunOffsetSchema).optional(), + }), +}); + +export const UpdateConsumerGroupErrorSchema = z.object({ + errors: z.array( + z.object({ + id: z.string(), + status: z.string(), + code: z.string(), + title: z.string(), + detail: z.string(), + source: z + .object({ + pointer: z.string().optional(), + }) + .optional(), + }), + ), +}); + export type ConsumerGroupsResponse = z.infer< typeof ConsumerGroupsResponseSchema >; @@ -65,3 +108,13 @@ export const ConsumerGroupResponseSchema = z.object({ export type ConsumerGroupResponse = z.infer< typeof ConsumerGroupsResponseSchema >; + +export type UpdateConsumerGroupErrorSchema = z.infer< + typeof UpdateConsumerGroupErrorSchema +>; + +export const ConsumerGroupDryrunResponseSchema = z.object({ + data: DryrunSchema, +}); + +export type DryrunResponse = z.infer; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConnectedConsumerGroupTable.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConnectedConsumerGroupTable.tsx new file mode 100644 index 000000000..6b3b9d2c5 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConnectedConsumerGroupTable.tsx @@ -0,0 +1,192 @@ +"use client"; + +import { ConsumerGroup, ConsumerGroupState } from "@/api/consumerGroups/schema"; +import { useRouter } from "@/navigation"; +import { useFilterParams } from "@/utils/useFilterParams"; +import { useOptimistic, useState, useTransition } from "react"; +import { ConsumerGroupColumn, ConsumerGroupColumns, ConsumerGroupsTable, SortableColumns } from "./ConsumerGroupsTable"; +import { ResetOffsetModal } from "./[groupId]/ResetOffsetModal"; + +export type ConnectedConsumerGroupTableProps = { + kafkaId: string; + consumerGroup: ConsumerGroup[] | undefined; + consumerGroupCount: number; + page: number; + perPage: number; + id: string | undefined; + sort: ConsumerGroupColumn; + sortDir: "asc" | "desc"; + consumerGroupState: ConsumerGroupState[] | undefined; + baseurl: string; + nextPageCursor: string | null | undefined; + prevPageCursor: string | null | undefined; + refresh: (() => Promise) | undefined; +}; + +type State = { + id: string | undefined; + consumerGroup: ConsumerGroup[] | undefined; + perPage: number; + sort: ConsumerGroupColumn; + sortDir: "asc" | "desc"; + consumerGroupState: ConsumerGroupState[] | undefined; +}; + +export function ConnectedConsumerGroupTable({ + consumerGroup, + consumerGroupCount, + page, + perPage, + id, + sort, + sortDir, + consumerGroupState, + baseurl, + nextPageCursor, + prevPageCursor, + kafkaId, + refresh +}: ConnectedConsumerGroupTableProps) { + const router = useRouter(); + const _updateUrl = useFilterParams({ perPage, sort, sortDir }); + const [_, startTransition] = useTransition(); + const [state, addOptimistic] = useOptimistic< + State, + Partial> + >( + { + consumerGroup, + id, + perPage, + sort, + sortDir, + consumerGroupState, + }, + (state, options) => ({ ...state, ...options, consumerGroup: undefined }), + ); + + const updateUrl: typeof _updateUrl = (newParams) => { + const { consumerGroup, ...s } = state; + _updateUrl({ + ...s, + ...newParams, + }); + }; + + function clearFilters() { + startTransition(() => { + _updateUrl({}); + addOptimistic({ + id: undefined, + consumerGroupState: undefined, + }); + }); + } + + const [isResetOffsetModalOpen, setResetOffsetModalOpen] = useState(false); + const [consumerGroupMembers, setConsumerGroupMembers] = useState([]); + const [consumerGroupName, setConsumerGroupName] = useState(""); + + const closeResetOffsetModal = () => { + setResetOffsetModalOpen(false); + setConsumerGroupMembers([]); + router.push(`${baseurl}`); + } + + return ( + <> + { + startTransition(() => { + const pageDiff = newPage - page; + switch (pageDiff) { + case -1: + updateUrl({ perPage, page: prevPageCursor }); + break; + case 1: + updateUrl({ perPage, page: nextPageCursor }); + break; + default: + updateUrl({ perPage }); + break; + } + addOptimistic({ perPage }); + }); + }} + consumerGroups={state.consumerGroup} + isColumnSortable={(col) => { + if (!SortableColumns.includes(col)) { + return undefined; + } + const activeIndex = ConsumerGroupColumns.indexOf(state.sort); + const columnIndex = ConsumerGroupColumns.indexOf(col); + return { + label: col as string, + columnIndex, + onSort: () => { + startTransition(() => { + const newSortDir = activeIndex === columnIndex + ? state.sortDir === "asc" + ? "desc" + : "asc" + : "asc"; + updateUrl({ + sort: col, + sortDir: newSortDir, + }); + addOptimistic({ sort: col, sortDir: newSortDir }); + }); + }, + sortBy: { + index: activeIndex, + direction: state.sortDir, + defaultDirection: "asc", + }, + isFavorites: undefined, + }; + }} + filterName={state.id} + onFilterNameChange={(id) => { + startTransition(() => { + updateUrl({ id }); + addOptimistic({ id }); + }); + }} + filterState={state.consumerGroupState} + onFilterStateChange={(consumerGroupState) => { + startTransition(() => { + updateUrl({ consumerGroupState }); + addOptimistic({ consumerGroupState }); + }); + }} + onClearAllFilters={clearFilters} + kafkaId={kafkaId} + refresh={refresh} + onResetOffset={(row) => { + startTransition(() => { + if (row.attributes.state === "STABLE") { + setResetOffsetModalOpen(true) + setConsumerGroupMembers(row.attributes.members?.map((member) => member.memberId) || []); + setConsumerGroupName(row.id) + } else if (row.attributes.state === "EMPTY") { + router.push(`${baseurl}/${row.id}/reset-offset`); + } + }); + }} /> + {isResetOffsetModalOpen && ( + + )} + + + ); +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.stories.tsx index 43db5f34b..cfd88c4cf 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.stories.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.stories.tsx @@ -19,11 +19,14 @@ export const ConsumerGroups: Story = { id: "console-datagen-group-0", attributes: { state: "STABLE", - offsets: [{ - lag: 200 - }, { - lag: 1000 - }], + offsets: [ + { + lag: 200, + }, + { + lag: 1000, + }, + ], members: [ { host: "localhost", @@ -47,12 +50,15 @@ export const ConsumerGroups: Story = { { id: "console-datagen-group-1", attributes: { - state: "STABLE", - offsets: [{ - lag: 400 - }, { - lag: 240 - }], + state: "EMPTY", + offsets: [ + { + lag: 400, + }, + { + lag: 240, + }, + ], members: [ { host: "localhost", @@ -73,17 +79,266 @@ export const ConsumerGroups: Story = { ], }, }, + { + id: "console-datagen-group-2", + attributes: { + state: "EMPTY", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, { id: "console-datagen-group-2", attributes: { state: "STABLE", - offsets: [{ - lag: 200 - }, { - lag: 400 - }, { - lag: 400 - }], + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], members: [ { host: "localhost", @@ -104,6 +359,1406 @@ export const ConsumerGroups: Story = { ], }, }, - ] - } + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "STABLE", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + { + id: "console-datagen-group-2", + attributes: { + state: "EMPTY", + offsets: [ + { + lag: 200, + }, + { + lag: 400, + }, + { + lag: 400, + }, + ], + members: [ + { + host: "localhost", + memberId: "member-1", + clientId: "client-1", + groupInstanceId: "instance-1", + assignments: [ + { + topicName: "console_datagen_002-a", + topicId: "1", + }, + { + topicName: "console_datagen_002-b", + topicId: "2", + }, + ], + }, + ], + }, + }, + ], + }, }; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx index f276118e2..8da733979 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx @@ -1,28 +1,83 @@ -"use client"; - -import { ConsumerGroup } from "@/api/consumerGroups/schema"; +import { ConsumerGroup, ConsumerGroupState } from "@/api/consumerGroups/schema"; import { Number } from "@/components/Format/Number"; import { LabelLink } from "@/components/Navigation/LabelLink"; -import { TableView } from "@/components/Table"; -import { LabelGroup, Tooltip } from "@/libs/patternfly/react-core"; -import { HelpIcon } from "@/libs/patternfly/react-icons"; +import { TableView, TableViewProps } from "@/components/Table"; +import { Icon, LabelGroup, Tooltip } from "@/libs/patternfly/react-core"; +import { + CheckCircleIcon, + HelpIcon, + InfoCircleIcon, +} from "@/libs/patternfly/react-icons"; import { Link } from "@/navigation"; import { useTranslations } from "next-intl"; -import { useEffect, useState } from "react"; +import { ReactNode, useEffect, useState } from "react"; + +export const ConsumerGroupColumns = [ + "name", + "state", + "lag", + "members", + "topics", +] as const; + +export type ConsumerGroupColumn = (typeof ConsumerGroupColumns)[number]; + +export type SortableConsumerGroupTableColumns = Exclude< + ConsumerGroupColumn, + "lag" | "members" | "topics" +>; + +export const SortableColumns = ["name", "state"]; + +const StateLabel: Record = { + STABLE: ( + <> + + + +  STABLE + + ), + EMPTY: ( + <> + + + +  EMPTY + + ), +}; export function ConsumerGroupsTable({ kafkaId, page, + perPage, total, consumerGroups: initialData, refresh, + isColumnSortable, + filterName, + filterState, + onFilterNameChange, + onFilterStateChange, + onPageChange, + onResetOffset, }: { kafkaId: string; page: number; + perPage: number; total: number; + filterName: string | undefined; + filterState: ConsumerGroupState[] | undefined; consumerGroups: ConsumerGroup[] | undefined; - refresh: (() => Promise) | undefined; -}) { + refresh: (() => Promise) | undefined; + onFilterNameChange: (name: string | undefined) => void; + onFilterStateChange: (status: ConsumerGroupState[] | undefined) => void; + onResetOffset: (consumerGroup: ConsumerGroup) => void; +} & Pick< + TableViewProps, + "isColumnSortable" | "onPageChange" | "onClearAllFilters" +>) { const t = useTranslations(); const [consumerGroups, setConsumerGroups] = useState(initialData); useEffect(() => { @@ -41,7 +96,8 @@ export function ConsumerGroupsTable({ {}} + perPage={perPage} + onPageChange={onPageChange} data={consumerGroups} emptyStateNoData={
{t("ConsumerGroupsTable.no_consumer_groups")}
@@ -50,7 +106,9 @@ export function ConsumerGroupsTable({
{t("ConsumerGroupsTable.no_consumer_groups")}
} ariaLabel={t("ConsumerGroupsTable.title")} - columns={["name", "state", "lag", "members", "topics"] as const} + isFiltered={filterName !== undefined || filterState?.length !== 0} + columns={ConsumerGroupColumns} + isColumnSortable={isColumnSortable} renderHeader={({ column, key, Th }) => { switch (column) { case "name": @@ -117,7 +175,7 @@ export function ConsumerGroupsTable({ case "state": return ( - {row.attributes.state} + {StateLabel[row.attributes.state]} ); case "lag": @@ -132,8 +190,15 @@ export function ConsumerGroupsTable({ ); case "topics": - const allTopics = - row.attributes.members?.flatMap((m) => m.assignments ?? []) ?? []; + const allTopics: { topicId: string; topicName: string }[] = []; + row.attributes.members + ?.flatMap((m) => m.assignments ?? []) + .forEach((a) => + allTopics.push({ topicId: a.topicId, topicName: a.topicName }), + ); + row.attributes.offsets?.forEach((a) => + allTopics.push({ topicId: a.topicId, topicName: a.topicName }), + ); return ( @@ -161,6 +226,50 @@ export function ConsumerGroupsTable({ ); } }} + renderActions={({ row, ActionsColumn }) => ( + onResetOffset(row), + }, + ]} + /> + )} + filters={{ + Name: { + type: "search", + chips: filterName ? [filterName] : [], + onSearch: onFilterNameChange, + onRemoveChip: () => { + onFilterNameChange(undefined); + }, + onRemoveGroup: () => { + onFilterNameChange(undefined); + }, + validate: () => true, + errorMessage: "", + }, + State: { + type: "checkbox", + chips: filterState || [], + onToggle: (state) => { + const newState = filterState?.includes(state) + ? filterState.filter((s) => s !== state) + : [...filterState!, state]; + onFilterStateChange(newState); + }, + onRemoveChip: (state) => { + const newStatus = (filterState || []).filter((s) => s !== state); + onFilterStateChange(newStatus); + }, + onRemoveGroup: () => { + onFilterStateChange(undefined); + }, + options: StateLabel, + }, + }} /> ); } diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.stories.tsx new file mode 100644 index 000000000..60aa663fb --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.stories.tsx @@ -0,0 +1,15 @@ +import { Meta, StoryObj } from "@storybook/react"; +import { ResetOffsetModal } from "./ResetOffsetModal"; + +export default { + component: ResetOffsetModal, +} as Meta; + +type Story = StoryObj; + +export const Default: Story = { + args: { + isResetOffsetModalOpen: true, + members: ["console-datagen-consumer-0", "console-datagen-consumer-1"] + }, +}; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.tsx new file mode 100644 index 000000000..6831bfe4d --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/ResetOffsetModal.tsx @@ -0,0 +1,72 @@ +import { ExternalLink } from "@/components/Navigation/ExternalLink"; +import { Button, List, ListItem, Modal, ModalVariant, Stack, StackItem, Text } from "@/libs/patternfly/react-core"; +import { useTranslations } from "next-intl"; +import { useRouter } from "next/navigation"; + +export function ResetOffsetModal({ + members, + isResetOffsetModalOpen, + onClickClose, + kafkaId, + consumerGroupName +}: { + members: string[]; + isResetOffsetModalOpen: boolean; + onClickClose: () => void; + kafkaId: string; + consumerGroupName: string; +}) { + + const t = useTranslations("ConsumerGroupsTable"); + const router = useRouter(); + + const refresh = () => { + if (members.length === 0) { + router.push(`/kafka/${kafkaId}/consumer-groups/${consumerGroupName}/reset-offset`) + } + } + + return ( + + {t("close")} + , + , + + ]}> + + + {t("member_shutdown_helper_text")} + + + + {members.map((member, index) => ( + {member} + ))} + + + + {t("shutdown_active_members")} + + + + {t("learn_to_shutdown_members")} + + + + + ) +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.stories.tsx new file mode 100644 index 000000000..64fd659e4 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.stories.tsx @@ -0,0 +1,29 @@ +import { Meta, StoryObj } from "@storybook/react"; +import { Dryrun } from "./Dryrun"; + +export default { + component: Dryrun, + args: {} +} as Meta; + +type Story = StoryObj; + +const NewOffsetValues = [ + { topicName: "Topic A", partition: 2, offset: 765, topicId: "cswerqts" }, + { topicName: "Topic C", partition: 2, offset: 121314, topicId: "xcsdwer" }, + { topicName: "Topic A", partition: 0, offset: 1234, topicId: "cswerqts" }, + { topicName: "Topic A", partition: 1, offset: 5678, topicId: "cswerqts" }, + { topicName: "Topic B", partition: 0, offset: 91011, topicId: "weqrests" }, + { topicName: "Topic C", partition: 1, offset: 221222, topicId: "xcsdwer" }, + { topicName: "Topic B", partition: 1, offset: 2341233, topicId: "weqrests" }, + { topicName: "Topic B", partition: 1, offset: 675, topicId: "weqrests" }, + +]; + +export const Default: Story = { + args: { + consumerGroupName: "console_datagen_002-a", + newOffset: NewOffsetValues + + }, +}; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.tsx new file mode 100644 index 000000000..1ef291480 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/Dryrun.tsx @@ -0,0 +1,173 @@ +"use client"; + +import { + Alert, + Button, + Card, + DescriptionList, + DescriptionListDescription, + DescriptionListGroup, + DescriptionListTerm, + Divider, + Flex, + FlexItem, + JumpLinks, + JumpLinksItem, + List, + ListItem, + Panel, + PanelHeader, + PanelMain, + PanelMainBody, + Sidebar, + SidebarContent, + SidebarPanel, + Stack, + StackItem +} from "@/libs/patternfly/react-core"; +import { TextContent, Text } from "@/libs/patternfly/react-core"; +import { useTranslations } from "next-intl"; +import { DownloadIcon } from "@/libs/patternfly/react-icons"; + +export type NewOffset = { + topicName: string; + partition: number; + offset: number | string; + topicId: string; + metadata?: string +}; + +export function Dryrun({ + consumerGroupName, + newOffset, + onClickCloseDryrun, + cliCommand +}: { + consumerGroupName: string; + newOffset: NewOffset[]; + onClickCloseDryrun: () => void; + cliCommand: string +}) { + const t = useTranslations("ConsumerGroupsTable"); + + const onClickDownload = () => { + const data = { + consumerGroupName, + newOffset, + }; + const jsonString = JSON.stringify(data, null, 2); + const blob = new Blob([jsonString], { type: "application/json" }); + const url = URL.createObjectURL(blob); + const a = document.createElement("a"); + a.href = url; + a.download = "dryrun-result.json"; + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + URL.revokeObjectURL(url); + }; + + // Group offsets by topic + const groupedTopics = newOffset.reduce>((acc, offset) => { + if (!acc[offset.topicName]) { + acc[offset.topicName] = []; + } + acc[offset.topicName].push(offset); + return acc; + }, {}); + + return ( + + + + + + {t.rich("dry_run_result")} + + + + + + + + {t.rich("consumer_name", { consumerGroupName })} + + + + + + + + + {cliCommand} + + + + + + + {Object.keys(groupedTopics).map((topicName) => ( + + {topicName} + + ))} + + + + + {Object.entries(groupedTopics).map(([topicName, offsets]) => ( + + + + + {t("topic")} + {topicName} + + + + + {t("partition")} + + + {offsets.sort((a, b) => a.partition - b.partition).map(({ partition }) => ( + {partition} + ))} + + + + + + + {t("new_offset")} + + + {offsets.map(({ partition, offset }) => ( + {offset} + ))} + + + + + + + + + ))} + + + + + + + + + + + + + + + ); +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.stories.tsx new file mode 100644 index 000000000..585eba5a9 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.stories.tsx @@ -0,0 +1,13 @@ +import { Meta, StoryObj } from "@storybook/react"; +import { OffsetSelect } from "./OffsetSelect"; + +export default { + component: OffsetSelect, + args: {} +} as Meta; + +type Story = StoryObj; + +export const Default: Story = { + args: {}, +}; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.tsx new file mode 100644 index 000000000..700594c72 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/OffsetSelect.tsx @@ -0,0 +1,71 @@ +import { MenuToggle, MenuToggleElement, Select, SelectList, SelectOption, SelectProps } from "@/libs/patternfly/react-core"; +import { useTranslations } from "next-intl"; +import { useState } from "react"; +import { OffsetValue } from "../types"; + +export function OffsetSelect({ + value, + onChange +}: { + value: OffsetValue; + onChange: (value: OffsetValue) => void; +}) { + + const t = useTranslations("ConsumerGroupsTable"); + + const [isOpen, setIsOpen] = useState(false); + + const onToggle = () => { + setIsOpen(!isOpen); + }; + + const offsetValueOption: { [key in OffsetValue]: string } = { + custom: t("offset.custom"), + latest: t("offset.latest"), + earliest: t("offset.earliest"), + specificDateTime: t("offset.specific_date_time") + }; + + const onSelect: SelectProps["onSelect"] = (_, selection) => { + onChange(selection as OffsetValue); + setIsOpen(false); + }; + + const toggle = (toggleRef: React.Ref) => ( + + {offsetValueOption[value]} + + ); + const makeOptions = () => { + return Object.entries(offsetValueOption).map(([value, label]) => ( + + {label} + + )); + }; + + return ( + + ) +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.stories.tsx new file mode 100644 index 000000000..8984186b5 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.stories.tsx @@ -0,0 +1,16 @@ +import { Meta, StoryObj } from "@storybook/react"; +import { ResetConsumerOffset } from "./ResetConsumerOffset"; + +export default { + component: ResetConsumerOffset, +} as Meta; + +type Story = StoryObj; + +export const Default: Story = { + args: { + consumerGroupName: "console-consumer-01", + topics: [{ topicId: "123", topicName: "console_datagen_002-a" }, { topicId: "456", topicName: "console_datagen_002-b" }, { topicId: "234", topicName: "console_datagen_002-c" }, { topicId: "431", topicName: "console_datagen_002-d" }], + partitions: [1, 2, 3] + }, +}; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx new file mode 100644 index 000000000..e833e7f34 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/ResetConsumerOffset.tsx @@ -0,0 +1,348 @@ +"use client"; + +import { Divider, Panel, PanelHeader, PanelMain, PanelMainBody, TextContent, Text, TextVariants, Radio, Form, FormGroup, FormSection, Select, SelectList, SelectOption, MenuToggle, MenuToggleElement, TextInput, ActionGroup, Button, SelectProps, PageSection, Page, Bullseye, Spinner, Flex, FlexItem, Grid, GridItem, Alert } from "@/libs/patternfly/react-core"; +import { useTranslations } from "next-intl"; +import { useState } from "react"; +import { DateTimeFormatSelection, OffsetValue, TopicSelection, partitionSelection } from "../types"; +import { TypeaheadSelect } from "./TypeaheadSelect"; +import { OffsetSelect } from "./OffsetSelect"; +import { useRouter } from "@/navigation"; +import { getDryrunResult, updateConsumerGroup } from "@/api/consumerGroups/actions"; +import { UpdateConsumerGroupErrorSchema } from "@/api/consumerGroups/schema"; +import { Dryrun } from "./Dryrun"; + +export type Offset = { + topicId: string + topicName: string; + partition: number; + offset: string | number; + metadeta?: string +}; + +export function ResetConsumerOffset({ + kafkaId, + consumerGroupName, + topics, + partitions, + baseurl + +}: { + kafkaId: string; + consumerGroupName: string; + topics: { topicId: string, topicName: string }[]; + partitions: number[]; + baseurl: string; +}) { + const t = useTranslations("ConsumerGroupsTable"); + + const router = useRouter(); + + const [selectedConsumerTopic, setSelectedConsumerTopic] = useState(); + + const [selectedPartition, setSelectedPartition] = useState(); + + const [selectedOffset, setSelectedOffset] = useState("custom"); + + const [offset, setOffset] = useState({ + topicId: "", + partition: 0, + offset: "", + topicName: "" + }); + + const [selectDateTimeFormat, setSelectDateTimeFormat] = useState("ISO"); + + const [isLoading, setIsLoading] = useState(false); + + const [error, setError] = useState(); + + const [newoffsetData, setNewOffsetData] = useState([]); + + const [showDryRun, setShowDryRun] = useState(false); + + const onTopicSelect = (value: TopicSelection) => { + setSelectedConsumerTopic(value); + }; + + const onPartitionSelect = (value: partitionSelection) => { + setSelectedPartition(value); + }; + + const onDateTimeSelect = (value: DateTimeFormatSelection) => { + setSelectDateTimeFormat(value) + } + + const handleTopicChange = (topicName: string | number) => { + if (typeof topicName === 'string') { + const selectedTopic = topics.find(topic => topic.topicName === topicName); + if (selectedTopic) { + setOffset((prev) => ({ + ...prev, + topicName: selectedTopic.topicName, + topicId: selectedTopic.topicId, + })); + } else { + console.warn('Selected topic name not found in topics array:', topicName); + } + } else { + console.warn('Expected a string, but got a number:', topicName); + } + }; + + const handlePartitionChange = (partition: string | number) => { + if (typeof partition === 'number') { + setOffset((prev) => ({ ...prev, partition })); + } else { + console.warn('Expected a number, but got a string:', partition + ); + } + }; + + const handleOffsetChange = (value: string) => { + const numericValue = Number(value); + setOffset((prev) => ({ ...prev, offset: isNaN(numericValue) ? value : numericValue })); + }; + + const generateCliCommand = (): string => { + let baseCommand = `$ kafka-consumer-groups --bootstrap-server localhost:9092 --group ${consumerGroupName} --reset-offsets`; + const topic = selectedConsumerTopic === "allTopics" ? "--all-topics" : `--topic ${offset.topicName}`; + baseCommand += ` ${topic}`; + if (selectedConsumerTopic === "selectedTopic") { + // Only include partition if a specific topic is selected + const partition = selectedPartition === "allPartitions" ? "" : `:${offset.partition}`; + baseCommand += `${partition}`; + } + if (selectedOffset === "custom") { + baseCommand += ` --to-offset ${offset.offset}`; + } else if (selectedOffset === "specificDateTime") { + baseCommand += ` --to-datetime ${offset.offset}`; + } else { + baseCommand += ` --to-${selectedOffset}`; + } + baseCommand += ` --dry-run`; + return baseCommand; + }; + + const generateOffsets = (): Array<{ topicId: string; partition?: number; offset: string | number }> => { + const offsets: Array<{ + topicId: string; + partition?: number; + offset: string | number; + }> = []; + + if (selectedConsumerTopic === "allTopics") { + topics.forEach((topic) => { + partitions.forEach((partition) => { + offsets.push({ + topicId: topic.topicId, + partition: partition, + offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" + ? selectDateTimeFormat === "Epoch" + ? convertEpochToISO(String(offset.offset)) + : offset.offset + : selectedOffset, + }); + }); + }); + } else if (selectedConsumerTopic === "selectedTopic") { + const uniquePartitions = new Set( + partitions.map(partition => selectedPartition === "allPartitions" ? partition : offset.partition) + ); + + Array.from(uniquePartitions).forEach((partition) => { + offsets.push({ + topicId: offset.topicId, + partition, + offset: selectedOffset === "custom" || selectedOffset === "specificDateTime" + ? selectDateTimeFormat === "Epoch" + ? convertEpochToISO(String(offset.offset)) + : offset.offset + : selectedOffset, + }); + }); + } + + // Remove duplicate entries + return offsets.filter((value, index, self) => + index === self.findIndex((t) => ( + t.topicId === value.topicId && t.partition === value.partition + )) + ); + }; + + const isDryRunDisable = + !selectedConsumerTopic || + !selectedOffset + + const openDryrun = async () => { + const uniqueOffsets = generateOffsets(); + const res = await getDryrunResult(kafkaId, consumerGroupName, uniqueOffsets); + setNewOffsetData(res?.attributes?.offsets ?? []); + setShowDryRun(true) + } + + const closeDryrun = () => { + setShowDryRun(false) + } + + const closeResetOffset = () => { + router.push(`${baseurl}`) + } + + const handleDateTimeChange = (value: string) => { + setOffset((prev) => ({ ...prev, offset: value })); + }; + + const convertEpochToISO = (epoch: string): string => { + const date = new Date(parseInt(epoch, 10)); + return date.toISOString(); + }; + + + const handleSave = async () => { + setError(undefined); + setIsLoading(true); + + try { + const uniqueOffsets = generateOffsets(); + const success = await updateConsumerGroup(kafkaId, consumerGroupName, uniqueOffsets); + if (success === true) { + closeResetOffset(); + } else { + const errorMessages = (success as UpdateConsumerGroupErrorSchema)?.errors.map((err) => err.detail) || []; + const errorMessage = errorMessages.length > 0 + ? errorMessages[0] + : "Failed to update consumer group"; + setError(errorMessage); + } + } catch (e: unknown) { + setError("Unknown error occurred"); + } finally { + setIsLoading(false); + } + }; + + return ( + + {isLoading ? ( + + + + + + + + + {t("reseting_consumer_group_offsets_text")} + + + ) : showDryRun ? ( + + ) : + ( + + + + {t("reset_consumer_offset")} + + + {t.rich("consumer_name", { consumerGroupName })} + + + + + + {error && ( + + )} +
+ + + onTopicSelect("allTopics")} /> + onTopicSelect("selectedTopic")} /> + + {selectedConsumerTopic === "selectedTopic" && ( + topic.topicName)} + onChange={handleTopicChange} placeholder={"Select topic"} /> + )} + {selectedConsumerTopic === "selectedTopic" && + onPartitionSelect("allPartitions")} /> + onPartitionSelect("selectedPartition")} /> + + } + {selectedConsumerTopic === "selectedTopic" && selectedPartition === "selectedPartition" && ( + + )} + + + + + + {selectedOffset === "custom" && + + handleOffsetChange(value)} + type="number" + /> + } + {selectedOffset === "specificDateTime" && + <> + + onDateTimeSelect("ISO")} /> + onDateTimeSelect("Epoch")} /> + + + handleDateTimeChange(value)} /> + + } + + + + + + +
+
+
+
) + } +
+ ) +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/SelectComponent.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/SelectComponent.tsx new file mode 100644 index 000000000..370b4af76 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/SelectComponent.tsx @@ -0,0 +1,258 @@ +import React from 'react'; +import { + Select, + SelectOption, + SelectList, + SelectOptionProps, + MenuToggle, + MenuToggleElement, + TextInputGroup, + TextInputGroupMain, + TextInputGroupUtilities, + Button +} from '@patternfly/react-core'; +import TimesIcon from '@patternfly/react-icons/dist/esm/icons/times-icon'; + + +export type TypeaheadSelectProps = { + value: string | number; + selectItems: (string | number)[]; + onChange: (item: string | number) => void; + placeholder: string; +} + +export function SelectComponent({ + value, + selectItems, + onChange, + placeholder, +}: TypeaheadSelectProps) { + const [isOpen, setIsOpen] = React.useState(false); + const [selected, setSelected] = React.useState(''); + const [inputValue, setInputValue] = React.useState(''); + const [filterValue, setFilterValue] = React.useState(''); + const [selectOptions, setSelectOptions] = React.useState([]); + const [focusedItemIndex, setFocusedItemIndex] = React.useState(null); + const [activeItemId, setActiveItemId] = React.useState(null); + const textInputRef = React.useRef(null); + + const NO_RESULTS = 'no results'; + + // Update select options based on filter value + React.useEffect(() => { + const initialSelectOptions = selectItems.map(item => ({ + value: item, + children: String(item), + })); + + let newSelectOptions: SelectOptionProps[] = initialSelectOptions; + + if (filterValue) { + newSelectOptions = initialSelectOptions.filter((option) => + String(option.children).toLowerCase().includes(filterValue.toLowerCase()) + ); + + if (!newSelectOptions.length) { + newSelectOptions = [ + { isAriaDisabled: true, children: `No results found for "${filterValue}"`, value: NO_RESULTS } + ]; + } + + if (!isOpen) { + setIsOpen(true); + } + } + + setSelectOptions(newSelectOptions); + }, [filterValue, isOpen, selectItems]); + + const createItemId = (value: any) => `select-typeahead-${value.toString().replace(' ', '-')}`; + + const setActiveAndFocusedItem = (itemIndex: number) => { + setFocusedItemIndex(itemIndex); + const focusedItem = selectOptions[itemIndex]; + setActiveItemId(createItemId(focusedItem.value)); + }; + + const resetActiveAndFocusedItem = () => { + setFocusedItemIndex(null); + setActiveItemId(null); + }; + + const closeMenu = () => { + setIsOpen(false); + resetActiveAndFocusedItem(); + }; + + const onInputClick = () => { + if (!isOpen) { + setIsOpen(true); + } else if (!inputValue) { + closeMenu(); + } + }; + + const selectOption = (value: string | number, content: string) => { + setInputValue(content); + setFilterValue(''); + setSelected(value.toString()); + onChange(value); + closeMenu(); + }; + + const onSelect = (_event: React.MouseEvent | undefined, value: string | number | undefined) => { + if (value && value !== NO_RESULTS) { + const optionText = selectOptions.find((option) => option.value === value)?.children; + selectOption(value, optionText as string); + } + }; + + const onTextInputChange = (_event: React.FormEvent, value: string) => { + setInputValue(value); + setFilterValue(value); + + resetActiveAndFocusedItem(); + + if (value !== selected) { + setSelected(''); + } + }; + + const handleMenuArrowKeys = (key: string) => { + let indexToFocus = 0; + + if (!isOpen) { + setIsOpen(true); + } + + if (selectOptions.every((option) => option.isDisabled)) { + return; + } + + if (key === 'ArrowUp') { + if (focusedItemIndex === null || focusedItemIndex === 0) { + indexToFocus = selectOptions.length - 1; + } else { + indexToFocus = focusedItemIndex - 1; + } + + while (selectOptions[indexToFocus].isDisabled) { + indexToFocus--; + if (indexToFocus === -1) { + indexToFocus = selectOptions.length - 1; + } + } + } + + if (key === 'ArrowDown') { + if (focusedItemIndex === null || focusedItemIndex === selectOptions.length - 1) { + indexToFocus = 0; + } else { + indexToFocus = focusedItemIndex + 1; + } + + while (selectOptions[indexToFocus].isDisabled) { + indexToFocus++; + if (indexToFocus === selectOptions.length) { + indexToFocus = 0; + } + } + } + + setActiveAndFocusedItem(indexToFocus); + }; + + const onInputKeyDown = (event: React.KeyboardEvent) => { + const focusedItem = focusedItemIndex !== null ? selectOptions[focusedItemIndex] : null; + + switch (event.key) { + case 'Enter': + if (isOpen && focusedItem && focusedItem.value !== NO_RESULTS && !focusedItem.isAriaDisabled) { + selectOption(focusedItem.value, focusedItem.children as string); + } + + if (!isOpen) { + setIsOpen(true); + } + + break; + case 'ArrowUp': + case 'ArrowDown': + event.preventDefault(); + handleMenuArrowKeys(event.key); + break; + } + }; + + const onToggleClick = () => { + setIsOpen(!isOpen); + textInputRef.current?.focus(); + }; + + const onClearButtonClick = () => { + setSelected(''); + setInputValue(''); + setFilterValue(''); + resetActiveAndFocusedItem(); + textInputRef.current?.focus(); + }; + + const toggle = (toggleRef: React.Ref) => ( + + + + + + + + + + ); + + return ( + + ); +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.stories.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.stories.tsx new file mode 100644 index 000000000..e0e245b40 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.stories.tsx @@ -0,0 +1,14 @@ +import { Meta, StoryObj } from "@storybook/react"; +import { TypeaheadSelect } from "./TypeaheadSelect"; + +export default { + component: TypeaheadSelect, +} as Meta; + +type Story = StoryObj; + +export const Default: Story = { + args: { + selectItems: ["console_datagen_002-a", "console_datagen_002-b", "console_datagen_002-c", "console_datagen_002-d", "console_datagen_002-a", "console_datagen_002-a", "console_datagen_002-b", "console_datagen_002-c", "console_datagen_002-d", "console_datagen_002-c"], + }, +}; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.tsx new file mode 100644 index 000000000..40f888336 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/TypeaheadSelect.tsx @@ -0,0 +1,142 @@ +import React, { useState } from 'react'; +import { + Select, + SelectOption, + SelectProps, + SelectList, + MenuToggle, + MenuToggleElement, + TextInputGroup, + TextInputGroupMain, + TextInputGroupUtilities, + Button +} from '@patternfly/react-core'; +import { TimesIcon } from '@patternfly/react-icons'; + +export function TypeaheadSelect({ + value, + selectItems, + onChange, + placeholder, +}: { + value: string | number; + selectItems: (string | number)[]; + onChange: (item: string | number) => void; + placeholder: string; +}) { + const [isOpen, setIsOpen] = useState(false); + const [filterValue, setFilterValue] = useState(''); + const [focusedItemIndex, setFocusedItemIndex] = useState(null); + const [activeItemId, setActiveItemId] = useState(null); + const textInputRef = React.useRef(null); + + const NO_RESULTS = 'no results'; + + const uniqueItems = Array.from(new Set(selectItems)); + + const onToggleClick = () => { + setIsOpen(!isOpen); + if (!isOpen) { + textInputRef.current?.focus(); + } + }; + + const onSelect: SelectProps['onSelect'] = (_event, selection) => { + if (selection !== NO_RESULTS) { + onChange(selection as string | number); + setFilterValue(''); + } + setIsOpen(false); + }; + + const filteredItems = filterValue + ? uniqueItems.filter((item) => + item.toString().toLowerCase().includes(filterValue.toLowerCase()) + ) + : uniqueItems; + + const options = filteredItems.length + ? filteredItems.map((item) => ( + + {item} + + )) + : [ + + {`No results found for "${filterValue}"`} + , + ]; + + const onInputChange = (_event: React.FormEvent, value: string) => { + setFilterValue(value); + setFocusedItemIndex(null); + if (value === '') { + onChange(''); + } + }; + + const onInputKeyDown = (event: React.KeyboardEvent) => { + if (event.key === 'Enter' && focusedItemIndex !== null) { + const selectedItem = filteredItems[focusedItemIndex]; + onChange(selectedItem); + setIsOpen(false); + } + }; + + const onClearButtonClick = () => { + setFilterValue(''); + setFocusedItemIndex(null); + setActiveItemId(null); + onChange(''); + textInputRef.current?.focus(); + }; + + const toggle = (toggleRef: React.Ref) => ( + + + setIsOpen(true)} + onChange={onInputChange} + onKeyDown={onInputKeyDown} + id="typeahead-select-input" + autoComplete="off" + ref={textInputRef} + placeholder={placeholder} + {...(activeItemId && { 'aria-activedescendant': activeItemId })} + role="combobox" + isExpanded={isOpen} + aria-controls="typeahead-select-listbox" + /> + + {(filterValue || value) && ( + + )} + + + + ); + + return ( + + ); +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/page.tsx new file mode 100644 index 000000000..a7bbb770e --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/reset-offset/page.tsx @@ -0,0 +1,55 @@ +import { getConsumerGroup } from "@/api/consumerGroups/actions"; +import { KafkaConsumerGroupMembersParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/KafkaConsumerGroupMembers.params"; +import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params"; +import { PageSection } from "@/libs/patternfly/react-core"; +import { notFound } from "next/navigation"; +import { Suspense } from "react"; +import { ResetConsumerOffset } from "./ResetConsumerOffset"; + +export default function ResetOffsetPage({ + params: { kafkaId, groupId }, +}: { + params: KafkaConsumerGroupMembersParams; +}) { + return ( + + } + > + + + + ); +} + +async function ConnectedResetOffset({ + params: { kafkaId, groupId }, +}: { + params: KafkaParams & { groupId: string }; +}) { + const consumerGroup = await getConsumerGroup(kafkaId, groupId); + if (!consumerGroup) { + notFound(); + } + + + const topics = consumerGroup.attributes.offsets?.map((o) => ({ + topicId: o.topicId, + topicName: o.topicName, + partition: o.partition, + })) || [] + + const topicDetails = topics.map((topic) => ({ + topicId: topic.topicId, + topicName: topic.topicName + })); + const partitions = topics.map(t => t.partition); + + + return ; +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/types.ts b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/types.ts new file mode 100644 index 000000000..7d478df20 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/[groupId]/types.ts @@ -0,0 +1,7 @@ +export type TopicSelection = "allTopics" | "selectedTopic"; + +export type partitionSelection = "allPartitions" | "selectedPartition"; + +export type OffsetValue = "custom" | "latest" | "earliest" | "specificDateTime"; + +export type DateTimeFormatSelection = "ISO" | "Epoch"; diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/page.tsx index 9998b6820..f08e71f1d 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/page.tsx @@ -1,9 +1,20 @@ import { getConsumerGroups } from "@/api/consumerGroups/actions"; import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params"; import { PageSection } from "@/libs/patternfly/react-core"; -import { notFound } from "next/navigation"; import { Suspense } from "react"; -import { ConsumerGroupsTable } from "./ConsumerGroupsTable"; +import { + SortableColumns, + SortableConsumerGroupTableColumns, +} from "./ConsumerGroupsTable"; +import { ConsumerGroupState } from "@/api/consumerGroups/schema"; +import { ConnectedConsumerGroupTable } from "./ConnectedConsumerGroupTable"; +import { stringToInt } from "@/utils/stringToInt"; +import { notFound } from "next/navigation"; + +const sortMap: Record<(typeof SortableColumns)[number], string> = { + name: "name", + state: "state", +}; export default function ConsumerGroupsPage({ params: { kafkaId }, @@ -11,59 +22,123 @@ export default function ConsumerGroupsPage({ }: { params: KafkaParams; searchParams: { + id: string | undefined; + state: string | undefined; perPage: string | undefined; sort: string | undefined; sortDir: string | undefined; page: string | undefined; }; }) { + const id = searchParams["id"]; + const pageSize = stringToInt(searchParams.perPage) || 20; + const sort = (searchParams["sort"] || + "name") as SortableConsumerGroupTableColumns; + const sortDir = (searchParams["sortDir"] || "asc") as "asc" | "desc"; + const pageCursor = searchParams["page"]; + const state = (searchParams["state"] || "").split(",").filter((v) => !!v) as + | ConsumerGroupState[] + | undefined; return ( } > - ); } -async function ConnectedConsumerGroupsTable({ - params: { kafkaId }, - searchParams, +async function AsyncConsumerGroupTable({ + kafkaId, + id, + sortDir, + sort, + pageCursor, + pageSize, + state, }: { - params: KafkaParams; - searchParams: { - perPage: string | undefined; - sort: string | undefined; - sortDir: string | undefined; - page: string | undefined; - }; -}) { + sort: SortableConsumerGroupTableColumns; + id: string | undefined; + sortDir: "asc" | "desc"; + pageSize: number; + pageCursor: string | undefined; + state: ConsumerGroupState[] | undefined; +} & KafkaParams) { async function refresh() { "use server"; - const res = await getConsumerGroups(kafkaId, searchParams); - return res?.data; + const consumerGroup = await getConsumerGroups(kafkaId, { + id, + sort: sortMap[sort], + sortDir, + pageSize, + pageCursor, + state, + }); + return consumerGroup?.data ?? []; } - const consumerGroups = await getConsumerGroups(kafkaId, searchParams); + const consumerGroups = await getConsumerGroups(kafkaId, { + id, + sort: sortMap[sort], + sortDir, + pageSize, + pageCursor, + state, + }); + + if (!consumerGroups) { + notFound(); + } + + const nextPageQuery = consumerGroups.links.next + ? new URLSearchParams(consumerGroups.links.next) + : undefined; + const nextPageCursor = nextPageQuery?.get("page[after]"); + const prevPageQuery = consumerGroups.links.prev + ? new URLSearchParams(consumerGroups.links.prev) + : undefined; + const prevPageCursor = prevPageQuery?.get("page[after]"); + return ( - ); diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/ConsumerGroupsTable.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/ConsumerGroupsTable.tsx new file mode 100644 index 000000000..1592f6a52 --- /dev/null +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/ConsumerGroupsTable.tsx @@ -0,0 +1,163 @@ +"use client"; + +import { ConsumerGroup } from "@/api/consumerGroups/schema"; +import { Number } from "@/components/Format/Number"; +import { LabelLink } from "@/components/Navigation/LabelLink"; +import { TableView } from "@/components/Table"; +import { LabelGroup, Tooltip } from "@/libs/patternfly/react-core"; +import { HelpIcon } from "@/libs/patternfly/react-icons"; +import { Link } from "@/navigation"; +import { useTranslations } from "next-intl"; +import { useEffect, useState } from "react"; + +export function ConsumerGroupsTable({ + kafkaId, + page, + total, + consumerGroups: initialData, + refresh, +}: { + kafkaId: string; + page: number; + total: number; + consumerGroups: ConsumerGroup[] | undefined; + refresh: (() => Promise) | undefined; +}) { + const t = useTranslations(); + const [consumerGroups, setConsumerGroups] = useState(initialData); + useEffect(() => { + let interval: ReturnType; + if (refresh) { + interval = setInterval(async () => { + const consumerGroups = await refresh(); + setConsumerGroups(consumerGroups); + }, 5000); + } + return () => clearInterval(interval); + }, [refresh]); + return ( + { }} + data={consumerGroups} + emptyStateNoData={ +
{t("ConsumerGroupsTable.no_consumer_groups")}
+ } + emptyStateNoResults={ +
{t("ConsumerGroupsTable.no_consumer_groups")}
+ } + ariaLabel={t("ConsumerGroupsTable.title")} + columns={["name", "state", "lag", "members", "topics"] as const} + renderHeader={({ column, key, Th }) => { + switch (column) { + case "name": + return ( + + {t("ConsumerGroupsTable.consumer_group_name")} + + ); + case "state": + return ( + + {t("ConsumerGroupsTable.state")}{" "} + + + + + ); + case "lag": + return ( + + {t("ConsumerGroupsTable.overall_lag")}{" "} + + + + + ); + case "members": + return ( + + {t("ConsumerGroupsTable.members")}{" "} + + + + + ); + case "topics": + return {t("ConsumerGroupsTable.topics")}; + } + }} + renderCell={({ row, column, key, Td }) => { + switch (column) { + case "name": + return ( + + + {row.id === "" ? ( + {t("ConsumerGroupsTable.empty_name")} + ) : ( + row.id + )} + + + ); + case "state": + return ( + + {row.attributes.state} + + ); + case "lag": + return ( + + o.lag) + // lag values may not be available from API, e.g. when there is an error listing the topic offsets + .reduce((acc, v) => (acc ?? NaN) + (v ?? NaN), 0)} + /> + + ); + case "topics": + const allTopics = + row.attributes.members?.flatMap((m) => m.assignments ?? []) ?? []; + return ( + + + {Array.from(new Set(allTopics.map((a) => a.topicName))).map( + (topic, idx) => ( + t.topicName === topic)!.topicId + }`} + > + {topic} + + ), + )} + + + ); + case "members": + return ( + + + + ); + } + }} + /> + ); +} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/page.tsx index 7c7b60d05..2e03ad89b 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/consumer-groups/page.tsx @@ -1,5 +1,5 @@ import { getTopicConsumerGroups } from "@/api/consumerGroups/actions"; -import { ConsumerGroupsTable } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable"; +import { ConsumerGroupsTable } from "./ConsumerGroupsTable"; import { KafkaTopicParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/topics/kafkaTopic.params"; import { PageSection } from "@/libs/patternfly/react-core"; import { notFound } from "next/navigation"; diff --git a/ui/messages/en.json b/ui/messages/en.json index 5cb068467..e389275e8 100644 --- a/ui/messages/en.json +++ b/ui/messages/en.json @@ -380,7 +380,50 @@ "members": "Members", "members_tooltip": "Represents an individual member consumer within the consumer group. Monitor the lag of each member for insights into the health of the consumer group.", "topics": "Topics", - "empty_name": "Empty Name" + "empty_name": "Empty Name", + "consumer_group_must_be_empty": "Consumer group must be empty", + "consumer_group_must_be_empty_description": "To reset consumer group offsets, this consumer group must be empty", + "close": "Close", + "refresh": "Refresh and retry", + "cancel": "Cancel", + "reset_offset": "Reset consumer offset", + "reset_offset_description": "Consumer group must be empty to reset offsets.", + "member_shutdown_helper_text": "To reset the consumer group offsets, all the active members must be shut down.", + "shutdown_active_members": "Shut down active members before proceeding", + "learn_to_shutdown_members": "Learn how to shut down members", + "reset_consumer_offset": "Reset consumer offset", + "consumer_name": "Consumer group: {consumerGroupName}", + "target": "Target", + "apply_action_on": "Apply action on", + "all_consumer_topics": "All consumer topics", + "selected_topic": "A selected topic", + "offset_details": "Offset details", + "new_offset": "New offset", + "offset": { + "custom": "Custom offset", + "earliest": "Earliest offset", + "latest": "Latest offset", + "specific_date_time": "Specific Date time" + }, + "custom_offset": "Custom offset", + "save": "Save", + "dry_run": "Dry run", + "partitions": "Partitions", + "all_partitions": "All partitions", + "selected_partition": "A selected partition", + "select_date_time": "Select Date time", + "iso_date_format": "ISO 8601 Date-Time Format", + "unix_date_format": "Unix Epoch Milliseconds", + "dry_run_result": "Dry Run results", + "download_dryrun_result": "Download Dry Run results", + "jump_to_topic": "Jump to topic", + "dry_run_execution_alert": "Dry Run results are accurate as of the execution time, but they may differ when performing the live reset of offset changes", + "back_to_edit_offset": "Back to edit offset", + "topic": "Topic", + "partition": "Partition", + "cli_command": "$kafka-consumer-groups --bootstrap-server localhost:9092 --group my-consumer-group --reset-offsets --topic mytopic --to-earliest --dry-run", + "resetting_spinner": "Resetting consumer group offsets.", + "reseting_consumer_group_offsets_text": "Resetting consumer group offsets. This might take a few moments" }, "CreateTopic": { "title": "Topic creation wizard",