Skip to content

Commit

Permalink
fix: implement with_new_children for FTS (#3441)
Browse files Browse the repository at this point in the history
this method isn't implemented before this and it causes panic, we don't
actually need this but maybe some optimizing would call this method for
rewriting the plans

Signed-off-by: BubbleCal <[email protected]>
  • Loading branch information
BubbleCal authored Feb 11, 2025
1 parent 8a61b69 commit 2e2bf1a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 13 deletions.
6 changes: 3 additions & 3 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ impl Scanner {
.limit(self.limit);

// load indices
let mut column_inputs = HashMap::with_capacity(columns.len());
let mut column_inputs = Vec::with_capacity(columns.len());
for column in columns {
let index = self
.dataset
Expand Down Expand Up @@ -1571,12 +1571,12 @@ impl Scanner {
scan_node
};

column_inputs.insert(column.clone(), (index_uuids, unindexed_scan_node));
column_inputs.push((column.clone(), index_uuids, unindexed_scan_node));
}

let indices = column_inputs
.iter()
.map(|(col, (idx, _))| (col.clone(), idx.clone()))
.map(|(col, idx, _)| (col.clone(), idx.clone()))
.collect();
let prefilter_source = self.prefilter_source(filter_plan).await?;
let fts_plan = Arc::new(FtsExec::new(
Expand Down
74 changes: 64 additions & 10 deletions rust/lance/src/io/exec/fts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,46 @@ impl ExecutionPlan for FtsExec {

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
todo!()
let plan = match children.len() {
0 => Self {
dataset: self.dataset.clone(),
indices: self.indices.clone(),
query: self.query.clone(),
prefilter_source: PreFilterSource::None,
properties: self.properties.clone(),
},
1 => {
let src = children.pop().unwrap();
let prefilter_source = match &self.prefilter_source {
PreFilterSource::FilteredRowIds(_) => {
PreFilterSource::FilteredRowIds(src.clone())
}
PreFilterSource::ScalarIndexQuery(_) => {
PreFilterSource::ScalarIndexQuery(src.clone())
}
PreFilterSource::None => {
return Err(DataFusionError::Internal(
"Unexpected prefilter source".to_string(),
));
}
};
Self {
dataset: self.dataset.clone(),
indices: self.indices.clone(),
query: self.query.clone(),
prefilter_source,
properties: self.properties.clone(),
}
}
_ => {
return Err(DataFusionError::Internal(
"Unexpected number of children".to_string(),
));
}
};
Ok(Arc::new(plan))
}

#[instrument(name = "fts_exec", level = "debug", skip_all)]
Expand Down Expand Up @@ -194,8 +231,8 @@ impl ExecutionPlan for FtsExec {
#[derive(Debug)]
pub struct FlatFtsExec {
dataset: Arc<Dataset>,
// column -> (indices, unindexed input stream)
column_inputs: HashMap<String, (Vec<Index>, Arc<dyn ExecutionPlan>)>,
// (column, indices, unindexed input stream)
column_inputs: Vec<(String, Vec<Index>, Arc<dyn ExecutionPlan>)>,
query: FullTextSearchQuery,
properties: PlanProperties,
}
Expand All @@ -213,7 +250,7 @@ impl DisplayAs for FlatFtsExec {
impl FlatFtsExec {
pub fn new(
dataset: Arc<Dataset>,
column_inputs: HashMap<String, (Vec<Index>, Arc<dyn ExecutionPlan>)>,
column_inputs: Vec<(String, Vec<Index>, Arc<dyn ExecutionPlan>)>,
query: FullTextSearchQuery,
) -> Self {
let properties = PlanProperties::new(
Expand Down Expand Up @@ -246,16 +283,33 @@ impl ExecutionPlan for FlatFtsExec {

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.column_inputs
.values()
.map(|(_, input)| input)
.iter()
.map(|(_, _, input)| input)
.collect()
}

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
todo!()
if self.column_inputs.len() != children.len() {
return Err(DataFusionError::Internal(
"Unexpected number of children".to_string(),
));
}

let column_inputs = self
.column_inputs
.iter()
.zip(children)
.map(|((column, indices, _), input)| (column.clone(), indices.clone(), input))
.collect();
Ok(Arc::new(Self {
dataset: self.dataset.clone(),
column_inputs,
query: self.query.clone(),
properties: self.properties.clone(),
}))
}

#[instrument(name = "flat_fts_exec", level = "debug", skip_all)]
Expand All @@ -269,7 +323,7 @@ impl ExecutionPlan for FlatFtsExec {
let column_inputs = self.column_inputs.clone();

let stream = stream::iter(column_inputs)
.map(move |(column, (indices, input))| {
.map(move |(column, indices, input)| {
let index_meta = indices[0].clone();
let uuid = index_meta.uuid.to_string();
let query = query.clone();
Expand Down

0 comments on commit 2e2bf1a

Please sign in to comment.