Skip to content

Commit

Permalink
fix(dyn-filter): filter left changes according to watermark before wr…
Browse files Browse the repository at this point in the history
…iting state table (#17816)

Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Patrick Huang <[email protected]>
  • Loading branch information
stdrc and hzxa21 authored Aug 1, 2024
1 parent 9815c01 commit 88f1c27
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 290 deletions.
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ message DynamicFilterNode {
// When the condition changes, we will tell downstream to insert the LHS records which now match the condition.
// If this is false, we need to store RHS records which match the condition in the internal table.
// When the condition changes, we will tell downstream to delete the LHS records which now no longer match the condition.
bool condition_always_relax = 5;
bool condition_always_relax = 5 [deprecated = true];
}

// Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta
Expand Down
7 changes: 1 addition & 6 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ pub fn visit_stream_node_tables_inner<F>(
optional!(node.memo_table, "TemporalJoinMemo");
}
NodeBody::DynamicFilter(node) => {
if node.condition_always_relax {
always!(node.left_table, "DynamicFilterLeftNotSatisfy");
} else {
always!(node.left_table, "DynamicFilterLeft");
}

always!(node.left_table, "DynamicFilterLeft");
always!(node.right_table, "DynamicFilterRight");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@
select * from t1 where now() - interval '15 minutes' > ts;
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
└─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├── tables: [ DynamicFilterLeftNotSatisfy: 0, DynamicFilterRight: 1 ]
└── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] }
├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ ├── tables: [ StreamScan: 2 ]
│ ├── Upstream
Expand Down Expand Up @@ -141,16 +140,16 @@
select * from t1 where now() - interval '15 minutes' > ts;
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
└─StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
└─StreamNow { output: [now] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] }
└── StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├── tables: [ DynamicFilterLeftNotSatisfy: 0, DynamicFilterRight: 1 ]
└── StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
├── tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ]
├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ ├── tables: [ StreamScan: 2 ]
│ ├── Upstream
Expand Down Expand Up @@ -184,7 +183,7 @@
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [ts] }
└─StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand All @@ -198,8 +197,7 @@
├── tables: [ Materialize: 4294967294 ]
└── StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true }
├── tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ]
├── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true }
│ ├── tables: [ DynamicFilterLeftNotSatisfy: 2, DynamicFilterRight: 3 ]
├── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 2, DynamicFilterRight: 3 ] }
│ ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ │ ├── tables: [ StreamScan: 4 ]
│ │ ├── Upstream
Expand Down Expand Up @@ -248,7 +246,7 @@
└─StreamHashJoin { type: Inner, predicate: t1.a = t2.b, output: [t1.a, t1.ta, t2.b, t2.tb, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.a) }
│ └─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true }
│ ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
│ │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ │ └─StreamExchange { dist: Broadcast }
│ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand Down Expand Up @@ -279,7 +277,7 @@
│ └─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand Down Expand Up @@ -308,7 +306,7 @@
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.b) }
└─StreamDynamicFilter { predicate: (t2.tb >= $expr2), output_watermarks: [t2.tb], output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true }
├─StreamDynamicFilter { predicate: (t2.tb < $expr1), output: [t2.b, t2.tb, t2._row_id], condition_always_relax: true }
├─StreamDynamicFilter { predicate: (t2.tb < $expr1), output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true }
│ ├─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand Down Expand Up @@ -416,7 +414,7 @@
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t._row_id, $src, 0:Int32) }
│ └─StreamProject { exprs: [t.t, t.a, $src, t._row_id, 0:Int32] }
│ └─StreamDynamicFilter { predicate: (t.t < $expr2), output: [t.t, t.a, t._row_id, $src], condition_always_relax: true }
│ └─StreamDynamicFilter { predicate: (t.t < $expr2), output: [t.t, t.a, t._row_id, $src], cleaned_by_watermark: true }
│ ├─StreamFilter { predicate: Not((t.a > 1:Int32)) }
│ │ └─StreamShare { id: 13 }
│ │ └─StreamUnion { all: true }
Expand Down
19 changes: 9 additions & 10 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub struct StreamDynamicFilter {
pub base: PlanBase<Stream>,
core: generic::DynamicFilter<PlanRef>,
cleaned_by_watermark: bool,
condition_always_relax: bool,
}

impl StreamDynamicFilter {
Expand Down Expand Up @@ -67,7 +66,6 @@ impl StreamDynamicFilter {
base,
core,
cleaned_by_watermark,
condition_always_relax,
}
}

Expand Down Expand Up @@ -100,7 +98,13 @@ impl StreamDynamicFilter {
// downstream. See `derive_watermark_columns`.
true
}
_ => false,
ExprType::LessThan | ExprType::LessThanOrEqual => {
// For <= and <, watermark on rhs means all rows older than the watermark should already be emitted,
// and future lhs inputs should be directly passed to downstream without any state table operation.
// So, the state table can be cleaned up.
true
}
_ => unreachable!(),
}
} else {
false
Expand All @@ -124,12 +128,6 @@ impl Distill for StreamDynamicFilter {
Pretty::display(&self.cleaned_by_watermark),
));
}
if self.condition_always_relax {
vec.push((
"condition_always_relax",
Pretty::display(&self.condition_always_relax),
));
}
childless_record(
plan_node_name!(
"StreamDynamicFilter",
Expand Down Expand Up @@ -172,12 +170,13 @@ impl StreamNode for StreamDynamicFilter {
let right = self.right();
let right_table = infer_right_internal_table_catalog(right.plan_base())
.with_id(state.gen_table_id_wrapped());
#[allow(deprecated)]
NodeBody::DynamicFilter(DynamicFilterNode {
left_key: left_index as u32,
condition,
left_table: Some(left_table.to_internal_table_prost()),
right_table: Some(right_table.to_internal_table_prost()),
condition_always_relax: self.condition_always_relax,
condition_always_relax: false, // deprecated
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ pub trait LocalStateStore: StaticSendSync {
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_;

/// Get last persisted watermark for a given vnode.
fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;

/// Inserts a key-value entry associated with a given `epoch` into the state store.
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/common/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

pub mod state_table;
mod state_table_cache;
mod watermark;

#[cfg(test)]
pub mod test_state_table;
Expand Down
Loading

0 comments on commit 88f1c27

Please sign in to comment.