From 399b11a40fae728595d0fd5da5ac3a8f2f06a72b Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:31:19 +0100 Subject: [PATCH] Put vectorized aggregation results in short-lived memory context (#7461) It is enough to use the per-tuple context for them, now they end up in the ExecutorState context which is incorrect. --- tsl/src/nodes/vector_agg/exec.c | 26 +++-- tsl/src/nodes/vector_agg/plan.c | 10 ++ tsl/test/expected/vector_agg_memory.out | 121 ++++++++++++++++++++++++ tsl/test/sql/CMakeLists.txt | 1 + tsl/test/sql/vector_agg_memory.sql | 99 +++++++++++++++++++ 5 files changed, 251 insertions(+), 6 deletions(-) create mode 100644 tsl/test/expected/vector_agg_memory.out create mode 100644 tsl/test/sql/vector_agg_memory.sql diff --git a/tsl/src/nodes/vector_agg/exec.c b/tsl/src/nodes/vector_agg/exec.c index 1f198d799a9..2da6e21c063 100644 --- a/tsl/src/nodes/vector_agg/exec.c +++ b/tsl/src/nodes/vector_agg/exec.c @@ -194,23 +194,31 @@ static TupleTableSlot * vector_agg_exec(CustomScanState *node) { VectorAggState *vector_agg_state = (VectorAggState *) node; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + ResetExprContext(econtext); TupleTableSlot *aggregated_slot = vector_agg_state->custom.ss.ps.ps_ResultTupleSlot; ExecClearTuple(aggregated_slot); + /* + * If we have more partial aggregation results, continue returning them. + */ GroupingPolicy *grouping = vector_agg_state->grouping; - if (grouping->gp_do_emit(grouping, aggregated_slot)) + MemoryContext old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + bool have_partial = grouping->gp_do_emit(grouping, aggregated_slot); + MemoryContextSwitchTo(old_context); + if (have_partial) { /* The grouping policy produced a partial aggregation result. */ return ExecStoreVirtualTuple(aggregated_slot); } + /* + * If the partial aggregation results have ended, and the input has ended, + * we're done. + */ if (vector_agg_state->input_ended) { - /* - * The partial aggregation results have ended, and the input has ended, - * so we're done. - */ return NULL; } @@ -285,7 +293,13 @@ vector_agg_exec(CustomScanState *node) grouping->gp_add_batch(grouping, batch_state); } - if (grouping->gp_do_emit(grouping, aggregated_slot)) + /* + * If we have partial aggregation results, start returning them. + */ + old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + have_partial = grouping->gp_do_emit(grouping, aggregated_slot); + MemoryContextSwitchTo(old_context); + if (have_partial) { /* Have partial aggregation results. */ return ExecStoreVirtualTuple(aggregated_slot); diff --git a/tsl/src/nodes/vector_agg/plan.c b/tsl/src/nodes/vector_agg/plan.c index bd0a236de58..2d24cad5a7f 100644 --- a/tsl/src/nodes/vector_agg/plan.c +++ b/tsl/src/nodes/vector_agg/plan.c @@ -380,6 +380,11 @@ has_vector_agg_node(Plan *plan, bool *has_normal_agg) append_plans = custom->custom_plans; } } + else if (IsA(plan, SubqueryScan)) + { + SubqueryScan *subquery = castNode(SubqueryScan, plan); + append_plans = list_make1(subquery->subplan); + } if (append_plans) { @@ -437,6 +442,11 @@ try_insert_vector_agg_node(Plan *plan) append_plans = custom->custom_plans; } } + else if (IsA(plan, SubqueryScan)) + { + SubqueryScan *subquery = castNode(SubqueryScan, plan); + append_plans = list_make1(subquery->subplan); + } if (append_plans) { diff --git a/tsl/test/expected/vector_agg_memory.out b/tsl/test/expected/vector_agg_memory.out new file mode 100644 index 00000000000..868075cc890 --- /dev/null +++ b/tsl/test/expected/vector_agg_memory.out @@ -0,0 +1,121 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +-- Helper function that returns the amount of memory currently allocated in a +-- given memory context. +create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint + as :MODULE_PATHNAME, 'ts_debug_allocated_bytes' + language c strict volatile; +create table mvagg (t int, s0 int, s1 int); +select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int); +NOTICE: adding not-null constraint to column "t" + create_hypertable +-------------------- + (1,public,mvagg,t) +(1 row) + +insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1; +-- Need two segmentbys to prevent compressed index scans to force hash aggregation. +-- Otherwise we might get GroupAggregate with Sort which uses linear memory in +-- the number of compressed batches. +alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1'); +NOTICE: default order by for hypertable "mvagg" is set to "t DESC" +-- Need to inflate the estimated cardinalities of segmentby columns, again to +-- force the hash aggregation. +insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x; +-- Need two chunks for chunkwise aggregation. +insert into mvagg select -1 t, 1 s0, 1 s1; +select count(compress_chunk(x)) from show_chunks('mvagg') x; + count +------- + 2 +(1 row) + +vacuum analyze mvagg; +-- We are going to log memory usage as a function of number of aggregated elements +-- here. +create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint); +-- First, ensure that the underlying decompression has constant memory usage. +explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes, + s0, s1, t + from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------- + Result + -> Unique + -> Merge Append + Sort Key: _hyper_1_1_chunk.s0, _hyper_1_1_chunk.s1, _hyper_1_1_chunk.t DESC + -> Custom Scan (DecompressChunk) on _hyper_1_1_chunk + Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000)) + -> Index Scan using compress_hyper_2_3_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_3_chunk + Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer)) + -> Custom Scan (DecompressChunk) on _hyper_1_2_chunk + Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000)) + -> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk + Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer)) +(12 rows) + +truncate log; +\set ECHO none +select * from log where ( + -- Ideally the memory usage should be constant, but we have to allow for + -- small spurious changes to make this test more robust. + select regr_slope(bytes, n) > 1/65536 from log +); + n | bytes | a | b | c | d | e | f +---+-------+---+---+---+---+---+--- +(0 rows) + +-- Test the vectorized aggregation with grouping by segmentby with various number +-- of input row. We expect approximately constant memory usage. +truncate log; +set max_parallel_workers_per_gather = 0; +set timescaledb.debug_require_vector_agg = 'require'; +-- Despite the tweaks above, we are unable to force the HashAggregation, because +-- the unsorted DecompressChunk paths for aggregation are not created properly +-- (see issue #6836). Limit the memory consumed by tuplesort. +set work_mem = '64kB'; +explain (costs off) select ts_debug_allocated_bytes() bytes, + count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f + from mvagg where t >= -1 and t < 1000000 group by s1; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize GroupAggregate + Group Key: _hyper_1_1_chunk.s1 + -> Sort + Sort Key: _hyper_1_1_chunk.s1 + -> Append + -> Custom Scan (VectorAgg) + -> Custom Scan (DecompressChunk) on _hyper_1_1_chunk + Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000)) + -> Sort + Sort Key: compress_hyper_2_3_chunk.s1 + -> Seq Scan on compress_hyper_2_3_chunk + Filter: ((_ts_meta_max_1 >= '-1'::integer) AND (_ts_meta_min_1 < 1000000)) + -> Custom Scan (VectorAgg) + -> Custom Scan (DecompressChunk) on _hyper_1_2_chunk + Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000)) + -> Sort + Sort Key: compress_hyper_2_4_chunk.s1 + -> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk + Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer)) +(19 rows) + +\set ECHO none +reset timescaledb.debug_require_vector_agg; +reset max_parallel_workers_per_gather; +reset work_mem; +select * from log where ( + -- For aggregation by segmentby, memory usage should be constant regardless + -- of the number of tuples. Still, we have to allow for small variations + -- that can be caused by other reasons. Currently the major increase is + -- caused by tuplesort, because we are unable to force hash aggregation due + -- to unrelated planning bugs. + select regr_slope(bytes, n) > 0.05 from log +); + n | bytes | a | b | c | d | e | f +---+-------+---+---+---+---+---+--- +(0 rows) + +reset timescaledb.debug_require_vector_agg; diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 9cc7e62b84a..50ab7107326 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -114,6 +114,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) recompress_chunk_segmentwise.sql feature_flags.sql vector_agg_default.sql + vector_agg_memory.sql vector_agg_segmentby.sql) list( diff --git a/tsl/test/sql/vector_agg_memory.sql b/tsl/test/sql/vector_agg_memory.sql new file mode 100644 index 00000000000..84a32071319 --- /dev/null +++ b/tsl/test/sql/vector_agg_memory.sql @@ -0,0 +1,99 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\c :TEST_DBNAME :ROLE_SUPERUSER + +-- Helper function that returns the amount of memory currently allocated in a +-- given memory context. +create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint + as :MODULE_PATHNAME, 'ts_debug_allocated_bytes' + language c strict volatile; + + +create table mvagg (t int, s0 int, s1 int); +select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int); +insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1; + +-- Need two segmentbys to prevent compressed index scans to force hash aggregation. +-- Otherwise we might get GroupAggregate with Sort which uses linear memory in +-- the number of compressed batches. +alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1'); +-- Need to inflate the estimated cardinalities of segmentby columns, again to +-- force the hash aggregation. +insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x; +-- Need two chunks for chunkwise aggregation. +insert into mvagg select -1 t, 1 s0, 1 s1; + +select count(compress_chunk(x)) from show_chunks('mvagg') x; + +vacuum analyze mvagg; + +-- We are going to log memory usage as a function of number of aggregated elements +-- here. +create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint); + + +-- First, ensure that the underlying decompression has constant memory usage. +explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes, + s0, s1, t + from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc; + +truncate log; +\set ECHO none +select +format('insert into log + select distinct on (s0, s1) %1$s, + ts_debug_allocated_bytes() bytes, + 0 a, 0 b, 0 c, 0 d, 0 e, 0 f + from mvagg where t >= -1 and t < %1$s + order by s0, s1, t desc', + pow(10, generate_series(1, 7))) +\gexec +\set ECHO all + +select * from log where ( + -- Ideally the memory usage should be constant, but we have to allow for + -- small spurious changes to make this test more robust. + select regr_slope(bytes, n) > 1/65536 from log +); + +-- Test the vectorized aggregation with grouping by segmentby with various number +-- of input row. We expect approximately constant memory usage. +truncate log; +set max_parallel_workers_per_gather = 0; +set timescaledb.debug_require_vector_agg = 'require'; +-- Despite the tweaks above, we are unable to force the HashAggregation, because +-- the unsorted DecompressChunk paths for aggregation are not created properly +-- (see issue #6836). Limit the memory consumed by tuplesort. +set work_mem = '64kB'; + +explain (costs off) select ts_debug_allocated_bytes() bytes, + count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f + from mvagg where t >= -1 and t < 1000000 group by s1; + +\set ECHO none +select +format('insert into log + select %1$s, + ts_debug_allocated_bytes() bytes, + count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f + from mvagg where t >= -1 and t < %1$s group by s1', + pow(10, generate_series(1, 7))) +\gexec +\set ECHO all + +reset timescaledb.debug_require_vector_agg; +reset max_parallel_workers_per_gather; +reset work_mem; + +select * from log where ( + -- For aggregation by segmentby, memory usage should be constant regardless + -- of the number of tuples. Still, we have to allow for small variations + -- that can be caused by other reasons. Currently the major increase is + -- caused by tuplesort, because we are unable to force hash aggregation due + -- to unrelated planning bugs. + select regr_slope(bytes, n) > 0.05 from log +); + +reset timescaledb.debug_require_vector_agg;