Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adaptive cost model functionality working #174

Merged
merged 31 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3c7aba5
added actual set of working job light queries
wangpatrick57 Apr 27, 2024
c2c99d8
fmt and clip
wangpatrick57 Apr 27, 2024
ec81ca6
changed precision of mg and hll, getting us from 35 -> 40 queries ahead
wangpatrick57 Apr 27, 2024
af00dd6
test_inner_redundant_predicate -> test_add_edge_to_multi_equal_graph_…
wangpatrick57 Apr 27, 2024
28a3d26
wrote test_three_table_join_for_join1_on_cond test. not passing yet
wangpatrick57 Apr 27, 2024
9104a81
turned get_join_selectivity_from_most_selective_predicates into get_j…
wangpatrick57 Apr 27, 2024
91791f7
fixed bug of adding new predicate that touches one col of existing pred
wangpatrick57 Apr 27, 2024
d9a8f29
generalized three table join test to allow for arbitrary initial join…
wangpatrick57 Apr 27, 2024
42a9162
added test_join_which_connects_two_components_together
wangpatrick57 Apr 27, 2024
0474fcd
fmt and clip
wangpatrick57 Apr 28, 2024
4fc54c1
changed comment and name of what is now get_join_selectivity_adjustme…
wangpatrick57 Apr 28, 2024
e5b2adf
inclusion principle comment
wangpatrick57 Apr 28, 2024
f9dda6b
poc working group card caching
wangpatrick57 Apr 29, 2024
90105b2
skeleton for load data or create table
wangpatrick57 Apr 29, 2024
9974637
wrote create tables for job
wangpatrick57 Apr 29, 2024
2258af5
load job data no stats
Gun9niR Apr 29, 2024
2d0b848
added execute query
wangpatrick57 Apr 29, 2024
f96224b
Merge remote-tracking branch 'refs/remotes/origin/phw0/df-execute' in…
wangpatrick57 Apr 29, 2024
ef458de
moved execute query
wangpatrick57 Apr 29, 2024
ececf10
Merge branch 'main' into phw2/df-execute
wangpatrick57 Apr 29, 2024
ab20bc9
integrated adaptive option with code
wangpatrick57 Apr 29, 2024
0eed5f2
added adaptive to which_queries_work.sh
wangpatrick57 Apr 29, 2024
349b6b5
removed debug from log_explain
wangpatrick57 Apr 29, 2024
7750438
specify schema when creating external table
Gun9niR Apr 29, 2024
11e00fb
Merge branch 'phw2/df-execute' of https://github.com/cmu-db/optd into…
Gun9niR Apr 29, 2024
2fbfe9b
use register_csv
Gun9niR Apr 29, 2024
4ce15c9
misc
Gun9niR Apr 29, 2024
3b82853
now creating table in temporary schema
wangpatrick57 Apr 29, 2024
ca9ddc1
fixed job
wangpatrick57 Apr 30, 2024
37197af
moved execute to after estcard
wangpatrick57 Apr 30, 2024
b7dd7e6
fmt and clippy
wangpatrick57 Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dev_scripts/which_queries_work.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ fi

if [[ "$benchmark_name" == "job" ]]; then
all_ids="1a,1b,1c,1d,2a,2b,2c,2d,3a,3b,3c,4a,4b,4c,5a,5b,5c,6a,6b,6c,6d,6e,6f,7a,7b,7c,8a,8b,8c,8d,9a,9b,9c,9d,10a,10b,10c,11a,11b,11c,11d,12a,12b,12c,13a,13b,13c,13d,14a,14b,14c,15a,15b,15c,15d,16a,16b,16c,16d,17a,17b,17c,17d,17e,17f,18a,18b,18c,19a,19b,19c,19d,20a,20b,20c,21a,21b,21c,22a,22b,22c,22d,23a,23b,23c,24a,24b,25a,25b,25c,26a,26b,26c,27a,27b,27c,28a,28b,28c,29a,29b,29c,30a,30b,30c,31a,31b,31c,32a,32b,33a,33b,33c"
vec_var_name="WORKING_QUERY_IDS"
vec_var_name="WORKING_JOB_QUERY_IDS"
elif [[ "$benchmark_name" == "joblight" ]]; then
all_ids="1a,1b,1c,1d,2a,3a,3b,3c,4a,4b,4c,5a,5b,5c,6a,6b,6c,6d,6e,7a,7b,7c,8a,8b,8c,9a,9b,10a,10b,10c,11a,11b,11c,12a,12b,12c,13a,14a,14b,14c,15a,15b,15c,16a,17a,17b,17c,18a,18b,18c,19a,19b,20a,20b,20c,21a,21b,22a,22b,22c,23a,23b,24a,24b,25a,26a,26b,27a,27b,28a"
vec_var_name="WORKING_JOBLIGHT_QUERY_IDS"
elif [[ "$benchmark_name" == "tpch" ]]; then
all_ids="1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22"
vec_var_name="WORKING_JOB_QUERY_IDS"
vec_var_name="WORKING_QUERY_IDS"
else
echo >&2 $USAGE
exit 1
Expand All @@ -24,7 +24,8 @@ fi
successful_ids=()
IFS=','
for id in $all_ids; do
cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id &>/dev/null
# make sure to execute with --adaptive so that we actually run the query in datafusion
cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id --adaptive &>/dev/null

