From 3c7aba53a563d3e3262cffff9241d567bd617a11 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 14:53:05 -0400 Subject: [PATCH 01/28] added actual set of working job light queries --- dev_scripts/which_queries_work.sh | 2 +- optd-perftest/src/job.rs | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/dev_scripts/which_queries_work.sh b/dev_scripts/which_queries_work.sh index b5bc553c..bf9b4db8 100755 --- a/dev_scripts/which_queries_work.sh +++ b/dev_scripts/which_queries_work.sh @@ -1,6 +1,6 @@ #!/bin/bash benchmark_name=$1 -USAGE="Usage: $0 [job|tpch]" +USAGE="Usage: $0 [job|joblight|tpch]" if [ $# -ne 1 ]; then echo >&2 $USAGE diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs index 8fc941fd..2ec0f932 100644 --- a/optd-perftest/src/job.rs +++ b/optd-perftest/src/job.rs @@ -20,12 +20,7 @@ pub const WORKING_JOB_QUERY_IDS: &[&str] = &[ "28c", "29a", "29b", "29c", "30a", "30b", "30c", "31a", "31b", "31c", "32a", "32b", "33a", "33b", "33c", ]; -pub const WORKING_JOBLIGHT_QUERY_IDS: &[&str] = &[ - "1a", "1b", "1d", "2a", "3a", "3b", "3c", "4a", "4b", "4c", "5c", "6a", "6b", "6c", "6d", "6e", - "7b", "8a", "8b", "8c", "9b", "10a", "10c", "12a", "12b", "12c", "13a", "14a", "14b", "14c", - "15a", "15b", "15c", "16a", "17a", "17b", "17c", "18a", "18c", "19b", "20a", "20b", "20c", - "22a", "22b", "22c", "23a", "23b", "24a", "24b", "25a", "26a", "26b", "28a", -]; +pub const WORKING_JOBLIGHT_QUERY_IDS: &[&str] = &["1a", "1b", "1c", "1d", "2a", "3a", "3b", "4a", "4b", "4c", "5a", "5b", "5c", "6a", "6b", "6c", "6d", "7a", "7b", "7c", "8a", "8b", "8c", "9a", "9b", "10a", "10b", "10c", "11a", "11b", "11c", "12a", "12b", "12c", "13a", "14a", "14b", "14c", "16a", "17a", "17b", "17c", "18a", "19b", "20a", "20b", "20c", "21a", "21b", "22b", "23b", "24a", "24b", "25a", "26a", "26b", "27a", "27b"]; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JobKitConfig { @@ -154,9 +149,9 @@ impl JobKit { job_kit_config: &JobKitConfig, ) -> io::Result> { let queries_dpath = (if job_kit_config.is_light { - &self.job_queries_dpath - } else { &self.joblight_queries_dpath + } else { + &self.job_queries_dpath }) .clone(); let sql_fpath_ordered_iter = From c2c99d8ca2e24eb7888cb4c3013c1eb7d9b9e844 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 14:53:31 -0400 Subject: [PATCH 02/28] fmt and clip --- optd-perftest/src/job.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs index 2ec0f932..c14ad2a2 100644 --- a/optd-perftest/src/job.rs +++ b/optd-perftest/src/job.rs @@ -20,7 +20,13 @@ pub const WORKING_JOB_QUERY_IDS: &[&str] = &[ "28c", "29a", "29b", "29c", "30a", "30b", "30c", "31a", "31b", "31c", "32a", "32b", "33a", "33b", "33c", ]; -pub const WORKING_JOBLIGHT_QUERY_IDS: &[&str] = &["1a", "1b", "1c", "1d", "2a", "3a", "3b", "4a", "4b", "4c", "5a", "5b", "5c", "6a", "6b", "6c", "6d", "7a", "7b", "7c", "8a", "8b", "8c", "9a", "9b", "10a", "10b", "10c", "11a", "11b", "11c", "12a", "12b", "12c", "13a", "14a", "14b", "14c", "16a", "17a", "17b", "17c", "18a", "19b", "20a", "20b", "20c", "21a", "21b", "22b", "23b", "24a", "24b", "25a", "26a", "26b", "27a", "27b"]; +pub const WORKING_JOBLIGHT_QUERY_IDS: &[&str] = &[ + "1a", "1b", "1c", "1d", "2a", "3a", "3b", "4a", "4b", "4c", "5a", "5b", "5c", "6a", "6b", "6c", + "6d", "7a", "7b", "7c", "8a", "8b", "8c", "9a", "9b", "10a", "10b", "10c", "11a", "11b", "11c", + "12a", "12b", "12c", "13a", "14a", "14b", "14c", "16a", "17a", "17b", "17c", "18a", "19b", + "20a", "20b", "20c", "21a", "21b", "22b", "23b", "24a", "24b", "25a", "26a", "26b", "27a", + "27b", +]; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JobKitConfig { From ec81ca6669c3cddcaf85affc9a3e509007042e09 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 15:59:07 -0400 Subject: [PATCH 03/28] changed precision of mg and hll, getting us from 35 -> 40 queries ahead --- optd-datafusion-repr/src/cost/base_cost/join.rs | 1 + optd-gungnir/src/stats/hyperloglog.rs | 2 +- optd-gungnir/src/stats/misragries.rs | 2 +- optd-perftest/src/datafusion_dbms.rs | 4 ++-- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 1c2fc13e..8aa731aa 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -335,6 +335,7 @@ impl< None => DEFAULT_NUM_DISTINCT, } }); + // println!("ndistincts={:?}", ndistincts.clone().map(|ndistinct| format!("{}", ndistinct)).collect::>().join(" ")); // using reduce(f64::min) is the idiomatic workaround to min() because // f64 does not implement Ord due to NaN let selectivity = ndistincts.map(|ndistinct| 1.0 / ndistinct as f64).reduce(f64::min).expect("reduce() only returns None if the iterator is empty, which is impossible since col_ref_exprs.len() == 2"); diff --git a/optd-gungnir/src/stats/hyperloglog.rs b/optd-gungnir/src/stats/hyperloglog.rs index 840afde5..d49f6f38 100644 --- a/optd-gungnir/src/stats/hyperloglog.rs +++ b/optd-gungnir/src/stats/hyperloglog.rs @@ -10,7 +10,7 @@ use optd_core::rel_node::Value; use crate::stats::murmur2::murmur_hash; use std::{cmp::max, marker::PhantomData}; -pub const DEFAULT_PRECISION: u8 = 12; +pub const DEFAULT_PRECISION: u8 = 16; /// Trait to transform any object into a stream of bytes. pub trait ByteSerializable { diff --git a/optd-gungnir/src/stats/misragries.rs b/optd-gungnir/src/stats/misragries.rs index c4b27636..6c31c102 100644 --- a/optd-gungnir/src/stats/misragries.rs +++ b/optd-gungnir/src/stats/misragries.rs @@ -9,7 +9,7 @@ use std::{cmp::min, collections::HashMap, hash::Hash}; use itertools::Itertools; -pub const DEFAULT_K_TO_TRACK: u16 = 100; +pub const DEFAULT_K_TO_TRACK: u16 = 1000; /// The Misra-Gries structure to approximate the k most frequent elements in /// a stream of N elements. It will always identify elements with frequency diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 8c3ce67a..3e1a29b2 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -155,7 +155,7 @@ impl DatafusionDBMS { let mut estcards = vec![]; for (query_id, sql_fpath) in tpch_kit.get_sql_fpath_ordered_iter(tpch_kit_config)? { - println!( + log::debug!( "about to evaluate datafusion's estcard for TPC-H Q{}", query_id ); @@ -177,7 +177,7 @@ impl DatafusionDBMS { } else { "JOB-light" }; - println!( + log::debug!( "about to evaluate datafusion's estcard for {} Q{}", benchmark_name, query_id ); From af00dd6321f314af8ee6ad888980c489115ad151 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 17:29:19 -0400 Subject: [PATCH 04/28] test_inner_redundant_predicate -> test_add_edge_to_multi_equal_graph_which_maintains_mst --- .../src/cost/base_cost/join.rs | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 8aa731aa..b3fe1c4f 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -1132,13 +1132,8 @@ mod tests { ); } - // Ensure that in `select t1, t2, t3 where t1.a = t2.a and t2.a = t3.a and t1.a = t3.a`, - // even if the first join picks the most selective predicate (which should have been discarded, - // since we need to ensure the most selective N - 1 predicates are picked), the selectivity is - // adjusted in the second join so that the final selectivity is the product of the - // selectivities of the 2 most selective redicates. #[test] - fn test_inner_redundant_predicate() { + fn test_add_edge_to_multi_equal_graph_which_maintains_mst() { let cost_model = create_three_table_cost_model( TestPerColumnStats::new( TestMostCommonValues::empty(), @@ -1148,20 +1143,17 @@ mod tests { ), TestPerColumnStats::new( TestMostCommonValues::empty(), - 4, + 3, 0.0, Some(TestDistribution::empty()), ), TestPerColumnStats::new( TestMostCommonValues::empty(), - 5, + 4, 0.0, Some(TestDistribution::empty()), ), ); - let col01_sel = 0.25; - let col02_sel = 0.2; - let col12_sel = 0.2; let col0_base_ref = BaseTableColumnRef { table: String::from(TABLE1_NAME), col_idx: 0, @@ -1186,12 +1178,14 @@ mod tests { Some(semantic_correlation), ); + // These are the two possible ways of adding a new edge such that we maintain that the multi-equal graph + // is an MST. let eq0and2 = bin_op(BinOpType::Eq, col_ref(0), col_ref(2)); let eq1and2 = bin_op(BinOpType::Eq, col_ref(1), col_ref(2)); - let expr_tree = log_op(LogOpType::And, vec![eq0and2, eq1and2]); assert_approx_eq::assert_approx_eq!( - test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree, &column_refs), - col02_sel * (col02_sel * col12_sel) / (col01_sel * col12_sel) + test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq0and2, &column_refs), + test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq1and2, &column_refs), + 1.0 / 12.0 ); } } From 28a3d26b06b61a0da1544eee474f39a684fd022e Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 18:05:15 -0400 Subject: [PATCH 05/28] wrote test_three_table_join_for_join1_on_cond test. not passing yet --- Cargo.lock | 1 + optd-datafusion-repr/Cargo.toml | 1 + .../src/cost/base_cost/join.rs | 91 +++++++++++++------ 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f58d0c17..716c79b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2850,6 +2850,7 @@ dependencies = [ "pretty-xmlish", "serde", "serde_with", + "test-case", "tracing", "tracing-subscriber", "union-find", diff --git a/optd-datafusion-repr/Cargo.toml b/optd-datafusion-repr/Cargo.toml index c17b9d18..efe7cf83 100644 --- a/optd-datafusion-repr/Cargo.toml +++ b/optd-datafusion-repr/Cargo.toml @@ -26,3 +26,4 @@ serde = { version = "1.0", features = ["derive"] } serde_with = {version = "3.7.0", features = ["json"]} bincode = "1.3.3" union-find = { git = "https://github.com/Gun9niR/union-find-rs.git", rev = "794821514f7daefcbb8d5f38ef04e62fc18b5665" } +test-case = "3.3" diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index b3fe1c4f..cb86230b 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -1132,8 +1132,14 @@ mod tests { ); } - #[test] - fn test_add_edge_to_multi_equal_graph_which_maintains_mst() { + /// Test all possible permutations of three-table joins that only involve two join operators. + /// A three-table join consists of at least two joins. `join1_on_cond` is the condition of the first + /// join. There can only be one condition because only two tables are involved at the time of the + /// first join. + #[test_case::test_case((0, 1))] + #[test_case::test_case((0, 2))] + #[test_case::test_case((1, 2))] + fn test_three_table_join_for_join1_on_cond(join1_on_cond: (usize, usize)) { let cost_model = create_three_table_cost_model( TestPerColumnStats::new( TestMostCommonValues::empty(), @@ -1154,38 +1160,71 @@ mod tests { Some(TestDistribution::empty()), ), ); - let col0_base_ref = BaseTableColumnRef { - table: String::from(TABLE1_NAME), - col_idx: 0, - }; - let col1_base_ref = BaseTableColumnRef { - table: String::from(TABLE2_NAME), - col_idx: 0, - }; - let col2_base_ref = BaseTableColumnRef { - table: String::from(TABLE3_NAME), - col_idx: 0, - }; - let col0_ref: ColumnRef = col0_base_ref.clone().into(); - let col1_ref: ColumnRef = col1_base_ref.clone().into(); - let col2_ref: ColumnRef = col2_base_ref.clone().into(); + let col_base_refs = vec![ + BaseTableColumnRef { + table: String::from(TABLE1_NAME), + col_idx: 0, + }, + BaseTableColumnRef { + table: String::from(TABLE2_NAME), + col_idx: 0, + }, + BaseTableColumnRef { + table: String::from(TABLE3_NAME), + col_idx: 0, + }, + ]; + let col_refs: Vec = col_base_refs.clone().into_iter().map(|col_base_ref| col_base_ref.into()).collect(); let mut eq_columns = EqBaseTableColumnSets::new(); - eq_columns.add_predicate(EqPredicate::new(col0_base_ref, col1_base_ref)); + eq_columns.add_predicate(EqPredicate::new(col_base_refs[join1_on_cond.0].clone(), col_base_refs[join1_on_cond.1].clone())); + let join1_selectivity = { + if join1_on_cond == (0, 1) { + 1.0 / 3.0 + } else if join1_on_cond == (0, 2) { + 1.0 / 4.0 + } else if join1_on_cond == (1, 2) { + 1.0 / 4.0 + } else { + panic!(); + } + }; let semantic_correlation = SemanticCorrelation::new(eq_columns); let column_refs = GroupColumnRefs::new_test( - vec![col0_ref.clone(), col1_ref.clone(), col2_ref.clone()], + col_refs, Some(semantic_correlation), ); - // These are the two possible ways of adding a new edge such that we maintain that the multi-equal graph - // is an MST. + // Try all join conditions of the second join which would lead to a fully joined third + // table (i.e. any set of pair of columns except the set that contains a single pair + // which is the exact on condition of the first join). + let eq0and1 = bin_op(BinOpType::Eq, col_ref(0), col_ref(1)); let eq0and2 = bin_op(BinOpType::Eq, col_ref(0), col_ref(2)); let eq1and2 = bin_op(BinOpType::Eq, col_ref(1), col_ref(2)); - assert_approx_eq::assert_approx_eq!( - test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq0and2, &column_refs), - test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq1and2, &column_refs), - 1.0 / 12.0 - ); + let and_01_02 = log_op(LogOpType::And, vec![eq0and1.clone(), eq0and2.clone()]); + let and_01_12 = log_op(LogOpType::And, vec![eq0and1.clone(), eq1and2.clone()]); + let and_02_12 = log_op(LogOpType::And, vec![eq0and2.clone(), eq1and2.clone()]); + let mut join2_expr_trees = vec![and_01_02, and_01_12, and_02_12]; + if join1_on_cond == (0, 1) { + join2_expr_trees.push(eq0and2); + join2_expr_trees.push(eq1and2); + } else if join1_on_cond == (0, 2) { + join2_expr_trees.push(eq0and1); + join2_expr_trees.push(eq1and2); + } else if join1_on_cond == (1, 2) { + join2_expr_trees.push(eq0and1); + join2_expr_trees.push(eq0and2); + } else { + panic!(); + } + for expr_tree in join2_expr_trees { + println!("expr_tree={:?}", expr_tree); + assert_approx_eq::assert_approx_eq!( + join1_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree, &column_refs), + 1.0 / 12.0 + ); + } } + + // TODO(phw2): three table join that involves three join operations } From 9104a814766b76f61047c05f044c060065dc38a9 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 19:02:27 -0400 Subject: [PATCH 06/28] turned get_join_selectivity_from_most_selective_predicates into get_join_selectivity_from_most_selective_columns --- .../src/cost/base_cost/join.rs | 101 +++++------------- .../src/properties/column_ref.rs | 9 ++ 2 files changed, 36 insertions(+), 74 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index cb86230b..73897c4f 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -1,4 +1,4 @@ -use std::ops::ControlFlow; +use std::{collections::HashSet, ops::ControlFlow}; use itertools::Itertools; use optd_core::{ @@ -346,68 +346,21 @@ impl< selectivity } - /// Given a set of equality predicates P that define N equal columns, find the selectivity of - /// the most selective N - 1 predicates that "touches" all the columns. - /// - /// We solve the problem using MST (Minimum Spanning Tree), where the columns are nodes and the - /// predicates are undirected edges. Since all the columns are equal, the graph is connected. - fn get_join_selecitivity_from_most_selective_predicates( + /// Given a set of N columns involved in a multi-equality, find the total selectivity + /// of the multi-equality. + /// + /// This is a generalization of get_join_selectivity_from_on_col_ref_pair(). + fn get_join_selectivity_from_most_selective_columns( &self, - predicates: Vec, - num_cols: usize, + base_col_refs: HashSet, ) -> f64 { - let mut acc_sel = 1.0; - let mut num_picked_predicates = 0; - let mut disjoint_sets = DisjointSets::new(); - - // Use Kruskal to compute MST. - // Step 1: sort predicates by selectivity in ascending order. - let mut sorted_predicates = predicates - .into_iter() - .map(|p| { - let sel: f64 = self.get_join_selectivity_from_on_col_ref_pair( - &p.left.clone().into(), - &p.right.clone().into(), - ); - (p, sel) - }) - .sorted_by(|(_, sel1), (_, sel2)| sel1.partial_cmp(sel2).unwrap()); - - // Step 2: pick predicates until all columns are "connected" by the predicates. - sorted_predicates.try_for_each(|(p, sel)| { - if !disjoint_sets.contains(&p.left) { - disjoint_sets.make_set(p.left.clone()).unwrap(); - } - if !disjoint_sets.contains(&p.right) { - disjoint_sets.make_set(p.right.clone()).unwrap(); - } - if !disjoint_sets.same_set(&p.left, &p.right).unwrap() { - acc_sel *= sel; - num_picked_predicates += 1; - disjoint_sets.union(&p.left, &p.right).unwrap(); - } - if num_picked_predicates == num_cols - 1 { - ControlFlow::Break(()) - } else { - ControlFlow::Continue(()) + let num_base_col_refs = base_col_refs.len(); + base_col_refs.into_iter().map(|base_col_ref| { + match self.get_column_comb_stats(&base_col_ref.table, &[base_col_ref.col_idx]) { + Some(per_col_stats) => per_col_stats.ndistinct, + None => DEFAULT_NUM_DISTINCT } - }); - debug_assert_eq!( - num_picked_predicates, - num_cols - 1, - "we should have picked N - 1 predicates" - ); - debug_assert_eq!( - disjoint_sets.num_sets(), - 1, - "all columns should be connected by the predicates" - ); - debug_assert_eq!( - disjoint_sets.num_items(), - num_cols, - "all columns should be connected by the predicates" - ); - acc_sel + }).map(|ndistinct| 1.0 / ndistinct as f64).sorted_by(|a, b| a.partial_cmp(b).expect("No floats should be NaN since n-distinct is never 0")).take(num_base_col_refs - 1).product() } /// A predicate set contains "redundant" predicates if some of them can be expressed with the rest. @@ -418,7 +371,7 @@ impl< /// If we have N columns that are equal, and the set of equality predicates P that defines the /// equalities (|P| >= N - 1), we pick the N - 1 most selective predicates (denoted P') that /// define the equalities by computing the MST of the graph where the columns are nodes and the - /// predicates are edges (see `get_join_selecitivity_from_most_selective_predicates` for + /// predicates are edges (see `get_join_selectivity_from_most_selective_predicates` for /// implementation). /// /// But since child has already picked some predicates which might not be the most selective @@ -434,27 +387,26 @@ impl< predicate: EqPredicate, past_eq_columns: &mut EqBaseTableColumnSets, ) -> f64 { - let left = predicate.left.clone(); // Compute the selectivity of the most selective N - 1 predicates. + let left = predicate.left.clone(); let children_pred_sel = { - let predicates = past_eq_columns.find_predicates_for_eq_column_set(&left); - self.get_join_selecitivity_from_most_selective_predicates( - predicates, - past_eq_columns.num_eq_columns(&left), + let cols = past_eq_columns.find_cols_for_eq_column_set(&left); + self.get_join_selectivity_from_most_selective_columns( + cols ) }; - // Add predicate to past_eq_columns. + + // Add predicate to past_eq_columns and repeat the process. past_eq_columns.add_predicate(predicate); - // Repeat the same process with the new predicate. let new_pred_sel = { - let predicates = past_eq_columns.find_predicates_for_eq_column_set(&left); - self.get_join_selecitivity_from_most_selective_predicates( - predicates, - past_eq_columns.num_eq_columns(&left), + let cols = past_eq_columns.find_cols_for_eq_column_set(&left); + self.get_join_selectivity_from_most_selective_columns( + cols ) }; // Compute division of MSTs as the selectivity. + println!("new_pred_sel={new_pred_sel}, children_pred_sel={children_pred_sel}"); new_pred_sel / children_pred_sel } @@ -1218,9 +1170,10 @@ mod tests { panic!(); } for expr_tree in join2_expr_trees { - println!("expr_tree={:?}", expr_tree); + let both_joins_selectivity = join1_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree.clone(), &column_refs); + println!("expr_tree={:?}, both_joins_selectivity={both_joins_selectivity}", expr_tree); assert_approx_eq::assert_approx_eq!( - join1_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree, &column_refs), + both_joins_selectivity, 1.0 / 12.0 ); } diff --git a/optd-datafusion-repr/src/properties/column_ref.rs b/optd-datafusion-repr/src/properties/column_ref.rs index 4893a9ea..26fed38e 100644 --- a/optd-datafusion-repr/src/properties/column_ref.rs +++ b/optd-datafusion-repr/src/properties/column_ref.rs @@ -165,6 +165,15 @@ impl EqBaseTableColumnSets { predicates } + /// Find the set of columns that define the equality of the set of columns `col` belongs to. + pub fn find_cols_for_eq_column_set( + &mut self, + col: &BaseTableColumnRef, + ) -> HashSet { + let predicates = self.find_predicates_for_eq_column_set(col); + predicates.into_iter().flat_map(|predicate| vec![predicate.left, predicate.right]).collect() + } + /// Union two `EqBaseTableColumnSets` to produce a new disjoint sets. pub fn union(x: &EqBaseTableColumnSets, y: &EqBaseTableColumnSets) -> EqBaseTableColumnSets { let mut eq_col_sets = Self::new(); From 91791f7c6bd5294bb617c50028d4f2b895c4ced5 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 19:23:29 -0400 Subject: [PATCH 07/28] fixed bug of adding new predicate that touches one col of existing pred --- .../src/cost/base_cost/join.rs | 54 ++++++++++++------- .../src/properties/column_ref.rs | 6 ++- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 73897c4f..13fc7484 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -354,6 +354,7 @@ impl< &self, base_col_refs: HashSet, ) -> f64 { + assert!(base_col_refs.len() > 1); let num_base_col_refs = base_col_refs.len(); base_col_refs.into_iter().map(|base_col_ref| { match self.get_column_comb_stats(&base_col_ref.table, &[base_col_ref.col_idx]) { @@ -384,29 +385,48 @@ impl< /// NOTE: This function modifies `past_eq_columns` by adding `predicate` to it. fn get_join_selectivity_adjustment_from_redundant_predicates( &self, - predicate: EqPredicate, + predicate: &EqPredicate, past_eq_columns: &mut EqBaseTableColumnSets, ) -> f64 { - // Compute the selectivity of the most selective N - 1 predicates. - let left = predicate.left.clone(); + // To find the adjustment, we need to know the selectivity of the graph before `predicate` is added. + // + // There are two cases: (1) adding `predicate` does not change the # of connected components, and + // (2) adding `predicate` reduces the # of connected by 1. Note that columns not involved in any + // predicates are considered a part of the graph and are a connected component on their own. let children_pred_sel = { - let cols = past_eq_columns.find_cols_for_eq_column_set(&left); - self.get_join_selectivity_from_most_selective_columns( - cols - ) + if past_eq_columns.is_eq(&predicate.left, &predicate.right) { + self.get_join_selectivity_from_most_selective_columns( + past_eq_columns.find_cols_for_eq_column_set(&predicate.left) + ) + } else { + let left_sel = if past_eq_columns.contains(&predicate.left) { + self.get_join_selectivity_from_most_selective_columns( + past_eq_columns.find_cols_for_eq_column_set(&predicate.left) + ) + } else { + 1.0 + }; + let right_sel = if past_eq_columns.contains(&predicate.right) { + self.get_join_selectivity_from_most_selective_columns( + past_eq_columns.find_cols_for_eq_column_set(&predicate.right) + ) + } else { + 1.0 + }; + left_sel * right_sel + } }; - // Add predicate to past_eq_columns and repeat the process. - past_eq_columns.add_predicate(predicate); + // Add predicate to past_eq_columns and compute the selectivity of the connected component it creates. + past_eq_columns.add_predicate(predicate.clone()); let new_pred_sel = { - let cols = past_eq_columns.find_cols_for_eq_column_set(&left); + let cols = past_eq_columns.find_cols_for_eq_column_set(&predicate.left); self.get_join_selectivity_from_most_selective_columns( cols ) }; // Compute division of MSTs as the selectivity. - println!("new_pred_sel={new_pred_sel}, children_pred_sel={children_pred_sel}"); new_pred_sel / children_pred_sel } @@ -445,14 +465,10 @@ impl< (left_col_ref, right_col_ref) { let predicate = EqPredicate::new(left.clone(), right.clone()); - if past_eq_columns.is_eq(left, right) { - return self.get_join_selectivity_adjustment_from_redundant_predicates( - predicate, - &mut past_eq_columns, - ); - } else { - past_eq_columns.add_predicate(predicate); - } + return self.get_join_selectivity_adjustment_from_redundant_predicates( + &predicate, + &mut past_eq_columns, + ); } self.get_join_selectivity_from_on_col_ref_pair(left_col_ref, right_col_ref) diff --git a/optd-datafusion-repr/src/properties/column_ref.rs b/optd-datafusion-repr/src/properties/column_ref.rs index 26fed38e..102c15f3 100644 --- a/optd-datafusion-repr/src/properties/column_ref.rs +++ b/optd-datafusion-repr/src/properties/column_ref.rs @@ -135,13 +135,17 @@ impl EqBaseTableColumnSets { self.eq_predicates.insert(predicate); } - /// Determine if two columns are equal. + /// Determine if two columns are in the same set. pub fn is_eq(&mut self, left: &BaseTableColumnRef, right: &BaseTableColumnRef) -> bool { self.disjoint_eq_col_sets .same_set(left, right) .unwrap_or(false) } + pub fn contains(&self, base_col_ref: &BaseTableColumnRef) -> bool { + self.disjoint_eq_col_sets.contains(base_col_ref) + } + /// Get the number of columns that are equal to `col`, including `col` itself. pub fn num_eq_columns(&mut self, col: &BaseTableColumnRef) -> usize { self.disjoint_eq_col_sets.set_size(col).unwrap() From d9a8f293cf775059801850cee59a86d43b342835 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 19:43:10 -0400 Subject: [PATCH 08/28] generalized three table join test to allow for arbitrary initial join conds --- .../src/cost/base_cost/join.rs | 75 +++++++++++-------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 13fc7484..8d1c2034 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -479,6 +479,8 @@ impl< #[cfg(test)] mod tests { + use std::collections::HashSet; + use optd_core::rel_node::Value; use crate::{ @@ -1100,14 +1102,20 @@ mod tests { ); } - /// Test all possible permutations of three-table joins that only involve two join operators. + /// Test all possible permutations of three-table joins. /// A three-table join consists of at least two joins. `join1_on_cond` is the condition of the first /// join. There can only be one condition because only two tables are involved at the time of the /// first join. - #[test_case::test_case((0, 1))] - #[test_case::test_case((0, 2))] - #[test_case::test_case((1, 2))] - fn test_three_table_join_for_join1_on_cond(join1_on_cond: (usize, usize)) { + #[test_case::test_case(&[(0, 1)])] + #[test_case::test_case(&[(0, 2)])] + #[test_case::test_case(&[(1, 2)])] + #[test_case::test_case(&[(0, 1), (0, 2)])] + #[test_case::test_case(&[(0, 1), (1, 2)])] + #[test_case::test_case(&[(0, 2), (1, 2)])] + #[test_case::test_case(&[(0, 1), (0, 2), (1, 2)])] + fn test_three_table_join_for_initial_join_on_conds(initial_join_on_conds: &[(usize, usize)]) { + assert!(!initial_join_on_conds.is_empty(), "initial_join_on_conds should be non-empty"); + assert_eq!(initial_join_on_conds.len(), initial_join_on_conds.iter().collect::>().len(), "initial_join_on_conds shouldn't contain duplicates"); let cost_model = create_three_table_cost_model( TestPerColumnStats::new( TestMostCommonValues::empty(), @@ -1145,16 +1153,23 @@ mod tests { let col_refs: Vec = col_base_refs.clone().into_iter().map(|col_base_ref| col_base_ref.into()).collect(); let mut eq_columns = EqBaseTableColumnSets::new(); - eq_columns.add_predicate(EqPredicate::new(col_base_refs[join1_on_cond.0].clone(), col_base_refs[join1_on_cond.1].clone())); + for initial_join_on_cond in initial_join_on_conds { + eq_columns.add_predicate(EqPredicate::new(col_base_refs[initial_join_on_cond.0].clone(), col_base_refs[initial_join_on_cond.1].clone())); + } let join1_selectivity = { - if join1_on_cond == (0, 1) { - 1.0 / 3.0 - } else if join1_on_cond == (0, 2) { - 1.0 / 4.0 - } else if join1_on_cond == (1, 2) { - 1.0 / 4.0 + if initial_join_on_conds.len() == 1 { + let initial_join_on_cond = initial_join_on_conds.first().unwrap(); + if initial_join_on_cond == &(0, 1) { + 1.0 / 3.0 + } else if initial_join_on_cond == &(0, 2) { + 1.0 / 4.0 + } else if initial_join_on_cond == &(1, 2) { + 1.0 / 4.0 + } else { + panic!(); + } } else { - panic!(); + 1.0 / 12.0 } }; let semantic_correlation = SemanticCorrelation::new(eq_columns); @@ -1163,27 +1178,29 @@ mod tests { Some(semantic_correlation), ); - // Try all join conditions of the second join which would lead to a fully joined third - // table (i.e. any set of pair of columns except the set that contains a single pair - // which is the exact on condition of the first join). + // Try all join conditions of the final join which would lead to all three tables being joined. let eq0and1 = bin_op(BinOpType::Eq, col_ref(0), col_ref(1)); let eq0and2 = bin_op(BinOpType::Eq, col_ref(0), col_ref(2)); let eq1and2 = bin_op(BinOpType::Eq, col_ref(1), col_ref(2)); let and_01_02 = log_op(LogOpType::And, vec![eq0and1.clone(), eq0and2.clone()]); let and_01_12 = log_op(LogOpType::And, vec![eq0and1.clone(), eq1and2.clone()]); let and_02_12 = log_op(LogOpType::And, vec![eq0and2.clone(), eq1and2.clone()]); - let mut join2_expr_trees = vec![and_01_02, and_01_12, and_02_12]; - if join1_on_cond == (0, 1) { - join2_expr_trees.push(eq0and2); - join2_expr_trees.push(eq1and2); - } else if join1_on_cond == (0, 2) { - join2_expr_trees.push(eq0and1); - join2_expr_trees.push(eq1and2); - } else if join1_on_cond == (1, 2) { - join2_expr_trees.push(eq0and1); - join2_expr_trees.push(eq0and2); - } else { - panic!(); + let and_01_02_12 = log_op(LogOpType::And, vec![eq0and1.clone(), eq0and2.clone(), eq1and2.clone()]); + let mut join2_expr_trees = vec![and_01_02, and_01_12, and_02_12, and_01_02_12]; + if initial_join_on_conds.len() == 1 { + let initial_join_on_cond = initial_join_on_conds.first().unwrap(); + if initial_join_on_cond == &(0, 1) { + join2_expr_trees.push(eq0and2); + join2_expr_trees.push(eq1and2); + } else if initial_join_on_cond == &(0, 2) { + join2_expr_trees.push(eq0and1); + join2_expr_trees.push(eq1and2); + } else if initial_join_on_cond == &(1, 2) { + join2_expr_trees.push(eq0and1); + join2_expr_trees.push(eq0and2); + } else { + panic!(); + } } for expr_tree in join2_expr_trees { let both_joins_selectivity = join1_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree.clone(), &column_refs); @@ -1194,6 +1211,4 @@ mod tests { ); } } - - // TODO(phw2): three table join that involves three join operations } From 42a91620787c02942cdbed32456e2a00d3cf8d38 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 19:50:24 -0400 Subject: [PATCH 09/28] added test_join_which_connects_two_components_together --- optd-datafusion-repr/src/cost/base_cost.rs | 44 ++++++++++++ .../src/cost/base_cost/join.rs | 70 +++++++++++++++++-- 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost.rs b/optd-datafusion-repr/src/cost/base_cost.rs index 922fb825..1ec44f70 100644 --- a/optd-datafusion-repr/src/cost/base_cost.rs +++ b/optd-datafusion-repr/src/cost/base_cost.rs @@ -318,6 +318,7 @@ mod tests { pub const TABLE1_NAME: &str = "table1"; pub const TABLE2_NAME: &str = "table2"; pub const TABLE3_NAME: &str = "table3"; + pub const TABLE4_NAME: &str = "table4"; // one column is sufficient for all filter selectivity tests pub fn create_one_column_cost_model(per_column_stats: TestPerColumnStats) -> TestOptCostModel { @@ -379,6 +380,49 @@ mod tests { ) } + /// Create a cost model with three columns, one for each table. Each column has 100 values. + pub fn create_four_table_cost_model( + tbl1_per_column_stats: TestPerColumnStats, + tbl2_per_column_stats: TestPerColumnStats, + tbl3_per_column_stats: TestPerColumnStats, + tbl4_per_column_stats: TestPerColumnStats, + ) -> TestOptCostModel { + OptCostModel::new( + vec![ + ( + String::from(TABLE1_NAME), + TableStats::new( + 100, + vec![(vec![0], tbl1_per_column_stats)].into_iter().collect(), + ), + ), + ( + String::from(TABLE2_NAME), + TableStats::new( + 100, + vec![(vec![0], tbl2_per_column_stats)].into_iter().collect(), + ), + ), + ( + String::from(TABLE3_NAME), + TableStats::new( + 100, + vec![(vec![0], tbl3_per_column_stats)].into_iter().collect(), + ), + ), + ( + String::from(TABLE4_NAME), + TableStats::new( + 100, + vec![(vec![0], tbl4_per_column_stats)].into_iter().collect(), + ), + ), + ] + .into_iter() + .collect(), + ) + } + /// We need custom row counts because some join algorithms rely on the row cnt pub fn create_two_table_cost_model_custom_row_cnts( tbl1_per_column_stats: TestPerColumnStats, diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 8d1c2034..7485aa82 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -1156,7 +1156,7 @@ mod tests { for initial_join_on_cond in initial_join_on_conds { eq_columns.add_predicate(EqPredicate::new(col_base_refs[initial_join_on_cond.0].clone(), col_base_refs[initial_join_on_cond.1].clone())); } - let join1_selectivity = { + let initial_selectivity = { if initial_join_on_conds.len() == 1 { let initial_join_on_cond = initial_join_on_conds.first().unwrap(); if initial_join_on_cond == &(0, 1) { @@ -1203,12 +1203,74 @@ mod tests { } } for expr_tree in join2_expr_trees { - let both_joins_selectivity = join1_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree.clone(), &column_refs); - println!("expr_tree={:?}, both_joins_selectivity={both_joins_selectivity}", expr_tree); + let overall_selectivity = initial_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree.clone(), &column_refs); assert_approx_eq::assert_approx_eq!( - both_joins_selectivity, + overall_selectivity, 1.0 / 12.0 ); } } + + #[test] + fn test_join_which_connects_two_components_together() { + let cost_model = create_four_table_cost_model( + TestPerColumnStats::new( + TestMostCommonValues::empty(), + 2, + 0.0, + Some(TestDistribution::empty()), + ), + TestPerColumnStats::new( + TestMostCommonValues::empty(), + 3, + 0.0, + Some(TestDistribution::empty()), + ), + TestPerColumnStats::new( + TestMostCommonValues::empty(), + 4, + 0.0, + Some(TestDistribution::empty()), + ), + TestPerColumnStats::new( + TestMostCommonValues::empty(), + 5, + 0.0, + Some(TestDistribution::empty()), + ), + ); + let col_base_refs = vec![ + BaseTableColumnRef { + table: String::from(TABLE1_NAME), + col_idx: 0, + }, + BaseTableColumnRef { + table: String::from(TABLE2_NAME), + col_idx: 0, + }, + BaseTableColumnRef { + table: String::from(TABLE3_NAME), + col_idx: 0, + }, + BaseTableColumnRef { + table: String::from(TABLE4_NAME), + col_idx: 0, + }, + ]; + let col_refs: Vec = col_base_refs.clone().into_iter().map(|col_base_ref| col_base_ref.into()).collect(); + + let mut eq_columns = EqBaseTableColumnSets::new(); + eq_columns.add_predicate(EqPredicate::new(col_base_refs[0].clone(), col_base_refs[1].clone())); + eq_columns.add_predicate(EqPredicate::new(col_base_refs[2].clone(), col_base_refs[3].clone())); + let initial_selectivity = 1.0 / (3.0 * 5.0); + let semantic_correlation = SemanticCorrelation::new(eq_columns); + let column_refs = GroupColumnRefs::new_test( + col_refs, + Some(semantic_correlation), + ); + + let eq1and2 = bin_op(BinOpType::Eq, col_ref(1), col_ref(2)); + let overall_selectivity = initial_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq1and2.clone(), &column_refs); + assert_approx_eq::assert_approx_eq!(overall_selectivity, 1.0 / (3.0 * 4.0 * 5.0)); + } } From 0474fcd21fe3212234b9e7a2a043ad9989d114c4 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sat, 27 Apr 2024 20:03:29 -0400 Subject: [PATCH 10/28] fmt and clip --- .../src/cost/base_cost/join.rs | 118 ++++++++++++------ .../src/properties/column_ref.rs | 5 +- optd-perftest/src/datafusion_dbms.rs | 3 +- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 7485aa82..9c30d99a 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, ops::ControlFlow}; +use std::collections::HashSet; use itertools::Itertools; use optd_core::{ @@ -6,7 +6,6 @@ use optd_core::{ cost::Cost, }; use serde::{de::DeserializeOwned, Serialize}; -use union_find::{disjoint_sets::DisjointSets, union_find::UnionFind}; use crate::{ cost::base_cost::{ @@ -348,7 +347,7 @@ impl< /// Given a set of N columns involved in a multi-equality, find the total selectivity /// of the multi-equality. - /// + /// /// This is a generalization of get_join_selectivity_from_on_col_ref_pair(). fn get_join_selectivity_from_most_selective_columns( &self, @@ -356,12 +355,21 @@ impl< ) -> f64 { assert!(base_col_refs.len() > 1); let num_base_col_refs = base_col_refs.len(); - base_col_refs.into_iter().map(|base_col_ref| { - match self.get_column_comb_stats(&base_col_ref.table, &[base_col_ref.col_idx]) { - Some(per_col_stats) => per_col_stats.ndistinct, - None => DEFAULT_NUM_DISTINCT - } - }).map(|ndistinct| 1.0 / ndistinct as f64).sorted_by(|a, b| a.partial_cmp(b).expect("No floats should be NaN since n-distinct is never 0")).take(num_base_col_refs - 1).product() + base_col_refs + .into_iter() + .map(|base_col_ref| { + match self.get_column_comb_stats(&base_col_ref.table, &[base_col_ref.col_idx]) { + Some(per_col_stats) => per_col_stats.ndistinct, + None => DEFAULT_NUM_DISTINCT, + } + }) + .map(|ndistinct| 1.0 / ndistinct as f64) + .sorted_by(|a, b| { + a.partial_cmp(b) + .expect("No floats should be NaN since n-distinct is never 0") + }) + .take(num_base_col_refs - 1) + .product() } /// A predicate set contains "redundant" predicates if some of them can be expressed with the rest. @@ -396,19 +404,19 @@ impl< let children_pred_sel = { if past_eq_columns.is_eq(&predicate.left, &predicate.right) { self.get_join_selectivity_from_most_selective_columns( - past_eq_columns.find_cols_for_eq_column_set(&predicate.left) + past_eq_columns.find_cols_for_eq_column_set(&predicate.left), ) } else { let left_sel = if past_eq_columns.contains(&predicate.left) { self.get_join_selectivity_from_most_selective_columns( - past_eq_columns.find_cols_for_eq_column_set(&predicate.left) + past_eq_columns.find_cols_for_eq_column_set(&predicate.left), ) } else { 1.0 }; let right_sel = if past_eq_columns.contains(&predicate.right) { self.get_join_selectivity_from_most_selective_columns( - past_eq_columns.find_cols_for_eq_column_set(&predicate.right) + past_eq_columns.find_cols_for_eq_column_set(&predicate.right), ) } else { 1.0 @@ -421,9 +429,7 @@ impl< past_eq_columns.add_predicate(predicate.clone()); let new_pred_sel = { let cols = past_eq_columns.find_cols_for_eq_column_set(&predicate.left); - self.get_join_selectivity_from_most_selective_columns( - cols - ) + self.get_join_selectivity_from_most_selective_columns(cols) }; // Compute division of MSTs as the selectivity. @@ -1114,8 +1120,15 @@ mod tests { #[test_case::test_case(&[(0, 2), (1, 2)])] #[test_case::test_case(&[(0, 1), (0, 2), (1, 2)])] fn test_three_table_join_for_initial_join_on_conds(initial_join_on_conds: &[(usize, usize)]) { - assert!(!initial_join_on_conds.is_empty(), "initial_join_on_conds should be non-empty"); - assert_eq!(initial_join_on_conds.len(), initial_join_on_conds.iter().collect::>().len(), "initial_join_on_conds shouldn't contain duplicates"); + assert!( + !initial_join_on_conds.is_empty(), + "initial_join_on_conds should be non-empty" + ); + assert_eq!( + initial_join_on_conds.len(), + initial_join_on_conds.iter().collect::>().len(), + "initial_join_on_conds shouldn't contain duplicates" + ); let cost_model = create_three_table_cost_model( TestPerColumnStats::new( TestMostCommonValues::empty(), @@ -1150,20 +1163,25 @@ mod tests { col_idx: 0, }, ]; - let col_refs: Vec = col_base_refs.clone().into_iter().map(|col_base_ref| col_base_ref.into()).collect(); + let col_refs: Vec = col_base_refs + .clone() + .into_iter() + .map(|col_base_ref| col_base_ref.into()) + .collect(); let mut eq_columns = EqBaseTableColumnSets::new(); for initial_join_on_cond in initial_join_on_conds { - eq_columns.add_predicate(EqPredicate::new(col_base_refs[initial_join_on_cond.0].clone(), col_base_refs[initial_join_on_cond.1].clone())); + eq_columns.add_predicate(EqPredicate::new( + col_base_refs[initial_join_on_cond.0].clone(), + col_base_refs[initial_join_on_cond.1].clone(), + )); } let initial_selectivity = { if initial_join_on_conds.len() == 1 { let initial_join_on_cond = initial_join_on_conds.first().unwrap(); if initial_join_on_cond == &(0, 1) { 1.0 / 3.0 - } else if initial_join_on_cond == &(0, 2) { - 1.0 / 4.0 - } else if initial_join_on_cond == &(1, 2) { + } else if initial_join_on_cond == &(0, 2) || initial_join_on_cond == &(1, 2) { 1.0 / 4.0 } else { panic!(); @@ -1173,10 +1191,7 @@ mod tests { } }; let semantic_correlation = SemanticCorrelation::new(eq_columns); - let column_refs = GroupColumnRefs::new_test( - col_refs, - Some(semantic_correlation), - ); + let column_refs = GroupColumnRefs::new_test(col_refs, Some(semantic_correlation)); // Try all join conditions of the final join which would lead to all three tables being joined. let eq0and1 = bin_op(BinOpType::Eq, col_ref(0), col_ref(1)); @@ -1185,7 +1200,10 @@ mod tests { let and_01_02 = log_op(LogOpType::And, vec![eq0and1.clone(), eq0and2.clone()]); let and_01_12 = log_op(LogOpType::And, vec![eq0and1.clone(), eq1and2.clone()]); let and_02_12 = log_op(LogOpType::And, vec![eq0and2.clone(), eq1and2.clone()]); - let and_01_02_12 = log_op(LogOpType::And, vec![eq0and1.clone(), eq0and2.clone(), eq1and2.clone()]); + let and_01_02_12 = log_op( + LogOpType::And, + vec![eq0and1.clone(), eq0and2.clone(), eq1and2.clone()], + ); let mut join2_expr_trees = vec![and_01_02, and_01_12, and_02_12, and_01_02_12]; if initial_join_on_conds.len() == 1 { let initial_join_on_cond = initial_join_on_conds.first().unwrap(); @@ -1203,11 +1221,15 @@ mod tests { } } for expr_tree in join2_expr_trees { - let overall_selectivity = initial_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, expr_tree.clone(), &column_refs); - assert_approx_eq::assert_approx_eq!( - overall_selectivity, - 1.0 / 12.0 - ); + let overall_selectivity = initial_selectivity + * test_get_join_selectivity( + &cost_model, + false, + JoinType::Inner, + expr_tree.clone(), + &column_refs, + ); + assert_approx_eq::assert_approx_eq!(overall_selectivity, 1.0 / 12.0); } } @@ -1257,20 +1279,34 @@ mod tests { col_idx: 0, }, ]; - let col_refs: Vec = col_base_refs.clone().into_iter().map(|col_base_ref| col_base_ref.into()).collect(); - + let col_refs: Vec = col_base_refs + .clone() + .into_iter() + .map(|col_base_ref| col_base_ref.into()) + .collect(); + let mut eq_columns = EqBaseTableColumnSets::new(); - eq_columns.add_predicate(EqPredicate::new(col_base_refs[0].clone(), col_base_refs[1].clone())); - eq_columns.add_predicate(EqPredicate::new(col_base_refs[2].clone(), col_base_refs[3].clone())); + eq_columns.add_predicate(EqPredicate::new( + col_base_refs[0].clone(), + col_base_refs[1].clone(), + )); + eq_columns.add_predicate(EqPredicate::new( + col_base_refs[2].clone(), + col_base_refs[3].clone(), + )); let initial_selectivity = 1.0 / (3.0 * 5.0); let semantic_correlation = SemanticCorrelation::new(eq_columns); - let column_refs = GroupColumnRefs::new_test( - col_refs, - Some(semantic_correlation), - ); + let column_refs = GroupColumnRefs::new_test(col_refs, Some(semantic_correlation)); let eq1and2 = bin_op(BinOpType::Eq, col_ref(1), col_ref(2)); - let overall_selectivity = initial_selectivity * test_get_join_selectivity(&cost_model, false, JoinType::Inner, eq1and2.clone(), &column_refs); + let overall_selectivity = initial_selectivity + * test_get_join_selectivity( + &cost_model, + false, + JoinType::Inner, + eq1and2.clone(), + &column_refs, + ); assert_approx_eq::assert_approx_eq!(overall_selectivity, 1.0 / (3.0 * 4.0 * 5.0)); } } diff --git a/optd-datafusion-repr/src/properties/column_ref.rs b/optd-datafusion-repr/src/properties/column_ref.rs index 102c15f3..08151931 100644 --- a/optd-datafusion-repr/src/properties/column_ref.rs +++ b/optd-datafusion-repr/src/properties/column_ref.rs @@ -175,7 +175,10 @@ impl EqBaseTableColumnSets { col: &BaseTableColumnRef, ) -> HashSet { let predicates = self.find_predicates_for_eq_column_set(col); - predicates.into_iter().flat_map(|predicate| vec![predicate.left, predicate.right]).collect() + predicates + .into_iter() + .flat_map(|predicate| vec![predicate.left, predicate.right]) + .collect() } /// Union two `EqBaseTableColumnSets` to produce a new disjoint sets. diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 3e1a29b2..94371166 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -179,7 +179,8 @@ impl DatafusionDBMS { }; log::debug!( "about to evaluate datafusion's estcard for {} Q{}", - benchmark_name, query_id + benchmark_name, + query_id ); let sql = fs::read_to_string(sql_fpath)?; let estcard = self.eval_query_estcard(&sql).await?; From 4fc54c129ea1a5ab81d68048af044532de164078 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sun, 28 Apr 2024 11:01:18 -0400 Subject: [PATCH 11/28] changed comment and name of what is now get_join_selectivity_adjustment_when_adding_to_multi_equality_graph() --- .../src/cost/base_cost/join.rs | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 9c30d99a..0269334e 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -334,7 +334,6 @@ impl< None => DEFAULT_NUM_DISTINCT, } }); - // println!("ndistincts={:?}", ndistincts.clone().map(|ndistinct| format!("{}", ndistinct)).collect::>().join(" ")); // using reduce(f64::min) is the idiomatic workaround to min() because // f64 does not implement Ord due to NaN let selectivity = ndistincts.map(|ndistinct| 1.0 / ndistinct as f64).reduce(f64::min).expect("reduce() only returns None if the iterator is empty, which is impossible since col_ref_exprs.len() == 2"); @@ -372,26 +371,23 @@ impl< .product() } - /// A predicate set contains "redundant" predicates if some of them can be expressed with the rest. - /// E.g. In { A = B, B = C, A = C }, one of the predicates is redundant. - /// In this case, we want to pick the most selective predicates that touch all the columns - /// that this set of predicates touches. + /// A predicate set defines a "multi-equality graph", which is an unweighted undirected graph. The + /// nodes are columns while edges are predicates. The old graph is defined by `past_eq_columns` + /// while the `predicate` is the new addition to this graph. This unweighted undirected graph + /// consists of a number of connected components, where each connected component represents columns + /// that are set to be equal to each other. Single nodes not connected to anything are considered + /// standalone connected components. /// - /// If we have N columns that are equal, and the set of equality predicates P that defines the - /// equalities (|P| >= N - 1), we pick the N - 1 most selective predicates (denoted P') that - /// define the equalities by computing the MST of the graph where the columns are nodes and the - /// predicates are edges (see `get_join_selectivity_from_most_selective_predicates` for - /// implementation). - /// - /// But since child has already picked some predicates which might not be the most selective - /// (because it has not seen the most selective ones), when we encounter a potentially more - /// selective `predicate` (in the parameter) and a set of previously seen predicates - /// `past_eq_columns`, `predicate` produces a selectivity adjustment factor, which is the - /// multiplied selectivity of the most selective N - 1 predicate among `past_eq_columns` union - /// `predicate` divided by the selectivity of the `past_eq_columns`. + /// The selectivity of each connected component of N nodes is equal to the product of 1/ndistinct of + /// the N-1 nodes with the highest ndistinct values. However, we cannot simply add `predicate` to the + /// multi-equality graph and compute the selectivity of the entire connected component, because this + /// would be "double counting" a lot of nodes. The join(s) before this join would already have a selectivity + /// value. Thus, we compute the selectivity of the join(s) before this join (the first block of the + /// function) and then the selectivity of the connected component after this join. The quotient is the + /// "adjustment" factor. /// /// NOTE: This function modifies `past_eq_columns` by adding `predicate` to it. - fn get_join_selectivity_adjustment_from_redundant_predicates( + fn get_join_selectivity_adjustment_when_adding_to_multi_equality_graph( &self, predicate: &EqPredicate, past_eq_columns: &mut EqBaseTableColumnSets, @@ -432,7 +428,7 @@ impl< self.get_join_selectivity_from_most_selective_columns(cols) }; - // Compute division of MSTs as the selectivity. + // Compute the adjustment factor. new_pred_sel / children_pred_sel } @@ -471,10 +467,11 @@ impl< (left_col_ref, right_col_ref) { let predicate = EqPredicate::new(left.clone(), right.clone()); - return self.get_join_selectivity_adjustment_from_redundant_predicates( - &predicate, - &mut past_eq_columns, - ); + return self + .get_join_selectivity_adjustment_when_adding_to_multi_equality_graph( + &predicate, + &mut past_eq_columns, + ); } self.get_join_selectivity_from_on_col_ref_pair(left_col_ref, right_col_ref) From e5b2adf3978d9ae13d9868f63b6ee95299109ca6 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sun, 28 Apr 2024 13:41:09 -0400 Subject: [PATCH 12/28] inclusion principle comment --- optd-datafusion-repr/src/cost/base_cost/join.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/join.rs b/optd-datafusion-repr/src/cost/base_cost/join.rs index 0269334e..238313ea 100644 --- a/optd-datafusion-repr/src/cost/base_cost/join.rs +++ b/optd-datafusion-repr/src/cost/base_cost/join.rs @@ -379,12 +379,16 @@ impl< /// standalone connected components. /// /// The selectivity of each connected component of N nodes is equal to the product of 1/ndistinct of - /// the N-1 nodes with the highest ndistinct values. However, we cannot simply add `predicate` to the - /// multi-equality graph and compute the selectivity of the entire connected component, because this - /// would be "double counting" a lot of nodes. The join(s) before this join would already have a selectivity - /// value. Thus, we compute the selectivity of the join(s) before this join (the first block of the - /// function) and then the selectivity of the connected component after this join. The quotient is the - /// "adjustment" factor. + /// the N-1 nodes with the highest ndistinct values. You can see this if you imagine that all columns + /// being joined are unique columns and that they follow the inclusion principle (every element of the + /// smaller tables is present in the larger tables). When these assumptions are not true, the selectivity + /// may not be completely accurate. However, it is still fairly accurate. + /// + /// However, we cannot simply add `predicate` to the multi-equality graph and compute the selectivity of + /// the entire connected component, because this would be "double counting" a lot of nodes. The join(s) + /// before this join would already have a selectivity value. Thus, we compute the selectivity of the + /// join(s) before this join (the first block of the function) and then the selectivity of the connected + /// component after this join. The quotient is the "adjustment" factor. /// /// NOTE: This function modifies `past_eq_columns` by adding `predicate` to it. fn get_join_selectivity_adjustment_when_adding_to_multi_equality_graph( From f9dda6bd70859ce4a47adec21e9762c998a74113 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 16:46:42 -0400 Subject: [PATCH 13/28] poc working group card caching --- dev_scripts/which_queries_work.sh | 4 ++-- optd-perftest/src/datafusion_dbms.rs | 29 ++++++++++++++++++++++++++-- optd-perftest/src/tpch.rs | 4 +--- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/dev_scripts/which_queries_work.sh b/dev_scripts/which_queries_work.sh index bf9b4db8..3f5221ee 100755 --- a/dev_scripts/which_queries_work.sh +++ b/dev_scripts/which_queries_work.sh @@ -9,13 +9,13 @@ fi if [[ "$benchmark_name" == "job" ]]; then all_ids="1a,1b,1c,1d,2a,2b,2c,2d,3a,3b,3c,4a,4b,4c,5a,5b,5c,6a,6b,6c,6d,6e,6f,7a,7b,7c,8a,8b,8c,8d,9a,9b,9c,9d,10a,10b,10c,11a,11b,11c,11d,12a,12b,12c,13a,13b,13c,13d,14a,14b,14c,15a,15b,15c,15d,16a,16b,16c,16d,17a,17b,17c,17d,17e,17f,18a,18b,18c,19a,19b,19c,19d,20a,20b,20c,21a,21b,21c,22a,22b,22c,22d,23a,23b,23c,24a,24b,25a,25b,25c,26a,26b,26c,27a,27b,27c,28a,28b,28c,29a,29b,29c,30a,30b,30c,31a,31b,31c,32a,32b,33a,33b,33c" - vec_var_name="WORKING_QUERY_IDS" + vec_var_name="WORKING_JOB_QUERY_IDS" elif [[ "$benchmark_name" == "joblight" ]]; then all_ids="1a,1b,1c,1d,2a,3a,3b,3c,4a,4b,4c,5a,5b,5c,6a,6b,6c,6d,6e,7a,7b,7c,8a,8b,8c,9a,9b,10a,10b,10c,11a,11b,11c,12a,12b,12c,13a,14a,14b,14c,15a,15b,15c,16a,17a,17b,17c,18a,18b,18c,19a,19b,20a,20b,20c,21a,21b,22a,22b,22c,23a,23b,24a,24b,25a,26a,26b,27a,27b,28a" vec_var_name="WORKING_JOBLIGHT_QUERY_IDS" elif [[ "$benchmark_name" == "tpch" ]]; then all_ids="1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22" - vec_var_name="WORKING_JOB_QUERY_IDS" + vec_var_name="WORKING_QUERY_IDS" else echo >&2 $USAGE exit 1 diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 94371166..8b15c8ca 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -50,12 +50,13 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { ) -> anyhow::Result> { let base_table_stats = self.get_benchmark_stats(benchmark).await?; self.clear_state(Some(base_table_stats)).await?; + self.load_benchmark_data_no_stats(benchmark).await?; match benchmark { Benchmark::Tpch(tpch_kit_config) => { // Create the tables. This must be done after clear_state because that clears everything let tpch_kit = TpchKit::build(&self.workspace_dpath)?; - self.create_tpch_tables(&tpch_kit).await?; + // self.create_tpch_tables(&tpch_kit).await?; self.eval_tpch_estcards(tpch_kit_config).await } Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => { @@ -160,6 +161,8 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; + // DEBUG(phw2) + self.execute_query(&sql).await?; let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } @@ -198,6 +201,14 @@ impl DatafusionDBMS { .unwrap(); let explain_str = physical_plan_after_optd_lines.join("\n"); log::info!("{} {}", self.get_name(), explain_str); + + // DEBUG(phw2) + let physical_plan_after_optd_lines = explains + .iter() + .find(|explain| explain.first().unwrap() == "physical_plan") + .unwrap(); + let explain_str = physical_plan_after_optd_lines.join("\n"); + log::info!("{} {}", self.get_name(), explain_str); } async fn eval_query_estcard(&self, sql: &str) -> anyhow::Result { @@ -224,11 +235,17 @@ impl DatafusionDBMS { Ok(row_cnt) } + /// This is used to execute the query in order to load the true cardinalities back into optd + /// in order to use the adaptive cost model. + async fn execute_query(&self, sql: &str) -> anyhow::Result<()> { + Self::execute(&self.ctx, sql).await?; + Ok(()) + } + /// Load the data into DataFusion without building the stats used by optd. /// Unlike Postgres, where both data and stats are used by the same program, for this class the /// data is used by DataFusion while the stats are used by optd. That is why there are two /// separate functions to load them. - #[allow(dead_code)] async fn load_benchmark_data_no_stats(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { match benchmark { Benchmark::Tpch(tpch_kit_config) => self.load_tpch_data_no_stats(tpch_kit_config).await, @@ -236,6 +253,14 @@ impl DatafusionDBMS { } } + /// This function creates the tables for the benchmark without loading the data. + async fn create_benchmark_tables_no_data(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { + match benchmark { + Benchmark::Tpch(tpch_kit_config) => self.load_tpch_data_no_stats(tpch_kit_config).await, + _ => unimplemented!(), + } + } + /// Build the stats that optd's cost model uses. async fn get_benchmark_stats( &mut self, diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index 720fab3f..a0c7db4f 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -13,9 +13,7 @@ use std::path::{Path, PathBuf}; const TPCH_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/tpch-kit.git"; pub const TPCH_KIT_POSTGRES: &str = "POSTGRESQL"; const NUM_TPCH_QUERIES: usize = 22; -pub const WORKING_QUERY_IDS: &[&str] = &[ - "2", "3", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "17", "19", -]; +pub const WORKING_QUERY_IDS: &[&str] = &["2", "3", "5", "7", "8", "9", "10", "12", "13", "14", "17"]; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TpchKitConfig { From 90105b28ea29aac25e7677adf4400c33099cc7d9 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 16:52:23 -0400 Subject: [PATCH 14/28] skeleton for load data or create table --- optd-perftest/src/datafusion_dbms.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 8b15c8ca..d22ef4be 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -50,13 +50,19 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { ) -> anyhow::Result> { let base_table_stats = self.get_benchmark_stats(benchmark).await?; self.clear_state(Some(base_table_stats)).await?; - self.load_benchmark_data_no_stats(benchmark).await?; + + if true { + // We need to load the stats if we're doing adaptivity because that involves executing the queries in datafusion. + // This function also calls create_tables(). + self.load_benchmark_data_no_stats(benchmark).await?; + } else { + // We only create the tables so that the optimizer doesn't work. However, we can save on the time of loading + // the data if we're not doing adaptivity because we won't be executing queries. + self.create_benchmark_tables(benchmark).await?; + } match benchmark { Benchmark::Tpch(tpch_kit_config) => { - // Create the tables. This must be done after clear_state because that clears everything - let tpch_kit = TpchKit::build(&self.workspace_dpath)?; - // self.create_tpch_tables(&tpch_kit).await?; self.eval_tpch_estcards(tpch_kit_config).await } Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => { @@ -291,6 +297,17 @@ impl DatafusionDBMS { } } + async fn create_benchmark_tables(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { + match benchmark { + Benchmark::Tpch(_) => { + let tpch_kit = TpchKit::build(&self.workspace_dpath)?; + self.create_tpch_tables(&tpch_kit).await?; + }, + _ => unimplemented!() + }; + Ok(()) + } + async fn create_tpch_tables(&mut self, tpch_kit: &TpchKit) -> anyhow::Result<()> { let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?; let ddls = ddls From 9974637aba50b722cd2229d2108639f5f4a7a25d Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 16:55:03 -0400 Subject: [PATCH 15/28] wrote create tables for job --- optd-perftest/src/datafusion_dbms.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index d22ef4be..90ba4bf8 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -66,8 +66,6 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { self.eval_tpch_estcards(tpch_kit_config).await } Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => { - let job_kit = JobKit::build(&self.workspace_dpath)?; - self.create_job_tables(&job_kit).await?; self.eval_job_estcards(job_kit_config).await } } @@ -303,7 +301,10 @@ impl DatafusionDBMS { let tpch_kit = TpchKit::build(&self.workspace_dpath)?; self.create_tpch_tables(&tpch_kit).await?; }, - _ => unimplemented!() + Benchmark::Job(_) | Benchmark::Joblight(_) => { + let job_kit = JobKit::build(&self.workspace_dpath)?; + self.create_job_tables(&job_kit).await?; + } }; Ok(()) } From 2258af594d2be804b8a9222f5fbfbbae8501f200 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 29 Apr 2024 17:10:18 -0400 Subject: [PATCH 16/28] load job data no stats --- optd-perftest/src/datafusion_dbms.rs | 73 ++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 14 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 90ba4bf8..60de7f02 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -62,9 +62,7 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { } match benchmark { - Benchmark::Tpch(tpch_kit_config) => { - self.eval_tpch_estcards(tpch_kit_config).await - } + Benchmark::Tpch(tpch_kit_config) => self.eval_tpch_estcards(tpch_kit_config).await, Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => { self.eval_job_estcards(job_kit_config).await } @@ -253,15 +251,9 @@ impl DatafusionDBMS { async fn load_benchmark_data_no_stats(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { match benchmark { Benchmark::Tpch(tpch_kit_config) => self.load_tpch_data_no_stats(tpch_kit_config).await, - _ => unimplemented!(), - } - } - - /// This function creates the tables for the benchmark without loading the data. - async fn create_benchmark_tables_no_data(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { - match benchmark { - Benchmark::Tpch(tpch_kit_config) => self.load_tpch_data_no_stats(tpch_kit_config).await, - _ => unimplemented!(), + Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => { + self.load_job_data_no_stats(job_kit_config).await + } } } @@ -295,12 +287,13 @@ impl DatafusionDBMS { } } + /// This function creates the tables for the benchmark without loading the data. async fn create_benchmark_tables(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> { match benchmark { Benchmark::Tpch(_) => { let tpch_kit = TpchKit::build(&self.workspace_dpath)?; self.create_tpch_tables(&tpch_kit).await?; - }, + } Benchmark::Job(_) | Benchmark::Joblight(_) => { let job_kit = JobKit::build(&self.workspace_dpath)?; self.create_job_tables(&job_kit).await?; @@ -335,7 +328,6 @@ impl DatafusionDBMS { Ok(()) } - #[allow(dead_code)] async fn load_tpch_data_no_stats( &mut self, tpch_kit_config: &TpchKitConfig, @@ -389,6 +381,59 @@ impl DatafusionDBMS { Ok(()) } + // Load job data by creating an external table first and copying the data to real tables. + async fn load_job_data_no_stats( + &mut self, + job_kit_config: &JobKitConfig, + ) -> anyhow::Result<()> { + // Download the tables. + let job_kit = JobKit::build(&self.workspace_dpath)?; + job_kit.download_tables(&job_kit_config)?; + + // Create the tables. + self.create_job_tables(&job_kit).await?; + + // Load the data by creating an external table first and copying the data to real tables. + let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap(); + for tbl_fpath in tbl_fpath_iter { + let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); + Self::execute( + &self.ctx, + &format!( + "create external table {}_tbl stored as csv delimiter ',' location '{}';", + tbl_name, + tbl_fpath.to_str().unwrap() + ), + ) + .await?; + + // Get the number of columns of this table. + let schema = self + .ctx + .catalog("datafusion") + .unwrap() + .schema("public") + .unwrap() + .table(tbl_name) + .await + .unwrap() + .schema(); + let projection_list = (1..=schema.fields().len()) + .map(|i| format!("column_{}", i)) + .collect::>() + .join(", "); + Self::execute( + &self.ctx, + &format!( + "insert into {} select {} from {}_tbl;", + tbl_name, projection_list, tbl_name, + ), + ) + .await?; + } + Ok(()) + } + async fn get_tpch_stats( &mut self, tpch_kit_config: &TpchKitConfig, From 2d0b848aaaf7d8e71c28cc7dd67e4799184db878 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 17:14:23 -0400 Subject: [PATCH 17/28] added execute query --- optd-perftest/src/datafusion_dbms.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 90ba4bf8..0dde4482 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -165,9 +165,9 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; - // DEBUG(phw2) - self.execute_query(&sql).await?; let estcard = self.eval_query_estcard(&sql).await?; + // Execute the query to fill the true cardinality cache. + self.execute_query(&sql).await?; estcards.push(estcard); } @@ -191,6 +191,8 @@ impl DatafusionDBMS { ); let sql = fs::read_to_string(sql_fpath)?; let estcard = self.eval_query_estcard(&sql).await?; + // Execute the query to fill the true cardinality cache. + self.execute_query(&sql).await?; estcards.push(estcard); } From ef458defe6c9fc7afffca19ccfeeb1577e32eb2d Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 17:16:15 -0400 Subject: [PATCH 18/28] moved execute query --- optd-perftest/src/datafusion_dbms.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index fe4ee1cb..0e5c55a7 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -163,9 +163,9 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; - let estcard = self.eval_query_estcard(&sql).await?; // Execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } @@ -188,9 +188,9 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; - let estcard = self.eval_query_estcard(&sql).await?; // Execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } From ab20bc95d08e9133e8edb978ac96332e04e8945b Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 17:18:42 -0400 Subject: [PATCH 19/28] integrated adaptive option with code --- optd-perftest/src/datafusion_dbms.rs | 18 +++++++++++++----- optd-perftest/src/main.rs | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 122d70ea..d5da0487 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -52,7 +52,7 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { let base_table_stats = self.get_benchmark_stats(benchmark).await?; self.clear_state(Some(base_table_stats)).await?; - if true { + if self.adaptive { // We need to load the stats if we're doing adaptivity because that involves executing the queries in datafusion. // This function also calls create_tables(). self.load_benchmark_data_no_stats(benchmark).await?; @@ -167,8 +167,12 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; - // Execute the query to fill the true cardinality cache. - self.execute_query(&sql).await?; + + if self.adaptive { + // If we're in adaptive mode, execute the query to fill the true cardinality cache. + self.execute_query(&sql).await?; + } + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } @@ -192,8 +196,12 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; - // Execute the query to fill the true cardinality cache. - self.execute_query(&sql).await?; + + if self.adaptive { + // Execute the query to fill the true cardinality cache. + self.execute_query(&sql).await?; + } + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } diff --git a/optd-perftest/src/main.rs b/optd-perftest/src/main.rs index aa04f834..8337fbd2 100644 --- a/optd-perftest/src/main.rs +++ b/optd-perftest/src/main.rs @@ -60,8 +60,8 @@ enum Commands { rebuild_cached_optd_stats: bool, #[clap(long)] + #[clap(action)] #[clap(help = "Whether to enable adaptivity for optd")] - #[clap(default_value = "true")] adaptive: bool, #[clap(long)] From 0eed5f2b88d89ddca4f6bd50a8506192e399db34 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 17:22:56 -0400 Subject: [PATCH 20/28] added adaptive to which_queries_work.sh --- dev_scripts/which_queries_work.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev_scripts/which_queries_work.sh b/dev_scripts/which_queries_work.sh index 3f5221ee..5a6796d9 100755 --- a/dev_scripts/which_queries_work.sh +++ b/dev_scripts/which_queries_work.sh @@ -24,7 +24,8 @@ fi successful_ids=() IFS=',' for id in $all_ids; do - cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id &>/dev/null + # make sure to execute with --adaptive so that we actually run the query in datafusion + cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id --adaptive &>/dev/null if [ $? -eq 0 ]; then echo >&2 $id succeeded From 349b6b5dc8b8611540b253e311a46bc164eee3ad Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 17:26:02 -0400 Subject: [PATCH 21/28] removed debug from log_explain --- optd-perftest/src/datafusion_dbms.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index d5da0487..fc975eb4 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -201,7 +201,7 @@ impl DatafusionDBMS { // Execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; } - + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } @@ -217,14 +217,6 @@ impl DatafusionDBMS { .unwrap(); let explain_str = physical_plan_after_optd_lines.join("\n"); log::info!("{} {}", self.get_name(), explain_str); - - // DEBUG(phw2) - let physical_plan_after_optd_lines = explains - .iter() - .find(|explain| explain.first().unwrap() == "physical_plan") - .unwrap(); - let explain_str = physical_plan_after_optd_lines.join("\n"); - log::info!("{} {}", self.get_name(), explain_str); } async fn eval_query_estcard(&self, sql: &str) -> anyhow::Result { From 77504382f66d800e3ebb368525305715fb952c34 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 29 Apr 2024 17:49:36 -0400 Subject: [PATCH 22/28] specify schema when creating external table --- optd-perftest/src/datafusion_dbms.rs | 45 +++++++++++++++------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 60de7f02..6a2f0555 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, fs::{self, File}, path::{Path, PathBuf}, sync::Arc, @@ -20,6 +21,7 @@ use datafusion::{ execution::{ config::SessionConfig, context::{SessionContext, SessionState}, + options::CsvReadOptions, runtime_env::{RuntimeConfig, RuntimeEnv}, }, sql::{parser::DFParser, sqlparser::dialect::GenericDialect}, @@ -393,6 +395,26 @@ impl DatafusionDBMS { // Create the tables. self.create_job_tables(&job_kit).await?; + // Extract the schema string from the DDLs. + let tbl_to_schema_str = { + let mut tbl_to_schema_str = HashMap::new(); + let ddls = fs::read_to_string(&job_kit.schema_fpath)?; + let ddls = ddls + .split(';') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect::>(); + for ddl in ddls { + let paren_start_idx = ddl.find('(').unwrap(); + let table_name = ddl["CREATE TABLE".len()..paren_start_idx].trim(); + let schema_string = &ddl[paren_start_idx..]; + + // Insert into the hash map + tbl_to_schema_str.insert(table_name.to_string(), schema_string.to_string()); + } + tbl_to_schema_str + }; + // Load the data by creating an external table first and copying the data to real tables. let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap(); for tbl_fpath in tbl_fpath_iter { @@ -400,34 +422,17 @@ impl DatafusionDBMS { Self::execute( &self.ctx, &format!( - "create external table {}_tbl stored as csv delimiter ',' location '{}';", + "create external table {}_tbl {} stored as csv delimiter ',' location '{}';", tbl_name, + tbl_to_schema_str.get(tbl_name).unwrap(), tbl_fpath.to_str().unwrap() ), ) .await?; - // Get the number of columns of this table. - let schema = self - .ctx - .catalog("datafusion") - .unwrap() - .schema("public") - .unwrap() - .table(tbl_name) - .await - .unwrap() - .schema(); - let projection_list = (1..=schema.fields().len()) - .map(|i| format!("column_{}", i)) - .collect::>() - .join(", "); Self::execute( &self.ctx, - &format!( - "insert into {} select {} from {}_tbl;", - tbl_name, projection_list, tbl_name, - ), + &format!("insert into {} select * from {}_tbl;", tbl_name, tbl_name), ) .await?; } From 2fbfe9bbdbd5e7d750e9d029c29b5c24ec9af6fa Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 29 Apr 2024 17:57:59 -0400 Subject: [PATCH 23/28] use register_csv --- optd-perftest/src/datafusion_dbms.rs | 58 ++++++++++------------------ 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index c438a2da..60edfd71 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -174,7 +174,7 @@ impl DatafusionDBMS { // If we're in adaptive mode, execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; } - + let estcard = self.eval_query_estcard(&sql).await?; estcards.push(estcard); } @@ -401,46 +401,30 @@ impl DatafusionDBMS { // Create the tables. self.create_job_tables(&job_kit).await?; - // Extract the schema string from the DDLs. - let tbl_to_schema_str = { - let mut tbl_to_schema_str = HashMap::new(); - let ddls = fs::read_to_string(&job_kit.schema_fpath)?; - let ddls = ddls - .split(';') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect::>(); - for ddl in ddls { - let paren_start_idx = ddl.find('(').unwrap(); - let table_name = ddl["CREATE TABLE".len()..paren_start_idx].trim(); - let schema_string = &ddl[paren_start_idx..]; - - // Insert into the hash map - tbl_to_schema_str.insert(table_name.to_string(), schema_string.to_string()); - } - tbl_to_schema_str - }; - // Load the data by creating an external table first and copying the data to real tables. let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap(); for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); - Self::execute( - &self.ctx, - &format!( - "create external table {}_tbl {} stored as csv delimiter ',' location '{}';", - tbl_name, - tbl_to_schema_str.get(tbl_name).unwrap(), - tbl_fpath.to_str().unwrap() - ), - ) - .await?; - - Self::execute( - &self.ctx, - &format!("insert into {} select * from {}_tbl;", tbl_name, tbl_name), - ) - .await?; + let schema = self + .ctx + .catalog("datafusion") + .unwrap() + .schema("public") + .unwrap() + .table(tbl_name) + .await + .unwrap() + .schema(); + self.ctx + .register_csv( + &tbl_name, + tbl_fpath.to_str().unwrap(), + CsvReadOptions::new() + .schema(&schema) + .delimiter(b',') + .escape(b'\\'), + ) + .await?; } Ok(()) } From 4ce15c93675beda50fd442a2d109fa1b21c6b0bb Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 29 Apr 2024 17:58:14 -0400 Subject: [PATCH 24/28] misc --- optd-perftest/src/datafusion_dbms.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 60edfd71..2862cfd1 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, fs::{self, File}, path::{Path, PathBuf}, sync::Arc, From 3b8285338d2a22e8388b411e7e8fdc81a55f996b Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 18:16:53 -0400 Subject: [PATCH 25/28] now creating table in temporary schema --- optd-perftest/src/datafusion_dbms.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 2862cfd1..9be4d9a6 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -303,7 +303,7 @@ impl DatafusionDBMS { } Benchmark::Job(_) | Benchmark::Joblight(_) => { let job_kit = JobKit::build(&self.workspace_dpath)?; - self.create_job_tables(&job_kit).await?; + Self::create_job_tables(&self.ctx, &job_kit).await?; } }; Ok(()) @@ -322,7 +322,7 @@ impl DatafusionDBMS { Ok(()) } - async fn create_job_tables(&mut self, job_kit: &JobKit) -> anyhow::Result<()> { + async fn create_job_tables(ctx: &SessionContext, job_kit: &JobKit) -> anyhow::Result<()> { let ddls = fs::read_to_string(&job_kit.schema_fpath)?; let ddls = ddls .split(';') @@ -330,7 +330,7 @@ impl DatafusionDBMS { .filter(|s| !s.is_empty()) .collect::>(); for ddl in ddls { - Self::execute(&self.ctx, ddl).await?; + Self::execute(ctx, ddl).await?; } Ok(()) } @@ -388,24 +388,26 @@ impl DatafusionDBMS { Ok(()) } - // Load job data by creating an external table first and copying the data to real tables. + // Load job data from a .csv file. async fn load_job_data_no_stats( &mut self, job_kit_config: &JobKitConfig, ) -> anyhow::Result<()> { + let ctx = Self::new_session_ctx(None, self.adaptive).await?; + // Download the tables. let job_kit = JobKit::build(&self.workspace_dpath)?; job_kit.download_tables(&job_kit_config)?; // Create the tables. - self.create_job_tables(&job_kit).await?; + Self::create_job_tables(&ctx, &job_kit).await?; - // Load the data by creating an external table first and copying the data to real tables. + // Load each table using register_csv() let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap(); for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); - let schema = self - .ctx + let schema = + ctx .catalog("datafusion") .unwrap() .schema("public") From ca9ddc1e9d4ccbb12f09cc717f76544fcd95542a Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Mon, 29 Apr 2024 22:34:09 -0400 Subject: [PATCH 26/28] fixed job --- optd-perftest/src/datafusion_dbms.rs | 56 ++++++++++++++++++---------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 9be4d9a6..7dc838e7 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -5,7 +5,7 @@ use std::{ }; use crate::{ - benchmark::Benchmark, + benchmark::{self, Benchmark}, cardtest::CardtestRunnerDBMSHelper, job::{JobKit, JobKitConfig}, tpch::{TpchKit, TpchKitConfig}, @@ -37,9 +37,12 @@ pub struct DatafusionDBMS { workspace_dpath: PathBuf, rebuild_cached_stats: bool, adaptive: bool, - ctx: SessionContext, + ctx: Option, } +const WITH_LOGICAL_FOR_TPCH: bool = true; +const WITH_LOGICAL_FOR_JOB: bool = false; + #[async_trait] impl CardtestRunnerDBMSHelper for DatafusionDBMS { fn get_name(&self) -> &str { @@ -51,7 +54,8 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { benchmark: &Benchmark, ) -> anyhow::Result> { let base_table_stats = self.get_benchmark_stats(benchmark).await?; - self.clear_state(Some(base_table_stats)).await?; + // clear_state() is how we "load" the stats into datafusion + self.clear_state(Some(base_table_stats), &benchmark).await?; if self.adaptive { // We need to load the stats if we're doing adaptivity because that involves executing the queries in datafusion. @@ -82,7 +86,7 @@ impl DatafusionDBMS { workspace_dpath: workspace_dpath.as_ref().to_path_buf(), rebuild_cached_stats, adaptive, - ctx: Self::new_session_ctx(None, adaptive).await?, + ctx: None, }) } @@ -91,16 +95,26 @@ impl DatafusionDBMS { /// /// A more ideal way to generate statistics would be to use the `ANALYZE` /// command in SQL, but DataFusion does not support that yet. - async fn clear_state(&mut self, stats: Option) -> anyhow::Result<()> { - self.ctx = Self::new_session_ctx(stats, self.adaptive).await?; + async fn clear_state(&mut self, stats: Option, benchmark: &Benchmark) -> anyhow::Result<()> { + let with_logical = match benchmark { + Benchmark::Tpch(_) => WITH_LOGICAL_FOR_TPCH, + Benchmark::Job(_) | Benchmark::Joblight(_) => WITH_LOGICAL_FOR_JOB, + }; + self.ctx = Some(Self::new_session_ctx(stats, self.adaptive, with_logical).await?); Ok(()) } async fn new_session_ctx( stats: Option, adaptive: bool, + with_logical: bool, ) -> anyhow::Result { - let session_config = SessionConfig::from_env()?.with_information_schema(true); + let mut session_config = SessionConfig::from_env()?.with_information_schema(true); + + if !with_logical { + session_config.options_mut().optimizer.max_passes = 0; + } + let rn_config = RuntimeConfig::new(); let runtime_env = RuntimeEnv::new(rn_config.clone())?; let ctx = { @@ -220,11 +234,15 @@ impl DatafusionDBMS { log::info!("{} {}", self.get_name(), explain_str); } + fn get_ctx(&self) -> &SessionContext { + self.ctx.as_ref().unwrap() + } + async fn eval_query_estcard(&self, sql: &str) -> anyhow::Result { lazy_static! { static ref ROW_CNT_RE: Regex = Regex::new(r"row_cnt=(\d+\.\d+)").unwrap(); } - let explains = Self::execute(&self.ctx, &format!("explain verbose {}", sql)).await?; + let explains = Self::execute(self.get_ctx(), &format!("explain verbose {}", sql)).await?; self.log_explain(&explains); // Find first occurrence of row_cnt=... in the output. let row_cnt = explains @@ -247,7 +265,7 @@ impl DatafusionDBMS { /// This is used to execute the query in order to load the true cardinalities back into optd /// in order to use the adaptive cost model. async fn execute_query(&self, sql: &str) -> anyhow::Result<()> { - Self::execute(&self.ctx, sql).await?; + Self::execute(&self.get_ctx(), sql).await?; Ok(()) } @@ -264,7 +282,7 @@ impl DatafusionDBMS { } } - /// Build the stats that optd's cost model uses. + /// Build the stats that optd's cost model uses, or get the stats from the cache. async fn get_benchmark_stats( &mut self, benchmark: &Benchmark, @@ -303,7 +321,7 @@ impl DatafusionDBMS { } Benchmark::Job(_) | Benchmark::Joblight(_) => { let job_kit = JobKit::build(&self.workspace_dpath)?; - Self::create_job_tables(&self.ctx, &job_kit).await?; + Self::create_job_tables(&self.get_ctx(), &job_kit).await?; } }; Ok(()) @@ -317,7 +335,7 @@ impl DatafusionDBMS { .filter(|s| !s.is_empty()) .collect::>(); for ddl in ddls { - Self::execute(&self.ctx, ddl).await?; + Self::execute(&self.get_ctx(), ddl).await?; } Ok(()) } @@ -351,7 +369,7 @@ impl DatafusionDBMS { for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); Self::execute( - &self.ctx, + &self.get_ctx(), &format!( "create external table {}_tbl stored as csv delimiter '|' location '{}';", tbl_name, @@ -362,7 +380,7 @@ impl DatafusionDBMS { // Get the number of columns of this table. let schema = self - .ctx + .get_ctx() .catalog("datafusion") .unwrap() .schema("public") @@ -376,7 +394,7 @@ impl DatafusionDBMS { .collect::>() .join(", "); Self::execute( - &self.ctx, + &self.get_ctx(), &format!( "insert into {} select {} from {}_tbl;", tbl_name, projection_list, tbl_name, @@ -393,7 +411,7 @@ impl DatafusionDBMS { &mut self, job_kit_config: &JobKitConfig, ) -> anyhow::Result<()> { - let ctx = Self::new_session_ctx(None, self.adaptive).await?; + let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?; // Download the tables. let job_kit = JobKit::build(&self.workspace_dpath)?; @@ -416,7 +434,7 @@ impl DatafusionDBMS { .await .unwrap() .schema(); - self.ctx + self.get_ctx() .register_csv( &tbl_name, tbl_fpath.to_str().unwrap(), @@ -439,7 +457,7 @@ impl DatafusionDBMS { tpch_kit.gen_tables(tpch_kit_config)?; // To get the schema of each table. - let ctx = Self::new_session_ctx(None, self.adaptive).await?; + let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_TPCH).await?; let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?; let ddls = ddls .split(';') @@ -499,7 +517,7 @@ impl DatafusionDBMS { job_kit.download_tables(job_kit_config)?; // To get the schema of each table. - let ctx = Self::new_session_ctx(None, self.adaptive).await?; + let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?; let ddls = fs::read_to_string(&job_kit.schema_fpath)?; let ddls = ddls .split(';') From 37197afa5eac76cdcb2bd67afb583436dd7c0ad6 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Tue, 30 Apr 2024 09:20:46 -0400 Subject: [PATCH 27/28] moved execute to after estcard --- optd-perftest/src/datafusion_dbms.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 7dc838e7..c72d3ef8 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -5,7 +5,7 @@ use std::{ }; use crate::{ - benchmark::{self, Benchmark}, + benchmark::Benchmark, cardtest::CardtestRunnerDBMSHelper, job::{JobKit, JobKitConfig}, tpch::{TpchKit, TpchKitConfig}, @@ -182,14 +182,13 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; + let estcard = self.eval_query_estcard(&sql).await?; + estcards.push(estcard); if self.adaptive { // If we're in adaptive mode, execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; } - - let estcard = self.eval_query_estcard(&sql).await?; - estcards.push(estcard); } Ok(estcards) @@ -211,14 +210,13 @@ impl DatafusionDBMS { query_id ); let sql = fs::read_to_string(sql_fpath)?; + let estcard = self.eval_query_estcard(&sql).await?; + estcards.push(estcard); if self.adaptive { // Execute the query to fill the true cardinality cache. self.execute_query(&sql).await?; } - - let estcard = self.eval_query_estcard(&sql).await?; - estcards.push(estcard); } Ok(estcards) From b7dd7e6aad2d504597a22b75c4da967b19bd6acf Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Tue, 30 Apr 2024 10:11:24 -0400 Subject: [PATCH 28/28] fmt and clippy --- optd-perftest/src/datafusion_dbms.rs | 25 ++++++++++++++----------- optd-perftest/src/tpch.rs | 3 ++- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index c72d3ef8..88853b22 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -55,7 +55,7 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS { ) -> anyhow::Result> { let base_table_stats = self.get_benchmark_stats(benchmark).await?; // clear_state() is how we "load" the stats into datafusion - self.clear_state(Some(base_table_stats), &benchmark).await?; + self.clear_state(Some(base_table_stats), benchmark).await?; if self.adaptive { // We need to load the stats if we're doing adaptivity because that involves executing the queries in datafusion. @@ -95,7 +95,11 @@ impl DatafusionDBMS { /// /// A more ideal way to generate statistics would be to use the `ANALYZE` /// command in SQL, but DataFusion does not support that yet. - async fn clear_state(&mut self, stats: Option, benchmark: &Benchmark) -> anyhow::Result<()> { + async fn clear_state( + &mut self, + stats: Option, + benchmark: &Benchmark, + ) -> anyhow::Result<()> { let with_logical = match benchmark { Benchmark::Tpch(_) => WITH_LOGICAL_FOR_TPCH, Benchmark::Job(_) | Benchmark::Joblight(_) => WITH_LOGICAL_FOR_JOB, @@ -263,7 +267,7 @@ impl DatafusionDBMS { /// This is used to execute the query in order to load the true cardinalities back into optd /// in order to use the adaptive cost model. async fn execute_query(&self, sql: &str) -> anyhow::Result<()> { - Self::execute(&self.get_ctx(), sql).await?; + Self::execute(self.get_ctx(), sql).await?; Ok(()) } @@ -319,7 +323,7 @@ impl DatafusionDBMS { } Benchmark::Job(_) | Benchmark::Joblight(_) => { let job_kit = JobKit::build(&self.workspace_dpath)?; - Self::create_job_tables(&self.get_ctx(), &job_kit).await?; + Self::create_job_tables(self.get_ctx(), &job_kit).await?; } }; Ok(()) @@ -333,7 +337,7 @@ impl DatafusionDBMS { .filter(|s| !s.is_empty()) .collect::>(); for ddl in ddls { - Self::execute(&self.get_ctx(), ddl).await?; + Self::execute(self.get_ctx(), ddl).await?; } Ok(()) } @@ -367,7 +371,7 @@ impl DatafusionDBMS { for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); Self::execute( - &self.get_ctx(), + self.get_ctx(), &format!( "create external table {}_tbl stored as csv delimiter '|' location '{}';", tbl_name, @@ -392,7 +396,7 @@ impl DatafusionDBMS { .collect::>() .join(", "); Self::execute( - &self.get_ctx(), + self.get_ctx(), &format!( "insert into {} select {} from {}_tbl;", tbl_name, projection_list, tbl_name, @@ -413,7 +417,7 @@ impl DatafusionDBMS { // Download the tables. let job_kit = JobKit::build(&self.workspace_dpath)?; - job_kit.download_tables(&job_kit_config)?; + job_kit.download_tables(job_kit_config)?; // Create the tables. Self::create_job_tables(&ctx, &job_kit).await?; @@ -422,8 +426,7 @@ impl DatafusionDBMS { let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap(); for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); - let schema = - ctx + let schema = ctx .catalog("datafusion") .unwrap() .schema("public") @@ -434,7 +437,7 @@ impl DatafusionDBMS { .schema(); self.get_ctx() .register_csv( - &tbl_name, + tbl_name, tbl_fpath.to_str().unwrap(), CsvReadOptions::new() .schema(&schema) diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index a0c7db4f..9260a194 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -13,7 +13,8 @@ use std::path::{Path, PathBuf}; const TPCH_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/tpch-kit.git"; pub const TPCH_KIT_POSTGRES: &str = "POSTGRESQL"; const NUM_TPCH_QUERIES: usize = 22; -pub const WORKING_QUERY_IDS: &[&str] = &["2", "3", "5", "7", "8", "9", "10", "12", "13", "14", "17"]; +pub const WORKING_QUERY_IDS: &[&str] = + &["2", "3", "5", "7", "8", "9", "10", "12", "13", "14", "17"]; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TpchKitConfig {