Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Implemented Sort Operator; Basic Sort query works #5

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion eggstrain/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ pub(crate) struct Filter {

/// TODO docs
impl Filter {
pub(crate) fn new(predicate: Arc<dyn PhysicalExpr>, children: Vec<Arc<dyn ExecutionPlan>>) -> Self {
pub(crate) fn new(
predicate: Arc<dyn PhysicalExpr>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
Self {
predicate,
children,
Expand Down
1 change: 1 addition & 0 deletions eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::broadcast::{Receiver, Sender};

pub mod filter;
pub mod project;
pub mod sort;

/// Defines shared behavior for all operators
///
Expand Down
114 changes: 114 additions & 0 deletions eggstrain/src/execution/operators/sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::BATCH_SIZE;

use super::{Operator, UnaryOperator};
use arrow::compute::{concat_batches, lexsort_to_indices, take};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use async_trait::async_trait;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{sorts::sort::SortExec, ExecutionPlan};
use datafusion_common::Result;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;

/// TODO docs
pub(crate) struct Sort {
sort_expr: Vec<PhysicalSortExpr>,
input_schema: SchemaRef, // TODO
children: Vec<Arc<dyn ExecutionPlan>>,
limit_size: Option<usize>,
}

/// TODO docs
impl Sort {
pub(crate) fn new(sort_plan: &SortExec) -> Self {
Self {
sort_expr: Vec::from(sort_plan.expr()),
input_schema: sort_plan.children()[0].schema(),
children: sort_plan.children(),
limit_size: sort_plan.fetch(),
}
}

fn sort_in_mem(&self, rb: RecordBatch) -> Result<RecordBatch> {
assert_eq!(rb.schema(), self.input_schema);

let expressions = self.sort_expr.clone();

let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(&rb))
.collect::<Result<Vec<_>>>()?;

let indices = lexsort_to_indices(&sort_columns, self.limit_size)?;

let columns = rb
.columns()
.iter()
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;

Ok(RecordBatch::try_new(rb.schema(), columns)?)
//TODO: do we need to drop rb here or will that happen on its own?
//drop(rb);
}
}

/// TODO docs
impl Operator for Sort {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
self.children.clone()
}
}

/// TODO docs
#[async_trait]
impl UnaryOperator for Sort {
type In = RecordBatch;
type Out = RecordBatch;

fn into_unary(self) -> Arc<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Arc::new(self)
}

async fn execute(
&self,
mut rx: broadcast::Receiver<Self::In>,
tx: broadcast::Sender<Self::Out>,
) {
let mut batches = vec![];
loop {
match rx.recv().await {
Ok(batch) => {
batches.push(batch);
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}

let merged_batch = concat_batches(&self.input_schema, &batches);
match merged_batch {
Ok(merged_batch) => {
let sorted_batch = self.sort_in_mem(merged_batch).unwrap();
let mut current = 0;
let total_rows = sorted_batch.num_rows();
while current + BATCH_SIZE < total_rows {
let batch_to_send = sorted_batch.slice(current, BATCH_SIZE);
tx.send(batch_to_send)
.expect("Unable to send the sorted batch");
current += BATCH_SIZE;
}
let batch_to_send = sorted_batch.slice(current, total_rows - current);
tx.send(batch_to_send)
.expect("Unable to send the last sorted batch");

// TODO: do I have to call drop here manually or will rust take care of it?
// drop(sorted_batch);
}
Err(_) => todo!("Could not concat the batches for sorting"),
}
}
}
22 changes: 16 additions & 6 deletions eggstrain/src/execution/query_dag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::operators::filter::Filter;
use super::operators::project::Project;
use super::operators::sort::Sort;
use super::operators::{BinaryOperator, UnaryOperator};
use crate::BATCH_SIZE;
use arrow::record_batch::RecordBatch;
Expand All @@ -23,11 +24,10 @@ use tokio::sync::broadcast;
enum EggstrainOperator {
Project(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Filter(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Sort(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),

// TODO remove `dead_code` once implemented
#[allow(dead_code)]
Sort(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
Aggregate(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
TableScan(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Expand All @@ -43,6 +43,7 @@ impl EggstrainOperator {
match self {
Self::Project(x) => x.children(),
Self::Filter(x) => x.children(),
Self::Sort(x) => x.children(),
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -85,7 +86,15 @@ fn parse_execution_plan_root(plan: &Arc<dyn ExecutionPlan>) -> Result<EggstrainO
} else if id == TypeId::of::<HashJoinExec>() {
unimplemented!("HashJoin not implemented");
} else if id == TypeId::of::<SortExec>() {
unimplemented!("Sort not implemented");
let Some(sort_plan) = root.downcast_ref::<SortExec>() else {
return Err(DataFusionError::NotImplemented(
"Unable to downcast DataFusion ExecutionPlan to ProjectionExec".to_string(),
));
};

let node = Sort::new(sort_plan);

Ok(EggstrainOperator::Sort(node.into_unary()))
} else if id == TypeId::of::<AggregateExec>() {
unimplemented!("Aggregate not implemented");
} else {
Expand Down Expand Up @@ -121,8 +130,7 @@ fn datafusion_execute(plan: Arc<dyn ExecutionPlan>, tx: broadcast::Sender<Record
continue;
}

tx.send(batch)
.expect("Unable to send rb to project node");
tx.send(batch).expect("Unable to send rb to project node");
}
}
});
Expand All @@ -141,7 +149,9 @@ fn setup_unary_operator(

// Create the operator's tokio task
match node.clone() {
EggstrainOperator::Project(eggnode) | EggstrainOperator::Filter(eggnode) => {
EggstrainOperator::Project(eggnode)
| EggstrainOperator::Filter(eggnode)
| EggstrainOperator::Sort(eggnode) => {
let tx = tx.clone();
tokio::spawn(async move {
eggnode.execute(child_rx, tx).await;
Expand Down
2 changes: 1 addition & 1 deletion queries/basic_filter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ SELECT
FROM
orders
WHERE
orders.o_totalprice < 850.00
orders.o_totalprice < 900.00
;
9 changes: 9 additions & 0 deletions queries/basic_sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT
orders.o_totalprice
FROM
orders
WHERE
orders.o_totalprice < 900.00
ORDER BY
orders.o_totalprice
;
Loading