diff --git a/web/vtadmin/src/components/routes/workflow/Workflow.tsx b/web/vtadmin/src/components/routes/workflow/Workflow.tsx index a81901786d8..4cbd827ee30 100644 --- a/web/vtadmin/src/components/routes/workflow/Workflow.tsx +++ b/web/vtadmin/src/components/routes/workflow/Workflow.tsx @@ -24,6 +24,7 @@ import { WorkspaceTitle } from '../../layout/WorkspaceTitle'; import { useDocumentTitle } from '../../../hooks/useDocumentTitle'; import { KeyspaceLink } from '../../links/KeyspaceLink'; import { WorkflowStreams } from './WorkflowStreams'; +import { WorkflowDetails } from './WorkflowDetails'; import { ContentContainer } from '../../layout/ContentContainer'; import { TabContainer } from '../../tabs/TabContainer'; import { Tab } from '../../tabs/Tab'; @@ -74,6 +75,7 @@ export const Workflow = () => { + @@ -82,6 +84,10 @@ export const Workflow = () => { + + + + diff --git a/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx b/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx new file mode 100644 index 00000000000..93023274b27 --- /dev/null +++ b/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx @@ -0,0 +1,260 @@ +/** + * Copyright 2024 The Vitess Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { orderBy } from 'lodash-es'; +import React, { useMemo } from 'react'; +import { Link } from 'react-router-dom'; + +import { useWorkflow, useWorkflowStatus, useWorkflows } from '../../../hooks/api'; +import { formatDateTime } from '../../../util/time'; +import { + TableCopyState, + formatStreamKey, + getReverseWorkflow, + getStreams, + getTableCopyStates, +} from '../../../util/workflows'; +import { DataTable } from '../../dataTable/DataTable'; +import { vtctldata } from '../../../proto/vtadmin'; +import { DataCell } from '../../dataTable/DataCell'; +import { StreamStatePip } from '../../pips/StreamStatePip'; +import { ThrottleThresholdSeconds } from '../Workflows'; + +interface Props { + clusterID: string; + keyspace: string; + name: string; +} + +const SUMMARY_COLUMNS = ['Stream Status', 'Traffic Status', 'Max VReplication Lag', 'Reverse Workflow']; + +const LOG_COLUMNS = ['Type', 'State', 'Updated At', 'Message', 'Count']; + +const TABLE_COPY_STATE_COLUMNS = ['Table Name', 'Total Bytes', 'Bytes Copied', 'Total Rows', 'Rows Copied']; + +const STREAM_COLUMNS = ['Stream', 'State', 'Message', 'Transaction Timestamp', 'Database Name']; + +export const WorkflowDetails = ({ clusterID, keyspace, name }: Props) => { + const { data: workflowData } = useWorkflow({ clusterID, keyspace, name }); + + const { data: workflowsData = [] } = useWorkflows(); + + const { data: workflowStatus } = useWorkflowStatus({ + clusterID, + keyspace, + name, + }); + + const reverseWorkflow = getReverseWorkflow(workflowsData, workflowData); + + const tableCopyStates = getTableCopyStates(workflowStatus); + + const streams = useMemo(() => { + const rows = getStreams(workflowData).map((stream) => ({ + key: formatStreamKey(stream), + ...stream, + })); + + return orderBy(rows, 'streamKey'); + }, [workflowData]); + + const getStreamsSummary = (streamList: typeof streams): string => { + const numStreamsByState: { [key: string]: number } = { + Copying: 0, + Throttled: 0, + Stopped: 0, + Running: 0, + Error: 0, + }; + streamList.forEach((stream) => { + var isThrottled = + Number(stream.throttler_status?.time_throttled?.seconds) > Date.now() / 1000 - ThrottleThresholdSeconds; + const streamState = isThrottled ? 'Throttled' : stream.state; + if (streamState) { + numStreamsByState[streamState]++; + } + }); + const states = Object.keys(numStreamsByState); + let message = ''; + states.forEach((state) => { + if (numStreamsByState[state]) { + let desc = state; + if (state === 'Error') { + desc = 'Failed'; + } + desc += numStreamsByState[state] > 1 ? ' Streams' : ' Stream'; + message += `${numStreamsByState[state]} ${desc}. `; + } + }); + return message; + }; + + const workflowSummary = { + streamSummary: getStreamsSummary(streams), + workflowStatus, + workflowData, + reverseWorkflow, + }; + + const renderSummaryRows = (rows: (typeof workflowSummary)[]) => { + return rows.map((row) => { + const reverseWorkflow = row.reverseWorkflow; + return ( + + {row.streamSummary ? row.streamSummary : '-'} + {row.workflowStatus ? row.workflowStatus.traffic_state : '-'} + + {row.workflowData && row.workflowData.workflow?.max_v_replication_lag + ? `${row.workflowData.workflow?.max_v_replication_lag}` + : '-'} + + + {reverseWorkflow ? ( + + {reverseWorkflow.workflow?.name} + + ) : ( + '-' + )} + + + ); + }); + }; + + const renderStreamRows = (rows: typeof streams) => { + return rows.map((row) => { + const href = + row.tablet && row.id + ? `/workflow/${clusterID}/${keyspace}/${name}/stream/${row.tablet.cell}/${row.tablet.uid}/${row.id}` + : null; + + var isThrottled = + Number(row?.throttler_status?.time_throttled?.seconds) > Date.now() / 1000 - ThrottleThresholdSeconds; + const rowState = isThrottled ? 'Throttled' : row.state; + return ( + + + {' '} + + {row.key} + +
+ Updated {formatDateTime(row.time_updated?.seconds)} +
+ {isThrottled ? ( +
+ Throttled: + in {row.throttler_status?.component_throttled} +
+ ) : null} +
+ {rowState} + {row.message ? row.message : '-'} + + {row.transaction_timestamp && row.transaction_timestamp.seconds + ? formatDateTime(row.transaction_timestamp.seconds) + : '-'} + + {row.db_name} + + ); + }); + }; + + const renderLogRows = (rows: vtctldata.Workflow.Stream.ILog[]) => { + return rows.map((row) => { + let message: string = row.message ? `${row.message}` : '-'; + // TODO: Investigate if message needs to be JSON parsed in case of "Stream Created" + if (row.type === 'Stream Created') { + message = '-'; + } + return ( + + {`${row.type}`} + {`${row.state}`} + {`${formatDateTime(parseInt(`${row.updated_at?.seconds}`, 10))}`} + {message} + {`${row.count}`} + + ); + }); + }; + + const renderTableCopyStateRows = (tableCopyStates: TableCopyState[]) => { + return tableCopyStates.map((copyState, index) => { + const tableKey = `${copyState.tableName}/${index}`; + return ( + + {`${copyState.tableName}`} + {copyState.bytes_total ? `${copyState.bytes_total}` : `N/A`} + + {copyState.bytes_copied ? `${copyState.bytes_copied}` : `N/A`}{' '} + {copyState.bytes_percentage ? `(${copyState.bytes_percentage}%)` : ``} + + {copyState.rows_total ? `${copyState.rows_total}` : `N/A`} + + {copyState.rows_copied ? `${copyState.rows_copied}` : `N/A`}{' '} + {copyState.rows_percentage ? `(${copyState.rows_percentage}%)` : ``} + + + ); + }); + }; + + return ( +
+ + + {tableCopyStates && ( + + )} +

Recent Logs

+ {streams.map((stream) => ( +
+ +
+ ))} +
+ ); +}; diff --git a/web/vtadmin/src/util/workflows.ts b/web/vtadmin/src/util/workflows.ts index 66e8d765961..3a0a03dc51b 100644 --- a/web/vtadmin/src/util/workflows.ts +++ b/web/vtadmin/src/util/workflows.ts @@ -94,3 +94,49 @@ export const getStreamTablets = (workflow: W | null | un return [...aliases]; }; + +/** + * getReverseWorkflow returns the reverse workflow of `originalWorkflow` by looking for the '_reverse' + * suffix and the source and target keyspace from all `workflows` list. + */ +export const getReverseWorkflow = ( + workflows: W[], + originalWorkflow: W | undefined | null +): W | undefined => { + if (!originalWorkflow) return; + const originalWorkflowName = originalWorkflow.workflow?.name!; + let reverseWorkflowName = originalWorkflowName.concat('_reverse'); + if (originalWorkflowName.endsWith('_reverse')) { + reverseWorkflowName = originalWorkflowName.split('_reverse')[0]; + } + return workflows.find( + (workflow) => + workflow.workflow?.name === reverseWorkflowName && + workflow.workflow?.source?.keyspace === originalWorkflow.workflow?.target?.keyspace && + workflow.workflow?.target?.keyspace === originalWorkflow.workflow?.source?.keyspace + ); +}; + +export interface TableCopyState extends vtctldata.WorkflowStatusResponse.ITableCopyState { + tableName: string; +} + +/** + * getTableCopyStates returns a list of table copy states with `tableName` included + * in the `TableCopyState` object, from the `workflowStatus` output. + */ +export const getTableCopyStates = ( + workflowStatus: vtctldata.WorkflowStatusResponse | undefined +): TableCopyState[] | undefined => { + if (!workflowStatus) return; + const tableNames = Object.keys(workflowStatus.table_copy_state); + if (!tableNames.length) return; + const tableCopyState: TableCopyState[] = tableNames.map((tableName) => { + const tableState = workflowStatus.table_copy_state[tableName]; + return { + tableName, + ...tableState, + }; + }); + return tableCopyState; +};