diff --git a/dashboard/components/Relations.tsx b/dashboard/components/Relations.tsx index e8bbc17a172aa..67feb2ac81cad 100644 --- a/dashboard/components/Relations.tsx +++ b/dashboard/components/Relations.tsx @@ -35,7 +35,7 @@ import Title from "../components/Title" import useFetch from "../lib/api/fetch" import { Relation, - StreamingJob, + StreamingRelation, getDatabases, getSchemas, getUsers, @@ -73,7 +73,7 @@ export const dependentsColumn: Column = { ), } -export const fragmentsColumn: Column = { +export const fragmentsColumn: Column = { name: "Fragments", width: 1, content: (r) => ( diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts index 211bd1b6bbc4c..ac7ba30d1258e 100644 --- a/dashboard/lib/api/streaming.ts +++ b/dashboard/lib/api/streaming.ts @@ -15,6 +15,7 @@ * */ +import { Expose, plainToInstance } from "class-transformer" import _ from "lodash" import sortBy from "lodash/sortBy" import { @@ -53,14 +54,6 @@ export async function getRelationIdInfos(): Promise { return fragmentIds } -export async function getFragments(): Promise { - let fragmentList: TableFragments[] = (await api.get("/fragments2")).map( - TableFragments.fromJSON - ) - fragmentList = sortBy(fragmentList, (x) => x.tableId) - return fragmentList -} - export interface Relation { id: number name: string @@ -75,7 +68,43 @@ export interface Relation { databaseName?: string } -export interface StreamingJob extends Relation { +export class StreamingJob { + @Expose({ name: "jobId" }) + id!: number + @Expose({ name: "objType" }) + _objType!: string + name!: string + jobStatus!: string + @Expose({ name: "parallelism" }) + _parallelism!: any + maxParallelism!: number + + get parallelism() { + const parallelism = this._parallelism + if (typeof parallelism === "string") { + // `Adaptive` + return parallelism + } else if (typeof parallelism === "object") { + // `Fixed (64)` + let key = Object.keys(parallelism)[0] + let value = parallelism[key] + return `${key} (${value})` + } else { + // fallback + return JSON.stringify(parallelism) + } + } + + get type() { + if (this._objType == "Table") { + return "Table / MV" + } else { + return this._objType + } + } +} + +export interface StreamingRelation extends Relation { dependentRelations: number[] } @@ -98,17 +127,15 @@ export function relationTypeTitleCase(x: Relation) { return _.startCase(_.toLower(relationType(x))) } -export function relationIsStreamingJob(x: Relation): x is StreamingJob { +export function relationIsStreamingJob(x: Relation): x is StreamingRelation { const type = relationType(x) return type !== "UNKNOWN" && type !== "SOURCE" && type !== "INTERNAL" } export async function getStreamingJobs() { - let jobs = _.concat( - await getMaterializedViews(), - await getTables(), - await getIndexes(), - await getSinks() + let jobs = plainToInstance( + StreamingJob, + (await api.get("/streaming_jobs")) as any[] ) jobs = sortBy(jobs, (x) => x.id) return jobs diff --git a/dashboard/package-lock.json b/dashboard/package-lock.json index 78bc775791ee2..ce861243e27cf 100644 --- a/dashboard/package-lock.json +++ b/dashboard/package-lock.json @@ -17,6 +17,7 @@ "@uidotdev/usehooks": "^2.4.1", "base64url": "^3.0.1", "bootstrap-icons": "^1.9.1", + "class-transformer": "^0.5.1", "d3": "^7.6.1", "d3-axis": "^3.0.0", "d3-dag": "^0.11.4", @@ -35,6 +36,7 @@ "react-json-view": "^1.21.3", "react-syntax-highlighter": "^15.5.0", "recharts": "^2.3.2", + "reflect-metadata": "^0.2.2", "styled-components": "5.3.0", "ts-proto": "^1.169.1" }, @@ -4135,6 +4137,11 @@ "integrity": "sha512-xmDt/QIAdeZ9+nfdPsaBCpMvHNLFiLdjj59qjqn+6iPe6YmHGQ35sBnQ8uslRBXFmXkiZQOJRjvQeoGppoTjjg==", "dev": true }, + "node_modules/class-transformer": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/class-transformer/-/class-transformer-0.5.1.tgz", + "integrity": "sha512-SQa1Ws6hUbfC98vKGxZH3KFY0Y1lm5Zm0SY8XX9zbK7FJCyVEac3ATW0RIpwzW+oOfmHE5PMPufDG9hCfoEOMw==" + }, "node_modules/classcat": { "version": "5.0.4", "resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.4.tgz", @@ -9801,6 +9808,11 @@ "resolved": "https://registry.npmjs.org/postcss-value-parser/-/postcss-value-parser-3.3.1.tgz", "integrity": "sha512-pISE66AbVkp4fDQ7VHBwRNXzAAKJjw4Vw7nWI/+Q3vuly7SNfgYXvm6i5IgFylHGK5sP/xHAbB7N49OS4gWNyQ==" }, + "node_modules/reflect-metadata": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/reflect-metadata/-/reflect-metadata-0.2.2.tgz", + "integrity": "sha512-urBwgfrvVP/eAyXx4hluJivBKzuEbSQs9rKWCrCkbSxNv8mxPcUZKeuoF3Uy4mJl3Lwprp6yy5/39VWigZ4K6Q==" + }, "node_modules/reflect.getprototypeof": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.4.tgz", @@ -14811,6 +14823,11 @@ "integrity": "sha512-xmDt/QIAdeZ9+nfdPsaBCpMvHNLFiLdjj59qjqn+6iPe6YmHGQ35sBnQ8uslRBXFmXkiZQOJRjvQeoGppoTjjg==", "dev": true }, + "class-transformer": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/class-transformer/-/class-transformer-0.5.1.tgz", + "integrity": "sha512-SQa1Ws6hUbfC98vKGxZH3KFY0Y1lm5Zm0SY8XX9zbK7FJCyVEac3ATW0RIpwzW+oOfmHE5PMPufDG9hCfoEOMw==" + }, "classcat": { "version": "5.0.4", "resolved": "https://registry.npmjs.org/classcat/-/classcat-5.0.4.tgz", @@ -18962,6 +18979,11 @@ } } }, + "reflect-metadata": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/reflect-metadata/-/reflect-metadata-0.2.2.tgz", + "integrity": "sha512-urBwgfrvVP/eAyXx4hluJivBKzuEbSQs9rKWCrCkbSxNv8mxPcUZKeuoF3Uy4mJl3Lwprp6yy5/39VWigZ4K6Q==" + }, "reflect.getprototypeof": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.4.tgz", diff --git a/dashboard/package.json b/dashboard/package.json index 5e01f71c3686b..79897d63b19c8 100644 --- a/dashboard/package.json +++ b/dashboard/package.json @@ -24,6 +24,7 @@ "@uidotdev/usehooks": "^2.4.1", "base64url": "^3.0.1", "bootstrap-icons": "^1.9.1", + "class-transformer": "^0.5.1", "d3": "^7.6.1", "d3-axis": "^3.0.0", "d3-dag": "^0.11.4", @@ -42,6 +43,7 @@ "react-json-view": "^1.21.3", "react-syntax-highlighter": "^15.5.0", "recharts": "^2.3.2", + "reflect-metadata": "^0.2.2", "styled-components": "5.3.0", "ts-proto": "^1.169.1" }, diff --git a/dashboard/pages/_app.tsx b/dashboard/pages/_app.tsx index 25746318c1d24..8dd4d2cc1079d 100644 --- a/dashboard/pages/_app.tsx +++ b/dashboard/pages/_app.tsx @@ -15,6 +15,7 @@ * */ import "bootstrap-icons/font/bootstrap-icons.css" +import "reflect-metadata" import "../styles/global.css" import { ChakraProvider } from "@chakra-ui/react" diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 3ffbda24b232f..2312334e53abe 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -24,7 +24,12 @@ import { HStack, Input, Select, + Table, + TableContainer, + Tbody, + Td, Text, + Tr, VStack, } from "@chakra-ui/react" import * as d3 from "d3" @@ -184,7 +189,7 @@ function buildFragmentDependencyAsEdges( return nodes } -const SIDEBAR_WIDTH = 200 +const SIDEBAR_WIDTH = 225 type BackPressureDataSource = "Embedded" | "Prometheus" const backPressureDataSources: BackPressureDataSource[] = [ @@ -202,23 +207,28 @@ interface EmbeddedBackPressureInfo { } export default function Streaming() { - const { response: relationList } = useFetch(getStreamingJobs) + const { response: streamingJobList } = useFetch(getStreamingJobs) const { response: relationIdInfos } = useFetch(getRelationIdInfos) - const [relationId, setRelationId] = useQueryState("id", parseAsInteger) + const [jobId, setJobId] = useQueryState("id", parseAsInteger) const [selectedFragmentId, setSelectedFragmentId] = useState() const [tableFragments, setTableFragments] = useState() + const job = useMemo( + () => streamingJobList?.find((j) => j.id === jobId), + [streamingJobList, jobId] + ) + const toast = useErrorToast() useEffect(() => { - if (relationId) { + if (jobId) { setTableFragments(undefined) - getFragmentsByJobId(relationId).then((tf) => { + getFragmentsByJobId(jobId).then((tf) => { setTableFragments(tf) }) } - }, [relationId]) + }, [jobId]) const fragmentDependencyCallback = useCallback(() => { if (tableFragments) { @@ -232,14 +242,14 @@ export default function Streaming() { }, [tableFragments]) useEffect(() => { - if (relationList) { - if (!relationId) { - if (relationList.length > 0) { - setRelationId(relationList[0].id) + if (streamingJobList) { + if (!jobId) { + if (streamingJobList.length > 0) { + setJobId(streamingJobList[0].id) } } } - }, [relationId, relationList, setRelationId]) + }, [jobId, streamingJobList, setJobId]) // The table fragments of the selected fragment id const fragmentDependency = fragmentDependencyCallback()?.fragmentDep @@ -276,7 +286,7 @@ export default function Streaming() { const fragmentIdToRelationId = map[relationId].map for (const fragmentId in fragmentIdToRelationId) { if (parseInt(fragmentId) == searchFragIdInt) { - setRelationId(parseInt(relationId)) + setJobId(parseInt(relationId)) setSelectedFragmentId(searchFragIdInt) return } @@ -295,7 +305,7 @@ export default function Streaming() { for (const fragmentId in fragmentIdToRelationId) { let actorIds = fragmentIdToRelationId[fragmentId].ids if (actorIds.includes(searchActorIdInt)) { - setRelationId(parseInt(relationId)) + setJobId(parseInt(relationId)) setSelectedFragmentId(parseInt(fragmentId)) return } @@ -406,41 +416,70 @@ export default function Streaming() { height="full" > - Relations + Streaming Jobs { - const id = relationList?.find( + const id = streamingJobList?.find( (x) => x.name == event.target.value )?.id if (id) { - setRelationId(id) + setJobId(id) } }} placeholder="Search..." mb={2} > - {relationList && - relationList.map((r) => ( + {streamingJobList && + streamingJobList.map((r) => ( ))} + {job && ( + + Information + + + + + + + + + + + + + + + + + + + + +
Type{job.type}
Status{job.jobStatus}
Parallelism{job.parallelism}
+ Max Parallelism + {job.maxParallelism}
+
+
+ )} Goto diff --git a/dashboard/tsconfig.json b/dashboard/tsconfig.json index d88f23ca2cf51..42256872b36d1 100644 --- a/dashboard/tsconfig.json +++ b/dashboard/tsconfig.json @@ -18,6 +18,8 @@ "isolatedModules": true, "jsx": "preserve", "incremental": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, }, "include": [ "next-env.d.ts", diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 4adde9de62a26..0e4f061e08dcc 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_connector::source::SplitMetaData; +use risingwave_meta::controller::fragment::StreamingJobInfo; use risingwave_meta::manager::{LocalNotification, MetadataManager}; use risingwave_meta::model; use risingwave_meta::model::ActorId; @@ -222,27 +223,35 @@ impl StreamManagerService for StreamServiceImpl { &self, _request: Request, ) -> Result, Status> { - let job_states = self + let job_infos = self .metadata_manager .catalog_controller - .list_streaming_job_states() + .list_streaming_job_infos() .await?; - let states = job_states + let states = job_infos .into_iter() - .map(|(table_id, state, parallelism, max_parallelism)| { - let parallelism = match parallelism { - StreamingParallelism::Adaptive => model::TableParallelism::Adaptive, - StreamingParallelism::Custom => model::TableParallelism::Custom, - StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _), - }; - - list_table_fragment_states_response::TableFragmentState { - table_id: table_id as _, - state: PbState::from(state) as _, - parallelism: Some(parallelism.into()), - max_parallelism: max_parallelism as _, - } - }) + .map( + |StreamingJobInfo { + job_id, + job_status, + parallelism, + max_parallelism, + .. + }| { + let parallelism = match parallelism { + StreamingParallelism::Adaptive => model::TableParallelism::Adaptive, + StreamingParallelism::Custom => model::TableParallelism::Custom, + StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _), + }; + + list_table_fragment_states_response::TableFragmentState { + table_id: job_id as _, + state: PbState::from(job_status) as _, + parallelism: Some(parallelism.into()), + max_parallelism: max_parallelism as _, + } + }, + ) .collect_vec(); Ok(Response::new(ListTableFragmentStatesResponse { states })) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 38b1fd365c041..40fdbac5eb21c 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -24,11 +24,12 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::util::worker_util::WorkerNodeId; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; +use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ActorUpstreamActors, - ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, - SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, + actor, actor_dispatcher, fragment, object, sink, source, streaming_job, table, ActorId, + ActorUpstreamActors, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, + ObjectId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; @@ -51,9 +52,10 @@ use risingwave_pb::stream_plan::{ use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ - ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, - QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value, + ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait, + QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value, }; +use serde::{Deserialize, Serialize}; use tracing::debug; use crate::controller::catalog::{CatalogController, CatalogControllerInner}; @@ -79,6 +81,17 @@ pub struct FragmentParallelismInfo { pub vnode_count: usize, } +#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] // for dashboard +pub struct StreamingJobInfo { + pub job_id: ObjectId, + pub obj_type: ObjectType, + pub name: String, + pub job_status: JobStatus, + pub parallelism: StreamingParallelism, + pub max_parallelism: i32, +} + impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. pub async fn all_running_fragment_mappings( @@ -690,19 +703,35 @@ impl CatalogController { ) } - pub async fn list_streaming_job_states( - &self, - ) -> MetaResult> { + pub async fn list_streaming_job_infos(&self) -> MetaResult> { let inner = self.inner.read().await; let job_states = StreamingJob::find() .select_only() + .column(streaming_job::Column::JobId) + .join(JoinType::InnerJoin, streaming_job::Relation::Object.def()) + .column(object::Column::ObjType) + .join(JoinType::LeftJoin, table::Relation::Object1.def().rev()) + .join(JoinType::LeftJoin, source::Relation::Object.def().rev()) + .join(JoinType::LeftJoin, sink::Relation::Object.def().rev()) + .column_as( + Expr::if_null( + Expr::col((table::Entity, table::Column::Name)), + Expr::if_null( + Expr::col((source::Entity, source::Column::Name)), + Expr::if_null( + Expr::col((sink::Entity, sink::Column::Name)), + Expr::val(""), + ), + ), + ), + "name", + ) .columns([ - streaming_job::Column::JobId, streaming_job::Column::JobStatus, streaming_job::Column::Parallelism, streaming_job::Column::MaxParallelism, ]) - .into_tuple() + .into_model() .all(&inner.db) .await?; Ok(job_states) diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 2bef0505ce443..32a3388cbcc90 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -76,6 +76,7 @@ pub(super) mod handlers { use thiserror_ext::AsReport; use super::*; + use crate::controller::fragment::StreamingJobInfo; pub struct DashboardError(anyhow::Error); pub type Result = std::result::Result; @@ -191,20 +192,17 @@ pub(super) mod handlers { Ok(Json(views)) } - pub async fn list_fragments( + pub async fn list_streaming_jobs( Extension(srv): Extension, - ) -> Result>> { - let table_fragments = srv + ) -> Result>> { + let streaming_jobs = srv .metadata_manager .catalog_controller - .table_fragments() + .list_streaming_job_infos() .await - .map_err(err)? - .values() - .cloned() - .collect_vec(); + .map_err(err)?; - Ok(Json(table_fragments)) + Ok(Json(streaming_jobs)) } /// In the ddl backpressure graph, we want to compute the backpressure between relations. @@ -508,7 +506,7 @@ impl DashboardService { let api_router = Router::new() .route("/clusters/:ty", get(list_clusters)) - .route("/fragments2", get(list_fragments)) + .route("/streaming_jobs", get(list_streaming_jobs)) .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id)) .route("/relation_id_infos", get(get_relation_id_infos)) .route( diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index fd94167d8dc2e..5403b87ae6228 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -581,7 +581,7 @@ impl MetadataManager { pub async fn count_streaming_job(&self) -> MetaResult { self.catalog_controller - .list_streaming_job_states() + .list_streaming_job_infos() .await .map(|x| x.len()) }