Skip to content

Commit

Permalink
improve null checking & add benches
Browse files Browse the repository at this point in the history
  • Loading branch information
demetribu committed Oct 5, 2024
1 parent b4d7d97 commit e2bf27f
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 36 deletions.
11 changes: 10 additions & 1 deletion datafusion/physical-expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ path = "src/lib.rs"

[dependencies]
ahash = { workspace = true }
arrow = { workspace = true }
arrow = { workspace = true, features = ["test_utils"] }
criterion = "0.5"
datafusion-common = { workspace = true, default-features = true }
datafusion-expr-common = { workspace = true }
hashbrown = { workspace = true }
rand = { workspace = true }

[[bench]]
harness = false
name = "binary_map"

[[bench]]
harness = false
name = "binary_view_map"
74 changes: 74 additions & 0 deletions datafusion/physical-expr-common/benches/binary_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::util::bench_util::create_string_array_with_len;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};

fn benchmark_arrow_bytes_map(c: &mut Criterion) {
let sizes = [100_000, 1_000_000];
let null_densities = [0.1, 0.5];
let string_lengths = [20, 50];

for &num_items in &sizes {
for &null_density in &null_densities {
for &str_len in &string_lengths {
let array: ArrayRef = Arc::new(create_string_array_with_len::<i32>(
num_items,
null_density,
str_len,
));

c.bench_function(
&format!(
"ArrowBytesMap insert_if_new - items: {}, null_density: {:.1}, str_len: {}",
num_items, null_density, str_len
),
|b| {
b.iter(|| {
let mut map = ArrowBytesMap::<i32, ()>::new(OutputType::Utf8);
map.insert_if_new(black_box(&array), |_| {}, |_| {}, |_| {});
black_box(&map);
});
},
);

let mut map = ArrowBytesMap::<i32, u32>::new(OutputType::Utf8);
map.insert_if_new(&array, |_| 1u32, |_| {}, |_| {});

c.bench_function(
&format!(
"ArrowBytesMap get_payloads - items: {}, null_density: {:.1}, str_len: {}",
num_items, null_density, str_len
),
|b| {
b.iter(|| {
let payloads = map.take().get_payloads(black_box(&array));
black_box(payloads);
});
},
);
}
}
}
}

criterion_group!(benches, benchmark_arrow_bytes_map);
criterion_main!(benches);
77 changes: 77 additions & 0 deletions datafusion/physical-expr-common/benches/binary_view_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::util::bench_util::create_string_view_array_with_len;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_physical_expr_common::{
binary_map::OutputType, binary_view_map::ArrowBytesViewMap,
};

fn benchmark_arrow_bytes_view_map(c: &mut Criterion) {
let sizes = [100_000, 1_000_000];
let null_densities = [0.1, 0.5];
let string_lengths = [20, 50];

for &num_items in &sizes {
for &null_density in &null_densities {
for &str_len in &string_lengths {
let array: ArrayRef = Arc::new(create_string_view_array_with_len(
num_items,
null_density,
str_len,
false,
));

c.bench_function(
&format!(
"ArrowBytesViewMap insert_if_new - items: {}, null_density: {:.1}, str_len: {}",
num_items, null_density, str_len
),
|b| {
b.iter(|| {
let mut map = ArrowBytesViewMap::<()>::new(OutputType::Utf8View);
map.insert_if_new(black_box(&array), |_| {}, |_| {}, |_| {});
black_box(&map);
});
},
);

let mut map = ArrowBytesViewMap::<i32>::new(OutputType::Utf8View);
map.insert_if_new(&array, |_| 1i32, |_| {}, |_| {});

c.bench_function(
&format!(
"ArrowBytesViewMap get_payloads - items: {}, null_density: {:.1}, str_len: {}",
num_items, null_density, str_len
),
|b| {
b.iter(|| {
let payloads = map.take().get_payloads(black_box(&array));
black_box(payloads);
});
},
);
}
}
}
}

criterion_group!(benches, benchmark_arrow_bytes_view_map);
criterion_main!(benches);
48 changes: 28 additions & 20 deletions datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,33 +539,21 @@ where
let mut batch_hashes = vec![0u64; values.len()];
batch_hashes.clear();
batch_hashes.resize(values.len(), 0);
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap(); // Compute the hashes for the values
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap();

