Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
setup module system derived from exec_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 12, 2024
1 parent d40a1da commit 6326e69
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 52 deletions.
7 changes: 6 additions & 1 deletion eggstrain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
name = "eggstrain"
version = "0.1.0"
edition = "2021"
authors = ["Connor Tsui (cjtsui)", "Sarvesh Tandon (sarvesht)", "Kyle Booker (kbooker)"]
authors = [
"Connor Tsui (cjtsui)",
"Sarvesh Tandon (sarvesht)",
"Kyle Booker (kbooker)",
]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
arrow = "50"
async-trait = "0.1"
datafusion = "35"
substrait = "0.24"
tokio = { version = "1", features = ["full"] }
Expand Down
31 changes: 3 additions & 28 deletions eggstrain/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,4 @@
pub mod operators;





pub trait Operator {
async fn execute(data: RecordBatch)
}



pub async fn execute()

















#[cfg(test)]
mod tests;
76 changes: 76 additions & 0 deletions eggstrain/src/execution/operators/forward_toy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::*;
use async_trait::async_trait;

pub struct Forward {
pub prime: usize,
}

impl Forward {
pub fn new(prime: usize) -> Self {
Self { prime }
}

// Simply multiples the number by what the prime is and then forwards to the sender
async fn add_and_forward(prime: usize, rx: &mut Receiver<usize>, tx: &Sender<usize>) {
loop {
match rx.recv().await {
Ok(x) => {
// Returns the number of receiving handles this value is getting sent to
let _ = tx.send(x + prime).expect("Receiver was somehow dropped");
}
Err(e) => match e {
tokio::sync::broadcast::error::RecvError::Closed => return,
tokio::sync::broadcast::error::RecvError::Lagged(_) => todo!(),
},
}
}
}
}

#[async_trait]
impl UnaryOperator for Forward {
type In = usize;
type Out = usize;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Box::new(self)
}

async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) {
Forward::add_and_forward(self.prime, &mut rx, &tx).await
}
}

#[async_trait]
impl BinaryOperator for Forward {
type InLeft = usize;
type InRight = usize;
type Out = usize;

fn into_binary(
self,
) -> Box<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>
{
Box::new(self)
}

// Have both children send to the same place, in whatever order they come in
async fn execute(
&self,
mut rx_left: Receiver<Self::InLeft>,
mut rx_right: Receiver<Self::InRight>,
tx: Sender<Self::Out>,
) {
let prime = self.prime;
let tx1 = tx.clone();
let tx2 = tx;

tokio::spawn(async move {
Forward::add_and_forward(prime, &mut rx_left, &tx1).await;
});

tokio::spawn(async move {
Forward::add_and_forward(prime, &mut rx_right, &tx2).await;
});
}
}
34 changes: 34 additions & 0 deletions eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use async_trait::async_trait;
use tokio::sync::broadcast::{Receiver, Sender};

pub mod forward_toy;
pub mod order_by;
pub mod project;

#[async_trait]
pub(crate) trait UnaryOperator: Send {
type In;
type Out;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>>;

async fn execute(&self, rx: Receiver<Self::In>, tx: Sender<Self::Out>);
}

#[async_trait]
pub(crate) trait BinaryOperator: Send {
type InLeft;
type InRight;
type Out;

fn into_binary(
self,
) -> Box<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>;

async fn execute(
&self,
rx_left: Receiver<Self::InLeft>,
rx_right: Receiver<Self::InRight>,
tx: Sender<Self::Out>,
);
}
102 changes: 102 additions & 0 deletions eggstrain/src/execution/operators/order_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! This entire implementation is very wrong since it is taking in a specific type
use super::*;
use async_trait::async_trait;
use std::cmp::{Ordering, PartialOrd};
use std::fmt::Debug;
use std::marker::PhantomData;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::oneshot;

pub struct OrderBy<F, T> {
pub comparison: F,
_phantom: PhantomData<T>,
}

impl<F, T> OrderBy<F, T> {
pub fn new(comparison: F) -> Self {
Self {
comparison,
_phantom: PhantomData,
}
}
}

