Skip to content

Commit

Permalink
refactor: Rearrange module imports and remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Oct 17, 2024
1 parent e597102 commit a47c1a5
Show file tree
Hide file tree
Showing 20 changed files with 160 additions and 98 deletions.
2 changes: 1 addition & 1 deletion qurious/src/datasource/connectorx/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl TableProvider for PostgresTableProvider {

fn to_arrow_type(col_type: &str) -> DataType {
match col_type {
"bigint"|"integer" => DataType::Int64,
"bigint" | "integer" => DataType::Int64,
"smallint" => DataType::Int16,
"character varying" => DataType::Utf8,
"character" => DataType::Utf8,
Expand Down
6 changes: 2 additions & 4 deletions qurious/src/datasource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#[cfg(feature = "connectorx")]
pub mod connectorx;
pub mod file;
pub mod memory;
#[cfg(feature="connectorx")]
pub mod connectorx;


4 changes: 2 additions & 2 deletions qurious/src/execution/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ pub struct SessionConfig {
pub default_schema: String,
}

impl Default for SessionConfig{
impl Default for SessionConfig {
fn default() -> Self {
Self {
default_catalog: "qurious".to_string(),
default_schema: "public".to_string(),
}
}
}
}
2 changes: 1 addition & 1 deletion qurious/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod session;

mod config;
mod providers;
mod config;
12 changes: 6 additions & 6 deletions qurious/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,19 @@ mod tests {
#[test]
fn test_create_table() -> Result<()> {
let session = ExecuteSession::new()?;
session.sql("create table a(v1 int, v2 int);")?;
session.sql("create table b(v1 int, v2 float);")?;
session.sql("create table t (v1 int, v2 int)")?;
// session.sql("create table b(v1 int, v2 float);")?;
// session.sql("create table t(v1 int not null, v2 int not null, v3 double not null)")?;

// session.sql("create table x(a int, b int);")?;
// session.sql("create table y(c int, d int);")?;

session.sql("insert into a values (1,10), (2,20), (3,30), (4,40)")?;
session.sql("insert into b select v1, v2 from a;")?;
// session.sql("insert into b values (1, 100), (3, 300), (4, 400);")?;
session.sql("insert into t values (1,1), (2,1), (3,2), (4,2), (5,3)")?;
// session.sql("insert into b select v1, v2 from a;")?;
// session.sql("INSERT INTO test VALUES (1, 1), (2, 2), (3, 3), (3, 5), (NULL, NULL);")?;
// session.sql("select a, b, c, d from x join y on a = c")?;
println!("++++++++++++++");
let batch = session.sql("select * from b")?;
let batch = session.sql("select v1 + 1 + count(*) from t group by v1 + 1")?;

print_batches(&batch)?;

Expand Down
2 changes: 1 addition & 1 deletion qurious/src/functions/datetime/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub mod extract;
pub mod extract;
4 changes: 2 additions & 2 deletions qurious/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ pub mod datasource;
pub mod datatypes;
pub mod error;
pub mod execution;
pub mod functions;
pub mod logical;
pub mod optimizer;
pub mod physical;
pub mod planner;
pub mod utils;
pub mod provider;
pub mod functions;
pub mod utils;

#[cfg(test)]
pub mod test_utils;
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl BinaryExpr {

pub fn field(&self, plan: &LogicalPlan) -> Result<FieldRef> {
Ok(Arc::new(Field::new(
format!("({} {} {})", self.left, self.op, self.right),
self.to_string(),
self.get_result_type(&plan.schema())?,
true,
)))
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/logical/expr/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;

use crate::logical::expr::LogicalExpr;

#[derive(Debug, Clone, PartialEq, Eq,Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SortExpr {
pub expr: Box<LogicalExpr>,
pub asc: bool,
Expand Down
9 changes: 7 additions & 2 deletions qurious/src/logical/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl LogicalPlan {
pub fn children(&self) -> Option<Vec<&LogicalPlan>> {
match self {
LogicalPlan::EmptyRelation(_) | LogicalPlan::Values(_) => None,

LogicalPlan::Projection(p) => p.children(),
LogicalPlan::Filter(f) => f.children(),
LogicalPlan::Aggregate(a) => a.children(),
Expand Down Expand Up @@ -137,6 +136,7 @@ impl LogicalPlan {
.into_iter()
.map(|expr| f(expr).data())
.collect::<Result<Vec<_>>>()?;

Ok(Transformed::yes(LogicalPlan::Aggregate(Aggregate {
schema,
input,
Expand Down Expand Up @@ -172,7 +172,12 @@ impl TransformNode for LogicalPlan {
aggr_expr,
})
}),

LogicalPlan::Sort(Sort { exprs, input }) => f(*input)?.update(|input| {
LogicalPlan::Sort(Sort {
exprs,
input: Box::new(input),
})
}),
_ => Transformed::no(self),
})
}
Expand Down
2 changes: 1 addition & 1 deletion qurious/src/optimizer/count_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn count_wildcard_rule(expr: LogicalExpr) -> Result<Transformed<LogicalExpr>> {
if AggregateOperator::Count == agg.op && LogicalExpr::Wildcard == *agg.expr {
return Ok(Transformed::yes(LogicalExpr::AggregateExpr(AggregateExpr {
op: AggregateOperator::Count,
expr: Box::new(LogicalExpr::Literal(ScalarValue::Int64(Some(1)))),
expr: Box::new(LogicalExpr::Literal(ScalarValue::from(1))),
})));
}
}
Expand Down
5 changes: 1 addition & 4 deletions qurious/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ pub struct Optimizer {
impl Optimizer {
pub fn new() -> Self {
Self {
rules: vec![
Box::new(CountWildcardRule),
Box::new(TypeCoercion::default())
],
rules: vec![Box::new(CountWildcardRule), Box::new(TypeCoercion::default())],
}
}

Expand Down
21 changes: 13 additions & 8 deletions qurious/src/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ impl OptimizerRule for TypeCoercion {
}

fn optimize(&self, base_plan: LogicalPlan) -> Result<LogicalPlan> {
let mut merged_schema = Arc::new(Schema::empty());
let schema = base_plan.schema();

for input in base_plan.children().into_iter().flat_map(|x| x) {
merged_schema = merge_schema(&schema, &input.schema()).map(Arc::new)?;
}

base_plan
.transform(|plan| plan.map_exprs(|expr| type_coercion(&merged_schema, expr)))
.transform(|plan| {
if matches!(plan, LogicalPlan::TableScan(_)) {
return Ok(Transformed::no(plan));
}
let mut merged_schema = Arc::new(Schema::empty());
let schema = plan.schema();

for input in plan.children().into_iter().flat_map(|x| x) {
merged_schema = merge_schema(&schema, &input.schema()).map(Arc::new)?;
}

plan.map_exprs(|expr| type_coercion(&merged_schema, expr))
})
.data()
}
}
Expand Down
6 changes: 5 additions & 1 deletion qurious/src/physical/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ impl Function {

impl PhysicalExpr for Function {
fn evaluate(&self, input: &RecordBatch) -> Result<ArrayRef> {
let inputs = self.args.iter().map(|arg| arg.evaluate(input)).collect::<Result<Vec<_>>>()?;
let inputs = self
.args
.iter()
.map(|arg| arg.evaluate(input))
.collect::<Result<Vec<_>>>()?;
self.func.eval(inputs)
}
}
Expand Down
5 changes: 1 addition & 4 deletions qurious/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use crate::{
error::{Error, Result},
internal_err,
logical::{
expr::{
alias::Alias, AggregateOperator, BinaryExpr, CastExpr, Column, Function, LogicalExpr,
},
expr::{alias::Alias, AggregateOperator, BinaryExpr, CastExpr, Column, Function, LogicalExpr},
plan::{
Aggregate, CrossJoin, EmptyRelation, Filter, Join, LogicalPlan, Projection, Sort, SubqueryAlias, TableScan,
Values,
Expand Down Expand Up @@ -282,4 +280,3 @@ impl DefaultQueryPlanner {
Ok(Arc::new(physical::expr::Function::new(function.func.clone(), args)))
}
}

86 changes: 61 additions & 25 deletions qurious/src/planner/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct Context {
relations: HashMap<TableRelation, SchemaRef>,
/// table alias -> original table name
table_aliase: HashMap<String, TableRelation>,
columns_alias: HashMap<String, LogicalExpr>,
}

pub struct SqlQueryPlanner<'a> {
Expand Down Expand Up @@ -245,6 +246,22 @@ impl<'a> SqlQueryPlanner<'a> {
.or(ctx.table_aliase.get(&table.to_string()).cloned())
})
}

fn add_column_alias(&mut self, name: String, expr: LogicalExpr) -> Result<()> {
let context = self.current_context();
if context.columns_alias.contains_key(&name) {
return internal_err!("Column alias {} already exists", name);
}
context.columns_alias.insert(name, expr);
Ok(())
}

fn get_column_alias(&self, name: &str) -> Option<LogicalExpr> {
self.contexts
.iter()
.rev()
.find_map(|ctx| ctx.columns_alias.get(name).cloned())
}
}

// functions for Binder
Expand Down Expand Up @@ -488,6 +505,8 @@ impl<'a> SqlQueryPlanner<'a> {
let mut plan = self.filter_expr(plan, select.r#where)?;
// process the SELECT expressions
let column_exprs = self.column_exprs(&plan, empty_from, select.columns)?;
// sort exprs
let sort_exprs = self.order_by_exprs(select.order_by.unwrap_or_default())?;
// get aggregate expressions
let aggr_exprs = find_aggregate_exprs(&column_exprs);
// process the HAVING clause
Expand All @@ -498,20 +517,38 @@ impl<'a> SqlQueryPlanner<'a> {
};
// process the GROUP BY clause or process aggregation in SELECT
if select.group_by.is_some() || !aggr_exprs.is_empty() {
plan = self.aggregate_plan(
plan,
column_exprs.clone(),
aggr_exprs,
select.group_by.unwrap_or_default(),
having,
)?;
let group_by_exprs = select
.group_by
.unwrap_or_default()
.into_iter()
.map(|expr| {
let col = self.sql_to_expr(expr)?;

col.transform(|expr| match expr {
LogicalExpr::Column(col) => {
if col.relation.is_none() {
if let Some(data) = self.get_column_alias(&col.name) {
return Ok(Transformed::yes(data));
}
}
Ok(Transformed::no(LogicalExpr::Column(col)))
}
_ => Ok(Transformed::no(expr)),
})
.data()
})
.collect::<Result<_>>()?;

plan = self.aggregate_plan(plan, column_exprs.clone(), aggr_exprs, group_by_exprs, having)?;
} else {
plan = LogicalPlanBuilder::project(plan, column_exprs)?;
}

// process the ORDER BY clause
let plan = if let Some(order_by) = select.order_by {
self.order_by_expr(plan, order_by)?
let plan = if !sort_exprs.is_empty() {
LogicalPlanBuilder::from(plan)
.sort(sort_exprs)
.map(|builder| builder.build())?
} else {
plan
};
Expand Down Expand Up @@ -654,13 +691,9 @@ impl<'a> SqlQueryPlanner<'a> {
input: LogicalPlan,
select_exprs: Vec<LogicalExpr>,
aggr_exprs: Vec<LogicalExpr>,
group_by: Vec<Expression>,
group_exprs: Vec<LogicalExpr>,
_having_expr: Option<LogicalExpr>,
) -> Result<LogicalPlan> {
let group_exprs = group_by
.into_iter()
.map(|expr| self.sql_to_expr(expr))
.collect::<Result<Vec<_>>>()?;
let agg_and_group_by_column_exprs = aggr_exprs.iter().chain(group_exprs.iter()).collect::<Vec<_>>();
let select_exprs_post_aggr = select_exprs
.into_iter()
Expand All @@ -683,10 +716,13 @@ impl<'a> SqlQueryPlanner<'a> {

for col_expr in select_exprs_post_aggr.iter().flat_map(find_columns_exprs) {
if !agg_and_group_columns.contains(&col_expr) {
return Err(Error::InternalError(format!(
"column [{}] must appear in the GROUP BY clause or be used in an aggregate function",
col_expr
)));
return internal_err!("column [{}] must appear in the GROUP BY clause or be used in an aggregate function, validate columns: [{}]",
col_expr,
agg_and_group_columns
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", "));
}
}

Expand All @@ -696,7 +732,7 @@ impl<'a> SqlQueryPlanner<'a> {
.map(|plan| plan.build())
}

fn order_by_expr(&self, plan: LogicalPlan, order_by: Vec<(Expression, Order)>) -> Result<LogicalPlan> {
fn order_by_exprs(&self, order_by: Vec<(Expression, Order)>) -> Result<Vec<SortExpr>> {
order_by
.into_iter()
.map(|(sort_expr, order)| {
Expand All @@ -706,8 +742,6 @@ impl<'a> SqlQueryPlanner<'a> {
})
})
.collect::<Result<Vec<_>>>()
.and_then(|sort_exprs| LogicalPlanBuilder::from(plan).sort(sort_exprs))
.map(|builder| builder.build())
}

fn cte_tables(&mut self, ctes: Vec<Cte>) -> Result<()> {
Expand Down Expand Up @@ -858,9 +892,11 @@ impl<'a> SqlQueryPlanner<'a> {
) -> Result<Vec<LogicalExpr>> {
match item {
SelectItem::UnNamedExpr(expr) => self.sql_to_expr(expr).map(|v| vec![v]),
SelectItem::ExprWithAlias(expr, alias) => self
.sql_to_expr(expr)
.map(|col| vec![LogicalExpr::Alias(Alias::new(alias, col))]),
SelectItem::ExprWithAlias(expr, alias) => {
let col = self.sql_to_expr(expr)?;
self.add_column_alias(alias.clone(), col.clone())?;
Ok(vec![LogicalExpr::Alias(Alias::new(alias, col))])
}
SelectItem::Wildcard => {
if empty_relation {
return Err(Error::InternalError(
Expand Down Expand Up @@ -1366,7 +1402,7 @@ mod tests {
quick_test(sql, expected);

let sql = "SELECT * FROM person GROUP BY name";
let expected = "Internal Error: column [person.age] must appear in the GROUP BY clause or be used in an aggregate function";
let expected = "Internal Error: column [person.age] must appear in the GROUP BY clause or be used in an aggregate function, validate columns: [person.name]";
quick_test(sql, expected);
}

Expand Down
8 changes: 6 additions & 2 deletions qurious/src/provider/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use std::sync::Arc;
use crate::error::Result;
use crate::provider::schema::SchemaProvider;

pub trait CatalogProvider : Send + Sync {
pub trait CatalogProvider: Send + Sync {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;

fn register_schema(&self, _name: &str, _schema: Arc<dyn SchemaProvider>) -> Result<Option<Arc<dyn SchemaProvider>>> {
fn register_schema(
&self,
_name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
unimplemented!("Registering new schemas is not supported")
}

Expand Down
Loading

0 comments on commit a47c1a5

Please sign in to comment.