Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Implemented Sort Operator; Basic Sort query works Note: top SortMerge…
Browse files Browse the repository at this point in the history
… Execplan must be removed from physical plans for basic_sort.sql
  • Loading branch information
SarveshOO7 committed Feb 25, 2024
1 parent da7eb9e commit 6c29bc6
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 7 deletions.
1 change: 1 addition & 0 deletions eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::broadcast::{Receiver, Sender};

pub mod filter;
pub mod project;
pub mod sort;

/// Defines shared behavior for all operators
///
Expand Down
114 changes: 114 additions & 0 deletions eggstrain/src/execution/operators/sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::BATCH_SIZE;

use super::{Operator, UnaryOperator};
use arrow::compute::{concat_batches, lexsort_to_indices, take};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use async_trait::async_trait;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{sorts::sort::SortExec, ExecutionPlan};
use datafusion_common::Result;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;

/// TODO docs
pub(crate) struct Sort {
sort_expr: Vec<PhysicalSortExpr>,
input_schema: SchemaRef, // TODO
children: Vec<Arc<dyn ExecutionPlan>>,
limit_size: Option<usize>,
}

/// TODO docs
impl Sort {
pub(crate) fn new(sort_plan: &SortExec) -> Self {
Self {
sort_expr: Vec::from(sort_plan.expr()),
input_schema: sort_plan.children()[0].schema(),
children: sort_plan.children(),
limit_size: sort_plan.fetch(),
}
}

fn sort_in_mem(&self, rb: RecordBatch) -> Result<RecordBatch> {
assert_eq!(rb.schema(), self.input_schema);

let expressions = self.sort_expr.clone();

let sort_columns = expressions
.iter()
.map(|expr| expr.evaluate_to_sort_column(&rb))
.collect::<Result<Vec<_>>>()?;

let indices = lexsort_to_indices(&sort_columns, self.limit_size)?;

let columns = rb
.columns()
.iter()
.map(|c| take(c.as_ref(), &indices, None))
.collect::<Result<_, _>>()?;

Ok(RecordBatch::try_new(rb.schema(), columns)?)
//TODO: do we need to drop rb here or will that happen on its own?
//drop(rb);
}
}

/// TODO docs
impl Operator for Sort {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
self.children.clone()
}
}

/// TODO docs
#[async_trait]
impl UnaryOperator for Sort {
type In = RecordBatch;
type Out = RecordBatch;

fn into_unary(self) -> Arc<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Arc::new(self)
}

async fn execute(
&self,
mut rx: broadcast::Receiver<Self::In>,
tx: broadcast::Sender<Self::Out>,
) {
let mut batches = vec![];
loop {
match rx.recv().await {
Ok(batch) => {
batches.push(batch);
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}

let merged_batch = concat_batches(&self.input_schema, &batches);
match merged_batch {
Ok(merged_batch) => {
let sorted_batch = self.sort_in_mem(merged_batch).unwrap();
let mut current = 0;
let total_rows = sorted_batch.num_rows();
while current + BATCH_SIZE < total_rows {
let batch_to_send = sorted_batch.slice(current, BATCH_SIZE);
tx.send(batch_to_send)
.expect("Unable to send the sorted batch");
current += BATCH_SIZE;
}
let batch_to_send = sorted_batch.slice(current, total_rows - current);
tx.send(batch_to_send)
.expect("Unable to send the last sorted batch");

// TODO: do I have to call drop here manually or will rust take care of it?
// drop(sorted_batch);
}
Err(_) => todo!("Could not concat the batches for sorting"),
}
}
}
22 changes: 16 additions & 6 deletions eggstrain/src/execution/query_dag.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::operators::filter::Filter;
use super::operators::project::Project;
use super::operators::sort::Sort;
use super::operators::{BinaryOperator, UnaryOperator};
use crate::BATCH_SIZE;
use arrow::record_batch::RecordBatch;
Expand All @@ -23,11 +24,10 @@ use tokio::sync::broadcast;
enum EggstrainOperator {
Project(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Filter(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Sort(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),

// TODO remove `dead_code` once implemented
#[allow(dead_code)]
Sort(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
Aggregate(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
#[allow(dead_code)]
TableScan(Arc<dyn UnaryOperator<In = RecordBatch, Out = RecordBatch>>),
Expand All @@ -43,6 +43,7 @@ impl EggstrainOperator {
match self {
Self::Project(x) => x.children(),
Self::Filter(x) => x.children(),
Self::Sort(x) => x.children(),
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -85,7 +86,15 @@ fn parse_execution_plan_root(plan: &Arc<dyn ExecutionPlan>) -> Result<EggstrainO
} else if id == TypeId::of::<HashJoinExec>() {
unimplemented!("HashJoin not implemented");
} else if id == TypeId::of::<SortExec>() {
unimplemented!("Sort not implemented");
let Some(sort_plan) = root.downcast_ref::<SortExec>() else {
return Err(DataFusionError::NotImplemented(
"Unable to downcast DataFusion ExecutionPlan to ProjectionExec".to_string(),
));
};

let node = Sort::new(sort_plan);

Ok(EggstrainOperator::Sort(node.into_unary()))
} else if id == TypeId::of::<AggregateExec>() {
unimplemented!("Aggregate not implemented");
} else {
Expand Down Expand Up @@ -121,8 +130,7 @@ fn datafusion_execute(plan: Arc<dyn ExecutionPlan>, tx: broadcast::Sender<Record
continue;
}

tx.send(batch)
.expect("Unable to send rb to project node");
tx.send(batch).expect("Unable to send rb to project node");
}
}
});
Expand All @@ -141,7 +149,9 @@ fn setup_unary_operator(

// Create the operator's tokio task
match node.clone() {
EggstrainOperator::Project(eggnode) | EggstrainOperator::Filter(eggnode) => {
EggstrainOperator::Project(eggnode)
| EggstrainOperator::Filter(eggnode)
| EggstrainOperator::Sort(eggnode) => {
let tx = tx.clone();
tokio::spawn(async move {
eggnode.execute(child_rx, tx).await;
Expand Down
2 changes: 1 addition & 1 deletion queries/basic_filter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ SELECT
FROM
orders
WHERE
orders.o_totalprice < 850.00
orders.o_totalprice < 900.00
;
9 changes: 9 additions & 0 deletions queries/basic_sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT
orders.o_totalprice
FROM
orders
WHERE
orders.o_totalprice < 900.00
ORDER BY
orders.o_totalprice
;

0 comments on commit 6c29bc6

Please sign in to comment.