Skip to content

Commit 259663b

Browse files
committed
implement to Arrow's builder
Signed-off-by: jayzhan211 <[email protected]>
1 parent b721abb commit 259663b

File tree

6 files changed

+263
-46
lines changed

6 files changed

+263
-46
lines changed

benchmarks/queries/clickbench/queries.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@ SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10;
3535
SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10;
3636
SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
3737
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10;
38-
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
38+
SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10;
39+
SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
40+
SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
41+
SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
42+
SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
43+
SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000;

datafusion/physical-expr-common/src/group_value_row.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
use arrow::array::BooleanBufferBuilder;
1919
use arrow::array::BufferBuilder;
2020
use arrow::array::GenericBinaryArray;
21+
use arrow::array::GenericBinaryBuilder;
2122
use arrow::array::GenericStringArray;
23+
use arrow::array::GenericStringBuilder;
2224
use arrow::array::OffsetSizeTrait;
25+
use arrow::array::PrimitiveBuilder;
2326
use arrow::buffer::NullBuffer;
2427
use arrow::buffer::OffsetBuffer;
2528
use arrow::buffer::ScalarBuffer;
@@ -37,13 +40,61 @@ use crate::binary_map::INITIAL_BUFFER_CAPACITY;
3740

3841
use std::sync::Arc;
3942

43+
pub trait ArrayEqV2: Send + Sync {
44+
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
45+
fn append_val(&mut self, array: &ArrayRef, row: usize);
46+
fn len(&self) -> usize;
47+
fn build(&mut self) -> ArrayRef;
48+
}
49+
4050
pub trait ArrayEq: Send + Sync {
4151
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
4252
fn append_val(&mut self, array: &ArrayRef, row: usize);
4353
fn len(&self) -> usize;
4454
fn build(self: Box<Self>) -> ArrayRef;
4555
}
4656

57+
impl<T> ArrayEqV2 for PrimitiveBuilder<T>
58+
where
59+
T: ArrowPrimitiveType,
60+
{
61+
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
62+
let arr = array.as_primitive::<T>();
63+
64+
if let Some(nulls) = self.validity_slice() {
65+
let null_slice_index = lhs_row / 8;
66+
let null_bit_map_index = lhs_row % 8;
67+
let is_elem_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1;
68+
if is_elem_null {
69+
return arr.is_null(rhs_row);
70+
} else if arr.is_null(rhs_row) {
71+
return false;
72+
}
73+
}
74+
75+
let elem = self.values_slice()[lhs_row];
76+
elem == arr.value(rhs_row)
77+
}
78+
79+
fn append_val(&mut self, array: &ArrayRef, row: usize) {
80+
let arr = array.as_primitive::<T>();
81+
if arr.is_null(row) {
82+
self.append_null();
83+
} else {
84+
let elem = arr.value(row);
85+
self.append_value(elem);
86+
}
87+
}
88+
89+
fn len(&self) -> usize {
90+
self.values_slice().len()
91+
}
92+
93+
fn build(&mut self) -> ArrayRef {
94+
Arc::new(self.finish())
95+
}
96+
}
97+
4798
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType>(Vec<Option<T::Native>>);
4899

