From 3d4d78989d25a17d71fb14eb46fd8d023dbbc759 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 23 Dec 2024 17:32:04 +0800 Subject: [PATCH] feat(dashboard): show current epoch and color for fragments & relations (#19829) --- dashboard/components/FragmentGraph.tsx | 118 +++++++++++++--- dashboard/components/RelationGraph.tsx | 132 ++++++++++++++---- dashboard/components/utils/backPressure.tsx | 44 ++++++ dashboard/lib/api/api.ts | 4 +- dashboard/lib/api/metric.ts | 4 +- dashboard/pages/fragment_graph.tsx | 20 ++- dashboard/pages/relation_graph.tsx | 17 ++- proto/monitor_service.proto | 12 ++ .../src/rpc/service/monitor_service.rs | 98 +++++++++++-- src/meta/src/dashboard/mod.rs | 22 +++ .../src/executor/monitor/streaming_stats.rs | 2 +- 11 files changed, 399 insertions(+), 74 deletions(-) diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 1e7d80c8641c3..2001cbf79c6fd 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -24,8 +24,14 @@ import { layoutItem, } from "../lib/layout" import { PlanNodeDatum } from "../pages/fragment_graph" +import { FragmentStats } from "../proto/gen/monitor_service" import { StreamNode } from "../proto/gen/stream_plan" -import { backPressureColor, backPressureWidth } from "./utils/backPressure" +import { + backPressureColor, + backPressureWidth, + epochToUnixMillis, + latencyToColor, +} from "./utils/backPressure" const ReactJson = loadable(() => import("react-json-view")) @@ -95,11 +101,13 @@ export default function FragmentGraph({ fragmentDependency, selectedFragmentId, backPressures, + fragmentStats, }: { planNodeDependencies: Map> fragmentDependency: FragmentBox[] selectedFragmentId?: string backPressures?: Map + fragmentStats?: { [fragmentId: number]: FragmentStats } }) { const svgRef = useRef(null) @@ -188,6 +196,7 @@ export default function FragmentGraph({ useEffect(() => { if (fragmentLayout) { + const now_ms = Date.now() const svgNode = svgRef.current const svgSelection = d3.select(svgNode) @@ -246,13 +255,69 @@ export default function FragmentGraph({ .attr("height", ({ height }) => height - fragmentMarginY * 2) .attr("x", fragmentMarginX) .attr("y", fragmentMarginY) - .attr("fill", "white") + .attr( + "fill", + fragmentStats + ? ({ id }) => { + const fragmentId = parseInt(id) + if (isNaN(fragmentId) || !fragmentStats[fragmentId]) { + return "white" + } + let currentMs = epochToUnixMillis( + fragmentStats[fragmentId].currentEpoch + ) + return latencyToColor(now_ms - currentMs, "white") + } + : "white" + ) .attr("stroke-width", ({ id }) => (isSelected(id) ? 3 : 1)) .attr("rx", 5) .attr("stroke", ({ id }) => isSelected(id) ? theme.colors.blue[500] : theme.colors.gray[500] ) + const getTooltipContent = (id: string) => { + const fragmentId = parseInt(id) + const stats = fragmentStats?.[fragmentId] + const latencySeconds = stats + ? ((now_ms - epochToUnixMillis(stats.currentEpoch)) / 1000).toFixed( + 2 + ) + : "N/A" + const epoch = stats?.currentEpoch ?? "N/A" + + return `Fragment ${fragmentId}
Epoch: ${epoch}
Latency: ${latencySeconds} seconds` + } + + boundingBox + .on("mouseover", (event, { id }) => { + // Remove existing tooltip if any + d3.selectAll(".tooltip").remove() + + // Create new tooltip + d3.select("body") + .append("div") + .attr("class", "tooltip") + .style("position", "absolute") + .style("background", "white") + .style("padding", "10px") + .style("border", "1px solid #ddd") + .style("border-radius", "4px") + .style("pointer-events", "none") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + .style("font-size", "12px") + .html(getTooltipContent(id)) + }) + .on("mousemove", (event) => { + d3.select(".tooltip") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + }) + .on("mouseout", () => { + d3.selectAll(".tooltip").remove() + }) + // Stream node edges let edgeSelection = gSel.select(".edges") if (edgeSelection.empty()) { @@ -409,24 +474,39 @@ export default function FragmentGraph({ .attr("stroke-width", width) .attr("stroke", color) - // Tooltip for back pressure rate - let title = gSel.select("title") - if (title.empty()) { - title = gSel.append("title") - } - - const text = (d: Edge) => { - if (backPressures) { - let value = backPressures.get(`${d.target}_${d.source}`) - if (value) { - return `${value.toFixed(2)}%` + path + .on("mouseover", (event, d) => { + // Remove existing tooltip if any + d3.selectAll(".tooltip").remove() + + if (backPressures) { + const value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + // Create new tooltip + d3.select("body") + .append("div") + .attr("class", "tooltip") + .style("position", "absolute") + .style("background", "white") + .style("padding", "10px") + .style("border", "1px solid #ddd") + .style("border-radius", "4px") + .style("pointer-events", "none") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + .style("font-size", "12px") + .html(`BP: ${value.toFixed(2)}%`) + } } - } - - return "" - } - - title.text(text) + }) + .on("mousemove", (event) => { + d3.select(".tooltip") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + }) + .on("mouseout", () => { + d3.selectAll(".tooltip").remove() + }) return gSel } diff --git a/dashboard/components/RelationGraph.tsx b/dashboard/components/RelationGraph.tsx index a61a6924a370e..59c87275b43ed 100644 --- a/dashboard/components/RelationGraph.tsx +++ b/dashboard/components/RelationGraph.tsx @@ -33,8 +33,14 @@ import { flipLayoutRelation, generateRelationEdges, } from "../lib/layout" +import { RelationStats } from "../proto/gen/monitor_service" import { CatalogModal, useCatalogModal } from "./CatalogModal" -import { backPressureColor, backPressureWidth } from "./utils/backPressure" +import { + backPressureColor, + backPressureWidth, + epochToUnixMillis, + latencyToColor, +} from "./utils/backPressure" function boundBox( relationPosition: RelationPointPosition[], @@ -62,11 +68,13 @@ export default function RelationGraph({ selectedId, setSelectedId, backPressures, + relationStats, }: { nodes: RelationPoint[] selectedId: string | undefined setSelectedId: (id: string) => void backPressures?: Map // relationId-relationId->back_pressure_rate}) + relationStats: { [relationId: number]: RelationStats } | undefined }) { const [modalData, setModalId] = useCatalogModal(nodes.map((n) => n.relation)) @@ -99,6 +107,7 @@ export default function RelationGraph({ const { layoutMap, width, height, links } = layoutMapCallback() useEffect(() => { + const now_ms = Date.now() const svgNode = svgRef.current const svgSelection = d3.select(svgNode) @@ -150,24 +159,39 @@ export default function RelationGraph({ isSelected(d.source) || isSelected(d.target) ? 1 : 0.5 ) - // Tooltip for back pressure rate - let title = sel.select("title") - if (title.empty()) { - title = sel.append("title") - } - - const text = (d: Edge) => { - if (backPressures) { - let value = backPressures.get(`${d.target}_${d.source}`) - if (value) { - return `${value.toFixed(2)}%` + sel + .on("mouseover", (event, d) => { + // Remove existing tooltip if any + d3.selectAll(".tooltip").remove() + + if (backPressures) { + const value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + // Create new tooltip + d3.select("body") + .append("div") + .attr("class", "tooltip") + .style("position", "absolute") + .style("background", "white") + .style("padding", "10px") + .style("border", "1px solid #ddd") + .style("border-radius", "4px") + .style("pointer-events", "none") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + .style("font-size", "12px") + .html(`BP: ${value.toFixed(2)}%`) + } } - } - - return "" - } - - title.text(text) + }) + .on("mousemove", (event) => { + d3.select(".tooltip") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + }) + .on("mouseout", () => { + d3.selectAll(".tooltip").remove() + }) return sel } @@ -189,9 +213,19 @@ export default function RelationGraph({ circle.attr("r", nodeRadius).attr("fill", ({ id, relation }) => { const weight = relationIsStreamingJob(relation) ? "500" : "400" - return isSelected(id) + const baseColor = isSelected(id) ? theme.colors.blue[weight] : theme.colors.gray[weight] + if (relationStats) { + const relationId = parseInt(id) + if (!isNaN(relationId) && relationStats[relationId]) { + const currentMs = epochToUnixMillis( + relationStats[relationId].currentEpoch + ) + return latencyToColor(now_ms - currentMs, baseColor) + } + } + return baseColor }) // Relation name @@ -233,16 +267,50 @@ export default function RelationGraph({ .attr("font-size", 16) .attr("font-weight", "bold") - // Relation type tooltip - let typeTooltip = g.select("title") - if (typeTooltip.empty()) { - typeTooltip = g.append("title") + // Tooltip + const getTooltipContent = (relation: Relation, id: string) => { + const relationId = parseInt(id) + const stats = relationStats?.[relationId] + const latencySeconds = stats + ? ( + (Date.now() - epochToUnixMillis(stats.currentEpoch)) / + 1000 + ).toFixed(2) + : "N/A" + const epoch = stats?.currentEpoch ?? "N/A" + + return `${relation.name} (${relationTypeTitleCase( + relation + )})
Epoch: ${epoch}
Latency: ${latencySeconds} seconds` } - typeTooltip.text( - ({ relation }) => - `${relation.name} (${relationTypeTitleCase(relation)})` - ) + g.on("mouseover", (event, { relation, id }) => { + // Remove existing tooltip if any + d3.selectAll(".tooltip").remove() + + // Create new tooltip + d3.select("body") + .append("div") + .attr("class", "tooltip") + .style("position", "absolute") + .style("background", "white") + .style("padding", "10px") + .style("border", "1px solid #ddd") + .style("border-radius", "4px") + .style("pointer-events", "none") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + .style("font-size", "12px") + .html(getTooltipContent(relation, id)) + }) + .on("mousemove", (event) => { + d3.select(".tooltip") + .style("left", event.pageX + 10 + "px") + .style("top", event.pageY + 10 + "px") + }) + .on("mouseout", () => { + d3.selectAll(".tooltip").remove() + }) // Relation modal g.style("cursor", "pointer").on("click", (_, { relation, id }) => { @@ -265,7 +333,15 @@ export default function RelationGraph({ nodeSelection.enter().call(createNode) nodeSelection.call(applyNode) nodeSelection.exit().remove() - }, [layoutMap, links, selectedId, setModalId, setSelectedId, backPressures]) + }, [ + layoutMap, + links, + selectedId, + setModalId, + setSelectedId, + backPressures, + relationStats, + ]) return ( <> diff --git a/dashboard/components/utils/backPressure.tsx b/dashboard/components/utils/backPressure.tsx index afb26e0746da4..841e64baafa82 100644 --- a/dashboard/components/utils/backPressure.tsx +++ b/dashboard/components/utils/backPressure.tsx @@ -41,3 +41,47 @@ export function backPressureWidth(value: number, scale: number) { return scale * (value / 100) + 2 } + +export function epochToUnixMillis(epoch: number) { + // UNIX_RISINGWAVE_DATE_SEC + return 1617235200000 + epoch / 65536 +} + +export function latencyToColor(latency_ms: number, baseColor: string) { + const LOWER = 10000 // 10s + const UPPER = 300000 // 5min + + const colorRange = [ + baseColor, + theme.colors.yellow["200"], + theme.colors.orange["300"], + theme.colors.red["400"], + ].map((c) => tinycolor(c)) + + if (latency_ms <= LOWER) { + return baseColor + } + + if (latency_ms >= UPPER) { + return theme.colors.red["400"] + } + + // Map log(latency) to [0,1] range between 10s and 5min + const minLog = Math.log(LOWER) + const maxLog = Math.log(UPPER) + const latencyLog = Math.log(latency_ms) + const normalizedPos = (latencyLog - minLog) / (maxLog - minLog) + + // Map to color range index + const step = colorRange.length - 1 + const pos = normalizedPos * step + const floor = Math.floor(pos) + const ceil = Math.ceil(pos) + + // Interpolate between colors + const color = tinycolor(colorRange[floor]) + .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100) + .toHexString() + + return color +} diff --git a/dashboard/lib/api/api.ts b/dashboard/lib/api/api.ts index af0116e31f3bb..48af21bc4c620 100644 --- a/dashboard/lib/api/api.ts +++ b/dashboard/lib/api/api.ts @@ -26,7 +26,9 @@ export const PREDEFINED_API_ENDPOINTS = [ ] export const DEFAULT_API_ENDPOINT: string = - process.env.NODE_ENV === "production" ? PROD_API_ENDPOINT : MOCK_API_ENDPOINT // EXTERNAL_META_NODE_API_ENDPOINT to debug with RisingWave servers + process.env.NODE_ENV === "production" + ? PROD_API_ENDPOINT + : EXTERNAL_META_NODE_API_ENDPOINT // EXTERNAL_META_NODE_API_ENDPOINT to debug with RisingWave servers export const API_ENDPOINT_KEY = "risingwave.dashboard.api.endpoint" diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index f310e0572170d..23d007bb08360 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -156,9 +156,7 @@ export async function fetchEmbeddedBackPressure() { const response: GetBackPressureResponse = await api.get( "/metrics/fragment/embedded_back_pressures" ) - let backPressureInfos: BackPressureInfo[] = - response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? [] - return backPressureInfos + return response } function calculatePercentile(samples: MetricsSample[], percentile: number) { diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 2312334e53abe..4b2a39aa5585c 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -56,7 +56,7 @@ import { } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" -import { BackPressureInfo } from "../proto/gen/monitor_service" +import { BackPressureInfo, FragmentStats } from "../proto/gen/monitor_service" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" interface DispatcherNode { @@ -329,12 +329,18 @@ export default function Streaming() { // Didn't call `useFetch()` because the `setState` way is special. const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = useState() + const [fragmentStats, setFragmentStats] = useState<{ + [key: number]: FragmentStats + }>() + useEffect(() => { if (backPressureDataSource === "Embedded") { - const interval = setInterval(() => { + function refresh() { fetchEmbeddedBackPressure().then( - (newBP) => { - console.log(newBP) + (response) => { + console.log(response) + let newBP = + response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? [] setEmbeddedBackPressureInfo((prev) => prev ? { @@ -355,13 +361,16 @@ export default function Streaming() { totalDurationNs: 0, } ) + setFragmentStats(response.fragmentStats) }, (e) => { console.error(e) toast(e, "error") } ) - }, INTERVAL_MS) + } + refresh() + const interval = setInterval(refresh, INTERVAL_MS) return () => { clearInterval(interval) } @@ -550,6 +559,7 @@ export default function Streaming() { fragmentDependency={fragmentDependency} planNodeDependencies={planNodeDependencies} backPressures={backPressures} + fragmentStats={fragmentStats} /> )} diff --git a/dashboard/pages/relation_graph.tsx b/dashboard/pages/relation_graph.tsx index a1061ffa23f0d..d44b29708bdc4 100644 --- a/dashboard/pages/relation_graph.tsx +++ b/dashboard/pages/relation_graph.tsx @@ -47,7 +47,7 @@ import { relationIsStreamingJob, } from "../lib/api/streaming" import { RelationPoint } from "../lib/layout" -import { BackPressureInfo } from "../proto/gen/monitor_service" +import { BackPressureInfo, RelationStats } from "../proto/gen/monitor_service" const SIDEBAR_WIDTH = "200px" const INTERVAL_MS = 5000 @@ -133,15 +133,20 @@ export default function StreamingGraph() { // Didn't call `useFetch()` because the `setState` way is special. const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = useState() + const [relationStats, setRelationStats] = useState<{ + [key: number]: RelationStats + }>() + useEffect(() => { if (resetEmbeddedBackPressures) { setEmbeddedBackPressureInfo(undefined) toggleResetEmbeddedBackPressures() } if (backPressureDataSource === "Embedded") { - const interval = setInterval(() => { + function refresh() { fetchEmbeddedBackPressure().then( - (newBP) => { + (response) => { + let newBP = response.backPressureInfos setEmbeddedBackPressureInfo((prev) => prev ? { @@ -162,13 +167,16 @@ export default function StreamingGraph() { totalDurationNs: 0, } ) + setRelationStats(response.relationStats) }, (e) => { console.error(e) toast(e, "error") } ) - }, INTERVAL_MS) + } + refresh() + const interval = setInterval(refresh, INTERVAL_MS) return () => { clearInterval(interval) } @@ -308,6 +316,7 @@ export default function StreamingGraph() { selectedId={selectedId?.toString()} setSelectedId={(id) => setSelectedId(parseInt(id))} backPressures={backPressures} + relationStats={relationStats} /> )} diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index 772d43e5613c2..08fc97bb73f9b 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -61,8 +61,20 @@ message BackPressureInfo { double value = 4; } +message FragmentStats { + uint32 actor_count = 2; + uint64 current_epoch = 3; +} + +message RelationStats { + uint32 actor_count = 2; + uint64 current_epoch = 3; +} + message GetBackPressureResponse { repeated BackPressureInfo back_pressure_infos = 1; + map fragment_stats = 2; + map relation_stats = 3; } message TieredCacheTracingRequest { diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 2671fd533262f..cf0b76919fb59 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -21,16 +21,18 @@ use std::time::Duration; use foyer::{HybridCache, TracingOptions}; use itertools::Itertools; use prometheus::core::Collector; +use prometheus::proto::Metric; use risingwave_common::config::{MetricLevel, ServerConfig}; use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX}; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces; use risingwave_pb::monitor_service::monitor_service_server::MonitorService; use risingwave_pb::monitor_service::{ - AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest, - GetBackPressureResponse, HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest, - ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, - StackTraceResponse, TieredCacheTracingRequest, TieredCacheTracingResponse, + AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, FragmentStats, + GetBackPressureRequest, GetBackPressureResponse, HeapProfilingRequest, HeapProfilingResponse, + ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, + RelationStats, StackTraceRequest, StackTraceResponse, TieredCacheTracingRequest, + TieredCacheTracingResponse, }; use risingwave_rpc_client::error::ToTonicStatus; use risingwave_storage::hummock::compactor::await_tree_key::Compaction; @@ -309,19 +311,77 @@ impl MonitorService for MonitorServiceImpl { let actor_count: HashMap<_, _> = actor_count .iter() - .filter_map(|m| { - let fragment_id = m - .get_label() - .iter() - .find(|lp| lp.get_name() == "fragment_id")? - .get_value() - .parse::() - .unwrap(); + .map(|m| { + let fragment_id = get_label(m, "fragment_id").unwrap(); let count = m.get_gauge().get_value() as u32; - Some((fragment_id, count)) + (fragment_id, count) }) .collect(); + let mut fragment_stats: HashMap = HashMap::new(); + for (&fragment_id, &actor_count) in &actor_count { + fragment_stats.insert( + fragment_id, + FragmentStats { + actor_count, + current_epoch: 0, + }, + ); + } + + let actor_current_epoch = metrics + .actor_current_epoch + .collect() + .into_iter() + .next() + .unwrap() + .take_metric(); + for m in &actor_current_epoch { + let fragment_id = get_label(m, "fragment_id").unwrap(); + let epoch = m.get_gauge().get_value() as u64; + if let Some(s) = fragment_stats.get_mut(&fragment_id) { + s.current_epoch = if s.current_epoch == 0 { + epoch + } else { + u64::min(s.current_epoch, epoch) + } + } else { + warn!( + fragment_id = fragment_id, + "Miss corresponding actor count metrics" + ); + } + } + + let mut relation_stats: HashMap = HashMap::new(); + let mview_current_epoch = metrics + .materialize_current_epoch + .collect() + .into_iter() + .next() + .unwrap() + .take_metric(); + for m in &mview_current_epoch { + let table_id = get_label(m, "table_id").unwrap(); + let epoch = m.get_gauge().get_value() as u64; + if let Some(s) = relation_stats.get_mut(&table_id) { + s.current_epoch = if s.current_epoch == 0 { + epoch + } else { + u64::min(s.current_epoch, epoch) + }; + s.actor_count += 1; + } else { + relation_stats.insert( + table_id, + RelationStats { + actor_count: 1, + current_epoch: epoch, + }, + ); + } + } + let mut back_pressure_infos: HashMap<_, BackPressureInfo> = HashMap::new(); for label_pairs in actor_output_buffer_blocking_duration_ns { @@ -360,6 +420,8 @@ impl MonitorService for MonitorServiceImpl { Ok(Response::new(GetBackPressureResponse { back_pressure_infos: back_pressure_infos.into_values().collect(), + fragment_stats, + relation_stats, })) } @@ -436,6 +498,16 @@ impl MonitorService for MonitorServiceImpl { } } +fn get_label(metric: &Metric, label: &str) -> Option { + metric + .get_label() + .iter() + .find(|lp| lp.get_name() == label)? + .get_value() + .parse::() + .ok() +} + pub use grpc_middleware::*; pub mod grpc_middleware { diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index ed52378c700fc..22ebf74acb47e 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -49,6 +49,7 @@ pub struct DashboardService { pub type Service = Arc; pub(super) mod handlers { + use std::cmp::min; use std::collections::HashMap; use anyhow::Context; @@ -488,7 +489,28 @@ pub(super) mod handlers { let result = result .map_err(|_| anyhow!("Failed to get back pressure")) .map_err(err)?; + // TODO(eric): aggregate back_pressure_infos here all.back_pressure_infos.extend(result.back_pressure_infos); + + // Aggregate fragment_stats + for (fragment_id, fragment_stats) in result.fragment_stats { + if let Some(s) = all.fragment_stats.get_mut(&fragment_id) { + s.actor_count += fragment_stats.actor_count; + s.current_epoch = min(s.current_epoch, fragment_stats.current_epoch); + } else { + all.fragment_stats.insert(fragment_id, fragment_stats); + } + } + + // Aggregate relation_stats + for (relation_id, relation_stats) in result.relation_stats { + if let Some(s) = all.relation_stats.get_mut(&relation_id) { + s.actor_count += relation_stats.actor_count; + s.current_epoch = min(s.current_epoch, relation_stats.current_epoch); + } else { + all.relation_stats.insert(relation_id, relation_stats); + } + } } Ok(all.into()) diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index f7da06608a722..ccefd82cfd2e9 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -191,7 +191,7 @@ pub struct StreamingMetrics { materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>, materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>, materialize_input_row_count: RelabeledGuardedIntCounterVec<3>, - materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>, + pub materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>, } pub static GLOBAL_STREAMING_METRICS: OnceLock = OnceLock::new();