Skip to content

Commit

Permalink
feat(metrics): add upstream fragment id in actor_in_record_cnt (#13966
Browse files Browse the repository at this point in the history
)
  • Loading branch information
fuyufjh authored Dec 14, 2023
1 parent cd04aaa commit 8b2b3f7
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,22 +1016,22 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_row(
"Actor Input Rows",
panels.timeseries_rowsps(
"Actor Input Throughput (rows/s)",
"",
[
panels.target(
f"sum(rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])) by (fragment_id)",
"fragment {{fragment_id}}",
f"sum(rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])) by (fragment_id, upstream_fragment_id)",
"fragment {{fragment_id}}<-{{upstream_fragment_id}}",
),
panels.target_hidden(
f"rate({metric('stream_actor_in_record_cnt')}[$__rate_interval])",
"actor {{actor_id}}",
),
],
),
panels.timeseries_row(
"Actor Output Rows",
panels.timeseries_rowsps(
"Actor Output Throughput (rows/s)",
"",
[
panels.target(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ impl MergeExecutor {
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
.with_label_values(&[&actor_id_str, &fragment_id_str])
.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
])
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub struct StreamingMetrics {

// Streaming actor
pub actor_memory_usage: GenericGaugeVec<AtomicI64>,
pub actor_in_record_cnt: LabelGuardedIntCounterVec<2>,
pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>,
pub actor_in_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_out_record_cnt: GenericCounterVec<AtomicU64>,

// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -353,15 +353,15 @@ impl StreamingMetrics {
)
.unwrap();

let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
let actor_in_record_cnt = register_int_counter_vec_with_registry!(
"stream_actor_in_record_cnt",
"Total number of rows actor received",
&["actor_id", "fragment_id"],
&["actor_id", "fragment_id", "upstream_fragment_id"],
registry
)
.unwrap();

let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
let actor_out_record_cnt = register_int_counter_vec_with_registry!(
"stream_actor_out_record_cnt",
"Total number of rows actor sent",
&["actor_id", "fragment_id"],
Expand Down
6 changes: 5 additions & 1 deletion src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ impl Executor for ReceiverExecutor {
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
.with_label_values(&[&actor_id_str, &fragment_id_str])
.with_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
])
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
Expand Down

0 comments on commit 8b2b3f7

Please sign in to comment.