Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized aggregation with grouping by one fixed-size column #7341

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
b92e622
Vectorized hash grouping on one column
akuzm Oct 2, 2024
4ce0e99
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 2, 2024
74d4419
benchmark vectorized grouping (2024-10-02 no. 6)
akuzm Oct 2, 2024
baedf7f
fixes
akuzm Oct 2, 2024
35dbd36
benchmark vectorized grouping (2024-10-02 no. 7)
akuzm Oct 2, 2024
74fffd3
some ugly stuff
akuzm Oct 2, 2024
f8db454
benchmark vectorized grouping (2024-10-02 no. 9)
akuzm Oct 2, 2024
00a9d11
someething
akuzm Oct 4, 2024
339f91a
reduce indirections
akuzm Oct 4, 2024
f075589
skip null bitmap words
akuzm Oct 8, 2024
88f325d
cleanup
akuzm Oct 9, 2024
15ab443
crc32
akuzm Oct 9, 2024
ff16ec8
license
akuzm Oct 9, 2024
4291b17
benchmark vectorized hash grouping (2024-10-09 no. 10)
akuzm Oct 9, 2024
795ef6b
test deltadelta changes
akuzm Oct 11, 2024
1fabb22
some speedups and simplehash simplifications
akuzm Oct 11, 2024
717abc4
Revert "test deltadelta changes"
akuzm Oct 11, 2024
b03bd6b
test deltadelta changes
akuzm Oct 11, 2024
166d0e8
work with signed types
akuzm Oct 14, 2024
7f578b4
Revert "work with signed types"
akuzm Oct 14, 2024
e70cb0b
bulk stuff specialized to element type
akuzm Oct 14, 2024
0040844
roll back the delta delta stuff
akuzm Oct 14, 2024
694faf6
use simplehash
akuzm Oct 14, 2024
3d05674
cleanup
akuzm Oct 14, 2024
d90a90f
benchmark vectorized hash grouping (simple) (2024-10-14 no. 11)
akuzm Oct 14, 2024
4a93549
add more tests
akuzm Oct 15, 2024
3e06b92
remove modified simplehash
akuzm Oct 15, 2024
a7942ed
offsets
akuzm Oct 15, 2024
6fb517f
cleanup
akuzm Oct 15, 2024
ffb28cf
changelog
akuzm Oct 15, 2024
778ca97
cleanup
akuzm Oct 15, 2024
ef3847a
benchmark vectorized hash grouping (simple) (2024-10-15 no. 12)
akuzm Oct 15, 2024
1409c74
32-bit
akuzm Oct 15, 2024
514ae96
some renames
akuzm Oct 15, 2024
22d23b3
cleanup
akuzm Oct 15, 2024
cd7a1dc
spelling
akuzm Oct 15, 2024
9ebd61f
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 24, 2024
9e51c19
Vectorize aggregate FILTER clause
akuzm Nov 19, 2024
480d0fe
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Nov 26, 2024
9b0ee38
cleanups after merge
akuzm Nov 28, 2024
effa7eb
cleanup
akuzm Dec 2, 2024
533be01
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Dec 2, 2024
8e6c6d2
changelog
akuzm Dec 2, 2024
b717f74
constify stable expressions
akuzm Dec 2, 2024
4df06d9
Merge commit '155ca6f7ef2925735c7063cd9178edd185c17009' into HEAD
akuzm Dec 3, 2024
47bcaa9
updates
akuzm Dec 3, 2024
b6cee02
remove extras
akuzm Dec 3, 2024
ecb1aec
ref
akuzm Dec 3, 2024
f64676f
fixes
akuzm Dec 3, 2024
fab11fb
benchmark single fixed-column hash grouping (2024-12-03 no. 11)
akuzm Dec 3, 2024
dff6dff
cleanup
akuzm Dec 3, 2024
831cadd
planning fixes for pg 17
akuzm Dec 4, 2024
66403f2
benchmark fixed-size hash grouping (2024-12-04 no. 152)
akuzm Dec 4, 2024
99e5b04
remove some (yet) unused code
akuzm Dec 4, 2024
de22a22
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Dec 16, 2024
9fccab9
ref
akuzm Dec 16, 2024
8e97c2f
Merge remote-tracking branch 'akuzm/vector-filter' into HEAD
akuzm Dec 16, 2024
f5b648a
add test
akuzm Dec 16, 2024
ecd9cb2
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Dec 16, 2024
dc6001d
typo
akuzm Dec 16, 2024
0ea397a
disable parallel
akuzm Dec 16, 2024
ea4dab1
add order
akuzm Dec 16, 2024
4b98e46
Update tsl/src/nodes/vector_agg/grouping_policy_hash.h
akuzm Dec 18, 2024
b615dbe
Update tsl/src/nodes/vector_agg/grouping_policy_hash.h
akuzm Dec 18, 2024
045f59a
determine the grouping type at plan time
akuzm Dec 18, 2024
10e66ad
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ jobs:
CC: clang-14
CXX: clang++-14
DEBIAN_FRONTEND: noninteractive
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum"
# vectorized_aggregation has different output on i386 because int8 is by
# reference and currently it cannot be used for vectorized hash grouping.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum vectorized_aggregation"
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
1 change: 1 addition & 0 deletions .unreleased/vectorized-grouping-one-fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7341 Vectorized aggregation with grouping by one fixed-size by-value compressed column (such as arithmetic types).
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
add_subdirectory(function)
add_subdirectory(hashing)
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c
${CMAKE_CURRENT_SOURCE_DIR}/plan.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
37 changes: 31 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"