49100
impl<T: ArrowPrimitiveType> PrimitiveGroupValueBuilder<T> {
@@ -127,6 +178,99 @@ impl<T: ArrowPrimitiveType> ArrayEq for PrimitiveGroupValueBuilder<T> {
127178
// }
128179
// }
129180

181+
impl<O> ArrayEqV2 for GenericStringBuilder<O>
182+
where
183+
O: OffsetSizeTrait,
184+
{
185+
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
186+
let arr = array.as_bytes::<GenericStringType<O>>();
187+
if let Some(nulls) = self.validity_slice() {
188+
let null_slice_index = lhs_row / 8;
189+
let null_bit_map_index = lhs_row % 8;
190+
191+
let is_lhs_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1;
192+
if is_lhs_null {
193+
return arr.is_null(rhs_row);
194+
} else if arr.is_null(rhs_row) {
195+
return false;
196+
}
197+
}
198+
199+
let rhs_elem: &[u8] = arr.value(rhs_row).as_ref();
200+
let rhs_elem_len = arr.value_length(rhs_row).as_usize();
201+
assert_eq!(rhs_elem_len, rhs_elem.len());
202+
let l = O::as_usize(self.offsets_slice()[lhs_row]);
203+
let r = O::as_usize(self.offsets_slice()[lhs_row + 1]);
204+
let existing_elem = &self.values_slice()[l..r];
205+
existing_elem.len() == rhs_elem.len() && rhs_elem == existing_elem
206+
}
207+
208+
fn append_val(&mut self, array: &ArrayRef, row: usize) {
209+
let arr = array.as_string::<O>();
210+
if arr.is_null(row) {
211+
self.append_null();
212+
return;
213+
}
214+
215+
let value = arr.value(row);
216+
self.append_value(value);
217+
}
218+
219+
fn len(&self) -> usize {
220+
self.offsets_slice().len() - 1
221+
}
222+
223+
fn build(&mut self) -> ArrayRef {
224+
Arc::new(self.finish())
225+
}
226+
}
227+
228+
impl<O> ArrayEqV2 for GenericBinaryBuilder<O>
229+
where
230+
O: OffsetSizeTrait,
231+
{
232+
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
233+
let arr = array.as_bytes::<GenericBinaryType<O>>();
234+
if let Some(nulls) = self.validity_slice() {
235+
let null_slice_index = lhs_row / 8;
236+
let null_bit_map_index = lhs_row % 8;
237+
let is_lhs_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1;
238+
if is_lhs_null {
239+
return arr.is_null(rhs_row);
240+
} else if arr.is_null(rhs_row) {
241+
return false;
242+
}
243+
}
244+
245+
let rhs_elem: &[u8] = arr.value(rhs_row).as_ref();
246+
let rhs_elem_len = arr.value_length(rhs_row).as_usize();
247+
assert_eq!(rhs_elem_len, rhs_elem.len());
248+
let l = O::as_usize(self.offsets_slice()[lhs_row]);
249+
let r = O::as_usize(self.offsets_slice()[lhs_row + 1]);
250+
let existing_elem = &self.values_slice()[l..r];
251+
existing_elem.len() == rhs_elem.len() && rhs_elem == existing_elem
252+
}
253+
254+
fn append_val(&mut self, array: &ArrayRef, row: usize) {
255+
let arr = array.as_binary::<O>();
256+
if arr.is_null(row) {
257+
self.append_null();
258+
return;
259+
}
260+
261+
let value: &[u8] = arr.value(row).as_ref();
262+
self.append_value(value);
263+
}
264+
265+
fn len(&self) -> usize {
266+
self.values_slice().len()
267+
}
268+
269+
fn build(&mut self) -> ArrayRef {
270+
Arc::new(self.finish())
271+
}
272+
}
273+
130274
pub struct ByteGroupValueBuilderNaive<O>
131275
where
132276
O: OffsetSizeTrait,
@@ -622,3 +766,26 @@ where
622766
// self.offset_or_inline..self.offset_or_inline + self.len.as_usize()
623767
// }
624768
// }
769+
770+
#[cfg(test)]
771+
mod tests {
772+
use arrow::{array::GenericByteBuilder, datatypes::GenericStringType};
773+
774+
#[test]
775+
fn test123() {
776+
let mut a = GenericByteBuilder::<GenericStringType<i32>>::new();
777+
a.append_null();
778+
a.append_value("a");
779+
a.append_null();
780+
a.append_value("bc");
781+
a.append_value("def");
782+
a.append_null();
783+
784+
let s = a.validity_slice();
785+
println!("s: {:?}", s);
786+
let v = a.values_slice();
787+
let o = a.offsets_slice();
788+
println!("v: {:?}", v);
789+
println!("o: {:?}", o);
790+
}
791+
}

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,9 @@ fn has_row_like_feature(data_type: &DataType) -> bool {
117117
| DataType::UInt32
118118
| DataType::UInt64
119119
| DataType::Utf8
120-
| DataType::LargeUtf8 => true,
120+
| DataType::LargeUtf8
121+
| DataType::Date32
122+
| DataType::Date64 => true,
121123
_ => false,
122124
}
123125
}

