Skip to content

Commit

Permalink
feat: integrate stats in to optd (#117)
Browse files Browse the repository at this point in the history
Generate the statistics in perftest and put them into `BaseCostModel` in
`DatafusionOptimizer`. Below is the comparison before & after stats are
added. You can check `PhysicalScan`, where the cost has changed.

The final cardinality remains the same because when stats on a column is
missing, we use a very small magic number `INVALID_SELECTIVITY` (0.001)
that just sets cardinality to 1.

### Todos in Future PRs

- Support generating stats on `Utf8`.
- Set a better magic number.
- Generate MCV.

### Before

```
plan space size budget used, not applying logical rules any more. current plan space: 1094
explain: PhysicalSort
├── exprs:SortOrder { order: Desc }
│   └── #1
├── cost: weighted=185.17,row_cnt=1.00,compute=179.17,io=6.00
└── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=182.12,row_cnt=1.00,compute=176.12,io=6.00 }
    └── PhysicalAgg
        ├── aggrs:Agg(Sum)
        │   └── Mul
        │       ├── #0
        │       └── Sub
        │           ├── 1
        │           └── #1
        ├── groups: [ #2 ]
        ├── cost: weighted=182.02,row_cnt=1.00,compute=176.02,io=6.00
        └── PhysicalProjection { exprs: [ #0, #1, #2 ], cost: weighted=64.90,row_cnt=1.00,compute=58.90,io=6.00 }
            └── PhysicalProjection { exprs: [ #0, #1, #4, #5, #6 ], cost: weighted=64.76,row_cnt=1.00,compute=58.76,io=6.00 }
                └── PhysicalProjection { exprs: [ #2, #3, #5, #6, #7, #8, #9 ], cost: weighted=64.54,row_cnt=1.00,compute=58.54,io=6.00 }
                    └── PhysicalProjection { exprs: [ #0, #3, #4, #5, #6, #7, #8, #9, #10, #11 ], cost: weighted=64.24,row_cnt=1.00,compute=58.24,io=6.00 }
                        └── PhysicalProjection { exprs: [ #1, #2, #4, #5, #6, #7, #8, #9, #10, #11, #12, #13 ], cost: weighted=63.82,row_cnt=1.00,compute=57.82,io=6.00 }
                            └── PhysicalProjection { exprs: [ #0, #3, #8, #9, #10, #11, #12, #13, #14, #15, #16, #17, #18, #19 ], cost: weighted=63.32,row_cnt=1.00,compute=57.32,io=6.00 }
                                └── PhysicalNestedLoopJoin
                                    ├── join_type: Inner
                                    ├── cond:And
                                    │   ├── Eq
                                    │   │   ├── #11
                                    │   │   └── #14
                                    │   └── Eq
                                    │       ├── #3
                                    │       └── #15
                                    ├── cost: weighted=62.74,row_cnt=1.00,compute=56.74,io=6.00
                                    ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ], cost: weighted=35.70,row_cnt=1.00,compute=32.70,io=3.00 }
                                    │   ├── PhysicalScan { table: customer, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
                                    │   └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: weighted=31.64,row_cnt=1.00,compute=29.64,io=2.00 }
                                    │       ├── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=27.40,row_cnt=1.00,compute=26.40,io=1.00 }
                                    │       │   └── PhysicalFilter
                                    │       │       ├── cond:And
                                    │       │       │   ├── Geq
                                    │       │       │   │   ├── #2
                                    │       │       │   │   └── 9131
                                    │       │       │   └── Lt
                                    │       │       │       ├── #2
                                    │       │       │       └── 9496
                                    │       │       ├── cost: weighted=27.30,row_cnt=1.00,compute=26.30,io=1.00
                                    │       │       └── PhysicalProjection { exprs: [ #0, #1, #4 ], cost: weighted=1.14,row_cnt=1.00,compute=0.14,io=1.00 }
                                    │       │           └── PhysicalScan { table: orders, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
                                    │       └── PhysicalProjection { exprs: [ #0, #2, #5, #6 ], cost: weighted=1.18,row_cnt=1.00,compute=0.18,io=1.00 }
                                    │           └── PhysicalScan { table: lineitem, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
                                    └── PhysicalProjection { exprs: [ #0, #3, #7, #8, #9, #10 ], cost: weighted=15.72,row_cnt=1.00,compute=12.72,io=3.00 }
                                        └── PhysicalHashJoin { join_type: Inner, left_keys: [ #3 ], right_keys: [ #0 ], cost: weighted=15.46,row_cnt=1.00,compute=12.46,io=3.00 }
                                            ├── PhysicalScan { table: supplier, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
                                            └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: weighted=11.40,row_cnt=1.00,compute=9.40,io=2.00 }
                                                ├── PhysicalProjection { exprs: [ #0, #1, #2 ], cost: weighted=1.14,row_cnt=1.00,compute=0.14,io=1.00 }
                                                │   └── PhysicalScan { table: nation, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
                                                └── PhysicalProjection { exprs: [ #0 ], cost: weighted=7.20,row_cnt=1.00,compute=6.20,io=1.00 }
                                                    └── PhysicalFilter
                                                        ├── cond:Eq
                                                        │   ├── #1
                                                        │   └── "AMERICA"
                                                        ├── cost: weighted=7.14,row_cnt=1.00,compute=6.14,io=1.00
                                                        └── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=1.10,row_cnt=1.00,compute=0.10,io=1.00 }
                                                            └── PhysicalScan { table: region, cost: weighted=1.00,row_cnt=1.00,compute=0.00,io=1.00 }
plan space size budget used, not applying logical rules any more. current plan space: 1094
qerrors: {"DataFusion": [5.0]}
```

### After

```
plan space size budget used, not applying logical rules any more. current plan space: 1094
explain: PhysicalSort
├── exprs:SortOrder { order: Desc }
│   └── #1
├── cost: weighted=336032.32,row_cnt=1.00,compute=259227.32,io=76805.00
└── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=336029.27,row_cnt=1.00,compute=259224.27,io=76805.00 }
    └── PhysicalAgg
        ├── aggrs:Agg(Sum)
        │   └── Mul
        │       ├── #0
        │       └── Sub
        │           ├── 1
        │           └── #1
        ├── groups: [ #2 ]
        ├── cost: weighted=336029.17,row_cnt=1.00,compute=259224.17,io=76805.00
        └── PhysicalProjection { exprs: [ #0, #1, #2 ], cost: weighted=335912.05,row_cnt=1.00,compute=259107.05,io=76805.00 }
            └── PhysicalProjection { exprs: [ #0, #1, #4, #5, #6 ], cost: weighted=335911.91,row_cnt=1.00,compute=259106.91,io=76805.00 }
                └── PhysicalProjection { exprs: [ #2, #3, #5, #6, #7, #8, #9 ], cost: weighted=335911.69,row_cnt=1.00,compute=259106.69,io=76805.00 }
                    └── PhysicalProjection { exprs: [ #0, #3, #4, #5, #6, #7, #8, #9, #10, #11 ], cost: weighted=335911.39,row_cnt=1.00,compute=259106.39,io=76805.00 }
                        └── PhysicalProjection { exprs: [ #1, #2, #4, #5, #6, #7, #8, #9, #10, #11, #12, #13 ], cost: weighted=335910.97,row_cnt=1.00,compute=259105.97,io=76805.00 }
                            └── PhysicalProjection { exprs: [ #0, #3, #8, #9, #10, #11, #12, #13, #14, #15, #16, #17, #18, #19 ], cost: weighted=335910.47,row_cnt=1.00,compute=259105.47,io=76805.00 }
                                └── PhysicalNestedLoopJoin
                                    ├── join_type: Inner
                                    ├── cond:And
                                    │   ├── Eq
                                    │   │   ├── #11
                                    │   │   └── #14
                                    │   └── Eq
                                    │       ├── #3
                                    │       └── #15
                                    ├── cost: weighted=335909.89,row_cnt=1.00,compute=259104.89,io=76805.00
                                    ├── PhysicalProjection { exprs: [ #6, #7, #8, #9, #10, #11, #12, #13, #0, #1, #2, #3, #4, #5 ], cost: weighted=335619.21,row_cnt=1.00,compute=258944.21,io=76675.00 }
                                    │   └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ], cost: weighted=335618.63,row_cnt=1.00,compute=258943.63,io=76675.00 }
                                    │       ├── PhysicalProjection { exprs: [ #4, #5, #0, #1, #2, #3 ], cost: weighted=332616.57,row_cnt=1.00,compute=257441.57,io=75175.00 }
                                    │       │   └── PhysicalProjection { exprs: [ #0, #2, #5, #6, #16, #17 ], cost: weighted=332616.31,row_cnt=1.00,compute=257441.31,io=75175.00 }
                                    │       │       └── PhysicalProjection { exprs: [ #2, #3, #4, #5, #6, #7, #8, #9, #10, #11, #12, #13, #14, #15, #16, #17, #0, #1 ], cost: weighted=332616.05,row_cnt=1.00,compute=257441.05,io=75175.00 }
                                    │       │           └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: weighted=332615.31,row_cnt=1.00,compute=257440.31,io=75175.00 }
                                    │       │               ├── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=212263.25,row_cnt=1.00,compute=197263.25,io=15000.00 }
                                    │       │               │   └── PhysicalFilter
                                    │       │               │       ├── cond:And
                                    │       │               │       │   ├── Geq
                                    │       │               │       │   │   ├── #2
                                    │       │               │       │   │   └── 9131
                                    │       │               │       │   └── Lt
                                    │       │               │       │       ├── #2
                                    │       │               │       │       └── 9496
                                    │       │               │       ├── cost: weighted=212263.15,row_cnt=1.00,compute=197263.15,io=15000.00
                                    │       │               │       └── PhysicalProjection { exprs: [ #0, #1, #4 ], cost: weighted=16050.07,row_cnt=15000.00,compute=1050.07,io=15000.00 }
                                    │       │               │           └── PhysicalScan { table: orders, cost: weighted=15000.00,row_cnt=15000.00,compute=0.00,io=15000.00 }
                                    │       │               └── PhysicalScan { table: lineitem, cost: weighted=60175.00,row_cnt=60175.00,compute=0.00,io=60175.00 }
                                    │       └── PhysicalScan { table: customer, cost: weighted=1500.00,row_cnt=1500.00,compute=0.00,io=1500.00 }
                                    └── PhysicalProjection { exprs: [ #0, #3, #7, #8, #9, #10 ], cost: weighted=279.36,row_cnt=1.00,compute=149.36,io=130.00 }
                                        └── PhysicalProjection { exprs: [ #4, #5, #6, #7, #8, #9, #10, #0, #1, #2, #3 ], cost: weighted=279.10,row_cnt=1.00,compute=149.10,io=130.00 }
                                            └── PhysicalProjection { exprs: [ #1, #2, #3, #0, #4, #5, #6, #7, #8, #9, #10 ], cost: weighted=278.64,row_cnt=1.00,compute=148.64,io=130.00 }
                                                └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #3 ], cost: weighted=278.18,row_cnt=1.00,compute=148.18,io=130.00 }
                                                    ├── PhysicalProjection { exprs: [ #3, #0, #1, #2 ], cost: weighted=76.12,row_cnt=1.00,compute=46.12,io=30.00 }
                                                    │   └── PhysicalProjection { exprs: [ #0, #1, #2, #4 ], cost: weighted=75.94,row_cnt=1.00,compute=45.94,io=30.00 }
                                                    │       └── PhysicalProjection { exprs: [ #1, #2, #3, #4, #0 ], cost: weighted=75.76,row_cnt=1.00,compute=45.76,io=30.00 }
                                                    │           └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #2 ], cost: weighted=75.54,row_cnt=1.00,compute=45.54,io=30.00 }
                                                    │               ├── PhysicalProjection { exprs: [ #0 ], cost: weighted=23.48,row_cnt=1.00,compute=18.48,io=5.00 }
                                                    │               │   └── PhysicalFilter
                                                    │               │       ├── cond:Eq
                                                    │               │       │   ├── #1
                                                    │               │       │   └── "AMERICA"
                                                    │               │       ├── cost: weighted=23.42,row_cnt=1.00,compute=18.42,io=5.00
                                                    │               │       └── PhysicalProjection { exprs: [ #0, #1 ], cost: weighted=5.30,row_cnt=5.00,compute=0.30,io=5.00 }
                                                    │               │           └── PhysicalScan { table: region, cost: weighted=5.00,row_cnt=5.00,compute=0.00,io=5.00 }
                                                    │               └── PhysicalScan { table: nation, cost: weighted=25.00,row_cnt=25.00,compute=0.00,io=25.00 }
                                                    └── PhysicalScan { table: supplier, cost: weighted=100.00,row_cnt=100.00,compute=0.00,io=100.00 }
plan space size budget used, not applying logical rules any more. current plan space: 1094
qerrors: {"DataFusion": [5.0]}
```
  • Loading branch information
Gun9niR authored Mar 20, 2024
1 parent 7915fb9 commit 1c557a4
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions optd-datafusion-repr/src/cost/adaptive_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ impl CostModel<OptRelNodeTyp> for AdaptiveCostModel {
) -> Cost {
if let OptRelNodeTyp::PhysicalScan = node {
let guard = self.runtime_row_cnt.lock().unwrap();
if let Some((runtime_row_cnt, iter)) = guard.history.get(&context.unwrap().group_id) {
if let Some((runtime_row_cnt, iter)) =
guard.history.get(&context.as_ref().unwrap().group_id)
{
if *iter + self.decay >= guard.iter_cnt {
let runtime_row_cnt = (*runtime_row_cnt).max(1) as f64;
return OptCostModel::cost(runtime_row_cnt, 0.0, runtime_row_cnt);
} else {
return OptCostModel::cost(1.0, 0.0, 1.0);
}
} else {
return OptCostModel::cost(1.0, 0.0, 1.0);
}
}
let (mut row_cnt, compute_cost, io_cost) = OptCostModel::cost_tuple(
Expand Down
224 changes: 216 additions & 8 deletions optd-datafusion-repr/src/cost/base_cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@ use crate::{
plan_nodes::{OptRelNodeRef, OptRelNodeTyp},
properties::column_ref::ColumnRef,
};
use arrow_schema::{ArrowError, DataType};
use datafusion::arrow::array::{
Array, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int8Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt16Array,
UInt32Array, UInt8Array,
};
use itertools::Itertools;
use optd_core::{
cascades::{CascadesOptimizer, RelNodeContext},
cost::{Cost, CostModel},
rel_node::{RelNode, RelNodeTyp, Value},
};
use optd_gungnir::stats::hyperloglog::{self, HyperLogLog};
use optd_gungnir::stats::tdigest::{self, TDigest};

fn compute_plan_node_cost<T: RelNodeTyp, C: CostModel<T>>(
model: &C,
Expand All @@ -34,9 +42,207 @@ pub struct OptCostModel {
per_table_stats_map: BaseTableStats,
}

struct MockMostCommonValues {
mcvs: HashMap<Value, f64>,
}

impl MockMostCommonValues {
pub fn empty() -> Self {
MockMostCommonValues {
mcvs: HashMap::new(),
}
}
}

impl MostCommonValues for MockMostCommonValues {
fn freq(&self, value: &Value) -> Option<f64> {
self.mcvs.get(value).copied()
}

fn total_freq(&self) -> f64 {
self.mcvs.values().sum()
}

fn freq_over_pred(&self, pred: Box<dyn Fn(&Value) -> bool>) -> f64 {
self.mcvs
.iter()
.filter(|(val, _)| pred(val))
.map(|(_, freq)| freq)
.sum()
}

fn cnt(&self) -> usize {
self.mcvs.len()
}
}

pub struct PerTableStats {
row_cnt: usize,
per_column_stats_vec: Vec<PerColumnStats>,
per_column_stats_vec: Vec<Option<PerColumnStats>>,
}

impl PerTableStats {
pub fn from_record_batches<I: IntoIterator<Item = Result<RecordBatch, ArrowError>>>(
batch_iter: RecordBatchIterator<I>,
) -> anyhow::Result<Self> {
let schema = batch_iter.schema();
let col_types = schema
.fields()
.iter()
.map(|f| f.data_type().clone())
.collect_vec();
let col_cnt = col_types.len();

let mut row_cnt = 0;
let mut mcvs = col_types
.iter()
.map(|col_type| {
if Self::is_type_supported(col_type) {
Some(MockMostCommonValues::empty())
} else {
None
}
})
.collect_vec();
let mut distr = col_types
.iter()
.map(|col_type| {
if Self::is_type_supported(col_type) {
Some(TDigest::new(tdigest::DEFAULT_COMPRESSION))
} else {
None
}
})
.collect_vec();
let mut hlls = vec![HyperLogLog::new(hyperloglog::DEFAULT_PRECISION); col_cnt];
let mut null_cnt = vec![0; col_cnt];

for batch in batch_iter {
let batch = batch?;
row_cnt += batch.num_rows();

// Enumerate the columns.
for (i, col) in batch.columns().iter().enumerate() {
let col_type = &col_types[i];
if Self::is_type_supported(col_type) {
// Update null cnt.
null_cnt[i] += col.null_count();

Self::generate_stats_for_column(col, col_type, &mut distr[i], &mut hlls[i]);
}
}
}

// Assemble the per-column stats.
let mut per_column_stats_vec = Vec::with_capacity(col_cnt);
for i in 0..col_cnt {
per_column_stats_vec.push(if Self::is_type_supported(&col_types[i]) {
Some(PerColumnStats {
mcvs: Box::new(mcvs[i].take().unwrap()) as Box<dyn MostCommonValues>,
ndistinct: hlls[i].n_distinct(),
null_frac: null_cnt[i] as f64 / row_cnt as f64,
distr: Box::new(distr[i].take().unwrap()) as Box<dyn Distribution>,
})
} else {
None
});
}
Ok(Self {
row_cnt,
per_column_stats_vec,
})
}

fn is_type_supported(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::Float32
| DataType::Float64
)
}

/// Generate statistics for a column.
fn generate_stats_for_column(
col: &Arc<dyn Array>,
col_type: &DataType,
distr: &mut Option<TDigest>,
hll: &mut HyperLogLog,
) {
macro_rules! generate_stats_for_col {
({ $col:expr, $distr:expr, $hll:expr, $array_type:path, $to_f64:ident }) => {{
let array = $col.as_any().downcast_ref::<$array_type>().unwrap();
// Filter out `None` values.
let values = array.iter().filter_map(|x| x).collect::<Vec<_>>();

// Update distribution.
*$distr = {
let mut f64_values = values.iter().map(|x| $to_f64(*x)).collect::<Vec<_>>();
Some($distr.take().unwrap().merge_values(&mut f64_values))
};

// Update hll.
$hll.aggregate(&values);
}};
}

/// Convert a value to f64 with no out of range or precision loss.
fn to_f64_safe<T: Into<f64>>(val: T) -> f64 {
val.into()
}

/// Convert i128 to f64 with possible precision loss.
///
/// Note: optd represents decimal with the significand as f64 (see `ConstantExpr::decimal`).
/// For instance 0.04 of type `Decimal128(15, 2)` is just 4.0, the type information
/// is discarded. Therefore we must use the significand to generate the statistics.
fn i128_to_f64(val: i128) -> f64 {
val as f64
}

match col_type {
DataType::Boolean => {
generate_stats_for_col!({ col, distr, hll, BooleanArray, to_f64_safe })
}
DataType::Int8 => {
generate_stats_for_col!({ col, distr, hll, Int8Array, to_f64_safe })
}
DataType::Int16 => {
generate_stats_for_col!({ col, distr, hll, Int16Array, to_f64_safe })
}
DataType::Int32 => {
generate_stats_for_col!({ col, distr, hll, Int32Array, to_f64_safe })
}
DataType::UInt8 => {
generate_stats_for_col!({ col, distr, hll, UInt8Array, to_f64_safe })
}
DataType::UInt16 => {
generate_stats_for_col!({ col, distr, hll, UInt16Array, to_f64_safe })
}
DataType::UInt32 => {
generate_stats_for_col!({ col, distr, hll, UInt32Array, to_f64_safe })
}
DataType::Float32 => {
generate_stats_for_col!({ col, distr, hll, Float32Array, to_f64_safe })
}
DataType::Float64 => {
generate_stats_for_col!({ col, distr, hll, Float64Array, to_f64_safe })
}
DataType::Date32 => {
generate_stats_for_col!({ col, distr, hll, Date32Array, to_f64_safe })
}
DataType::Decimal128(_, _) => {
generate_stats_for_col!({ col, distr, hll, Decimal128Array, i128_to_f64 })
}
_ => unreachable!(),
}
}
}

pub struct PerColumnStats {
Expand All @@ -45,7 +251,7 @@ pub struct PerColumnStats {

// ndistinct _does_ include the values in mcvs
// ndistinct _does not_ include nulls
ndistinct: i32,
ndistinct: u64,

// postgres uses null_frac instead of something like "num_nulls" so we'll follow suit
// my guess for why they use null_frac is because we only ever use the fraction of nulls, not the #
Expand Down Expand Up @@ -445,7 +651,8 @@ impl OptCostModel {
is_eq: bool,
) -> Option<f64> {
if let Some(per_table_stats) = self.per_table_stats_map.get(table) {
if let Some(per_column_stats) = per_table_stats.per_column_stats_vec.get(col_idx) {
if let Some(Some(per_column_stats)) = per_table_stats.per_column_stats_vec.get(col_idx)
{
let eq_freq = if let Some(freq) = per_column_stats.mcvs.freq(value) {
freq
} else {
Expand Down Expand Up @@ -484,7 +691,8 @@ impl OptCostModel {
is_col_eq_val: bool,
) -> Option<f64> {
if let Some(per_table_stats) = self.per_table_stats_map.get(table) {
if let Some(per_column_stats) = per_table_stats.per_column_stats_vec.get(col_idx) {
if let Some(Some(per_column_stats)) = per_table_stats.per_column_stats_vec.get(col_idx)
{
// because distr does not include the values in MCVs, we need to compute the CDFs there as well
// because nulls return false in any comparison, they are never included when computing range selectivity
let distr_leq_freq = per_column_stats.distr.cdf(value);
Expand Down Expand Up @@ -555,7 +763,7 @@ impl OptCostModel {
}

impl PerTableStats {
pub fn new(row_cnt: usize, per_column_stats_vec: Vec<PerColumnStats>) -> Self {
pub fn new(row_cnt: usize, per_column_stats_vec: Vec<Option<PerColumnStats>>) -> Self {
Self {
row_cnt,
per_column_stats_vec,
Expand All @@ -566,7 +774,7 @@ impl PerTableStats {
impl PerColumnStats {
pub fn new(
mcvs: Box<dyn MostCommonValues>,
ndistinct: i32,
ndistinct: u64,
null_frac: f64,
distr: Box<dyn Distribution>,
) -> Self {
Expand Down Expand Up @@ -612,7 +820,7 @@ mod tests {
}
}

fn empty() -> Self {
pub fn empty() -> Self {
MockMostCommonValues::new(vec![])
}
}
Expand Down Expand Up @@ -664,7 +872,7 @@ mod tests {
OptCostModel::new(
vec![(
String::from(TABLE1_NAME),
PerTableStats::new(100, vec![per_column_stats]),
PerTableStats::new(100, vec![Some(per_column_stats)]),
)]
.into_iter()
.collect(),
Expand Down
10 changes: 10 additions & 0 deletions optd-gungnir/src/stats/hyperloglog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
use crate::stats::murmur2::murmur_hash;
use std::cmp::max;

pub const DEFAULT_PRECISION: u8 = 12;

/// Trait to transform any object into a stream of bytes.
pub trait ByteSerializable {
fn to_bytes(&self) -> Vec<u8>;
}

/// The HyperLogLog (HLL) structure to provide a statistical estimate of NDistinct.
/// For safety reasons, HLLs can only count elements of the same ByteSerializable type.
#[derive(Clone)]
pub struct HyperLogLog {
registers: Vec<u8>, // The buckets to estimate HLL on (i.e. upper p bits).
precision: u8, // The precision (p) of our HLL; 4 <= p <= 16.
Expand All @@ -29,6 +32,13 @@ impl ByteSerializable for String {
}
}

// Serialize common data types for hashing (bool).
impl ByteSerializable for bool {
fn to_bytes(&self) -> Vec<u8> {
(*self as u8).to_bytes()
}
}

// Serialize common data types for hashing (numeric).
macro_rules! impl_byte_serializable_for_numeric {
($($type:ty),*) => {
Expand Down
3 changes: 3 additions & 0 deletions optd-gungnir/src/stats/tdigest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
use itertools::Itertools;
use std::f64::consts::PI;

pub const DEFAULT_COMPRESSION: f64 = 200.0;

/// The TDigest structure for the statistical aggregator to query quantiles.
#[derive(Clone)]
pub struct TDigest {
/// A sorted array of Centroids, according to their mean.
centroids: Vec<Centroid>,
Expand Down
1 change: 1 addition & 0 deletions optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ datafusion = { version = "32.0.0", features = [
] }
optd-datafusion-repr = { path = "../optd-datafusion-repr" }
optd-datafusion-bridge = { path = "../optd-datafusion-bridge" }
optd-gungnir = { path = "../optd-gungnir" }
datafusion-optd-cli = { path = "../datafusion-optd-cli" }
futures = "0.3"
anyhow = { version = "1", features = ["backtrace"] }
Expand Down
2 changes: 1 addition & 1 deletion optd-perftest/src/cardtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::path::Path;

use crate::postgres_db::PostgresDb;
use crate::{benchmark::Benchmark, datafusion_db_cardtest::DatafusionDb, tpch::TpchConfig};
use crate::{benchmark::Benchmark, datafusion_db::DatafusionDb, tpch::TpchConfig};

use anyhow::{self};
use async_trait::async_trait;
Expand Down
Loading

0 comments on commit 1c557a4

Please sign in to comment.