static int
get_input_offset(DecompressChunkState *decompress_state, Var *var)
Expand Down Expand Up @@ -179,17 +180,41 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)

Var *var = castNode(Var, tlentry->expr);
col->input_offset = get_input_offset(decompress_state, var);
DecompressContext *dcontext = &decompress_state->decompress_context;
CompressionColumnDescription *desc =
&dcontext->compressed_chunk_columns[col->input_offset];
col->value_bytes = desc->value_bytes;
}
}

/*
* Currently the only grouping policy we use is per-batch grouping.
* Create the grouping policy chosen at plan time.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns);
const VectorAggGroupingType grouping_type =
intVal(list_nth(cscan->custom_private, VASI_GroupingType));
if (grouping_type == VAGT_Batch)
{
/*
* Per-batch grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns);
}
else
{
/*
* Hash grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_hash(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns,
grouping_type);
}
}

static void
Expand Down
1 change: 1 addition & 0 deletions tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct GroupingColumn
{
int input_offset;
int output_offset;
int value_bytes;
} GroupingColumn;

typedef struct
Expand Down
58 changes: 58 additions & 0 deletions tsl/src/nodes/vector_agg/function/agg_many_vector_helper.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

/*
* A generic implementation of adding the given batch to many aggregate function
* states with given offsets. Used for hash aggregation, and builds on the
* FUNCTION_NAME(one) function, which adds one passing non-null row to the given
* aggregate function state.
*/
static pg_attribute_always_inline void
FUNCTION_NAME(many_vector_impl)(void *restrict agg_states, const uint32 *offsets,
const uint64 *filter, int start_row, int end_row,
const ArrowArray *vector, MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(state) *restrict states = (FUNCTION_NAME(state) *) agg_states;
const CTYPE *values = vector->buffers[1];
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
for (int row = start_row; row < end_row; row++)
{
const CTYPE value = values[row];
FUNCTION_NAME(state) *restrict state = &states[offsets[row]];
if (arrow_row_is_valid(filter, row))
{
Assert(offsets[row] != 0);
FUNCTION_NAME(one)(state, value);
}
}
MemoryContextSwitchTo(old);
}

static pg_noinline void
FUNCTION_NAME(many_vector_all_valid)(void *restrict agg_states, const uint32 *offsets,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(many_vector_impl)
(agg_states, offsets, NULL, start_row, end_row, vector, agg_extra_mctx);
}

static void
FUNCTION_NAME(many_vector)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
if (filter == NULL)
{
FUNCTION_NAME(many_vector_all_valid)
(agg_states, offsets, start_row, end_row, vector, agg_extra_mctx);
}
else
{
FUNCTION_NAME(many_vector_impl)
(agg_states, offsets, filter, start_row, end_row, vector, agg_extra_mctx);
}
}
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/float48_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->Sx = newSx;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -325,6 +326,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#undef UPDATE
#undef COMBINE
Expand Down
75 changes: 75 additions & 0 deletions tsl/src/nodes/vector_agg/function/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,70 @@ count_star_scalar(void *agg_state, Datum constvalue, bool constisnull, int n,
state->count += n;
}

