Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): runtime check for scalar subquery in batch queries #13880

Merged
merged 18 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions e2e_test/batch/subquery/scalar_subquery.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
statement ok
set RW_IMPLICIT_FLUSH to true;

statement ok
create table t (x int);

query II
select (select x from t) x, 1 one;
----
NULL 1

statement ok
insert into t values (114514);

query II
select (select x from t) x, 1 one;
----
114514 1

# Cannot create materialized view as the cardinality of the subquery is unknown
statement error Scalar subquery might produce more than one row
create materialized view mv as select (select x from t) x, 1 one;

statement ok
insert into t values (1919810);

# Cannot query as the cardinality of the subquery is now 2
query error Scalar subquery produced more than one row
select (select x from t) x, 1 one;

statement ok
drop table t;
9 changes: 6 additions & 3 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@
repeated common.ColumnOrder order_by = 3;
}

message MaxOneRowNode {}

message PlanNode {

Check failure on line 285 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "37" with name "sort_over_window" on message "PlanNode" was deleted without reserving the number "37".
repeated PlanNode children = 1;
reserved 22;
reserved "sort_merge_join";
Expand Down Expand Up @@ -311,10 +313,11 @@
GroupTopNNode group_top_n = 32;
DistributedLookupJoinNode distributed_lookup_join = 33;
SourceNode source = 34;
SortOverWindowNode sort_over_window = 37;
SortOverWindowNode sort_over_window = 35;

Check failure on line 316 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "35" on message "PlanNode" changed type from "bool" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules.
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
MaxOneRowNode max_one_row = 36;

Check failure on line 317 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "36" on message "PlanNode" changed type from "bool" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules.
// The following nodes are used for testing.
bool block_executor = 35;
bool busy_loop_executor = 36;
bool block_executor = 100;
bool busy_loop_executor = 101;
}
string identity = 24;
}
Expand Down
81 changes: 81 additions & 0 deletions src/batch/src/executor/max_one_row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use crate::task::BatchTaskContext;

pub struct MaxOneRowExecutor {
child: BoxedExecutor,

/// Identity string of the executor
identity: String,
}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for MaxOneRowExecutor {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> Result<BoxedExecutor> {
let [child]: [_; 1] = inputs.try_into().unwrap();

let _node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::MaxOneRow
)?;

Ok(Box::new(Self {
child,
identity: source.plan_node().get_identity().clone(),
}))
}
}

