Skip to content

Commit

Permalink
Put vectorized aggregation results in short-lived memory context (#7461)
Browse files Browse the repository at this point in the history
It is enough to use the per-tuple context for them, now they end up in
the ExecutorState context which is incorrect.
  • Loading branch information
akuzm authored Nov 21, 2024
1 parent f88488a commit 399b11a
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 6 deletions.
26 changes: 20 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,31 @@ static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
VectorAggState *vector_agg_state = (VectorAggState *) node;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
ResetExprContext(econtext);

TupleTableSlot *aggregated_slot = vector_agg_state->custom.ss.ps.ps_ResultTupleSlot;
ExecClearTuple(aggregated_slot);

/*
* If we have more partial aggregation results, continue returning them.
*/
GroupingPolicy *grouping = vector_agg_state->grouping;
if (grouping->gp_do_emit(grouping, aggregated_slot))
MemoryContext old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
bool have_partial = grouping->gp_do_emit(grouping, aggregated_slot);
MemoryContextSwitchTo(old_context);
if (have_partial)
{
/* The grouping policy produced a partial aggregation result. */
return ExecStoreVirtualTuple(aggregated_slot);
}

/*
* If the partial aggregation results have ended, and the input has ended,
* we're done.
*/
if (vector_agg_state->input_ended)
{
/*
* The partial aggregation results have ended, and the input has ended,
* so we're done.
*/
return NULL;
}

Expand Down Expand Up @@ -285,7 +293,13 @@ vector_agg_exec(CustomScanState *node)
grouping->gp_add_batch(grouping, batch_state);
}

if (grouping->gp_do_emit(grouping, aggregated_slot))
/*
* If we have partial aggregation results, start returning them.
*/
old_context = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
have_partial = grouping->gp_do_emit(grouping, aggregated_slot);
MemoryContextSwitchTo(old_context);
if (have_partial)
{
/* Have partial aggregation results. */
return ExecStoreVirtualTuple(aggregated_slot);
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ has_vector_agg_node(Plan *plan, bool *has_normal_agg)
append_plans = custom->custom_plans;
}
}
else if (IsA(plan, SubqueryScan))
{
SubqueryScan *subquery = castNode(SubqueryScan, plan);
append_plans = list_make1(subquery->subplan);
}

if (append_plans)
{
Expand Down Expand Up @@ -437,6 +442,11 @@ try_insert_vector_agg_node(Plan *plan)
append_plans = custom->custom_plans;
}
}
else if (IsA(plan, SubqueryScan))
{
SubqueryScan *subquery = castNode(SubqueryScan, plan);
append_plans = list_make1(subquery->subplan);
}

if (append_plans)
{
Expand Down
121 changes: 121 additions & 0 deletions tsl/test/expected/vector_agg_memory.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
-- Helper function that returns the amount of memory currently allocated in a
-- given memory context.
create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
language c strict volatile;
create table mvagg (t int, s0 int, s1 int);
select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int);
NOTICE: adding not-null constraint to column "t"
create_hypertable
--------------------
(1,public,mvagg,t)
(1 row)

insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1;
-- Need two segmentbys to prevent compressed index scans to force hash aggregation.
-- Otherwise we might get GroupAggregate with Sort which uses linear memory in
-- the number of compressed batches.
alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1');
NOTICE: default order by for hypertable "mvagg" is set to "t DESC"
-- Need to inflate the estimated cardinalities of segmentby columns, again to
-- force the hash aggregation.
insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x;
-- Need two chunks for chunkwise aggregation.
insert into mvagg select -1 t, 1 s0, 1 s1;
select count(compress_chunk(x)) from show_chunks('mvagg') x;
count
-------
2
(1 row)

vacuum analyze mvagg;
-- We are going to log memory usage as a function of number of aggregated elements
-- here.
create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint);
-- First, ensure that the underlying decompression has constant memory usage.
explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes,
s0, s1, t
from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
Result
-> Unique
-> Merge Append
Sort Key: _hyper_1_1_chunk.s0, _hyper_1_1_chunk.s1, _hyper_1_1_chunk.t DESC
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Index Scan using compress_hyper_2_3_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_3_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
(12 rows)

truncate log;
\set ECHO none
select * from log where (
-- Ideally the memory usage should be constant, but we have to allow for
-- small spurious changes to make this test more robust.
select regr_slope(bytes, n) > 1/65536 from log
);
n | bytes | a | b | c | d | e | f
---+-------+---+---+---+---+---+---
(0 rows)

