Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): share a changelog stream for multiple cdc tables #12535

Merged
merged 69 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
5c1574b
modify create_source to create a source with debezium json schema
StrikeW Sep 5, 2023
9e2fa37
try create a source stream job
StrikeW Sep 7, 2023
7b2fe4f
minor
StrikeW Sep 7, 2023
2b2ff29
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Sep 7, 2023
bebd823
we can create a source stream job
StrikeW Sep 7, 2023
f49c57b
fix source state table lost in frontend
StrikeW Sep 8, 2023
bbe3615
table w/o Source has a dumb `SourceNode` in the fragment, need to che…
StrikeW Sep 8, 2023
bde58ca
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Sep 8, 2023
91b407b
finish create_source work
StrikeW Sep 8, 2023
5d36ee9
fin drop source
StrikeW Sep 9, 2023
fef1687
frontend: CREATE TABLE ... FROM cdc_source
StrikeW Sep 17, 2023
98431d1
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Sep 17, 2023
dc037a2
save work: transform debezium chunk to table chunk in the merge executor
StrikeW Sep 19, 2023
83769ca
save work: transofrm upstream chunk to table chunk in merger
StrikeW Sep 20, 2023
9d7e16b
save work: finished convert upstream chunk into cdc table chunk in me…
StrikeW Sep 20, 2023
7d684b9
move transofrm upstream chunk from merge exec to cdc backfill exec
StrikeW Sep 21, 2023
8ac91c1
save work: may pass connector properties via the first barrier
StrikeW Sep 22, 2023
f42d3db
WIP: test e2e process without persist state of backfill
StrikeW Sep 24, 2023
f400b52
Revert "refactor(connector): migrate cdc source metric from connector…
StrikeW Sep 24, 2023
4a26072
Revert "feat(connector): init embedded connector node (#12122)"
StrikeW Sep 24, 2023
fb7b1d1
Revert "refactor(connector): replace validate source rpc with jni (#1…
StrikeW Sep 24, 2023
7405cdd
fix chain & fragment graph
StrikeW Sep 25, 2023
6d3cc13
save work: set is_cdc_table will cause error on with_upstream
StrikeW Sep 25, 2023
6f6395e
save work: cannot receive the first barrier in cdc backfill
StrikeW Sep 26, 2023
7a27653
WIP: cdc backfill can get the first barrier now
StrikeW Sep 26, 2023
6130828
WIP: mview exec of the table job can get the chunk
StrikeW Sep 26, 2023
8a2e937
add TableJobSubType to distinguish different table job
StrikeW Sep 26, 2023
ea99562
derive cdc connect properties and save to TableDesc
StrikeW Sep 28, 2023
025498c
refactor cdc backfill state management
StrikeW Sep 28, 2023
556f13a
fix chain progress update
StrikeW Sep 30, 2023
af9ccb8
extend fragment mgr to support upstream source fragment
StrikeW Sep 30, 2023
5ca8af7
clean code
StrikeW Sep 30, 2023
3c7fd1b
Revert "Revert "feat(connector): init embedded connector node (#12122)""
StrikeW Oct 6, 2023
da87860
Revert "Revert "refactor(connector): migrate cdc source metric from c…
StrikeW Oct 6, 2023
12d9389
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 6, 2023
2108362
Revert "Revert "refactor(connector): replace validate source rpc with…
StrikeW Oct 6, 2023
d787a3d
fix merge
StrikeW Oct 6, 2023
ed65ddc
fix fmt
StrikeW Oct 6, 2023
fa45b93
fix unit test
StrikeW Oct 6, 2023
1ef5079
fix prepare stream job
StrikeW Oct 7, 2023
1ba1a80
fix create source for cdc job
StrikeW Oct 7, 2023
f998dba
minor
StrikeW Oct 7, 2023
263eac8
minor
StrikeW Oct 7, 2023
43c7603
add e2e test
StrikeW Oct 9, 2023
bc77a2b
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 9, 2023
fe2c526
fix e2e test
StrikeW Oct 10, 2023
efa436c
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 17, 2023
e5fbe05
fix source job recovery
StrikeW Oct 19, 2023
2d3b140
introduce CdcTablename dispatcher type
StrikeW Oct 20, 2023
3381fa7
fix check
StrikeW Oct 20, 2023
b06b2d4
introduce CdcTableDesc
StrikeW Oct 22, 2023
5dd4a19
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 23, 2023
230aa8f
fix dispatcher
StrikeW Oct 23, 2023
c278abe
minor refine cdc source validate
StrikeW Oct 23, 2023
be00749
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 24, 2023
62db882
address comments part1
StrikeW Oct 25, 2023
31cd2f9
fix cdc backfill executor schema
StrikeW Oct 26, 2023
e91da88
address comment part2
StrikeW Oct 26, 2023
5938748
minor
StrikeW Oct 26, 2023
e4b65a8
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 26, 2023
de01e67
address PR comments part2
StrikeW Oct 31, 2023
90eb775
fix single table cdc backfill
StrikeW Oct 31, 2023
b8d2840
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 31, 2023
9edbb35
Add schedule rule for DispatcherType::CdcTablename
StrikeW Oct 31, 2023
f8a973a
update test
StrikeW Oct 31, 2023
f144e42
use btreemap in proto
StrikeW Oct 31, 2023
f540c36
Merge remote-tracking branch 'origin/main' into siyuan/cdc-backfill-f…
StrikeW Oct 31, 2023
15b6f67
minor
StrikeW Oct 31, 2023
45eaf03
add comments
StrikeW Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ message DropViewResponse {

enum TableJobType {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
UNSPECIFIED = 0;
// table streaming job w/ a shared upstream CDC source job
SHARED_CDC_SOURCE = 1;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
GENERAL = 1;
// table streaming job sharing a CDC source job
SHARED_CDC_SOURCE = 2;
}

message CreateTableRequest {
Expand Down
3 changes: 1 addition & 2 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ message ExternalTableDesc {
repeated common.ColumnOrder pk = 3;
string table_name = 4;
repeated uint32 stream_key = 5;
// use string as a work around until https://github.com/madsim-rs/madsim/issues/169 finished
string connect_properties = 6;
map<string, string> connect_properties = 6;
}

enum JoinType {
Expand Down
5 changes: 2 additions & 3 deletions src/common/src/catalog/external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::util::sort_util::ColumnOrder;
/// Compute node will use this information to connect to the external database and scan the table.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
pub struct CdcTableDesc {
/// Id of the table, to find in storage.
/// Id of the upstream source in sharing cdc mode
pub table_id: TableId,

/// The full name of the table in external database, e.g. `database_name.table.name` in MySQL
Expand Down Expand Up @@ -62,8 +62,7 @@ impl CdcTableDesc {
pk: self.pk.iter().map(|v| v.to_protobuf()).collect(),
table_name: self.external_table_name.clone(),
stream_key: self.stream_key.iter().map(|k| *k as _).collect(),
connect_properties: serde_json::to_string(&self.connect_properties)
.expect("failed to serialize connect_properties"),
connect_properties: self.connect_properties.clone(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl ColIndexMapping {
r#type: strategy.r#type,
dist_key_indices: map(&strategy.dist_key_indices)?,
output_indices: map(&strategy.output_indices)?,
downstream_table_name: None,
downstream_table_name: strategy.downstream_table_name.clone(),
})
}
}
Expand Down
23 changes: 15 additions & 8 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,22 @@ impl MessageMeta<'_> {
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(self.offset.into()).into(),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected kafka meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
}).into()
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
assert_eq!(
desc.name.as_str(),
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected kafka meta column name"
);
kafka_meta
.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into()
},
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = &self.meta => {
SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.meta => {
assert_eq!(desc.name.as_str(), TABLE_NAME_COLUMN_NAME, "unexpected cdc meta column name");
Datum::Some(cdc_meta.full_table_name.as_str().into()).into()
}
Expand Down
61 changes: 23 additions & 38 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub(crate) async fn bind_columns_from_source(
source_schema: &ConnectorSchema,
with_properties: &HashMap<String, String>,
create_cdc_source_job: bool,
) -> Result<(Option<Vec<ColumnCatalog>>, Vec<String>, StreamSourceInfo)> {
) -> Result<(Option<Vec<ColumnCatalog>>, StreamSourceInfo)> {
const MESSAGE_NAME_KEY: &str = "message";
const KEY_MESSAGE_NAME_KEY: &str = "key.message";
const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy";
Expand Down Expand Up @@ -358,32 +358,19 @@ pub(crate) async fn bind_columns_from_source(
)
}
(Format::Plain, Encode::Json) => {
let (resolved_columns, use_schema_registry) = if !create_cdc_source_job {
let schema_config = get_json_schema_location(&mut options)?;
if schema_config.is_some() && sql_defined_schema {
return Err(RwError::from(ProtocolError(
"User-defined schema is not allowed with schema registry.".to_string(),
)));
}
if schema_config.is_none() && sql_defined_columns.is_empty() {
return Err(RwError::from(InvalidInputSyntax(
"schema definition is required for ENCODE JSON".to_owned(),
)));
}
(
extract_json_table_schema(&schema_config, with_properties).await?,
json_schema_infer_use_schema_registry(&schema_config),
)
let schema_config = get_json_schema_location(&mut options)?;
let columns = if create_cdc_source_job {
Some(debezium_cdc_source_schema())
} else {
(Some(debezium_cdc_source_schema()), false)
extract_json_table_schema(&schema_config, with_properties).await?
};

(
resolved_columns,
sql_defined_pk_names,
columns,
StreamSourceInfo {
format: FormatType::Plain as i32,
row_encode: EncodeType::Json as i32,
use_schema_registry,
use_schema_registry: json_schema_infer_use_schema_registry(&schema_config),
cdc_source_job: create_cdc_source_job,
..Default::default()
},
Expand Down Expand Up @@ -1132,22 +1119,9 @@ pub async fn handle_create_source(
// gated the feature with a session variable
let create_cdc_source_job =
is_cdc_connector(&with_properties) && session.config().get_cdc_backfill();
let (columns_from_resolve_source, pk_names, source_info) = try_bind_columns_from_source(
&source_schema,
pk_names,
&stmt.columns,
&with_properties,
create_cdc_source_job,
)
.await?;

if create_cdc_source_job {
// set connector to backfill mode
with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
// enable cdc sharing mode, which will capture all tables in the given `database.name`
with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
}

let (columns_from_resolve_source, source_info) =
bind_columns_from_source(&source_schema, &with_properties, create_cdc_source_job).await?;
let columns_from_sql = bind_sql_columns(&stmt.columns)?;

let mut columns = bind_all_columns(
Expand All @@ -1165,6 +1139,13 @@ pub async fn handle_create_source(
)
.await?;

if create_cdc_source_job {
// set connector to backfill mode
with_properties.insert(CDC_SNAPSHOT_MODE_KEY.into(), CDC_SNAPSHOT_BACKFILL.into());
// enable cdc sharing mode, which will capture all tables in the given `database.name`
with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
}

check_and_add_timestamp_column(&with_properties, &mut columns);

let mut col_id_gen = ColumnIdGenerator::new_initial();
Expand Down Expand Up @@ -1238,7 +1219,7 @@ pub async fn handle_create_source(
Some(Rc::new(SourceCatalog::from(&source))),
columns.clone(),
row_id_index,
true,
false,
false,
context.into(),
)?;
Expand All @@ -1247,7 +1228,11 @@ pub async fn handle_create_source(
let stream_plan = source_node.to_stream(&mut ToStreamContext::new(false))?;
let mut graph = build_graph(stream_plan);
// set parallelism to 1
graph.parallelism = Some(Parallelism { parallelism: 1 });
// graph.parallelism = Some(Parallelism { parallelism: 1 });
graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
graph
};
catalog_writer
Expand Down
70 changes: 40 additions & 30 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, INITIAL_SOURCE_VERSION_ID,
INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET,
CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET,
};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector::source;
use risingwave_connector::source::cdc::{CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY};
use risingwave_connector::source::external::{
CdcTableType, DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
Expand All @@ -43,6 +44,7 @@ use risingwave_sqlparser::ast::{

use super::RwPgResponse;
use crate::binder::{bind_data_type, bind_struct_field, Clause};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::table_catalog::TableVersion;
use crate::catalog::{check_valid_column_name, CatalogError, ColumnId};
Expand Down Expand Up @@ -463,9 +465,10 @@ pub(crate) async fn gen_create_table_plan_with_source(

ensure_table_constraints_supported(&constraints)?;

let (columns_from_resolve_source, pk_names, mut source_info) =
try_bind_columns_from_source(&source_schema, pk_names, &column_defs, &properties, false)
.await?;
let sql_pk_names = bind_sql_pk_names(&column_defs, &constraints)?;

let (columns_from_resolve_source, mut source_info) =
bind_columns_from_source(&source_schema, &properties, false).await?;
let columns_from_sql = bind_sql_columns(&column_defs)?;

let mut columns = bind_all_columns(
Expand Down Expand Up @@ -545,7 +548,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
pk: table_pk,
columns: columns.iter().map(|c| c.column_desc.clone()).collect(),
stream_key: pk_column_indices,
value_indices: (0..columns.len()).collect_vec(), /* FIXME: maybe we can remove `_rw_offset` from TableDesc */
value_indices: (0..columns.len()).collect_vec(),
connect_properties: Default::default(),
};

Expand Down Expand Up @@ -771,7 +774,7 @@ fn gen_create_table_plan_for_cdc_source(
column_defs: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
mut col_id_gen: ColumnIdGenerator,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
) -> Result<(PlanRef, PbTable)> {
let session = context.session_ctx().clone();
let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
Expand All @@ -781,15 +784,27 @@ fn gen_create_table_plan_for_cdc_source(
// cdc table cannot be append-only
let append_only = false;
let source_name = source_name.real_value();
let source = session.get_source_by_name(schema_name, &source_name)?;

let source = {
let catalog_reader = session.env().catalog_reader().read_guard();
let schema_name = schema_name
.clone()
.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (source, _) = catalog_reader.get_source_by_name(
db_name,
SchemaPath::Name(schema_name.as_str()),
source_name.as_str(),
)?;
source.clone()
};

let mut columns = bind_sql_columns(&column_defs)?;

for c in &mut columns {
c.column_desc.column_id = col_id_gen.generate(c.name())
}

let pk_names = bind_pk_names(&column_defs, &constraints)?;
let pk_names = bind_sql_pk_names(&column_defs, &constraints)?;
let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?;

let definition = context.normalized_sql().to_owned();
Expand Down Expand Up @@ -855,18 +870,19 @@ fn gen_create_table_plan_for_cdc_source(

let mut table = materialize.table().to_prost(schema_id, database_id);
table.owner = session.user_id();
Ok((materialize.into(), None, table))
Ok((materialize.into(), table))
}

fn derive_connect_properties(
source: &SourceCatalog,
external_table_name: String,
) -> Result<BTreeMap<String, String>> {
use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR};
// we should remove the prefix from `full_table_name`
let mut connect_properties = source.properties.clone();
if let Some(connector) = source.properties.get(UPSTREAM_SOURCE_KEY) {
let table_name = match connector.as_str() {
"mysql-cdc" => {
MYSQL_CDC_CONNECTOR => {
let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| {
anyhow!("{} not found in source properties", DATABASE_NAME_KEY)
})?;
Expand All @@ -876,7 +892,7 @@ fn derive_connect_properties(
.strip_prefix(prefix.as_str())
.ok_or_else(|| anyhow!("external table name must contain database prefix"))?
}
"postgres-cdc" => {
POSTGRES_CDC_CONNECTOR => {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
let schema_name = connect_properties
.get(SCHEMA_NAME_KEY)
.ok_or_else(|| anyhow!("{} not found in source properties", SCHEMA_NAME_KEY))?;
Expand Down Expand Up @@ -935,7 +951,6 @@ pub async fn handle_create_table(
let context = OptimizerContext::from_handler_args(handler_args);
let source_schema = check_create_table_with_source(context.with_options(), source_schema)?;
let col_id_gen = ColumnIdGenerator::new_initial();
let properties = context.with_options().inner().clone().into_iter().collect();

let ((plan, source, table), job_type) = match (source_schema, cdc_table_info.as_ref()) {
(Some(source_schema), None) => (
Expand All @@ -950,7 +965,7 @@ pub async fn handle_create_table(
append_only,
)
.await?,
TableJobType::Unspecified,
TableJobType::General,
),
(None, None) => (
gen_create_table_plan(
Expand All @@ -962,21 +977,22 @@ pub async fn handle_create_table(
source_watermarks,
append_only,
)?,
TableJobType::Unspecified,
TableJobType::General,
),

(None, Some(cdc_table)) => (
gen_create_table_plan_for_cdc_source(
(None, Some(cdc_table)) => {
let (plan, table) = gen_create_table_plan_for_cdc_source(
context.into(),
cdc_table.source_name.clone(),
table_name.clone(),
cdc_table.external_table_name.clone(),
column_defs,
constraints,
col_id_gen,
)?,
TableJobType::SharedCdcSource,
),
)?;

((plan, None, table), TableJobType::SharedCdcSource)
}
(Some(_), Some(_)) => return Err(ErrorCode::NotSupported(
"Data format and encoding format doesn't apply to table created from a CDC source"
.into(),
Expand All @@ -985,16 +1001,10 @@ pub async fn handle_create_table(
.into()),
};
let mut graph = build_graph(plan);
graph.parallelism = if cdc_table_info.is_some()
|| CdcTableType::from_properties(&properties).can_backfill()
{
Some(Parallelism { parallelism: 1 })
} else {
session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism })
};
graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
(graph, source, table, job_type)
};

Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ impl Scan {
matches!(self.scan_table_type, ScanTableType::CdcTable)
}

pub fn append_only(&self) -> bool {
if self.is_cdc_table() {
return false;
}
self.table_desc.append_only
}

/// Get the descs of the output columns.
pub fn column_descs(&self) -> Vec<ColumnDesc> {
self.output_col_idx
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ impl PredicatePushdown for LogicalScan {
.conjunctions
.extract_if(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr))
.collect();

let predicate = predicate.rewrite_expr(&mut ColIndexMapping::with_target_size(
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
self.output_col_idx().iter().map(|i| Some(*i)).collect(),
self.table_desc().columns.len(),
));
Expand Down
Loading