From b63514ac0b45a9c1044124a6330b021b14497210 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 11 Jan 2023 13:43:51 -0500 Subject: [PATCH 01/17] push for benchmarking tests --- dask_planner/src/sql/logical/table_scan.rs | 29 +++++++++++++- dask_sql/context.py | 8 ++++ dask_sql/datacontainer.py | 1 + dask_sql/physical/rel/logical/table_scan.py | 43 +++++++++++++++++++-- dask_sql/physical/utils/filter.py | 40 ++++++++++++++++++- 5 files changed, 116 insertions(+), 5 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 537f011cc..f0830489f 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use datafusion_common::DFSchema; -use datafusion_expr::{logical_plan::TableScan, LogicalPlan}; +use datafusion_expr::{Expr, logical_plan::TableScan, LogicalPlan}; use pyo3::prelude::*; use crate::{ @@ -43,6 +43,33 @@ impl PyTableScan { fn scan_filters(&self) -> PyResult> { py_expr_list(&self.input, &self.table_scan.filters) } + + #[pyo3(name = "getDNFFilters")] + fn dnf_io_filters(&self) -> PyResult> { + let mut filters: Vec<(String, String, String)> = Vec::new(); + for filter in &self.table_scan.filters { + match filter { + Expr::BinaryExpr(binary_expr) => { + let left = binary_expr.left.to_string(); + let mut left_split = left.split('.'); + let left = left_split.nth(1); + let right = binary_expr.right.to_string(); + let mut right_split = right.split('.'); + let right = right_split.nth(0); + filters.push((left.unwrap().to_string(), binary_expr.op.to_string(), right.unwrap().to_string())) + }, + // Expr::IsNotNull(expr) => { + // let expr_thing = expr.to_string(); + // let mut expr_split = expr_thing.split('.'); + // let left = expr_split.nth(1); + // filters.push((left.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) + // }, + Expr::IsNotNull(expr) => print!("Skipping is not null for now"), + _ => panic!("Encountered non BinaryExpr type: {}", filter) + } + } + Ok(filters) + } } impl TryFrom for PyTableScan { diff --git a/dask_sql/context.py b/dask_sql/context.py index 860b4b75e..7034635c9 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -244,6 +244,14 @@ def create_table( **kwargs, ) + # Temporary for testing + self.schema[schema_name].tables_meta[table_name.lower()] = { + "input_path": input_table, + "table_name": table_name, + "format": format, + "gpu": gpu, + } + self.schema[schema_name].tables[table_name.lower()] = dc if statistics: self.schema[schema_name].statistics[table_name.lower()] = statistics diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index 6e1b7336c..01067c3ac 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -269,6 +269,7 @@ class SchemaContainer: def __init__(self, name: str): self.__name__ = name self.tables: Dict[str, DataContainer] = {} + self.tables_meta: Dict[str, str] = {} self.statistics: Dict[str, Statistics] = {} self.experiments: Dict[str, pd.DataFrame] = {} self.models: Dict[str, Tuple[Any, List[str]]] = {} diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 8bd7874f2..25397ddb6 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -3,11 +3,13 @@ from functools import reduce from typing import TYPE_CHECKING -from dask_sql.datacontainer import DataContainer +from dask_sql.datacontainer import DataContainer, ColumnContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rel.logical.filter import filter_or_scalar from dask_sql.physical.rex import RexConverter +import dask_cudf as ddf + if TYPE_CHECKING: import dask_sql from dask_planner.rust import LogicalPlan @@ -43,10 +45,45 @@ def convert( dask_table = rel.getTable() schema_name, table_name = [n.lower() for n in context.fqn(dask_table)] - dc = context.schema[schema_name].tables[table_name] + filters = table_scan.getFilters() + if filters: + tbl_meta = context.schema[schema_name].tables_meta[table_name] + + # print(f"Reading table: {table_name} @ {tbl_meta['input_path']}") + # print(f"# Of filters from Rust: {len(table_scan.getFilters())}") + + # Generate the filters in DNF form for the cudf reader + filters = table_scan.getDNFFilters() + # print(f"Filters: {filters}") + + # Columns that should be projected + cols = table_scan.getTableScanProjects() + # print(f"Columns to read: {cols}") + + if len(filters) > 0: + # Prepare the filters to be in the format expected by Python since they came from Rust + updated_filters = [] + for filter_tup in filters: + if filter_tup[2].startswith("Int"): + num = filter_tup[2].split('(')[1].split(')')[0] + updated_filters.append((filter_tup[0], filter_tup[1], int(num))) + else: + updated_filters.append(filter_tup) + + + # print(f"Updated filters: {updated_filters}") + # df = ddf.read_parquet(tbl_meta["input_path"], columns=cols, filters=filters) + df = ddf.read_parquet(tbl_meta["input_path"], filters=updated_filters, columns=cols) + else: + # df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) + df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) + + dc = DataContainer(df.copy(), ColumnContainer(df.columns)) + else: + dc = context.schema[schema_name].tables[table_name] + # dc = self._apply_filters(table_scan, rel, dc, context) # Apply filter before projections since filter columns may not be in projections - dc = self._apply_filters(table_scan, rel, dc, context) dc = self._apply_projections(table_scan, dask_table, dc) cc = dc.column_container diff --git a/dask_sql/physical/utils/filter.py b/dask_sql/physical/utils/filter.py index 5309289c4..a4a4e3193 100644 --- a/dask_sql/physical/utils/filter.py +++ b/dask_sql/physical/utils/filter.py @@ -75,6 +75,7 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: name = ddf._name try: filters = dsk.layers[name]._dnf_filter_expression(dsk) + print(f"Newly created filters: {filters}") if not isinstance(filters, frozenset): # No filters encountered return ddf @@ -179,11 +180,16 @@ def to_dnf(expr): operator.or_, operator.getitem, M.fillna, + # M.isna, + # M.astype, + # operator.inv, } # Specify functions that must be generated with # a different API at the dataframe-collection level -_special_op_mappings = {M.fillna: dd._Frame.fillna} +_special_op_mappings = {M.fillna: dd._Frame.fillna, + M.isna: dd._Frame.isna, + M.astype: dd._Frame.astype} class RegenerableLayer: @@ -245,7 +251,12 @@ def _regenerate_collection( regen_args = self.creation_info.get("args", []) regen_kwargs = self.creation_info.get("kwargs", {}).copy() regen_kwargs = {k: v for k, v in self.creation_info.get("kwargs", {}).items()} + print(f"new_kwargs: {new_kwargs}") regen_kwargs.update((new_kwargs or {}).get(self.layer.output, {})) + print(f"Func: {func}") + print(f"Inputs: {inputs}") + print(f"Regen Args: {regen_args}") + print(f"Regen KWArgs: {regen_kwargs}") result = func(*inputs, *regen_args, **regen_kwargs) _regen_cache[self.layer.output] = result return result @@ -263,7 +274,14 @@ def _dnf_filter_expression(self, dsk): func = _blockwise_getitem_dnf elif op == dd._Frame.fillna: func = _blockwise_fillna_dnf + elif op == dd._Frame.isna: + func = _blockwise_isna_dnf + elif op == operator.inv: + func = _blockwise_inv_dnf + elif op == dd._Frame.astype: + func = _blockwise_astype_dnf else: + print(f"This is the problem spot!~!!! Op: {op}") raise ValueError(f"No DNF expression for {op}") return func(op, self.layer.indices, dsk) @@ -289,6 +307,7 @@ def from_hlg(cls, hlg: HighLevelGraph): _layers = {} for key, layer in hlg.layers.items(): + # print(f"Graph Key: {key}, Layer: {layer}, Layer Type: {type(layer)}") regenerable_layer = None if isinstance(layer, DataFrameIOLayer): regenerable_layer = RegenerableLayer(layer, layer.creation_info or {}) @@ -310,6 +329,7 @@ def from_hlg(cls, hlg: HighLevelGraph): else: op = tasks[0][0] if op in _regenerable_ops: + print(f"Blockwise OP: {op}") regenerable_layer = RegenerableLayer( layer, { @@ -317,6 +337,8 @@ def from_hlg(cls, hlg: HighLevelGraph): "kwargs": kwargs, }, ) + else: + print(f"Blockwise OP: {op} NOT in list of _regenerable_ops") if regenerable_layer is None: raise ValueError(f"Graph contains non-regenerable layer: {layer}") @@ -378,3 +400,19 @@ def _blockwise_getitem_dnf(op, indices: list, dsk: RegenerableGraph): def _blockwise_fillna_dnf(op, indices: list, dsk: RegenerableGraph): # Return dnf of input collection return _get_blockwise_input(0, indices, dsk) + + +def _blockwise_isna_dnf(op, indices: list, dsk: RegenerableGraph): + # Return dnf of input collection + return _get_blockwise_input(0, indices, dsk) + + +def _blockwise_inv_dnf(op, indices: list, dsk: RegenerableGraph): + # Return dnf of input collection + print(f"Indices: {indices}") + return _get_blockwise_input(0, indices, dsk) + + +def _blockwise_astype_dnf(op, indices: list, dsk: RegenerableGraph): + # Return dnf of input collection + return _get_blockwise_input(0, indices, dsk) From 4b7f3bb5d93d7d2759ab9787d4a7501bcb86d962 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 17 Jan 2023 17:08:01 -0500 Subject: [PATCH 02/17] Updates to include post IO Dask filtering logic --- dask_planner/src/sql/logical/table_scan.rs | 32 +++++--- dask_sql/physical/rel/logical/filter.py | 10 ++- dask_sql/physical/rel/logical/table_scan.py | 83 ++++++++++----------- dask_sql/physical/utils/filter.py | 1 - 4 files changed, 66 insertions(+), 60 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index f0830489f..75b13e821 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -16,6 +16,16 @@ pub struct PyTableScan { input: Arc, } +#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)] +#[derive(Debug, Clone)] +pub struct PyFilteredResult { + // Exprs that cannot be successfully passed down to the IO layer for filtering and must still be filtered using Dask operations + #[pyo3(get)] + pub io_unfilterable_exprs: Vec, + #[pyo3(get)] + pub filtered_exprs: Vec<(String, String, String)>, +} + #[pymethods] impl PyTableScan { #[pyo3(name = "getTableScanProjects")] @@ -45,8 +55,9 @@ impl PyTableScan { } #[pyo3(name = "getDNFFilters")] - fn dnf_io_filters(&self) -> PyResult> { + fn dnf_io_filters(&self) -> PyResult { let mut filters: Vec<(String, String, String)> = Vec::new(); + let mut unfiltered: Vec = Vec::new(); for filter in &self.table_scan.filters { match filter { Expr::BinaryExpr(binary_expr) => { @@ -58,17 +69,18 @@ impl PyTableScan { let right = right_split.nth(0); filters.push((left.unwrap().to_string(), binary_expr.op.to_string(), right.unwrap().to_string())) }, - // Expr::IsNotNull(expr) => { - // let expr_thing = expr.to_string(); - // let mut expr_split = expr_thing.split('.'); - // let left = expr_split.nth(1); - // filters.push((left.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) - // }, - Expr::IsNotNull(expr) => print!("Skipping is not null for now"), - _ => panic!("Encountered non BinaryExpr type: {}", filter) + _ => { + println!("Unable to apply filter: `{}` to IO reader, using in Dask instead", filter); + let tbl_scan = LogicalPlan::TableScan(self.table_scan.clone()); + unfiltered.push(PyExpr::from(filter.clone(), Some(vec![Arc::new(tbl_scan)]))) + } } } - Ok(filters) + + Ok(PyFilteredResult { + io_unfilterable_exprs: unfiltered, + filtered_exprs: filters + }) } } diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 178121fef..0740c2d82 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -34,10 +34,12 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri # In SQL, a NULL in a boolean is False on filtering filter_condition = filter_condition.fillna(False) out = df[filter_condition] - if dask_config.get("sql.predicate_pushdown"): - return attempt_predicate_pushdown(out) - else: - return out + # Commented out for this POC PR + # if dask_config.get("sql.predicate_pushdown"): + # return attempt_predicate_pushdown(out) + # else: + # return out + return out class DaskFilterPlugin(BaseRelPlugin): diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 25397ddb6..7c3212950 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -44,45 +44,8 @@ def convert( # The table(s) we need to return dask_table = rel.getTable() schema_name, table_name = [n.lower() for n in context.fqn(dask_table)] - - filters = table_scan.getFilters() - if filters: - tbl_meta = context.schema[schema_name].tables_meta[table_name] - - # print(f"Reading table: {table_name} @ {tbl_meta['input_path']}") - # print(f"# Of filters from Rust: {len(table_scan.getFilters())}") - - # Generate the filters in DNF form for the cudf reader - filters = table_scan.getDNFFilters() - # print(f"Filters: {filters}") - - # Columns that should be projected - cols = table_scan.getTableScanProjects() - # print(f"Columns to read: {cols}") - - if len(filters) > 0: - # Prepare the filters to be in the format expected by Python since they came from Rust - updated_filters = [] - for filter_tup in filters: - if filter_tup[2].startswith("Int"): - num = filter_tup[2].split('(')[1].split(')')[0] - updated_filters.append((filter_tup[0], filter_tup[1], int(num))) - else: - updated_filters.append(filter_tup) - - - # print(f"Updated filters: {updated_filters}") - # df = ddf.read_parquet(tbl_meta["input_path"], columns=cols, filters=filters) - df = ddf.read_parquet(tbl_meta["input_path"], filters=updated_filters, columns=cols) - else: - # df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) - df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) - - dc = DataContainer(df.copy(), ColumnContainer(df.columns)) - else: - dc = context.schema[schema_name].tables[table_name] - # dc = self._apply_filters(table_scan, rel, dc, context) - + + dc = self._apply_filters(context.schema[schema_name].tables_meta[table_name], table_scan, rel, context) # Apply filter before projections since filter columns may not be in projections dc = self._apply_projections(table_scan, dask_table, dc) @@ -110,19 +73,49 @@ def _apply_projections(self, table_scan, dask_table, dc): cc = cc.limit_to(field_specifications) return DataContainer(df, cc) - def _apply_filters(self, table_scan, rel, dc, context): - df = dc.df - cc = dc.column_container + def _apply_filters(self, tbl_meta, table_scan, rel, context): + # df = dc.df + # cc = dc.column_container + # Columns that should be projected + cols = table_scan.getTableScanProjects() + filters = table_scan.getFilters() - # All partial filters here are applied in conjunction (&) if filters: + # Generate the filters in DNF form for the cudf reader + filtered_result = table_scan.getDNFFilters() + print(f"Filtered Result: {filtered_result}") + filtered = filtered_result.filtered_exprs + unfiltered = filtered_result.io_unfilterable_exprs + print(f"Filtered: {filtered}") + print(f"Un-filtered: {unfiltered}") + + if len(filtered) > 0: + # Prepare the filters to be in the format expected by Python since they came from Rust + updated_filters = [] + for filter_tup in filtered: + if filter_tup[2].startswith("Int"): + num = filter_tup[2].split('(')[1].split(')')[0] + updated_filters.append((filter_tup[0], filter_tup[1], int(num))) + else: + updated_filters.append(filter_tup) + + + df = ddf.read_parquet(tbl_meta["input_path"], filters=updated_filters, columns=cols) + else: + df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) + + dc = DataContainer(df.copy(), ColumnContainer(df.columns)) + + # All partial filters here are applied in conjunction (&) df_condition = reduce( operator.and_, [ RexConverter.convert(rel, rex, dc, context=context) - for rex in filters + for rex in unfiltered ], ) df = filter_or_scalar(df, df_condition) + else: + df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) - return DataContainer(df, cc) + return DataContainer(df, ColumnContainer(df.columns)) diff --git a/dask_sql/physical/utils/filter.py b/dask_sql/physical/utils/filter.py index a4a4e3193..dbd712255 100644 --- a/dask_sql/physical/utils/filter.py +++ b/dask_sql/physical/utils/filter.py @@ -75,7 +75,6 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: name = ddf._name try: filters = dsk.layers[name]._dnf_filter_expression(dsk) - print(f"Newly created filters: {filters}") if not isinstance(filters, frozenset): # No filters encountered return ddf From c560032c354a689a1ab6fa86ed77f116bf209199 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 24 Jan 2023 11:17:48 -0500 Subject: [PATCH 03/17] Updated with rough-in null IO filtering logic --- dask_planner/src/sql/logical/table_scan.rs | 7 +++++++ dask_sql/physical/rel/logical/table_scan.py | 22 +++++++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 75b13e821..01e145d5f 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -69,6 +69,13 @@ impl PyTableScan { let right = right_split.nth(0); filters.push((left.unwrap().to_string(), binary_expr.op.to_string(), right.unwrap().to_string())) }, + Expr::IsNotNull(inner_expr) => { + println!("IS NOT NULL Expr: {:?}", inner_expr); + let fqtn = inner_expr.to_string(); + let mut col_split = fqtn.split('.'); + let col = col_split.nth(1); + filters.push((col.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) + }, _ => { println!("Unable to apply filter: `{}` to IO reader, using in Dask instead", filter); let tbl_scan = LogicalPlan::TableScan(self.table_scan.clone()); diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 7c3212950..6eb81e2ba 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -8,6 +8,8 @@ from dask_sql.physical.rel.logical.filter import filter_or_scalar from dask_sql.physical.rex import RexConverter +import numpy as np + import dask_cudf as ddf if TYPE_CHECKING: @@ -96,9 +98,12 @@ def _apply_filters(self, tbl_meta, table_scan, rel, context): if filter_tup[2].startswith("Int"): num = filter_tup[2].split('(')[1].split(')')[0] updated_filters.append((filter_tup[0], filter_tup[1], int(num))) + elif filter_tup[2] == "np.nan": + updated_filters.append((filter_tup[0], filter_tup[1], np.nan)) else: updated_filters.append(filter_tup) + print(f"Invoking ddf.read_parquet with filters: {updated_filters}") df = ddf.read_parquet(tbl_meta["input_path"], filters=updated_filters, columns=cols) else: @@ -107,14 +112,15 @@ def _apply_filters(self, tbl_meta, table_scan, rel, context): dc = DataContainer(df.copy(), ColumnContainer(df.columns)) # All partial filters here are applied in conjunction (&) - df_condition = reduce( - operator.and_, - [ - RexConverter.convert(rel, rex, dc, context=context) - for rex in unfiltered - ], - ) - df = filter_or_scalar(df, df_condition) + if len(unfiltered) > 0: + df_condition = reduce( + operator.and_, + [ + RexConverter.convert(rel, rex, dc, context=context) + for rex in unfiltered + ], + ) + df = filter_or_scalar(df, df_condition) else: df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) From 873d73011c6bf11b66e26716c631e3e9e6ebc611 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Thu, 2 Feb 2023 10:22:02 -0500 Subject: [PATCH 04/17] Allow for dataframe input and create dask task graphs for IO operations that are not filterable --- dask_planner/src/sql/logical/table_scan.rs | 31 +++++++----- dask_sql/physical/rel/logical/table_scan.py | 54 ++++++++++++--------- 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 01e145d5f..c00615d79 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use datafusion_common::DFSchema; -use datafusion_expr::{Expr, logical_plan::TableScan, LogicalPlan}; +use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ @@ -67,17 +67,24 @@ impl PyTableScan { let right = binary_expr.right.to_string(); let mut right_split = right.split('.'); let right = right_split.nth(0); - filters.push((left.unwrap().to_string(), binary_expr.op.to_string(), right.unwrap().to_string())) - }, - Expr::IsNotNull(inner_expr) => { - println!("IS NOT NULL Expr: {:?}", inner_expr); - let fqtn = inner_expr.to_string(); - let mut col_split = fqtn.split('.'); - let col = col_split.nth(1); - filters.push((col.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) - }, + filters.push(( + left.unwrap().to_string(), + binary_expr.op.to_string(), + right.unwrap().to_string(), + )) + } + // Expr::IsNotNull(inner_expr) => { + // println!("IS NOT NULL Expr: {:?}", inner_expr); + // let fqtn = inner_expr.to_string(); + // let mut col_split = fqtn.split('.'); + // let col = col_split.nth(1); + // filters.push((col.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) + // }, _ => { - println!("Unable to apply filter: `{}` to IO reader, using in Dask instead", filter); + println!( + "Unable to apply filter: `{}` to IO reader, using in Dask instead", + filter + ); let tbl_scan = LogicalPlan::TableScan(self.table_scan.clone()); unfiltered.push(PyExpr::from(filter.clone(), Some(vec![Arc::new(tbl_scan)]))) } @@ -86,7 +93,7 @@ impl PyTableScan { Ok(PyFilteredResult { io_unfilterable_exprs: unfiltered, - filtered_exprs: filters + filtered_exprs: filters, }) } } diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 6eb81e2ba..913a5068f 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -3,15 +3,15 @@ from functools import reduce from typing import TYPE_CHECKING -from dask_sql.datacontainer import DataContainer, ColumnContainer +import dask_cudf as ddf +import numpy as np +from dask.utils_test import hlg_layer + +from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rel.logical.filter import filter_or_scalar from dask_sql.physical.rex import RexConverter -import numpy as np - -import dask_cudf as ddf - if TYPE_CHECKING: import dask_sql from dask_planner.rust import LogicalPlan @@ -46,9 +46,11 @@ def convert( # The table(s) we need to return dask_table = rel.getTable() schema_name, table_name = [n.lower() for n in context.fqn(dask_table)] - - dc = self._apply_filters(context.schema[schema_name].tables_meta[table_name], table_scan, rel, context) + + dc = context.schema[schema_name].tables[table_name] + # Apply filter before projections since filter columns may not be in projections + dc = self._apply_filters(table_scan, rel, dc, context) dc = self._apply_projections(table_scan, dask_table, dc) cc = dc.column_container @@ -75,41 +77,47 @@ def _apply_projections(self, table_scan, dask_table, dc): cc = cc.limit_to(field_specifications) return DataContainer(df, cc) - def _apply_filters(self, tbl_meta, table_scan, rel, context): - # df = dc.df - # cc = dc.column_container + def _apply_filters(self, table_scan, rel, dc, context): + # Columns that should be projected cols = table_scan.getTableScanProjects() filters = table_scan.getFilters() if filters: + df = dc.df + # Generate the filters in DNF form for the cudf reader filtered_result = table_scan.getDNFFilters() - print(f"Filtered Result: {filtered_result}") filtered = filtered_result.filtered_exprs unfiltered = filtered_result.io_unfilterable_exprs - print(f"Filtered: {filtered}") - print(f"Un-filtered: {unfiltered}") if len(filtered) > 0: # Prepare the filters to be in the format expected by Python since they came from Rust updated_filters = [] for filter_tup in filtered: if filter_tup[2].startswith("Int"): - num = filter_tup[2].split('(')[1].split(')')[0] + num = filter_tup[2].split("(")[1].split(")")[0] updated_filters.append((filter_tup[0], filter_tup[1], int(num))) elif filter_tup[2] == "np.nan": updated_filters.append((filter_tup[0], filter_tup[1], np.nan)) else: updated_filters.append(filter_tup) - print(f"Invoking ddf.read_parquet with filters: {updated_filters}") - - df = ddf.read_parquet(tbl_meta["input_path"], filters=updated_filters, columns=cols) - else: - df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) + # Rebuild the read_* operation from the existing Dask task + # TODO: This will currently only work with parquet, need to update + layer = hlg_layer(df.dask, "read-parquet") + creation_path = layer.creation_info["args"][0] + print( + f"Rebuilding Dask Task `read_parquet()` \n \ + Path: {creation_path} \n \ + Filters: {updated_filters} \n \ + Columns: {cols}" + ) - dc = DataContainer(df.copy(), ColumnContainer(df.columns)) + df = ddf.read_parquet( + creation_path, filters=updated_filters, columns=cols + ) + dc = DataContainer(df, ColumnContainer(df.columns)) # All partial filters here are applied in conjunction (&) if len(unfiltered) > 0: @@ -121,7 +129,7 @@ def _apply_filters(self, tbl_meta, table_scan, rel, context): ], ) df = filter_or_scalar(df, df_condition) - else: - df = ddf.read_parquet(tbl_meta["input_path"], columns=cols) - return DataContainer(df, ColumnContainer(df.columns)) + dc = DataContainer(df, ColumnContainer(df.columns)) + + return dc From 9c9feeab9206de44411e1cbcdf19d548583f48e4 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 14 Feb 2023 14:41:30 -0500 Subject: [PATCH 05/17] Remove print noise --- dask_sql/physical/utils/filter.py | 39 +------------------------------ 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/dask_sql/physical/utils/filter.py b/dask_sql/physical/utils/filter.py index dbd712255..5309289c4 100644 --- a/dask_sql/physical/utils/filter.py +++ b/dask_sql/physical/utils/filter.py @@ -179,16 +179,11 @@ def to_dnf(expr): operator.or_, operator.getitem, M.fillna, - # M.isna, - # M.astype, - # operator.inv, } # Specify functions that must be generated with # a different API at the dataframe-collection level -_special_op_mappings = {M.fillna: dd._Frame.fillna, - M.isna: dd._Frame.isna, - M.astype: dd._Frame.astype} +_special_op_mappings = {M.fillna: dd._Frame.fillna} class RegenerableLayer: @@ -250,12 +245,7 @@ def _regenerate_collection( regen_args = self.creation_info.get("args", []) regen_kwargs = self.creation_info.get("kwargs", {}).copy() regen_kwargs = {k: v for k, v in self.creation_info.get("kwargs", {}).items()} - print(f"new_kwargs: {new_kwargs}") regen_kwargs.update((new_kwargs or {}).get(self.layer.output, {})) - print(f"Func: {func}") - print(f"Inputs: {inputs}") - print(f"Regen Args: {regen_args}") - print(f"Regen KWArgs: {regen_kwargs}") result = func(*inputs, *regen_args, **regen_kwargs) _regen_cache[self.layer.output] = result return result @@ -273,14 +263,7 @@ def _dnf_filter_expression(self, dsk): func = _blockwise_getitem_dnf elif op == dd._Frame.fillna: func = _blockwise_fillna_dnf - elif op == dd._Frame.isna: - func = _blockwise_isna_dnf - elif op == operator.inv: - func = _blockwise_inv_dnf - elif op == dd._Frame.astype: - func = _blockwise_astype_dnf else: - print(f"This is the problem spot!~!!! Op: {op}") raise ValueError(f"No DNF expression for {op}") return func(op, self.layer.indices, dsk) @@ -306,7 +289,6 @@ def from_hlg(cls, hlg: HighLevelGraph): _layers = {} for key, layer in hlg.layers.items(): - # print(f"Graph Key: {key}, Layer: {layer}, Layer Type: {type(layer)}") regenerable_layer = None if isinstance(layer, DataFrameIOLayer): regenerable_layer = RegenerableLayer(layer, layer.creation_info or {}) @@ -328,7 +310,6 @@ def from_hlg(cls, hlg: HighLevelGraph): else: op = tasks[0][0] if op in _regenerable_ops: - print(f"Blockwise OP: {op}") regenerable_layer = RegenerableLayer( layer, { @@ -336,8 +317,6 @@ def from_hlg(cls, hlg: HighLevelGraph): "kwargs": kwargs, }, ) - else: - print(f"Blockwise OP: {op} NOT in list of _regenerable_ops") if regenerable_layer is None: raise ValueError(f"Graph contains non-regenerable layer: {layer}") @@ -399,19 +378,3 @@ def _blockwise_getitem_dnf(op, indices: list, dsk: RegenerableGraph): def _blockwise_fillna_dnf(op, indices: list, dsk: RegenerableGraph): # Return dnf of input collection return _get_blockwise_input(0, indices, dsk) - - -def _blockwise_isna_dnf(op, indices: list, dsk: RegenerableGraph): - # Return dnf of input collection - return _get_blockwise_input(0, indices, dsk) - - -def _blockwise_inv_dnf(op, indices: list, dsk: RegenerableGraph): - # Return dnf of input collection - print(f"Indices: {indices}") - return _get_blockwise_input(0, indices, dsk) - - -def _blockwise_astype_dnf(op, indices: list, dsk: RegenerableGraph): - # Return dnf of input collection - return _get_blockwise_input(0, indices, dsk) From 086da1da2643550eb7284fe8f3c45f0e919f8636 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 15 Feb 2023 15:07:48 -0500 Subject: [PATCH 06/17] Add methods to capture split_row_groups and aggregate_files into the predicate push down --- dask_sql/physical/rel/logical/table_scan.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 913a5068f..db16ca4c1 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -85,7 +85,7 @@ def _apply_filters(self, table_scan, rel, dc, context): filters = table_scan.getFilters() if filters: df = dc.df - + # Generate the filters in DNF form for the cudf reader filtered_result = table_scan.getDNFFilters() filtered = filtered_result.filtered_exprs @@ -111,11 +111,17 @@ def _apply_filters(self, table_scan, rel, dc, context): f"Rebuilding Dask Task `read_parquet()` \n \ Path: {creation_path} \n \ Filters: {updated_filters} \n \ - Columns: {cols}" + Columns: {cols} \n \ + split_row_groups: {layer.creation_info['kwargs']['split_row_groups']} \n \ + aggregate_files: {layer.creation_info['kwargs']['aggregate_files']}\n" ) df = ddf.read_parquet( - creation_path, filters=updated_filters, columns=cols + creation_path, + filters=updated_filters, + columns=cols, + split_row_groups=layer.creation_info["kwargs"]["split_row_groups"], + aggregate_files=layer.creation_info["kwargs"]["aggregate_files"], ) dc = DataContainer(df, ColumnContainer(df.columns)) From 805085a7529fecb1e9dbc2c0564ebb9dcc1f016d Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 1 Mar 2023 21:09:22 -0500 Subject: [PATCH 07/17] updates to attempt to expand dnf filters --- dask_planner/src/sql/logical/table_scan.rs | 187 ++++++++++++++++----- 1 file changed, 145 insertions(+), 42 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index c00615d79..31a71cd31 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{f32::consts::E, sync::Arc}; -use datafusion_common::DFSchema; -use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; +use datafusion_common::{DFSchema, ScalarValue}; +use datafusion_expr::{logical_plan::TableScan, BinaryExpr, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ @@ -26,6 +26,109 @@ pub struct PyFilteredResult { pub filtered_exprs: Vec<(String, String, String)>, } +impl PyTableScan { + /// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding + /// it as well if needed. + /// + /// Ex: BinaryExpr("column_name", Operator::Eq, "something") -> vec!["column_name", "=", "something"] + /// Ex: Expr::Column("column_name") -> vec!["column_name"] + /// Ex: Expr::Literal(Utf-8("something")) -> vec!["something"] + pub fn _expand_dnf_filter(filter: &Expr) -> Vec<(String, String, String)> { + let mut filter_tuple: Vec<(String, String, String)> = Vec::new(); + + match filter { + Expr::BinaryExpr(binary_expr) => { + println!( + "!!!BinaryExpr -> Left: {:?}, Operator: {:?}, Right: {:?}", + binary_expr.left, binary_expr.op, binary_expr.right + ); + + // Since Tuples are immutable in Rust we need a datastructure to temporaily hold the Tuples values until all have been parsed + let mut tmp_vals: Vec = Vec::new(); + + // Push left Expr string value or combo of expanded left values + match &*binary_expr.left { + Expr::BinaryExpr(binary_expr) => { + filter_tuple.append(&mut PyTableScan::_expand_dnf_filter( + &Expr::BinaryExpr(binary_expr.clone()), + )); + } + _ => { + let str = binary_expr.left.to_string(); + if str.contains(".") { + tmp_vals.push(str.split('.').nth(1).unwrap().to_string()); + } else { + tmp_vals.push(str); + } + } + }; + + // Handle the operator here. This controls if the format is conjunctive or disjuntive + tmp_vals.push(binary_expr.op.to_string()); + + match &*binary_expr.right { + Expr::BinaryExpr(binary_expr) => { + filter_tuple.append(&mut PyTableScan::_expand_dnf_filter( + &Expr::BinaryExpr(binary_expr.clone()), + )); + } + _ => match &*binary_expr.right { + Expr::Literal(scalar_value) => match &scalar_value { + ScalarValue::Utf8(value) => { + let val = value.as_ref().unwrap(); + if val.contains(".") { + tmp_vals.push(val.split('.').nth(1).unwrap().to_string()); + } else { + tmp_vals.push(val.clone()); + } + } + _ => tmp_vals.push(scalar_value.to_string()), + }, + _ => panic!("hit this!"), + }, + }; + + if tmp_vals.len() == 3 { + filter_tuple.push(( + tmp_vals[0].clone(), + tmp_vals[1].clone(), + tmp_vals[2].clone(), + )); + } else { + println!( + "Wonder why tmp_vals doesn't equal 3?? {:?}, Value: {:?}", + tmp_vals.len(), + tmp_vals[0] + ); + } + } + _ => { + println!( + "Unable to apply filter: `{}` to IO reader, using in Dask instead", + filter + ); + } + } + + filter_tuple + } + + pub fn _expand_dnf_filters(filters: &Vec) -> PyFilteredResult { + // 1. Loop through all of the TableScan filters (Expr(s)) + let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); + let mut unfiltered_exprs: Vec = Vec::new(); + for filter in filters { + let dnf_filter = PyTableScan::_expand_dnf_filter(filter); + println!("DNF Filter: {:?}", dnf_filter); + } + + PyFilteredResult { + io_unfilterable_exprs: unfiltered_exprs, + filtered_exprs: filtered_exprs, + } + } +} + #[pymethods] impl PyTableScan { #[pyo3(name = "getTableScanProjects")] @@ -56,45 +159,8 @@ impl PyTableScan { #[pyo3(name = "getDNFFilters")] fn dnf_io_filters(&self) -> PyResult { - let mut filters: Vec<(String, String, String)> = Vec::new(); - let mut unfiltered: Vec = Vec::new(); - for filter in &self.table_scan.filters { - match filter { - Expr::BinaryExpr(binary_expr) => { - let left = binary_expr.left.to_string(); - let mut left_split = left.split('.'); - let left = left_split.nth(1); - let right = binary_expr.right.to_string(); - let mut right_split = right.split('.'); - let right = right_split.nth(0); - filters.push(( - left.unwrap().to_string(), - binary_expr.op.to_string(), - right.unwrap().to_string(), - )) - } - // Expr::IsNotNull(inner_expr) => { - // println!("IS NOT NULL Expr: {:?}", inner_expr); - // let fqtn = inner_expr.to_string(); - // let mut col_split = fqtn.split('.'); - // let col = col_split.nth(1); - // filters.push((col.unwrap().to_string(), "!=".to_string(), "np.nan".to_string())) - // }, - _ => { - println!( - "Unable to apply filter: `{}` to IO reader, using in Dask instead", - filter - ); - let tbl_scan = LogicalPlan::TableScan(self.table_scan.clone()); - unfiltered.push(PyExpr::from(filter.clone(), Some(vec![Arc::new(tbl_scan)]))) - } - } - } - - Ok(PyFilteredResult { - io_unfilterable_exprs: unfiltered, - filtered_exprs: filters, - }) + let results = PyTableScan::_expand_dnf_filters(&self.table_scan.filters); + Ok(results) } } @@ -121,3 +187,40 @@ impl TryFrom for PyTableScan { } } } + +#[cfg(test)] +mod tests { + use datafusion::logical_expr::expr_fn::{binary_expr, col}; + use datafusion_common::ScalarValue; + use datafusion_expr::{Expr, Operator}; + + use crate::sql::logical::table_scan::PyTableScan; + + #[test] + pub fn expand_binary_exprs_dnf_filters() { + let jewlery = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Jewelry")), + ); + let women = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Women")), + ); + let music = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Music")), + ); + + let full_expr = binary_expr(jewlery, Operator::Or, women); + let full_expr = binary_expr(full_expr, Operator::Or, music); + println!("BinaryExpr: {:?}", full_expr); + + let filters = vec![full_expr]; + + let result = PyTableScan::_expand_dnf_filters(&filters); + println!("Result: {:?}", result); + } +} From c4f15ea41bf67c6e50026df1a39116c80eb3830e Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Wed, 1 Mar 2023 21:40:52 -0500 Subject: [PATCH 08/17] Add filter to list --- dask_planner/src/sql/logical/table_scan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 31a71cd31..61260f110 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -118,8 +118,7 @@ impl PyTableScan { let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); let mut unfiltered_exprs: Vec = Vec::new(); for filter in filters { - let dnf_filter = PyTableScan::_expand_dnf_filter(filter); - println!("DNF Filter: {:?}", dnf_filter); + filtered_exprs.append(&mut PyTableScan::_expand_dnf_filter(filter)); } PyFilteredResult { From dbed42890a3a967d1cc7c8f0f5f210335d30916a Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 11:50:50 -0500 Subject: [PATCH 09/17] debug statement --- dask_sql/physical/rel/logical/table_scan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index db16ca4c1..7442d26d8 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -109,6 +109,7 @@ def _apply_filters(self, table_scan, rel, dc, context): creation_path = layer.creation_info["args"][0] print( f"Rebuilding Dask Task `read_parquet()` \n \ + Original Dask read-parquet: {layer} \n \ Path: {creation_path} \n \ Filters: {updated_filters} \n \ Columns: {cols} \n \ From ea7e1ac3b28fd9acebb1f1debb2c037296c6a22c Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 12:46:11 -0500 Subject: [PATCH 10/17] bitwise examples --- dask_sql/physical/rel/logical/table_scan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 7442d26d8..72235c3f6 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -110,6 +110,7 @@ def _apply_filters(self, table_scan, rel, dc, context): print( f"Rebuilding Dask Task `read_parquet()` \n \ Original Dask read-parquet: {layer} \n \ + Original creation_info: {layer.creation_info} \n \ Path: {creation_path} \n \ Filters: {updated_filters} \n \ Columns: {cols} \n \ From 478cbe59953ea9bd77bee5740616c7f3d3cf7aae Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 13:04:51 -0500 Subject: [PATCH 11/17] Encode TimestampNanoseconds as a String so Python can use as a Int64 --- dask_planner/src/sql/logical/table_scan.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 61260f110..8f2f58ba8 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -1,7 +1,7 @@ -use std::{f32::consts::E, sync::Arc}; +use std::sync::Arc; use datafusion_common::{DFSchema, ScalarValue}; -use datafusion_expr::{logical_plan::TableScan, BinaryExpr, Expr, LogicalPlan}; +use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ @@ -55,7 +55,7 @@ impl PyTableScan { } _ => { let str = binary_expr.left.to_string(); - if str.contains(".") { + if str.contains('.') { tmp_vals.push(str.split('.').nth(1).unwrap().to_string()); } else { tmp_vals.push(str); @@ -76,12 +76,20 @@ impl PyTableScan { Expr::Literal(scalar_value) => match &scalar_value { ScalarValue::Utf8(value) => { let val = value.as_ref().unwrap(); - if val.contains(".") { + if val.contains('.') { tmp_vals.push(val.split('.').nth(1).unwrap().to_string()); } else { tmp_vals.push(val.clone()); } } + ScalarValue::TimestampNanosecond(val, _an_option) => { + // Need to encode the value as a String to return to Python, Python side will then convert + // value back to a integer + let mut val_builder = "Int64(".to_string(); + val_builder.push_str(val.unwrap().to_string().as_str()); + val_builder.push(')'); + tmp_vals.push(val_builder); + } _ => tmp_vals.push(scalar_value.to_string()), }, _ => panic!("hit this!"), @@ -116,14 +124,14 @@ impl PyTableScan { pub fn _expand_dnf_filters(filters: &Vec) -> PyFilteredResult { // 1. Loop through all of the TableScan filters (Expr(s)) let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); - let mut unfiltered_exprs: Vec = Vec::new(); + let unfiltered_exprs: Vec = Vec::new(); for filter in filters { filtered_exprs.append(&mut PyTableScan::_expand_dnf_filter(filter)); } PyFilteredResult { io_unfilterable_exprs: unfiltered_exprs, - filtered_exprs: filtered_exprs, + filtered_exprs, } } } From 70a0f69ca3ca4e228b1267ad2f76b29f0c953cf5 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 13:11:47 -0500 Subject: [PATCH 12/17] Encode TimestampNanoseconds as string and decode in Python as pd.to_timestamp() --- dask_planner/src/sql/logical/table_scan.rs | 2 +- dask_sql/physical/rel/logical/table_scan.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 8f2f58ba8..579ae4a21 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -85,7 +85,7 @@ impl PyTableScan { ScalarValue::TimestampNanosecond(val, _an_option) => { // Need to encode the value as a String to return to Python, Python side will then convert // value back to a integer - let mut val_builder = "Int64(".to_string(); + let mut val_builder = "TimestampNanosecond(".to_string(); val_builder.push_str(val.unwrap().to_string().as_str()); val_builder.push(')'); tmp_vals.push(val_builder); diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 72235c3f6..35acdfff5 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -5,6 +5,7 @@ import dask_cudf as ddf import numpy as np +import pandas as pd from dask.utils_test import hlg_layer from dask_sql.datacontainer import ColumnContainer, DataContainer @@ -98,6 +99,10 @@ def _apply_filters(self, table_scan, rel, dc, context): if filter_tup[2].startswith("Int"): num = filter_tup[2].split("(")[1].split(")")[0] updated_filters.append((filter_tup[0], filter_tup[1], int(num))) + elif filter_tup[2].startswith("TimestampNanosecond"): + ns_timestamp = filter_tup[2].split("(")[1].split(")")[0] + val = pd.to_datetime(ns_timestamp, unit="ns") + updated_filters.append((filter_tup[0], filter_tup[1], val)) elif filter_tup[2] == "np.nan": updated_filters.append((filter_tup[0], filter_tup[1], np.nan)) else: From c16bf2db5749f7ed64d2a5006e621666ee481552 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 15:14:41 -0500 Subject: [PATCH 13/17] Add DaskPlannerError for capturing binaryexpr that cannot be converted into a pyarrow compatible filter --- dask_planner/src/error.rs | 2 ++ dask_planner/src/sql/logical/table_scan.rs | 32 ++++++++++++++------- dask_sql/physical/rel/logical/table_scan.py | 1 + 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/dask_planner/src/error.rs b/dask_planner/src/error.rs index d5b0eb39c..d3ac4132b 100644 --- a/dask_planner/src/error.rs +++ b/dask_planner/src/error.rs @@ -12,6 +12,7 @@ pub enum DaskPlannerError { ParserError(ParserError), TokenizerError(TokenizerError), Internal(String), + InvalidIOFilter(String), } impl Display for DaskPlannerError { @@ -21,6 +22,7 @@ impl Display for DaskPlannerError { Self::ParserError(e) => write!(f, "SQL Parser Error: {e}"), Self::TokenizerError(e) => write!(f, "SQL Tokenizer Error: {e}"), Self::Internal(e) => write!(f, "Internal Error: {e}"), + Self::InvalidIOFilter(e) => write!(f, "Invalid pyarrow filter: {e} encountered. Defaulting to Dask CPU/GPU bound task operation"), } } } diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 579ae4a21..e8c16c02e 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -5,6 +5,7 @@ use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ + error::DaskPlannerError, expression::{py_expr_list, PyExpr}, sql::exceptions::py_type_err, }; @@ -33,7 +34,9 @@ impl PyTableScan { /// Ex: BinaryExpr("column_name", Operator::Eq, "something") -> vec!["column_name", "=", "something"] /// Ex: Expr::Column("column_name") -> vec!["column_name"] /// Ex: Expr::Literal(Utf-8("something")) -> vec!["something"] - pub fn _expand_dnf_filter(filter: &Expr) -> Vec<(String, String, String)> { + pub fn _expand_dnf_filter( + filter: &Expr, + ) -> Result, DaskPlannerError> { let mut filter_tuple: Vec<(String, String, String)> = Vec::new(); match filter { @@ -49,9 +52,12 @@ impl PyTableScan { // Push left Expr string value or combo of expanded left values match &*binary_expr.left { Expr::BinaryExpr(binary_expr) => { - filter_tuple.append(&mut PyTableScan::_expand_dnf_filter( - &Expr::BinaryExpr(binary_expr.clone()), - )); + filter_tuple.append( + &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + binary_expr.clone(), + )) + .unwrap(), + ); } _ => { let str = binary_expr.left.to_string(); @@ -68,9 +74,12 @@ impl PyTableScan { match &*binary_expr.right { Expr::BinaryExpr(binary_expr) => { - filter_tuple.append(&mut PyTableScan::_expand_dnf_filter( - &Expr::BinaryExpr(binary_expr.clone()), - )); + filter_tuple.append( + &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + binary_expr.clone(), + )) + .unwrap(), + ); } _ => match &*binary_expr.right { Expr::Literal(scalar_value) => match &scalar_value { @@ -118,15 +127,18 @@ impl PyTableScan { } } - filter_tuple + Ok(filter_tuple) } pub fn _expand_dnf_filters(filters: &Vec) -> PyFilteredResult { // 1. Loop through all of the TableScan filters (Expr(s)) let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); - let unfiltered_exprs: Vec = Vec::new(); + let mut unfiltered_exprs: Vec = Vec::new(); for filter in filters { - filtered_exprs.append(&mut PyTableScan::_expand_dnf_filter(filter)); + match PyTableScan::_expand_dnf_filter(filter) { + Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter), + Err(_e) => unfiltered_exprs.push(PyExpr::from(filter.clone(), None)), + } } PyFilteredResult { diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 35acdfff5..d8b0a837b 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -101,6 +101,7 @@ def _apply_filters(self, table_scan, rel, dc, context): updated_filters.append((filter_tup[0], filter_tup[1], int(num))) elif filter_tup[2].startswith("TimestampNanosecond"): ns_timestamp = filter_tup[2].split("(")[1].split(")")[0] + print(f"Nanosecond Value from Rust: {ns_timestamp}") val = pd.to_datetime(ns_timestamp, unit="ns") updated_filters.append((filter_tup[0], filter_tup[1], val)) elif filter_tup[2] == "np.nan": From 2cb64ae8464756ca0753e6e6c10a849b3055a01e Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 16:11:31 -0500 Subject: [PATCH 14/17] Add DaskPlannerError for capturing binaryexpr that cannot be converted into a pyarrow compatible filter --- dask_planner/src/sql/logical/table_scan.rs | 68 +++++++++++++++++----- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index e8c16c02e..4a3da75df 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -5,7 +5,7 @@ use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ - error::DaskPlannerError, + error::{DaskPlannerError, Result}, expression::{py_expr_list, PyExpr}, sql::exceptions::py_type_err, }; @@ -28,15 +28,35 @@ pub struct PyFilteredResult { } impl PyTableScan { + // pub fn _expand_expr(filter: &Expr) -> Result { + // // Push left Expr string value or combo of expanded left values + // match filter { + // Expr::BinaryExpr(binary_expr) => { + // filter_tuple.append( + // &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + // binary_expr.clone(), + // )) + // .unwrap(), + // ); + // } + // _ => { + // let str = filter.to_string(); + // if str.contains('.') { + // tmp_vals.push(str.split('.').nth(1).unwrap().to_string()); + // } else { + // tmp_vals.push(str); + // } + // } + // } + // } + /// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding /// it as well if needed. /// /// Ex: BinaryExpr("column_name", Operator::Eq, "something") -> vec!["column_name", "=", "something"] /// Ex: Expr::Column("column_name") -> vec!["column_name"] /// Ex: Expr::Literal(Utf-8("something")) -> vec!["something"] - pub fn _expand_dnf_filter( - filter: &Expr, - ) -> Result, DaskPlannerError> { + pub fn _expand_dnf_filter(filter: &Expr) -> Result> { let mut filter_tuple: Vec<(String, String, String)> = Vec::new(); match filter { @@ -67,7 +87,7 @@ impl PyTableScan { tmp_vals.push(str); } } - }; + } // Handle the operator here. This controls if the format is conjunctive or disjuntive tmp_vals.push(binary_expr.op.to_string()); @@ -91,19 +111,24 @@ impl PyTableScan { tmp_vals.push(val.clone()); } } - ScalarValue::TimestampNanosecond(val, _an_option) => { - // Need to encode the value as a String to return to Python, Python side will then convert - // value back to a integer - let mut val_builder = "TimestampNanosecond(".to_string(); - val_builder.push_str(val.unwrap().to_string().as_str()); - val_builder.push(')'); - tmp_vals.push(val_builder); + ScalarValue::TimestampNanosecond(_val, _an_option) => { + // // Need to encode the value as a String to return to Python, Python side will then convert + // // value back to a integer + // let mut val_builder = "TimestampNanosecond(".to_string(); + // val_builder.push_str(val.unwrap().to_string().as_str()); + // val_builder.push(')'); + // tmp_vals.push(val_builder); + // let er = + // DaskPlannerError::InvalidIOFilter(scalar_value.to_string()); + // Err::, DaskPlannerError>(er) + + // SIMPLY DO NOT PUSH THE VALUES HERE AND IT WILL CAUSE THE ERROR TO BE PROPAGATED } _ => tmp_vals.push(scalar_value.to_string()), }, _ => panic!("hit this!"), }, - }; + } if tmp_vals.len() == 3 { filter_tuple.push(( @@ -111,12 +136,20 @@ impl PyTableScan { tmp_vals[1].clone(), tmp_vals[2].clone(), )); + + Ok(filter_tuple) } else { println!( "Wonder why tmp_vals doesn't equal 3?? {:?}, Value: {:?}", tmp_vals.len(), tmp_vals[0] ); + let er = DaskPlannerError::InvalidIOFilter(format!( + "Wonder why tmp_vals doesn't equal 3?? {:?}, Value: {:?}", + tmp_vals.len(), + tmp_vals[0] + )); + Err::, DaskPlannerError>(er) } } _ => { @@ -124,10 +157,15 @@ impl PyTableScan { "Unable to apply filter: `{}` to IO reader, using in Dask instead", filter ); + let er = DaskPlannerError::InvalidIOFilter(format!( + "Unable to apply filter: `{}` to IO reader, using in Dask instead", + filter + )); + Err::, DaskPlannerError>(er) } } - Ok(filter_tuple) + // Ok(filter_tuple) } pub fn _expand_dnf_filters(filters: &Vec) -> PyFilteredResult { @@ -186,7 +224,7 @@ impl PyTableScan { impl TryFrom for PyTableScan { type Error = PyErr; - fn try_from(logical_plan: LogicalPlan) -> Result { + fn try_from(logical_plan: LogicalPlan) -> std::result::Result { match logical_plan { LogicalPlan::TableScan(table_scan) => { // Create an input logical plan that's identical to the table scan with schema from the table source From ff6372ae60c01e9fe71503f313f161e044e37bb0 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 16:25:35 -0500 Subject: [PATCH 15/17] Add DaskPlannerError for capturing binaryexpr that cannot be converted into a pyarrow compatible filter --- dask_planner/src/sql/logical/table_scan.rs | 30 ++++------------------ 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 4a3da75df..c99c919c5 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -28,28 +28,6 @@ pub struct PyFilteredResult { } impl PyTableScan { - // pub fn _expand_expr(filter: &Expr) -> Result { - // // Push left Expr string value or combo of expanded left values - // match filter { - // Expr::BinaryExpr(binary_expr) => { - // filter_tuple.append( - // &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( - // binary_expr.clone(), - // )) - // .unwrap(), - // ); - // } - // _ => { - // let str = filter.to_string(); - // if str.contains('.') { - // tmp_vals.push(str.split('.').nth(1).unwrap().to_string()); - // } else { - // tmp_vals.push(str); - // } - // } - // } - // } - /// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding /// it as well if needed. /// @@ -168,14 +146,16 @@ impl PyTableScan { // Ok(filter_tuple) } - pub fn _expand_dnf_filters(filters: &Vec) -> PyFilteredResult { + pub fn _expand_dnf_filters(input: &Arc, filters: &Vec) -> PyFilteredResult { // 1. Loop through all of the TableScan filters (Expr(s)) let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); let mut unfiltered_exprs: Vec = Vec::new(); for filter in filters { match PyTableScan::_expand_dnf_filter(filter) { Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter), - Err(_e) => unfiltered_exprs.push(PyExpr::from(filter.clone(), None)), + Err(_e) => { + unfiltered_exprs.push(PyExpr::from(filter.clone(), Some(vec![input.clone()]))) + } } } @@ -216,7 +196,7 @@ impl PyTableScan { #[pyo3(name = "getDNFFilters")] fn dnf_io_filters(&self) -> PyResult { - let results = PyTableScan::_expand_dnf_filters(&self.table_scan.filters); + let results = PyTableScan::_expand_dnf_filters(&self.input, &self.table_scan.filters); Ok(results) } } From a635c39d6ac3a2a403875ecfd8353c42bcfa5e02 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 16:48:27 -0500 Subject: [PATCH 16/17] Add DaskPlannerError for capturing binaryexpr that cannot be converted into a pyarrow compatible filter --- dask_planner/src/sql/logical/table_scan.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index c99c919c5..9cdc09217 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -50,12 +50,22 @@ impl PyTableScan { // Push left Expr string value or combo of expanded left values match &*binary_expr.left { Expr::BinaryExpr(binary_expr) => { - filter_tuple.append( - &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( - binary_expr.clone(), - )) - .unwrap(), - ); + // if let Ok(tup) = filter_tuple.append( + // &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + // binary_expr.clone(), + // )) + // .unwrap(), + // ); + + if let Ok(mut tup) = + PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr(binary_expr.clone())) + { + filter_tuple.append(&mut tup); + } else { + // Error so add to list of expressions that cannot be handled by PyArrow + // SIMPLY DO NOT PUSH THE VALUES HERE AND IT WILL CAUSE THE ERROR TO BE PROPAGATED + // REMOVE LATER BUT LEAVE FOR MORE TO MAKE INTENT MORE EXPLICIT. COMPILER OPTIMIZES THIS AWAY ANYWAY + } } _ => { let str = binary_expr.left.to_string(); From b3167eb1e26ba153b88b7d10e4ea16ae381cd1b6 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 6 Mar 2023 17:05:11 -0500 Subject: [PATCH 17/17] Support for multiple Int types --- dask_planner/src/sql/logical/table_scan.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 9cdc09217..44e97da78 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -99,6 +99,10 @@ impl PyTableScan { tmp_vals.push(val.clone()); } } + ScalarValue::Int8(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int16(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int32(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int64(v) => tmp_vals.push(format!("Int({})", v.unwrap())), ScalarValue::TimestampNanosecond(_val, _an_option) => { // // Need to encode the value as a String to return to Python, Python side will then convert // // value back to a integer