-- Test the vectorized aggregation with grouping by segmentby with various number
-- of input row. We expect approximately constant memory usage.
truncate log;
set max_parallel_workers_per_gather = 0;
set timescaledb.debug_require_vector_agg = 'require';
-- Despite the tweaks above, we are unable to force the HashAggregation, because
-- the unsorted DecompressChunk paths for aggregation are not created properly
-- (see issue #6836). Limit the memory consumed by tuplesort.
set work_mem = '64kB';
explain (costs off) select ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < 1000000 group by s1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: _hyper_1_1_chunk.s1
-> Sort
Sort Key: _hyper_1_1_chunk.s1
-> Append
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Sort
Sort Key: compress_hyper_2_3_chunk.s1
-> Seq Scan on compress_hyper_2_3_chunk
Filter: ((_ts_meta_max_1 >= '-1'::integer) AND (_ts_meta_min_1 < 1000000))
-> Custom Scan (VectorAgg)
-> Custom Scan (DecompressChunk) on _hyper_1_2_chunk
Vectorized Filter: ((t >= '-1'::integer) AND (t < 1000000))
-> Sort
Sort Key: compress_hyper_2_4_chunk.s1
-> Index Scan using compress_hyper_2_4_chunk_s0_s1__ts_meta_min_1__ts_meta_max__idx on compress_hyper_2_4_chunk
Index Cond: ((_ts_meta_min_1 < 1000000) AND (_ts_meta_max_1 >= '-1'::integer))
(19 rows)

\set ECHO none
reset timescaledb.debug_require_vector_agg;
reset max_parallel_workers_per_gather;
reset work_mem;
select * from log where (
-- For aggregation by segmentby, memory usage should be constant regardless
-- of the number of tuples. Still, we have to allow for small variations
-- that can be caused by other reasons. Currently the major increase is
-- caused by tuplesort, because we are unable to force hash aggregation due
-- to unrelated planning bugs.
select regr_slope(bytes, n) > 0.05 from log
);
n | bytes | a | b | c | d | e | f
---+-------+---+---+---+---+---+---
(0 rows)

reset timescaledb.debug_require_vector_agg;
1 change: 1 addition & 0 deletions tsl/test/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
recompress_chunk_segmentwise.sql
feature_flags.sql
vector_agg_default.sql
vector_agg_memory.sql
vector_agg_segmentby.sql)

list(
Expand Down
99 changes: 99 additions & 0 deletions tsl/test/sql/vector_agg_memory.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

\c :TEST_DBNAME :ROLE_SUPERUSER

-- Helper function that returns the amount of memory currently allocated in a
-- given memory context.
create or replace function ts_debug_allocated_bytes(text = 'PortalContext') returns bigint
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
language c strict volatile;


create table mvagg (t int, s0 int, s1 int);
select create_hypertable('mvagg', 't', chunk_time_interval => pow(10, 9)::int);
insert into mvagg select generate_series(1, 2 * pow(10, 6)::int) t, 1 s0, 1 s1;

-- Need two segmentbys to prevent compressed index scans to force hash aggregation.
-- Otherwise we might get GroupAggregate with Sort which uses linear memory in
-- the number of compressed batches.
alter table mvagg set (timescaledb.compress, timescaledb.compress_segmentby='s0, s1');
-- Need to inflate the estimated cardinalities of segmentby columns, again to
-- force the hash aggregation.
insert into mvagg select -1 - x t, -x s0, -x s1 from generate_series(1, 1000) x;
-- Need two chunks for chunkwise aggregation.
insert into mvagg select -1 t, 1 s0, 1 s1;

select count(compress_chunk(x)) from show_chunks('mvagg') x;

vacuum analyze mvagg;

-- We are going to log memory usage as a function of number of aggregated elements
-- here.
create table log(n int, bytes int, a bigint, b bigint, c bigint, d bigint, e bigint, f bigint);


-- First, ensure that the underlying decompression has constant memory usage.
explain (costs off) select distinct on (s0, s1) ts_debug_allocated_bytes() bytes,
s0, s1, t
from mvagg where t >= -1 and t < 1000000 order by s0, s1, t desc;

truncate log;
\set ECHO none
select
format('insert into log
select distinct on (s0, s1) %1$s,
ts_debug_allocated_bytes() bytes,
0 a, 0 b, 0 c, 0 d, 0 e, 0 f
from mvagg where t >= -1 and t < %1$s
order by s0, s1, t desc',
pow(10, generate_series(1, 7)))
\gexec
\set ECHO all

select * from log where (
-- Ideally the memory usage should be constant, but we have to allow for
-- small spurious changes to make this test more robust.
select regr_slope(bytes, n) > 1/65536 from log
);

-- Test the vectorized aggregation with grouping by segmentby with various number
-- of input row. We expect approximately constant memory usage.
truncate log;
set max_parallel_workers_per_gather = 0;
set timescaledb.debug_require_vector_agg = 'require';
-- Despite the tweaks above, we are unable to force the HashAggregation, because
-- the unsorted DecompressChunk paths for aggregation are not created properly
-- (see issue #6836). Limit the memory consumed by tuplesort.
set work_mem = '64kB';

explain (costs off) select ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < 1000000 group by s1;

\set ECHO none
select
format('insert into log
select %1$s,
ts_debug_allocated_bytes() bytes,
count(*) a, count(t) b, sum(t) c, avg(t) d, min(t) e, max(t) f
from mvagg where t >= -1 and t < %1$s group by s1',
pow(10, generate_series(1, 7)))
\gexec
\set ECHO all

reset timescaledb.debug_require_vector_agg;
reset max_parallel_workers_per_gather;
reset work_mem;

select * from log where (
-- For aggregation by segmentby, memory usage should be constant regardless
-- of the number of tuples. Still, we have to allow for small variations
-- that can be caused by other reasons. Currently the major increase is
-- caused by tuplesort, because we are unable to force hash aggregation due
-- to unrelated planning bugs.
select regr_slope(bytes, n) > 0.05 from log
);

reset timescaledb.debug_require_vector_agg;

0 comments on commit 399b11a

Please sign in to comment.