-
Notifications
You must be signed in to change notification settings - Fork 894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorized aggregation with grouping by one fixed-size column #7341
base: main
Are you sure you want to change the base?
Changes from 62 commits
b92e622
4ce0e99
74d4419
baedf7f
35dbd36
74fffd3
f8db454
00a9d11
339f91a
f075589
88f325d
15ab443
ff16ec8
4291b17
795ef6b
1fabb22
717abc4
b03bd6b
166d0e8
7f578b4
e70cb0b
0040844
694faf6
3d05674
d90a90f
4a93549
3e06b92
a7942ed
6fb517f
ffb28cf
778ca97
ef3847a
1409c74
514ae96
22d23b3
cd7a1dc
9ebd61f
9e51c19
480d0fe
9b0ee38
effa7eb
533be01
8e6c6d2
b717f74
4df06d9
47bcaa9
b6cee02
ecb1aec
f64676f
fab11fb
dff6dff
831cadd
66403f2
99e5b04
de22a22
9fccab9
8e97c2f
f5b648a
ecd9cb2
dc6001d
0ea397a
ea4dab1
4b98e46
b615dbe
045f59a
10e66ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Implements: #7341 Vectorized aggregation with grouping by one fixed-size by-value compressed column (such as arithmetic types). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
add_subdirectory(function) | ||
add_subdirectory(hashing) | ||
set(SOURCES | ||
${CMAKE_CURRENT_SOURCE_DIR}/exec.c | ||
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c | ||
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c | ||
${CMAKE_CURRENT_SOURCE_DIR}/plan.c) | ||
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* This file and its contents are licensed under the Timescale License. | ||
* Please see the included NOTICE for copyright information and | ||
* LICENSE-TIMESCALE for a copy of the license. | ||
*/ | ||
|
||
/* | ||
* A generic implementation of adding the given batch to many aggregate function | ||
* states with given offsets. Used for hash aggregation, and builds on the | ||
* FUNCTION_NAME(one) function, which adds one passing non-null row to the given | ||
* aggregate function state. | ||
*/ | ||
static pg_attribute_always_inline void | ||
FUNCTION_NAME(many_vector_impl)(void *restrict agg_states, const uint32 *offsets, | ||
const uint64 *filter, int start_row, int end_row, | ||
const ArrowArray *vector, MemoryContext agg_extra_mctx) | ||
{ | ||
FUNCTION_NAME(state) *restrict states = (FUNCTION_NAME(state) *) agg_states; | ||
const CTYPE *values = vector->buffers[1]; | ||
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx); | ||
for (int row = start_row; row < end_row; row++) | ||
{ | ||
const CTYPE value = values[row]; | ||
FUNCTION_NAME(state) *restrict state = &states[offsets[row]]; | ||
if (arrow_row_is_valid(filter, row)) | ||
{ | ||
Assert(offsets[row] != 0); | ||
FUNCTION_NAME(one)(state, value); | ||
} | ||
} | ||
MemoryContextSwitchTo(old); | ||
} | ||
|
||
static pg_noinline void | ||
FUNCTION_NAME(many_vector_all_valid)(void *restrict agg_states, const uint32 *offsets, | ||
int start_row, int end_row, const ArrowArray *vector, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
FUNCTION_NAME(many_vector_impl) | ||
(agg_states, offsets, NULL, start_row, end_row, vector, agg_extra_mctx); | ||
} | ||
|
||
static void | ||
FUNCTION_NAME(many_vector)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter, | ||
int start_row, int end_row, const ArrowArray *vector, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
if (filter == NULL) | ||
{ | ||
FUNCTION_NAME(many_vector_all_valid) | ||
(agg_states, offsets, start_row, end_row, vector, agg_extra_mctx); | ||
} | ||
else | ||
{ | ||
FUNCTION_NAME(many_vector_impl) | ||
(agg_states, offsets, filter, start_row, end_row, vector, agg_extra_mctx); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,11 +52,70 @@ count_star_scalar(void *agg_state, Datum constvalue, bool constisnull, int n, | |
state->count += n; | ||
} | ||
|
||
static pg_attribute_always_inline void | ||
count_star_many_scalar_impl(void *restrict agg_states, const uint32 *offsets, const uint64 *filter, | ||
int start_row, int end_row, Datum constvalue, bool constisnull, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
CountState *states = (CountState *) agg_states; | ||
for (int row = start_row; row < end_row; row++) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double-checking that it is really correct to be non-inclusive with the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that's the C++ habit of mine where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so what I suggested is Not super-important, but the semantics are a bit more clear. Feel free to decide yourself what works best.... |
||
{ | ||
if (arrow_row_is_valid(filter, row)) | ||
{ | ||
states[offsets[row]].count++; | ||
} | ||
} | ||
} | ||
|
||
static pg_noinline void | ||
count_star_many_scalar_nofilter(void *restrict agg_states, const uint32 *offsets, int start_row, | ||
int end_row, Datum constvalue, bool constisnull, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
count_star_many_scalar_impl(agg_states, | ||
offsets, | ||
NULL, | ||
start_row, | ||
end_row, | ||
constvalue, | ||
constisnull, | ||
agg_extra_mctx); | ||
} | ||
|
||
static void | ||
count_star_many_scalar(void *restrict agg_states, const uint32 *offsets, const uint64 *filter, | ||
int start_row, int end_row, Datum constvalue, bool constisnull, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
if (filter == NULL) | ||
{ | ||
count_star_many_scalar_nofilter(agg_states, | ||
offsets, | ||
start_row, | ||
end_row, | ||
constvalue, | ||
constisnull, | ||
agg_extra_mctx); | ||
} | ||
else | ||
{ | ||
count_star_many_scalar_impl(agg_states, | ||
offsets, | ||
filter, | ||
start_row, | ||
end_row, | ||
constvalue, | ||
constisnull, | ||
agg_extra_mctx); | ||
} | ||
} | ||
|
||
VectorAggFunctions count_star_agg = { | ||
.state_bytes = sizeof(CountState), | ||
.agg_init = count_init, | ||
.agg_scalar = count_star_scalar, | ||
.agg_emit = count_emit, | ||
.agg_many_scalar = count_star_many_scalar, | ||
}; | ||
|
||
/* | ||
|
@@ -110,12 +169,28 @@ count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter | |
} | ||
} | ||
|
||
static void | ||
count_any_many_vector(void *restrict agg_states, const uint32 *offsets, const uint64 *filter, | ||
int start_row, int end_row, const ArrowArray *vector, | ||
MemoryContext agg_extra_mctx) | ||
{ | ||
for (int row = start_row; row < end_row; row++) | ||
erimatnor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
CountState *state = (offsets[row] + (CountState *) agg_states); | ||
if (arrow_row_is_valid(filter, row)) | ||
{ | ||
state->count++; | ||
} | ||
} | ||
} | ||
|
||
VectorAggFunctions count_any_agg = { | ||
.state_bytes = sizeof(CountState), | ||
.agg_init = count_init, | ||
.agg_emit = count_emit, | ||
.agg_scalar = count_any_scalar, | ||
.agg_vector = count_any_vector, | ||
.agg_many_vector = count_any_many_vector, | ||
}; | ||
|
||
/* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity: Why is the grouping policy decided at execution time and not plan time? Should it not affect the plan and cost calc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it all to plan time, although as we discussed today on call, it doesn't affect the costs yet.