// Step 2: Get payloads for each value
let values = values.as_bytes::<B>();
assert_eq!(values.len(), batch_hashes.len()); // Ensure hash count matches value count
assert_eq!(values.len(), batch_hashes.len());

let mut payloads = Vec::with_capacity(values.len());

for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
// Handle null value
let Some(value) = value else {
if let Some(&(payload, _)) = self.null.as_ref() {
payloads.push(Some(payload));
} else {
payloads.push(None);
}
continue;
};

let process_value = |value: &B::Native, hash: u64| -> Option<V> {
let value: &[u8] = value.as_ref();
let value_len = O::usize_as(value.len());

// Small value optimization
let payload = if value.len() <= SHORT_VALUE_LEN {
let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize);
if value.len() <= SHORT_VALUE_LEN {
let inline = value.iter().fold(0usize, |acc, &x| (acc << 8) | x as usize);

// Check if the value is already present in the set
let entry = self.map.get(hash, |header| {
if header.len != value_len {
return false;
Expand All @@ -575,7 +563,6 @@ where

entry.map(|entry| entry.payload)
} else {
// Handle larger values
let entry = self.map.get(hash, |header| {
if header.len != value_len {
return false;
Expand All @@ -586,9 +573,30 @@ where
});

entry.map(|entry| entry.payload)
};
}
};

payloads.push(payload);
if let Some(validity_bitmap) = values.nulls() {
let null_payload = self.null.as_ref().map(|&(payload, _)| payload);
let validity_iter = validity_bitmap.iter();

for ((value_opt, &hash), is_valid) in
values.iter().zip(batch_hashes.iter()).zip(validity_iter)
{
if is_valid {
let value = value_opt.unwrap(); // Safe to unwrap since is_valid is true
let payload = process_value(value, hash);
payloads.push(payload);
} else {
payloads.push(null_payload);
}
}
} else {
for (value_opt, &hash) in values.iter().zip(batch_hashes.iter()) {
let value = value_opt.unwrap(); // Safe to unwrap because there are no nulls
let payload = process_value(value, hash);
payloads.push(payload);
}
}

payloads
Expand Down
42 changes: 27 additions & 15 deletions datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,34 +360,46 @@ where
{
// Step 1: Compute hashes
let mut batch_hashes = vec![0u64; values.len()];
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap(); // Compute the hashes for the values
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap();

// Step 2: Get payloads for each value
let values = values.as_byte_view::<B>();
assert_eq!(values.len(), batch_hashes.len()); // Ensure hash count matches value count
assert_eq!(values.len(), batch_hashes.len());

let mut payloads = Vec::with_capacity(values.len());

for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
// Handle null value
let Some(value) = value else {
if let Some(&(payload, _)) = self.null.as_ref() {
payloads.push(Some(payload));
} else {
payloads.push(None);
}
continue;
};

let process_value = |value: &B::Native, hash: u64| -> Option<V> {
let value: &[u8] = value.as_ref();

let entry = self.map.get(hash, |header| {
let v = self.builder.get_value(header.view_idx);
v.len() == value.len() && v == value
});

let payload = entry.map(|e| e.payload);
payloads.push(payload);
entry.map(|e| e.payload)
};

if let Some(validity_bitmap) = values.nulls() {
let null_payload = self.null.as_ref().map(|&(payload, _)| payload);
let validity_iter = validity_bitmap.iter();

for ((value_opt, &hash), is_valid) in
values.iter().zip(batch_hashes.iter()).zip(validity_iter)
{
if is_valid {
let value = value_opt.unwrap(); // Safe to unwrap since is_valid is true
let payload = process_value(value, hash);
payloads.push(payload);
} else {
payloads.push(null_payload);
}
}
} else {
for (value_opt, &hash) in values.iter().zip(batch_hashes.iter()) {
let value = value_opt.unwrap(); // Safe to unwrap because there are no nulls
let payload = process_value(value, hash);
payloads.push(payload);
}
}

payloads
Expand Down

0 comments on commit e2bf27f

Please sign in to comment.