static pg_attribute_always_inline void
count_star_many_scalar_impl(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
CountState *states = (CountState *) agg_states;
for (int row = start_row; row < end_row; row++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking that it is really correct to be non-inclusive with the end_row here...

end_row sounds like it is a "valid" row index as opposed to using something like num_rows in a zero-indexed series. If end_row is not a valid index it should probably be called something else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's the C++ habit of mine where end is idiomatically a past-the-end invalid iterator. In general I think the ranges with exclusive right end are very common, like [begin, end). Do you have a better name for this? Sometimes I write past_the_end_row to make it absolutely clear, but this feels a little too long for common usage...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so what I suggested is num_rows, which has clear semantics. Then you can have a "standard" for-loop on i (starting at 0) and just get the row from start_row + i.

Not super-important, but the semantics are a bit more clear. Feel free to decide yourself what works best....

{
if (arrow_row_is_valid(filter, row))
{
states[offsets[row]].count++;
}
}
}

static pg_noinline void
count_star_many_scalar_nofilter(void *restrict agg_states, const uint32 *offsets, int start_row,
int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
count_star_many_scalar_impl(agg_states,
offsets,
NULL,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}

static void
count_star_many_scalar(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
if (filter == NULL)
{
count_star_many_scalar_nofilter(agg_states,
offsets,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}
else
{
count_star_many_scalar_impl(agg_states,
offsets,
filter,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}
}

VectorAggFunctions count_star_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_scalar = count_star_scalar,
.agg_emit = count_emit,
.agg_many_scalar = count_star_many_scalar,
};

/*
Expand Down Expand Up @@ -110,12 +169,28 @@ count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter
}
}

static void
count_any_many_vector(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
for (int row = start_row; row < end_row; row++)
erimatnor marked this conversation as resolved.
Show resolved Hide resolved
{
CountState *state = (offsets[row] + (CountState *) agg_states);
if (arrow_row_is_valid(filter, row))
{
state->count++;
}
}
}

VectorAggFunctions count_any_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_emit = count_emit,
.agg_scalar = count_any_scalar,
.agg_vector = count_any_vector,
.agg_many_vector = count_any_many_vector,
};

/*
Expand Down
16 changes: 16 additions & 0 deletions tsl/src/nodes/vector_agg/function/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ typedef struct
void (*agg_scalar)(void *restrict agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx);

/*
* Add the rows of the given arrow array to aggregate function states given
* by the respective offsets.
*/
void (*agg_many_vector)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx);

/*
* Same as above, but for a scalar argument. This is mostly important for
* count(*) and can be NULL.
*/
void (*agg_many_scalar)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx);

/* Emit a partial aggregation result. */
void (*agg_emit)(void *restrict agg_state, Datum *out_result, bool *out_isnull);
} VectorAggFunctions;
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int128_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
#endif
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -119,6 +120,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int24_avg_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->sum += value;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -47,6 +48,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = int24_avg_accum_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int24_sum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)

typedef Int24SumState FUNCTION_NAME(state);

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -70,6 +71,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = int_sum_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/minmax_arithmetic_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
}
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -67,6 +68,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = minmax_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/sum_float_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->result += value;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -100,6 +101,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
21 changes: 21 additions & 0 deletions tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ typedef struct GroupingPolicy
*/
void (*gp_reset)(GroupingPolicy *gp);

/*
* Aggregate a single compressed batch.
*/
void (*gp_add_batch)(GroupingPolicy *gp, DecompressBatchState *batch_state);

/*
Expand All @@ -51,6 +54,24 @@ typedef struct GroupingPolicy
char *(*gp_explain)(GroupingPolicy *gp);
} GroupingPolicy;

/*
* The various types of grouping we might use, as determined at planning time.
* The hashed subtypes are all implemented by hash grouping policy.
*/
typedef enum
{
VAGT_Invalid,
VAGT_Batch,
VAGT_HashSingleFixed2,
VAGT_HashSingleFixed4,
VAGT_HashSingleFixed8
} VectorAggGroupingType;

extern GroupingPolicy *create_grouping_policy_batch(int num_agg_defs, VectorAggDef *agg_defs,
int num_grouping_columns,
GroupingColumn *grouping_columns);

extern GroupingPolicy *create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs,
int num_grouping_columns,
GroupingColumn *grouping_columns,
VectorAggGroupingType grouping_type);
Loading
Loading