-
Notifications
You must be signed in to change notification settings - Fork 894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorized aggregation with grouping by one fixed-size column #7341
base: main
Are you sure you want to change the base?
Changes from 37 commits
b92e622
4ce0e99
74d4419
baedf7f
35dbd36
74fffd3
f8db454
00a9d11
339f91a
f075589
88f325d
15ab443
ff16ec8
4291b17
795ef6b
1fabb22
717abc4
b03bd6b
166d0e8
7f578b4
e70cb0b
0040844
694faf6
3d05674
d90a90f
4a93549
3e06b92
a7942ed
6fb517f
ffb28cf
778ca97
ef3847a
1409c74
514ae96
22d23b3
cd7a1dc
9ebd61f
9e51c19
480d0fe
9b0ee38
effa7eb
533be01
8e6c6d2
b717f74
4df06d9
47bcaa9
b6cee02
ecb1aec
f64676f
fab11fb
dff6dff
831cadd
66403f2
99e5b04
de22a22
9fccab9
8e97c2f
f5b648a
ecd9cb2
dc6001d
0ea397a
ea4dab1
4b98e46
b615dbe
045f59a
10e66ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Implements: #7341 Vectorized aggregation with grouping by one fixed-size by-value compressed column (such as arithmetic types). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* This file and its contents are licensed under the Timescale License. | ||
* Please see the included NOTICE for copyright information and | ||
* LICENSE-TIMESCALE for a copy of the license. | ||
*/ | ||
|
||
/* | ||
* A generic implementation of adding the given batch to many aggregate function | ||
* states with given offsets. Used for hash aggregation, and builds on the | ||
* FUNCTION_NAME(one) function, which adds one passing non-null row to the given | ||
* aggregate function state. | ||
*/ | ||
static void | ||
FUNCTION_NAME(many_vector)(void *restrict agg_states, uint32 *restrict offsets, int start_row, | ||
int end_row, const ArrowArray *vector, MemoryContext agg_extra_mctx) | ||
{ | ||
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx); | ||
const CTYPE *values = vector->buffers[1]; | ||
const uint64 *valid = vector->buffers[0]; | ||
for (int row = start_row; row < end_row; row++) | ||
{ | ||
FUNCTION_NAME(state) *state = (offsets[row] + (FUNCTION_NAME(state) *) agg_states); | ||
const CTYPE value = values[row]; | ||
const bool row_passes = (offsets[row] != 0); | ||
const bool value_notnull = arrow_row_is_valid(valid, row); | ||
|
||
if (row_passes && value_notnull) | ||
{ | ||
FUNCTION_NAME(one)(state, value); | ||
} | ||
} | ||
MemoryContextSwitchTo(old); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,10 +27,13 @@ typedef struct | |
} CountState; | ||
|
||
static void | ||
count_init(void *agg_state) | ||
count_init(void *restrict agg_states, int n) | ||
{ | ||
CountState *state = (CountState *) agg_state; | ||
state->count = 0; | ||
CountState *states = (CountState *) agg_states; | ||
for (int i = 0; i < n; i++) | ||
{ | ||
states[i].count = 0; | ||
} | ||
} | ||
|
||
static void | ||
|
@@ -42,26 +45,44 @@ count_emit(void *agg_state, Datum *out_result, bool *out_isnull) | |
} | ||
|
||
static void | ||
count_star_const(void *agg_state, Datum constvalue, bool constisnull, int n, | ||
MemoryContext agg_extra_mctx) | ||
count_star_scalar(void *agg_state, Datum constvalue, bool constisnull, int n, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
CountState *state = (CountState *) agg_state; | ||
state->count += n; | ||
} | ||
|
||
static void | ||
count_star_many_scalar(void *restrict agg_states, uint32 *restrict offsets, int start_row, | ||
int end_row, Datum constvalue, bool constisnull, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
CountState *states = (CountState *) agg_states; | ||
for (int row = start_row; row < end_row; row++) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double-checking that it is really correct to be non-inclusive with the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that's the C++ habit of mine where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so what I suggested is Not super-important, but the semantics are a bit more clear. Feel free to decide yourself what works best.... |
||
{ | ||
if (offsets[row] == 0) | ||
{ | ||
continue; | ||
} | ||
|
||
states[offsets[row]].count++; | ||
} | ||
} | ||
|
||
VectorAggFunctions count_star_agg = { | ||
.state_bytes = sizeof(CountState), | ||
.agg_init = count_init, | ||
.agg_const = count_star_const, | ||
.agg_scalar = count_star_scalar, | ||
.agg_emit = count_emit, | ||
.agg_many_scalar = count_star_many_scalar, | ||
}; | ||
|
||
/* | ||
* Aggregate function count(x). | ||
*/ | ||
static void | ||
count_any_const(void *agg_state, Datum constvalue, bool constisnull, int n, | ||
MemoryContext agg_extra_mctx) | ||
count_any_scalar(void *agg_state, Datum constvalue, bool constisnull, int n, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
if (constisnull) | ||
{ | ||
|
@@ -73,8 +94,8 @@ count_any_const(void *agg_state, Datum constvalue, bool constisnull, int n, | |
} | ||
|
||
static void | ||
count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter, | ||
MemoryContext agg_extra_mctx) | ||
count_any_many_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
CountState *state = (CountState *) agg_state; | ||
const int n = vector->length; | ||
|
@@ -110,12 +131,30 @@ count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter | |
} | ||
} | ||
|
||
static void | ||
count_any_many(void *restrict agg_states, uint32 *restrict offsets, int start_row, int end_row, | ||
const ArrowArray *vector, MemoryContext agg_extra_mctx) | ||
{ | ||
const uint64 *valid = vector->buffers[0]; | ||
for (int row = start_row; row < end_row; row++) | ||
erimatnor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
CountState *state = (offsets[row] + (CountState *) agg_states); | ||
const bool row_passes = (offsets[row] != 0); | ||
const bool value_notnull = arrow_row_is_valid(valid, row); | ||
if (row_passes && value_notnull) | ||
{ | ||
state->count++; | ||
} | ||
} | ||
} | ||
|
||
VectorAggFunctions count_any_agg = { | ||
.state_bytes = sizeof(CountState), | ||
.agg_init = count_init, | ||
.agg_emit = count_emit, | ||
.agg_const = count_any_const, | ||
.agg_vector = count_any_vector, | ||
.agg_scalar = count_any_scalar, | ||
.agg_vector = count_any_many_vector, | ||
.agg_many_vector = count_any_many, | ||
}; | ||
|
||
/* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice with a comment before this check letting the reader know that that we want to try optimizing the 1-column case in a special way and that we later fall back to the "regular" per-batch grouping if the optimization wasn't possible.