Skip to content

Commit

Permalink
fix: merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
izveigor committed Aug 4, 2023
2 parents 1d899bf + 2354758 commit 9f20ef5
Show file tree
Hide file tree
Showing 118 changed files with 3,114 additions and 1,499 deletions.
12 changes: 6 additions & 6 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
Expand All @@ -319,23 +319,23 @@ run_tpch_mem() {
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 10 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" -m --format parquet -o ${RESULTS_FILE}
}

# Runs the parquet filter benchmark
run_parquet() {
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running parquet filter benchmark..."
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- filter --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}

# Runs the sort benchmark
run_sort() {
RESULTS_FILE="${RESULTS_DIR}/sort.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort benchmark..."
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 10 -o ${RESULTS_FILE}
$CARGO_COMMAND --bin parquet -- sort --path "${DATA_DIR}" --scale-factor 1.0 --iterations 5 -o ${RESULTS_FILE}
}


Expand Down Expand Up @@ -389,15 +389,15 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with a single large parquet file
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand Down
10 changes: 3 additions & 7 deletions benchmarks/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use arrow::datatypes::SchemaBuilder;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
common::plan_err,
error::{DataFusionError, Result},
};
use std::fs;

mod run;
pub use run::RunOpt;

Expand Down Expand Up @@ -158,13 +158,9 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
Err(e) => errors.push(format!("{filename}: {e}")),
};
}
Err(DataFusionError::Plan(format!(
"invalid query. Could not find query: {errors:?}"
)))
plan_err!("invalid query. Could not find query: {:?}", errors)
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),
))
plan_err!("invalid query. Expected value between 1 and 22")
}
}

Expand Down
39 changes: 25 additions & 14 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
},
print_options::PrintOptions,
};
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
datasource::listing::ListingTableUrl,
error::{DataFusionError, Result},
Expand Down Expand Up @@ -192,18 +193,29 @@ async fn exec_and_print(
let now = Instant::now();

let sql = unescape_input(&sql)?;
let plan = ctx.state().create_logical_plan(&sql).await?;
let df = match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => {
create_external_table(ctx, cmd).await?;
ctx.execute_logical_plan(plan).await?
}
_ => ctx.execute_logical_plan(plan).await?,
};

let results = df.collect().await?;
print_options.print_batches(&results, now)?;
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
DataFusionError::Plan(format!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
))
})?;
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = ctx.state().statement_to_plan(statement).await?;
let df = match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => {
create_external_table(ctx, cmd).await?;
ctx.execute_logical_plan(plan).await?
}
_ => ctx.execute_logical_plan(plan).await?,
};

let results = df.collect().await?;
print_options.print_batches(&results, now)?;
}
Ok(())
}

Expand Down Expand Up @@ -251,6 +263,7 @@ async fn create_external_table(
#[cfg(test)]
mod tests {
use super::*;
use datafusion::common::plan_err;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand All @@ -259,9 +272,7 @@ mod tests {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
create_external_table(&ctx, cmd).await?;
} else {
return Err(DataFusionError::Plan(
"LogicalPlan is not a CreateExternalTable".to_string(),
));
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

ctx.runtime_env()
Expand Down
3 changes: 0 additions & 3 deletions datafusion-cli/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ impl CliHelper {
Ok(statements) if statements.is_empty() => Ok(ValidationResult::Invalid(
Some(" 🤔 You entered an empty statement".to_string()),
)),
Ok(statements) if statements.len() > 1 => Ok(ValidationResult::Invalid(
Some(" 🤔 You entered more than one statement".to_string()),
)),
Ok(_statements) => Ok(ValidationResult::Valid(None)),
Err(err) => Ok(ValidationResult::Invalid(Some(format!(
" 🤔 Invalid statement: {err}",
Expand Down
16 changes: 5 additions & 11 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ fn get_bucket_name(url: &Url) -> Result<&str> {

#[cfg(test)]
mod tests {
use super::*;
use datafusion::common::plan_err;
use datafusion::{
datasource::listing::ListingTableUrl,
logical_expr::{DdlStatement, LogicalPlan},
prelude::SessionContext,
};
use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey};

use super::*;

#[tokio::test]
async fn s3_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
Expand Down Expand Up @@ -195,9 +195,7 @@ mod tests {
assert_eq!(value, builder.get_config_value(&key).unwrap());
}
} else {
return Err(DataFusionError::Plan(
"LogicalPlan is not a CreateExternalTable".to_string(),
));
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

Ok(())
Expand Down Expand Up @@ -228,9 +226,7 @@ mod tests {
assert_eq!(value, builder.get_config_value(&key).unwrap());
}
} else {
return Err(DataFusionError::Plan(
"LogicalPlan is not a CreateExternalTable".to_string(),
));
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

Ok(())
Expand Down Expand Up @@ -265,9 +261,7 @@ mod tests {
assert_eq!(value, builder.get_config_value(&key).unwrap());
}
} else {
return Err(DataFusionError::Plan(
"LogicalPlan is not a CreateExternalTable".to_string(),
));
return plan_err!("LogicalPlan is not a CreateExternalTable");
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct PrintOptions {

fn print_timing_info(row_count: usize, now: Instant) {
println!(
"{} {} in set. Query took {:.3} seconds.",
"{} {} in set. Query took {:.3} seconds.\n",
row_count,
if row_count == 1 { "row" } else { "rows" },
now.elapsed().as_secs_f64()
Expand Down
4 changes: 4 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ fn init() {
["--command", "select 1", "--format", "json", "-q"],
"[{\"Int64(1)\":1}]\n"
)]
#[case::exec_multiple_statements(
["--command", "select 1; select 2;", "--format", "json", "-q"],
"[{\"Int64(1)\":1}]\n[{\"Int64(2)\":2}]\n"
)]
#[case::exec_from_files(
["--file", "tests/data/sql.txt", "--format", "json", "-q"],
"[{\"Int64(1)\":1}]\n"
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
};
Expand Down Expand Up @@ -200,7 +200,7 @@ impl ContextProvider for MyContextProvider {
])),
}))
} else {
Err(DataFusionError::Plan("table not found".to_string()))
plan_err!("table not found")
}
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::datasource::file_format::options::CsvReadOptions;

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::{plan_err, DataFusionError, ScalarValue};
use datafusion_expr::{
PartitionEvaluator, Signature, Volatility, WindowFrame, WindowUDF,
};
Expand Down Expand Up @@ -140,11 +140,11 @@ fn smooth_it() -> WindowUDF {
/// arguments of `arg_types`.
fn return_type(arg_types: &[DataType]) -> Result<Arc<DataType>> {
if arg_types.len() != 1 {
return Err(DataFusionError::Plan(format!(
return plan_err!(
"my_udwf expects 1 argument, got {}: {:?}",
arg_types.len(),
arg_types
)));
);
}
Ok(Arc::new(arg_types[0].clone()))
}
Expand Down
Loading

0 comments on commit 9f20ef5

Please sign in to comment.