datafusion/physical-plan/src/aggregates/group_values/row_like.rs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::ops::DerefMut;
19+
1820
use crate::aggregates::group_values::GroupValues;
1921
use ahash::RandomState;
22+
use arrow::array::{GenericStringBuilder, PrimitiveBuilder};
2023
use arrow::compute::cast;
2124
use arrow::datatypes::{
22-
Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type,
23-
UInt32Type, UInt64Type, UInt8Type,
25+
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
26+
Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
2427
};
2528
use arrow::record_batch::RecordBatch;
2629
use arrow::row::{RowConverter, Rows, SortField};
@@ -30,12 +33,11 @@ use datafusion_common::hash_utils::create_hashes;
3033
use datafusion_common::{internal_err, DataFusionError, Result};
3134
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
3235
use datafusion_expr::EmitTo;
33-
use datafusion_physical_expr::binary_map::OutputType;
34-
use datafusion_physical_expr_common::group_value_row::{
35-
ArrayEq, ByteGroupValueBuilderNaive, PrimitiveGroupValueBuilder,
36-
};
36+
use datafusion_physical_expr_common::group_value_row::ArrayEqV2;
3737
use hashbrown::raw::RawTable;
3838

39+
pub(super) const INITIAL_CAPACITY: usize = 8 * 1024;
40+
3941
/// A [`GroupValues`] making use of [`Rows`]
4042
pub struct GroupValuesRowLike {
4143
/// The output schema
@@ -75,7 +77,7 @@ pub struct GroupValuesRowLike {
7577

7678
/// Random state for creating hashes
7779
random_state: RandomState,
78-
group_values_v2: Option<Vec<Box<dyn ArrayEq>>>,
80+
group_values_v2: Option<Vec<Box<dyn ArrayEqV2>>>,
7981
}
8082

8183
impl GroupValuesRowLike {
@@ -131,53 +133,65 @@ impl GroupValues for GroupValuesRowLike {
131133
for (i, f) in self.schema.fields().iter().enumerate() {
132134
match f.data_type() {
133135
&DataType::Int8 => {
134-
let b = PrimitiveGroupValueBuilder::<Int8Type>::new();
136+
let b = PrimitiveBuilder::<Int8Type>::new();
135137
v.push(Box::new(b) as _)
136138
}
137139
&DataType::Int16 => {
138-
let b = PrimitiveGroupValueBuilder::<Int16Type>::new();
140+
let b = PrimitiveBuilder::<Int16Type>::new();
139141
v.push(Box::new(b) as _)
140142
}
141143
&DataType::Int32 => {
142-
let b = PrimitiveGroupValueBuilder::<Int32Type>::new();
144+
let b = PrimitiveBuilder::<Int32Type>::new();
143145
v.push(Box::new(b) as _)
144146
}
145147
&DataType::Int64 => {
146-
let b = PrimitiveGroupValueBuilder::<Int64Type>::new();
148+
let b = PrimitiveBuilder::<Int64Type>::new();
147149
v.push(Box::new(b) as _)
148150
}
149151
&DataType::UInt8 => {
150-
let b = PrimitiveGroupValueBuilder::<UInt8Type>::new();
152+
let b = PrimitiveBuilder::<UInt8Type>::new();
151153
v.push(Box::new(b) as _)
152154
}
153155
&DataType::UInt16 => {
154-
let b = PrimitiveGroupValueBuilder::<UInt16Type>::new();
156+
let b = PrimitiveBuilder::<UInt16Type>::new();
155157
v.push(Box::new(b) as _)
156158
}
157159
&DataType::UInt32 => {
158-
let b = PrimitiveGroupValueBuilder::<UInt32Type>::new();
160+
let b = PrimitiveBuilder::<UInt32Type>::new();
159161
v.push(Box::new(b) as _)
160162
}
161163
&DataType::UInt64 => {
162-
let b = PrimitiveGroupValueBuilder::<UInt64Type>::new();
164+
let b = PrimitiveBuilder::<UInt64Type>::new();
163165
v.push(Box::new(b) as _)
164166
}
165167
&DataType::Float32 => {
166-
let b = PrimitiveGroupValueBuilder::<Float32Type>::new();
168+
let b = PrimitiveBuilder::<Float32Type>::new();
167169
v.push(Box::new(b) as _)
168170
}
169171
&DataType::Float64 => {
170-
let b = PrimitiveGroupValueBuilder::<Float64Type>::new();
172+
let b = PrimitiveBuilder::<Float64Type>::new();
173+
v.push(Box::new(b) as _)
174+
}
175+
&DataType::Date32 => {
176+
let b = PrimitiveBuilder::<Date32Type>::new();
177+
v.push(Box::new(b) as _)
178+
}
179+
&DataType::Date64 => {
180+
let b = PrimitiveBuilder::<Date64Type>::new();
171181
v.push(Box::new(b) as _)
172182
}
173183
&DataType::Utf8 => {
174-
let b =
175-
ByteGroupValueBuilderNaive::<i32>::new(OutputType::Utf8);
184+
let b = GenericStringBuilder::<i32>::with_capacity(
185+
INITIAL_CAPACITY,
186+
INITIAL_CAPACITY,
187+
);
176188
v.push(Box::new(b) as _)
177189
}
178190
&DataType::LargeUtf8 => {
179-
let b =
180-
ByteGroupValueBuilderNaive::<i64>::new(OutputType::Utf8);
191+
let b = GenericStringBuilder::<i64>::with_capacity(
192+
INITIAL_CAPACITY,
193+
INITIAL_CAPACITY,
194+
);
181195
v.push(Box::new(b) as _)
182196
}
183197
dt => todo!("{dt} not impl"),
@@ -211,7 +225,7 @@ impl GroupValues for GroupValuesRowLike {
211225
// && group_rows.row(row) == group_values.row(*group_idx)
212226

213227
fn compare_equal(
214-
arry_eq: &dyn ArrayEq,
228+
arry_eq: &dyn ArrayEqV2,
215229
lhs_row: usize,
216230
array: &ArrayRef,
217231
rhs_row: usize,
@@ -311,7 +325,10 @@ impl GroupValues for GroupValuesRowLike {
311325
EmitTo::All => {
312326
let output = group_values_v2
313327
.into_iter()
314-
.map(|v| v.build())
328+
.map(|mut v| {
329+
let p = v.deref_mut().build();
330+
p
331+
})
315332
.collect::<Vec<_>>();
316333
// let output = self.row_converter.convert_rows(&group_values)?;
317334
// group_values.clear();
@@ -324,8 +341,15 @@ impl GroupValues for GroupValuesRowLike {
324341

325342
// println!("to first n");
326343
let len = group_values_v2.len();
327-
let first_n: Vec<Box<dyn ArrayEq>> = group_values_v2.drain(..n).collect();
328-
let output = first_n.into_iter().map(|v| v.build()).collect::<Vec<_>>();
344+
let first_n: Vec<Box<dyn ArrayEqV2>> =
345+
group_values_v2.drain(..n).collect();
346+
let output = first_n
347+
.into_iter()
348+
.map(|mut v| {
349+
let p = v.deref_mut().build();
350+
p
351+
})
352+
.collect::<Vec<_>>();
329353
assert_eq!(len, group_values_v2.len() + n);
330354
self.group_values_v2 = Some(group_values_v2);
331355

0 commit comments

Comments
 (0)