/// TODO figure out proper trait bounds and lifetimes
#[async_trait]
impl<F, T> UnaryOperator for OrderBy<F, T>
where
T: PartialOrd + Clone + Debug + Send + Sync + 'static,
F: (Fn(&T, &T) -> Ordering) + Send + Sync + Clone + 'static,
{
type In = Vec<T>;
type Out = Vec<T>;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Box::new(self)
}

async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) {
let mut gather = Vec::new();

loop {
match rx.recv().await {
Ok(mut batch) => {
println!("Received {:?}", batch);
gather.append(&mut batch);
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}

let (tx_one, rx_one) = oneshot::channel();
let comparison = self.comparison.clone();

println!("Spawning rayon thread now!");

rayon::spawn(move || {
println!("Beginning sort!");
gather.sort_by(comparison);
tx_one.send(gather).expect("Oneshot Send failed");
});

match rx_one.await {
Ok(sorted) => {
tx.send(sorted).expect("Send failed");
println!("Sent sorted vecs");
}
Err(_) => todo!(),
};
}
}

#[tokio::test]
async fn test_order_by() {
use tokio::sync::broadcast;

let comparison = |a: &i32, b: &i32| a.partial_cmp(b).unwrap();
let operator = OrderBy::new(comparison);
let operator = operator.into_unary();

let (tx_in, rx_in) = broadcast::channel(1000);
let (tx_out, mut rx_out) = broadcast::channel(1000);

let nums = (0..10).collect::<Vec<_>>();
let mut multi_nums = vec![nums.clone(), nums.clone(), nums.clone(), nums.clone()];

tokio::spawn(async move {
operator.execute(rx_in, tx_out).await;
});

while let Some(vec) = multi_nums.pop() {
tx_in.send(vec).expect("Send vec failed");
}
std::mem::drop(tx_in); // notifies that there is nothing left to send

let sorted_nums = rx_out.recv().await.expect("Receive error from channel");

println!("Sorted nums: {:?}", sorted_nums);
}
76 changes: 76 additions & 0 deletions eggstrain/src/execution/operators/project.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::*;
use arrow::{
array::RecordBatch,
datatypes::{DataType, Field, Fields, Schema},
};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;

pub struct Project {
pub schema: Arc<Schema>,
/// TODO:
/// https://docs.rs/substrait/latest/substrait/proto/struct.ProjectRel.html
/// Need to make these expressions rather than a bunch of columns
pub expressions: Vec<usize>,
}

impl Project {
pub fn new(_schema: Arc<Schema>, expressions: Vec<usize>) -> Self {
// TODO placeholder
let field_a = Field::new("a", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a]));
let res = Self {
schema,
expressions,
};

let fields = res.schema.fields();

debug_assert!(res.is_valid_projection(fields));
res
}

fn is_valid_projection(&self, fields: &Fields) -> bool {
self.expressions.iter().all(|&col| col < fields.len())
}

fn project_record_batch(&self, batch: RecordBatch) -> RecordBatch {
let schema = batch.schema();

let projected = self
.expressions
.iter()
.map(|&i| (schema.field(i).name(), batch.column(i).clone()));

RecordBatch::try_from_iter(projected).expect("Unable to create the RecordBatch")
}
}

#[async_trait]
impl UnaryOperator for Project {
type In = RecordBatch;
type Out = RecordBatch;

fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> {
Box::new(self)
}

async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) {
// For now assume that each record batch has the same type

loop {
match rx.recv().await {
Ok(batch) => {
debug_assert!(batch.schema() == self.schema, "RecordBatch {:?} does not have the correct schema. Schema is {:?}, supposed to be {:?}", batch, batch.schema(), self.schema);
tx.send(self.project_record_batch(batch))
.expect("Sending failed");
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}
}
}
1 change: 1 addition & 0 deletions eggstrain/src/execution/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod toy_forward_example;
Loading

0 comments on commit 6326e69

Please sign in to comment.