impl Executor for MaxOneRowExecutor {
fn schema(&self) -> &Schema {
self.child.schema()
}

fn identity(&self) -> &str {
&self.identity
}

#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn execute(self: Box<Self>) {
let data_types = self.child.schema().data_types();
let mut any = false;

#[for_await]
for chunk in self.child.execute() {
let chunk = chunk?;
for row in chunk.rows() {
if any {
// `MaxOneRow` is currently only used for the runtime check of
// scalar subqueries, so we raise a precise error here.
bail!("Scalar subquery produced more than one row.");
} else {
any = true;
let one_row_chunk = DataChunk::from_rows(&[row], &data_types);
yield one_row_chunk;
}
}
}
}
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod insert;
mod join;
mod limit;
mod managed;
mod max_one_row;
mod merge_sort_exchange;
mod order_by;
mod project;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub use insert::*;
pub use join::*;
pub use limit::*;
pub use managed::*;
pub use max_one_row::*;
pub use merge_sort_exchange::*;
pub use order_by::*;
pub use project::*;
Expand Down Expand Up @@ -231,6 +233,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::Union => UnionExecutor,
NodeBody::Source => SourceExecutor,
NodeBody::SortOverWindow => SortOverWindowExecutor,
NodeBody::MaxOneRow => MaxOneRowExecutor,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuidler,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuidler,
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/planner_test/tests/testdata/input/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@
expected_outputs:
- stream_error
- sql: |
SELECT 1, (SELECT regexp_matches('barbeque barbeque', '(bar)(beque)', 'g'))
SELECT 1 a, (SELECT regexp_matches('barbeque barbeque', '(bar)(beque)', 'g')) b
expected_outputs:
- batch_error
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t1 (a int, b int);
select a, (select count(*) from t1 where t1.a <> t.b) from t1 as t order by 1;
Expand Down Expand Up @@ -305,4 +306,4 @@
sql: |
select Array(select 1 union select 2);
expected_outputs:
- batch_plan
- batch_plan
23 changes: 14 additions & 9 deletions src/frontend/planner_test/tests/testdata/input/subquery_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
- optimized_logical_plan_for_batch
- sql: |
create table t(x int);
select (select x from t), 1 from t;
select (select x from t) a, 1 b from t;
expected_outputs:
- logical_plan
- optimizer_error
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t(x int);
select (select x from t limit 1), 1 from t;
Expand All @@ -23,28 +24,32 @@
- optimized_logical_plan_for_batch
- sql: |
create table t(x int);
select (select x from t order by x fetch next 1 rows with ties), 1 from t;
select (select x from t order by x fetch next 1 rows with ties) a, 1 b from t;
expected_outputs:
- logical_plan
- optimizer_error
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t(x int);
select (select x from t) + 1 from t;
select (select x from t) + 1 a from t;
expected_outputs:
- optimizer_error
- logical_plan
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t(x int);
select (select x from t), (select 1);
select (select x from t) a, (select 1) b;
expected_outputs:
- logical_plan
- optimizer_error
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t(x int);
select x + (select x + (select x as v1 from t) as v2 from t) as v3 from t;
expected_outputs:
- logical_plan
- optimizer_error
- optimized_logical_plan_for_batch
- stream_error
- sql: |
create table t1 (x int, y int);
create table t2 (x int, y int);
Expand Down
12 changes: 10 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,16 @@
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
- sql: |
SELECT 1, (SELECT regexp_matches('barbeque barbeque', '(bar)(beque)', 'g'))
batch_error: 'internal error: Scalar subquery might produce more than one row.'
SELECT 1 a, (SELECT regexp_matches('barbeque barbeque', '(bar)(beque)', 'g')) b
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [1:Int32, RegexpMatches('barbeque barbeque':Varchar, '(bar)(beque)':Varchar, 'g':Varchar)] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
└─LogicalMaxOneRow
└─LogicalProject { exprs: [RegexpMatches('barbeque barbeque':Varchar, '(bar)(beque)':Varchar, 'g':Varchar)] }
└─LogicalProjectSet { select_list: [RegexpMatches('barbeque barbeque':Varchar, '(bar)(beque)':Varchar, 'g':Varchar)] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t1 (a int, b int);
select a, (select count(*) from t1 where t1.a <> t.b) from t1 as t order by 1;
Expand Down
54 changes: 45 additions & 9 deletions src/frontend/planner_test/tests/testdata/output/subquery_expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
- sql: |
create table t(x int);
select (select x from t), 1 from t;
select (select x from t) a, 1 b from t;
logical_plan: |-
LogicalProject { exprs: [t.x, 1:Int32] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: t, columns: [t.x, t._row_id] }
└─LogicalProject { exprs: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t._row_id] }
optimizer_error: 'internal error: Scalar subquery might produce more than one row.'
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [t.x, 1:Int32] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalScan { table: t, columns: [] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
select (select x from t limit 1), 1 from t;
Expand Down Expand Up @@ -55,28 +61,41 @@
└─LogicalScan { table: t, columns: [t.x] }
- sql: |
create table t(x int);
select (select x from t order by x fetch next 1 rows with ties), 1 from t;
select (select x from t order by x fetch next 1 rows with ties) a, 1 b from t;
logical_plan: |-
LogicalProject { exprs: [t.x, 1:Int32] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: t, columns: [t.x, t._row_id] }
└─LogicalTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true }
└─LogicalProject { exprs: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t._row_id] }
optimizer_error: 'internal error: Scalar subquery might produce more than one row.'
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [t.x, 1:Int32] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalScan { table: t, columns: [] }
└─LogicalMaxOneRow
└─LogicalTopN { order: [t.x ASC], limit: 1, offset: 0, with_ties: true }
└─LogicalScan { table: t, columns: [t.x] }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
select (select x from t) + 1 from t;
select (select x from t) + 1 a from t;
logical_plan: |-
LogicalProject { exprs: [(t.x + 1:Int32) as $expr1] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 1, max_one_row: true }
├─LogicalScan { table: t, columns: [t.x, t._row_id] }
└─LogicalProject { exprs: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t._row_id] }
optimizer_error: 'internal error: Scalar subquery might produce more than one row.'
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [(t.x + 1:Int32) as $expr1] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalScan { table: t, columns: [] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
select (select x from t), (select 1);
select (select x from t) a, (select 1) b;
logical_plan: |-
LogicalProject { exprs: [t.x, 1:Int32] }
└─LogicalApply { type: LeftOuter, on: true, correlated_id: 2, max_one_row: true }
Expand All @@ -86,7 +105,14 @@
│ └─LogicalScan { table: t, columns: [t.x, t._row_id] }
└─LogicalProject { exprs: [1:Int32] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
optimizer_error: 'internal error: Scalar subquery might produce more than one row.'
optimized_logical_plan_for_batch: |-
LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalJoin { type: LeftOuter, on: true, output: all }
│ ├─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
│ └─LogicalMaxOneRow
│ └─LogicalScan { table: t, columns: [t.x] }
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t(x int);
select x + (select x + (select x as v1 from t) as v2 from t) as v3 from t;
Expand All @@ -99,7 +125,17 @@
├─LogicalScan { table: t, columns: [t.x, t._row_id] }
└─LogicalProject { exprs: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t._row_id] }
optimizer_error: 'internal error: Scalar subquery might produce more than one row.'
optimized_logical_plan_for_batch: |-
LogicalProject { exprs: [(t.x + $expr1) as $expr2] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalScan { table: t, columns: [t.x] }
└─LogicalMaxOneRow
└─LogicalProject { exprs: [(t.x + t.x) as $expr1] }
└─LogicalJoin { type: LeftOuter, on: true, output: all }
├─LogicalScan { table: t, columns: [t.x] }
└─LogicalMaxOneRow
└─LogicalScan { table: t, columns: [t.x] }
stream_error: Scalar subquery might produce more than one row.
- sql: |
create table t1 (x int, y int);
create table t2 (x int, y int);
Expand Down
Loading
Loading