if [ $? -eq 0 ]; then
echo >&2 $id succeeded
Expand Down
153 changes: 126 additions & 27 deletions optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use datafusion::{
execution::{
config::SessionConfig,
context::{SessionContext, SessionState},
options::CsvReadOptions,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
sql::{parser::DFParser, sqlparser::dialect::GenericDialect},
Expand All @@ -36,9 +37,12 @@ pub struct DatafusionDBMS {
workspace_dpath: PathBuf,
rebuild_cached_stats: bool,
adaptive: bool,
ctx: SessionContext,
ctx: Option<SessionContext>,
}

const WITH_LOGICAL_FOR_TPCH: bool = true;
const WITH_LOGICAL_FOR_JOB: bool = false;

#[async_trait]
impl CardtestRunnerDBMSHelper for DatafusionDBMS {
fn get_name(&self) -> &str {
Expand All @@ -50,18 +54,22 @@ impl CardtestRunnerDBMSHelper for DatafusionDBMS {
benchmark: &Benchmark,
) -> anyhow::Result<Vec<usize>> {
let base_table_stats = self.get_benchmark_stats(benchmark).await?;
self.clear_state(Some(base_table_stats)).await?;
// clear_state() is how we "load" the stats into datafusion
self.clear_state(Some(base_table_stats), benchmark).await?;

if self.adaptive {
// We need to load the stats if we're doing adaptivity because that involves executing the queries in datafusion.
// This function also calls create_tables().
self.load_benchmark_data_no_stats(benchmark).await?;
} else {
// We only create the tables so that the optimizer doesn't work. However, we can save on the time of loading
// the data if we're not doing adaptivity because we won't be executing queries.
self.create_benchmark_tables(benchmark).await?;
}

match benchmark {
Benchmark::Tpch(tpch_kit_config) => {
// Create the tables. This must be done after clear_state because that clears everything
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
self.create_tpch_tables(&tpch_kit).await?;
self.eval_tpch_estcards(tpch_kit_config).await
}
Benchmark::Tpch(tpch_kit_config) => self.eval_tpch_estcards(tpch_kit_config).await,
Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => {
let job_kit = JobKit::build(&self.workspace_dpath)?;
self.create_job_tables(&job_kit).await?;
self.eval_job_estcards(job_kit_config).await
}
}
Expand All @@ -78,7 +86,7 @@ impl DatafusionDBMS {
workspace_dpath: workspace_dpath.as_ref().to_path_buf(),
rebuild_cached_stats,
adaptive,
ctx: Self::new_session_ctx(None, adaptive).await?,
ctx: None,
})
}

Expand All @@ -87,16 +95,30 @@ impl DatafusionDBMS {
///
/// A more ideal way to generate statistics would be to use the `ANALYZE`
/// command in SQL, but DataFusion does not support that yet.
async fn clear_state(&mut self, stats: Option<DataFusionBaseTableStats>) -> anyhow::Result<()> {
self.ctx = Self::new_session_ctx(stats, self.adaptive).await?;
async fn clear_state(
&mut self,
stats: Option<DataFusionBaseTableStats>,
benchmark: &Benchmark,
) -> anyhow::Result<()> {
let with_logical = match benchmark {
Benchmark::Tpch(_) => WITH_LOGICAL_FOR_TPCH,
Benchmark::Job(_) | Benchmark::Joblight(_) => WITH_LOGICAL_FOR_JOB,
};
self.ctx = Some(Self::new_session_ctx(stats, self.adaptive, with_logical).await?);
Ok(())
}

async fn new_session_ctx(
stats: Option<DataFusionBaseTableStats>,
adaptive: bool,
with_logical: bool,
) -> anyhow::Result<SessionContext> {
let session_config = SessionConfig::from_env()?.with_information_schema(true);
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);

if !with_logical {
session_config.options_mut().optimizer.max_passes = 0;
}

let rn_config = RuntimeConfig::new();
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
let ctx = {
Expand Down Expand Up @@ -166,6 +188,11 @@ impl DatafusionDBMS {
let sql = fs::read_to_string(sql_fpath)?;
let estcard = self.eval_query_estcard(&sql).await?;
estcards.push(estcard);

if self.adaptive {
// If we're in adaptive mode, execute the query to fill the true cardinality cache.
self.execute_query(&sql).await?;
}
}

Ok(estcards)
Expand All @@ -189,6 +216,11 @@ impl DatafusionDBMS {
let sql = fs::read_to_string(sql_fpath)?;
let estcard = self.eval_query_estcard(&sql).await?;
estcards.push(estcard);

if self.adaptive {
// Execute the query to fill the true cardinality cache.
self.execute_query(&sql).await?;
}
}

Ok(estcards)
Expand All @@ -204,11 +236,15 @@ impl DatafusionDBMS {
log::info!("{} {}", self.get_name(), explain_str);
}

fn get_ctx(&self) -> &SessionContext {
self.ctx.as_ref().unwrap()
}

async fn eval_query_estcard(&self, sql: &str) -> anyhow::Result<usize> {
lazy_static! {
static ref ROW_CNT_RE: Regex = Regex::new(r"row_cnt=(\d+\.\d+)").unwrap();
}
let explains = Self::execute(&self.ctx, &format!("explain verbose {}", sql)).await?;
let explains = Self::execute(self.get_ctx(), &format!("explain verbose {}", sql)).await?;
self.log_explain(&explains);
// Find first occurrence of row_cnt=... in the output.
let row_cnt = explains
Expand All @@ -228,19 +264,27 @@ impl DatafusionDBMS {
Ok(row_cnt)
}

/// This is used to execute the query in order to load the true cardinalities back into optd
/// in order to use the adaptive cost model.
async fn execute_query(&self, sql: &str) -> anyhow::Result<()> {
Self::execute(self.get_ctx(), sql).await?;
Ok(())
}

/// Load the data into DataFusion without building the stats used by optd.
/// Unlike Postgres, where both data and stats are used by the same program, for this class the
/// data is used by DataFusion while the stats are used by optd. That is why there are two
/// separate functions to load them.
#[allow(dead_code)]
async fn load_benchmark_data_no_stats(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> {
match benchmark {
Benchmark::Tpch(tpch_kit_config) => self.load_tpch_data_no_stats(tpch_kit_config).await,
_ => unimplemented!(),
Benchmark::Job(job_kit_config) | Benchmark::Joblight(job_kit_config) => {
self.load_job_data_no_stats(job_kit_config).await
}
}
}

/// Build the stats that optd's cost model uses.
/// Build the stats that optd's cost model uses, or get the stats from the cache.
async fn get_benchmark_stats(
&mut self,
benchmark: &Benchmark,
Expand Down Expand Up @@ -270,6 +314,21 @@ impl DatafusionDBMS {
}
}

/// This function creates the tables for the benchmark without loading the data.
async fn create_benchmark_tables(&mut self, benchmark: &Benchmark) -> anyhow::Result<()> {
match benchmark {
Benchmark::Tpch(_) => {
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
self.create_tpch_tables(&tpch_kit).await?;
}
Benchmark::Job(_) | Benchmark::Joblight(_) => {
let job_kit = JobKit::build(&self.workspace_dpath)?;
Self::create_job_tables(self.get_ctx(), &job_kit).await?;
}
};
Ok(())
}

async fn create_tpch_tables(&mut self, tpch_kit: &TpchKit) -> anyhow::Result<()> {
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
Expand All @@ -278,25 +337,24 @@ impl DatafusionDBMS {
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(&self.ctx, ddl).await?;
Self::execute(self.get_ctx(), ddl).await?;
}
Ok(())
}

async fn create_job_tables(&mut self, job_kit: &JobKit) -> anyhow::Result<()> {
async fn create_job_tables(ctx: &SessionContext, job_kit: &JobKit) -> anyhow::Result<()> {
let ddls = fs::read_to_string(&job_kit.schema_fpath)?;
let ddls = ddls
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(&self.ctx, ddl).await?;
Self::execute(ctx, ddl).await?;
}
Ok(())
}

#[allow(dead_code)]
async fn load_tpch_data_no_stats(
&mut self,
tpch_kit_config: &TpchKitConfig,
Expand All @@ -313,7 +371,7 @@ impl DatafusionDBMS {
for tbl_fpath in tbl_fpath_iter {
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
Self::execute(
&self.ctx,
self.get_ctx(),
&format!(
"create external table {}_tbl stored as csv delimiter '|' location '{}';",
tbl_name,
Expand All @@ -324,7 +382,7 @@ impl DatafusionDBMS {

// Get the number of columns of this table.
let schema = self
.ctx
.get_ctx()
.catalog("datafusion")
.unwrap()
.schema("public")
Expand All @@ -338,7 +396,7 @@ impl DatafusionDBMS {
.collect::<Vec<_>>()
.join(", ");
Self::execute(
&self.ctx,
self.get_ctx(),
&format!(
"insert into {} select {} from {}_tbl;",
tbl_name, projection_list, tbl_name,
Expand All @@ -350,6 +408,47 @@ impl DatafusionDBMS {
Ok(())
}

// Load job data from a .csv file.
async fn load_job_data_no_stats(
&mut self,
job_kit_config: &JobKitConfig,
) -> anyhow::Result<()> {
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;

// Download the tables.
let job_kit = JobKit::build(&self.workspace_dpath)?;
job_kit.download_tables(job_kit_config)?;

// Create the tables.
Self::create_job_tables(&ctx, &job_kit).await?;

// Load each table using register_csv()
let tbl_fpath_iter = job_kit.get_tbl_fpath_iter().unwrap();
for tbl_fpath in tbl_fpath_iter {
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
let schema = ctx
.catalog("datafusion")
.unwrap()
.schema("public")
.unwrap()
.table(tbl_name)
.await
.unwrap()
.schema();
self.get_ctx()
.register_csv(
tbl_name,
tbl_fpath.to_str().unwrap(),
CsvReadOptions::new()
.schema(&schema)
.delimiter(b',')
.escape(b'\\'),
)
.await?;
}
Ok(())
}

async fn get_tpch_stats(
&mut self,
tpch_kit_config: &TpchKitConfig,
Expand All @@ -359,7 +458,7 @@ impl DatafusionDBMS {
tpch_kit.gen_tables(tpch_kit_config)?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None, self.adaptive).await?;
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_TPCH).await?;
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
Expand Down Expand Up @@ -419,7 +518,7 @@ impl DatafusionDBMS {
job_kit.download_tables(job_kit_config)?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None, self.adaptive).await?;
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;
let ddls = fs::read_to_string(&job_kit.schema_fpath)?;
let ddls = ddls
.split(';')
Expand Down
2 changes: 1 addition & 1 deletion optd-perftest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ enum Commands {
rebuild_cached_optd_stats: bool,

#[clap(long)]
#[clap(action)]
#[clap(help = "Whether to enable adaptivity for optd")]
#[clap(default_value = "true")]
adaptive: bool,

#[clap(long)]
Expand Down
5 changes: 2 additions & 3 deletions optd-perftest/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use std::path::{Path, PathBuf};
const TPCH_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/tpch-kit.git";
pub const TPCH_KIT_POSTGRES: &str = "POSTGRESQL";
const NUM_TPCH_QUERIES: usize = 22;
pub const WORKING_QUERY_IDS: &[&str] = &[
"2", "3", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "17", "19",
];
pub const WORKING_QUERY_IDS: &[&str] =
&["2", "3", "5", "7", "8", "9", "10", "12", "13", "14", "17"];

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TpchKitConfig {
Expand Down
Loading