Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTAdmin: Initiate workflow details tab #16570

Merged
merged 11 commits into from
Aug 27, 2024
6 changes: 6 additions & 0 deletions web/vtadmin/src/components/routes/workflow/Workflow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { WorkspaceTitle } from '../../layout/WorkspaceTitle';
import { useDocumentTitle } from '../../../hooks/useDocumentTitle';
import { KeyspaceLink } from '../../links/KeyspaceLink';
import { WorkflowStreams } from './WorkflowStreams';
import { WorkflowDetails } from './WorkflowDetails';
import { ContentContainer } from '../../layout/ContentContainer';
import { TabContainer } from '../../tabs/TabContainer';
import { Tab } from '../../tabs/Tab';
Expand Down Expand Up @@ -74,6 +75,7 @@ export const Workflow = () => {
<ContentContainer>
<TabContainer>
<Tab text="Streams" to={`${url}/streams`} count={streams.length} />
<Tab text="Details" to={`${url}/details`} />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we load the Details info all the time? If so, we should only load on clicking the tab, since there are a large number of shards that could take a lot of time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are loading the details info when we click on the tab.

<Tab text="JSON" to={`${url}/json`} />
</TabContainer>

Expand All @@ -82,6 +84,10 @@ export const Workflow = () => {
<WorkflowStreams clusterID={clusterID} keyspace={keyspace} name={name} />
</Route>

<Route path={`${path}/details`}>
<WorkflowDetails clusterID={clusterID} keyspace={keyspace} name={name} />
</Route>

<Route path={`${path}/json`}>
<Code code={JSON.stringify(data, null, 2)} />
</Route>
Expand Down
260 changes: 260 additions & 0 deletions web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/**
* Copyright 2024 The Vitess Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { orderBy } from 'lodash-es';
import React, { useMemo } from 'react';
import { Link } from 'react-router-dom';

import { useWorkflow, useWorkflowStatus, useWorkflows } from '../../../hooks/api';
import { formatDateTime } from '../../../util/time';
import {
TableCopyState,
formatStreamKey,
getReverseWorkflow,
getStreams,
getTableCopyStates,
} from '../../../util/workflows';
import { DataTable } from '../../dataTable/DataTable';
import { vtctldata } from '../../../proto/vtadmin';
import { DataCell } from '../../dataTable/DataCell';
import { StreamStatePip } from '../../pips/StreamStatePip';
import { ThrottleThresholdSeconds } from '../Workflows';

interface Props {
clusterID: string;
keyspace: string;
name: string;
}

const SUMMARY_COLUMNS = ['Stream Status', 'Traffic Status', 'Max VReplication Lag', 'Reverse Workflow'];

const LOG_COLUMNS = ['Type', 'State', 'Updated At', 'Message', 'Count'];

const TABLE_COPY_STATE_COLUMNS = ['Table Name', 'Total Bytes', 'Bytes Copied', 'Total Rows', 'Rows Copied'];

const STREAM_COLUMNS = ['Stream', 'State', 'Message', 'Transaction Timestamp', 'Database Name'];

export const WorkflowDetails = ({ clusterID, keyspace, name }: Props) => {
const { data: workflowData } = useWorkflow({ clusterID, keyspace, name });

const { data: workflowsData = [] } = useWorkflows();

const { data: workflowStatus } = useWorkflowStatus({
clusterID,
keyspace,
name,
});

const reverseWorkflow = getReverseWorkflow(workflowsData, workflowData);

const tableCopyStates = getTableCopyStates(workflowStatus);

const streams = useMemo(() => {
const rows = getStreams(workflowData).map((stream) => ({
key: formatStreamKey(stream),
...stream,
}));

return orderBy(rows, 'streamKey');
}, [workflowData]);

const getStreamsSummary = (streamList: typeof streams): string => {
const numStreamsByState: { [key: string]: number } = {
Copying: 0,
Throttled: 0,
Stopped: 0,
Running: 0,
Error: 0,
};
streamList.forEach((stream) => {
var isThrottled =
Number(stream.throttler_status?.time_throttled?.seconds) > Date.now() / 1000 - ThrottleThresholdSeconds;
const streamState = isThrottled ? 'Throttled' : stream.state;
if (streamState) {
numStreamsByState[streamState]++;
}
});
const states = Object.keys(numStreamsByState);
let message = '';
states.forEach((state) => {
if (numStreamsByState[state]) {
let desc = state;
if (state === 'Error') {
desc = 'Failed';
}
desc += numStreamsByState[state] > 1 ? ' Streams' : ' Stream';
message += `${numStreamsByState[state]} ${desc}. `;
}
});
return message;
};

const workflowSummary = {
streamSummary: getStreamsSummary(streams),
workflowStatus,
workflowData,
reverseWorkflow,
};

const renderSummaryRows = (rows: (typeof workflowSummary)[]) => {
return rows.map((row) => {
const reverseWorkflow = row.reverseWorkflow;
return (
<tr key={reverseWorkflow?.workflow?.name}>
<DataCell>{row.streamSummary ? row.streamSummary : '-'}</DataCell>
<DataCell>{row.workflowStatus ? row.workflowStatus.traffic_state : '-'}</DataCell>
<DataCell>
{row.workflowData && row.workflowData.workflow?.max_v_replication_lag
? `${row.workflowData.workflow?.max_v_replication_lag}`
: '-'}
</DataCell>
<DataCell>
{reverseWorkflow ? (
<Link
to={`/workflow/${reverseWorkflow.cluster?.id}/${reverseWorkflow.keyspace}/${reverseWorkflow.workflow?.name}`}
className="text-base"
>
{reverseWorkflow.workflow?.name}
</Link>
) : (
'-'
)}
</DataCell>
</tr>
);
});
};

const renderStreamRows = (rows: typeof streams) => {
return rows.map((row) => {
const href =
row.tablet && row.id
? `/workflow/${clusterID}/${keyspace}/${name}/stream/${row.tablet.cell}/${row.tablet.uid}/${row.id}`
: null;

var isThrottled =
Number(row?.throttler_status?.time_throttled?.seconds) > Date.now() / 1000 - ThrottleThresholdSeconds;
const rowState = isThrottled ? 'Throttled' : row.state;
return (
<tr key={row.key}>
<DataCell>
<StreamStatePip state={rowState} />{' '}
<Link className="font-bold" to={href}>
{row.key}
</Link>
<div className="text-sm text-secondary">
Updated {formatDateTime(row.time_updated?.seconds)}
</div>
{isThrottled ? (
<div className="text-sm text-secondary">
<span className="font-bold text-danger">Throttled: </span>
in {row.throttler_status?.component_throttled}
</div>
) : null}
</DataCell>
<DataCell>{rowState}</DataCell>
<DataCell>{row.message ? row.message : '-'}</DataCell>
<DataCell>
{row.transaction_timestamp && row.transaction_timestamp.seconds
? formatDateTime(row.transaction_timestamp.seconds)
: '-'}
</DataCell>
<DataCell>{row.db_name}</DataCell>
</tr>
);
});
};

const renderLogRows = (rows: vtctldata.Workflow.Stream.ILog[]) => {
return rows.map((row) => {
let message: string = row.message ? `${row.message}` : '-';
// TODO: Investigate if message needs to be JSON parsed in case of "Stream Created"
if (row.type === 'Stream Created') {
message = '-';
}
return (
<tr key={`${row.id}`}>
<DataCell>{`${row.type}`}</DataCell>
<DataCell>{`${row.state}`}</DataCell>
<DataCell>{`${formatDateTime(parseInt(`${row.updated_at?.seconds}`, 10))}`}</DataCell>
<DataCell>{message}</DataCell>
<DataCell>{`${row.count}`}</DataCell>
</tr>
);
});
};

const renderTableCopyStateRows = (tableCopyStates: TableCopyState[]) => {
return tableCopyStates.map((copyState, index) => {
const tableKey = `${copyState.tableName}/${index}`;
return (
<tr key={tableKey}>
<DataCell>{`${copyState.tableName}`}</DataCell>
<DataCell>{copyState.bytes_total ? `${copyState.bytes_total}` : `N/A`}</DataCell>
<DataCell>
{copyState.bytes_copied ? `${copyState.bytes_copied}` : `N/A`}{' '}
{copyState.bytes_percentage ? `(${copyState.bytes_percentage}%)` : ``}
</DataCell>
<DataCell>{copyState.rows_total ? `${copyState.rows_total}` : `N/A`}</DataCell>
<DataCell>
{copyState.rows_copied ? `${copyState.rows_copied}` : `N/A`}{' '}
{copyState.rows_percentage ? `(${copyState.rows_percentage}%)` : ``}
</DataCell>
</tr>
);
});
};

return (
<div className="mt-12 mb-16">
<DataTable
columns={SUMMARY_COLUMNS}
data={[workflowSummary]}
renderRows={renderSummaryRows}
pageSize={1}
title="Summary"
/>
<DataTable
columns={STREAM_COLUMNS}
data={streams}
renderRows={renderStreamRows}
pageSize={10}
title="Streams"
/>
{tableCopyStates && (
<DataTable
columns={TABLE_COPY_STATE_COLUMNS}
data={tableCopyStates}
renderRows={renderTableCopyStateRows}
pageSize={1000}
title="Table Copy State"
/>
)}
<h3 className="mt-8 mb-4">Recent Logs</h3>
{streams.map((stream) => (
<div className="mt-2" key={stream.key}>
<DataTable
columns={LOG_COLUMNS}
data={orderBy(stream.logs, 'id', 'desc')}
renderRows={renderLogRows}
pageSize={10}
title={stream.key!}
/>
</div>
))}
</div>
);
};
46 changes: 46 additions & 0 deletions web/vtadmin/src/util/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,49 @@ export const getStreamTablets = <W extends pb.IWorkflow>(workflow: W | null | un

return [...aliases];
};

/**
* getReverseWorkflow returns the reverse workflow of `originalWorkflow` by looking for the '_reverse'
* suffix and the source and target keyspace from all `workflows` list.
*/
export const getReverseWorkflow = <W extends pb.Workflow>(
workflows: W[],
originalWorkflow: W | undefined | null
): W | undefined => {
if (!originalWorkflow) return;
const originalWorkflowName = originalWorkflow.workflow?.name!;
let reverseWorkflowName = originalWorkflowName.concat('_reverse');
if (originalWorkflowName.endsWith('_reverse')) {
reverseWorkflowName = originalWorkflowName.split('_reverse')[0];
}
return workflows.find(
(workflow) =>
workflow.workflow?.name === reverseWorkflowName &&
workflow.workflow?.source?.keyspace === originalWorkflow.workflow?.target?.keyspace &&
workflow.workflow?.target?.keyspace === originalWorkflow.workflow?.source?.keyspace
);
};

export interface TableCopyState extends vtctldata.WorkflowStatusResponse.ITableCopyState {
tableName: string;
}

/**
* getTableCopyStates returns a list of table copy states with `tableName` included
* in the `TableCopyState` object, from the `workflowStatus` output.
*/
export const getTableCopyStates = (
workflowStatus: vtctldata.WorkflowStatusResponse | undefined
): TableCopyState[] | undefined => {
if (!workflowStatus) return;
const tableNames = Object.keys(workflowStatus.table_copy_state);
if (!tableNames.length) return;
const tableCopyState: TableCopyState[] = tableNames.map((tableName) => {
const tableState = workflowStatus.table_copy_state[tableName];
return {
tableName,
...tableState,
};
});
return tableCopyState;
};
Loading