From 01a65d89fb02cd1835f5e13d23d8c18cd3a571a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Fri, 24 Nov 2023 15:04:56 +0100 Subject: [PATCH] Break out decompress state from DecompressChunkState This change continues the refactor of transparent decompression in order to make it more modular. A new structure called DecompressContext is introduced that holds the state necessary to do execution-time decompression. The context can be passed around to related code without passing on the full DecompressChunkState node. However, this is not yet fully realized and DecompressChunkState is still passed around across many modules. This will be addressed in follow-on changes. --- .../nodes/decompress_chunk/batch_queue_fifo.h | 21 ++++--- .../nodes/decompress_chunk/batch_queue_heap.c | 18 +++--- .../nodes/decompress_chunk/compressed_batch.c | 29 ++++++---- tsl/src/nodes/decompress_chunk/exec.c | 58 ++++++++++--------- tsl/src/nodes/decompress_chunk/exec.h | 26 +++++---- 5 files changed, 87 insertions(+), 65 deletions(-) diff --git a/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h b/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h index 9f3b245c7af..ada20c824e6 100644 --- a/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h +++ b/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h @@ -14,28 +14,32 @@ inline static void batch_queue_fifo_create(DecompressChunkState *chunk_state) { - batch_array_init(&chunk_state->batch_array, + DecompressContext *dcontext = &chunk_state->decompress_context; + + batch_array_init(&dcontext->batch_array, 1, chunk_state->num_compressed_columns, - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); } inline static void batch_queue_fifo_free(DecompressChunkState *chunk_state) { - batch_array_destroy(&chunk_state->batch_array); + batch_array_destroy(&chunk_state->decompress_context.batch_array); } inline static bool batch_queue_fifo_needs_next_batch(DecompressChunkState *chunk_state) { - return TupIsNull(batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot); + return TupIsNull(batch_array_get_at(&chunk_state->decompress_context.batch_array, 0) + ->decompressed_scan_slot); } inline static void batch_queue_fifo_pop(DecompressChunkState *chunk_state) { - DecompressBatchState *batch_state = batch_array_get_at(&chunk_state->batch_array, 0); + DecompressBatchState *batch_state = + batch_array_get_at(&chunk_state->decompress_context.batch_array, 0); if (TupIsNull(batch_state->decompressed_scan_slot)) { /* Allow this function to be called on the initial empty queue. */ @@ -48,7 +52,7 @@ batch_queue_fifo_pop(DecompressChunkState *chunk_state) inline static void batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; DecompressBatchState *batch_state = batch_array_get_at(batch_array, 0); Assert(TupIsNull(batch_array_get_at(batch_array, 0)->decompressed_scan_slot)); compressed_batch_set_compressed_tuple(chunk_state, batch_state, compressed_slot); @@ -58,11 +62,12 @@ batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c inline static void batch_queue_fifo_reset(DecompressChunkState *chunk_state) { - batch_array_clear_at(&chunk_state->batch_array, 0); + batch_array_clear_at(&chunk_state->decompress_context.batch_array, 0); } inline static TupleTableSlot * batch_queue_fifo_top_tuple(DecompressChunkState *chunk_state) { - return batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot; + return batch_array_get_at(&chunk_state->decompress_context.batch_array, 0) + ->decompressed_scan_slot; } diff --git a/tsl/src/nodes/decompress_chunk/batch_queue_heap.c b/tsl/src/nodes/decompress_chunk/batch_queue_heap.c index 011bec0a445..f2d9c6bbc63 100644 --- a/tsl/src/nodes/decompress_chunk/batch_queue_heap.c +++ b/tsl/src/nodes/decompress_chunk/batch_queue_heap.c @@ -58,7 +58,7 @@ static int32 decompress_binaryheap_compare_heap_pos(Datum a, Datum b, void *arg) { DecompressChunkState *chunk_state = (DecompressChunkState *) arg; - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; int batchA = DatumGetInt32(a); Assert(batchA <= batch_array->n_batch_states); @@ -95,7 +95,7 @@ binaryheap_add_unordered_autoresize(binaryheap *heap, Datum d) void batch_queue_heap_pop(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -124,7 +124,7 @@ batch_queue_heap_pop(DecompressChunkState *chunk_state) bool batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -156,7 +156,7 @@ batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state) void batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; Assert(!TupIsNull(compressed_slot)); @@ -182,7 +182,7 @@ batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c TupleTableSlot * batch_queue_heap_top_tuple(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -198,10 +198,12 @@ batch_queue_heap_top_tuple(DecompressChunkState *chunk_state) void batch_queue_heap_create(DecompressChunkState *chunk_state) { - batch_array_init(&chunk_state->batch_array, + DecompressContext *dcontext = &chunk_state->decompress_context; + + batch_array_init(&dcontext->batch_array, INITIAL_BATCH_CAPACITY, chunk_state->num_compressed_columns, - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); chunk_state->merge_heap = binaryheap_allocate(INITIAL_BATCH_CAPACITY, decompress_binaryheap_compare_heap_pos, @@ -225,7 +227,7 @@ batch_queue_heap_reset(DecompressChunkState *chunk_state) void batch_queue_heap_free(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; elog(DEBUG3, "Heap has capacity of %d", chunk_state->merge_heap->bh_space); elog(DEBUG3, "Created batch states %d", batch_array->n_batch_states); diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 99c6868690b..5875d23f9e0 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -81,6 +81,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull) static void decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, int i) { + DecompressContext *dcontext = &chunk_state->decompress_context; DecompressChunkColumnDescription *column_description = &chunk_state->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; column_values->iterator = NULL; @@ -115,11 +116,11 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value); ArrowArray *arrow = NULL; - if (chunk_state->enable_bulk_decompression && column_description->bulk_decompression_supported) + if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported) { - if (chunk_state->bulk_decompression_context == NULL) + if (dcontext->bulk_decompression_context == NULL) { - chunk_state->bulk_decompression_context = create_bulk_decompression_mctx( + dcontext->bulk_decompression_context = create_bulk_decompression_mctx( MemoryContextGetParent(batch_state->per_batch_context)); } @@ -128,13 +129,13 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch Assert(decompress_all != NULL); MemoryContext context_before_decompression = - MemoryContextSwitchTo(chunk_state->bulk_decompression_context); + MemoryContextSwitchTo(dcontext->bulk_decompression_context); arrow = decompress_all(PointerGetDatum(header), column_description->typid, batch_state->per_batch_context); - MemoryContextReset(chunk_state->bulk_decompression_context); + MemoryContextReset(dcontext->bulk_decompression_context); MemoryContextSwitchTo(context_before_decompression); } @@ -158,8 +159,8 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch /* As a fallback, decompress row-by-row. */ column_values->iterator = tsl_get_decompression_iterator_init(header->compression_algorithm, - chunk_state->reverse)(PointerGetDatum(header), - column_description->typid); + dcontext->reverse)(PointerGetDatum(header), + column_description->typid); } /* @@ -330,6 +331,8 @@ void compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, TupleTableSlot *subslot) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(TupIsNull(batch_state->decompressed_scan_slot)); /* @@ -340,7 +343,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, { /* Init memory context */ batch_state->per_batch_context = - create_per_batch_mctx(chunk_state->batch_memory_context_bytes); + create_per_batch_mctx(dcontext->batch_memory_context_bytes); Assert(batch_state->per_batch_context != NULL); Assert(batch_state->compressed_slot == NULL); @@ -502,6 +505,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, static void make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; TupleTableSlot *decompressed_scan_slot = batch_state->decompressed_scan_slot; Assert(decompressed_scan_slot != NULL); @@ -509,9 +513,8 @@ make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_s Assert(batch_state->next_batch_row < batch_state->total_batch_rows); const int output_row = batch_state->next_batch_row; - const size_t arrow_row = unlikely(chunk_state->reverse) ? - batch_state->total_batch_rows - 1 - output_row : - output_row; + const size_t arrow_row = + unlikely(dcontext->reverse) ? batch_state->total_batch_rows - 1 - output_row : output_row; const int num_compressed_columns = chunk_state->num_compressed_columns; for (int i = 0; i < num_compressed_columns; i++) @@ -588,12 +591,14 @@ make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_s static bool vector_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(batch_state->total_batch_rows > 0); Assert(batch_state->next_batch_row < batch_state->total_batch_rows); const int output_row = batch_state->next_batch_row; const size_t arrow_row = - chunk_state->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row; + dcontext->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row; if (!batch_state->vector_qual_result) { diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 70ae40e7ec5..9edd1fa54b5 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -155,9 +155,9 @@ decompress_chunk_state_create(CustomScan *cscan) Assert(list_length(settings) == 6); chunk_state->hypertable_id = linitial_int(settings); chunk_state->chunk_relid = lsecond_int(settings); - chunk_state->reverse = lthird_int(settings); + chunk_state->decompress_context.reverse = lthird_int(settings); chunk_state->batch_sorted_merge = lfourth_int(settings); - chunk_state->enable_bulk_decompression = lfifth_int(settings); + chunk_state->decompress_context.enable_bulk_decompression = lfifth_int(settings); chunk_state->perform_vectorized_aggregation = lsixth_int(settings); Assert(IsA(cscan->custom_exprs, List)); @@ -265,6 +265,7 @@ static void decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) { DecompressChunkState *chunk_state = (DecompressChunkState *) node; + DecompressContext *dcontext = &chunk_state->decompress_context; CustomScan *cscan = castNode(CustomScan, node->ss.ps.plan); Plan *compressed_scan = linitial(cscan->custom_plans); Assert(list_length(cscan->custom_plans) == 1); @@ -436,8 +437,8 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) * * Start with the default size. */ - chunk_state->batch_memory_context_bytes = ALLOCSET_DEFAULT_INITSIZE; - if (chunk_state->enable_bulk_decompression) + dcontext->batch_memory_context_bytes = ALLOCSET_DEFAULT_INITSIZE; + if (dcontext->enable_bulk_decompression) { for (int i = 0; i < num_total; i++) { @@ -445,31 +446,31 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) if (column->bulk_decompression_supported) { /* Values array, with 64 element padding (actually we have less). */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += (GLOBAL_MAX_ROWS_PER_COMPRESSION + 64) * column->value_bytes; /* Also nulls bitmap. */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += GLOBAL_MAX_ROWS_PER_COMPRESSION / (64 * sizeof(uint64)); /* Arrow data structure. */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += sizeof(ArrowArray) + sizeof(void *) * 2 /* buffers */; /* Memory context header overhead for the above parts. */ - chunk_state->batch_memory_context_bytes += sizeof(void *) * 3; + dcontext->batch_memory_context_bytes += sizeof(void *) * 3; } } } /* Round up to even number of 4k pages. */ - chunk_state->batch_memory_context_bytes = - ((chunk_state->batch_memory_context_bytes + 4095) / 4096) * 4096; + dcontext->batch_memory_context_bytes = + ((dcontext->batch_memory_context_bytes + 4095) / 4096) * 4096; /* As a precaution, limit it to 1MB. */ - chunk_state->batch_memory_context_bytes = - Min(chunk_state->batch_memory_context_bytes, 1 * 1024 * 1024); + dcontext->batch_memory_context_bytes = + Min(dcontext->batch_memory_context_bytes, 1 * 1024 * 1024); elog(DEBUG3, "Batch memory context has initial capacity of %zu bytes", - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); /* * Choose which batch queue we are going to use: heap for batch sorted @@ -545,6 +546,8 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) static TupleTableSlot * perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(chunk_state != NULL); Assert(aggref != NULL); @@ -558,24 +561,23 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(chunk_state->template_columns[1].type == COUNT_COLUMN); /* Get a free batch slot */ - const int new_batch_index = batch_array_get_unused_slot(&chunk_state->batch_array); + const int new_batch_index = batch_array_get_unused_slot(&dcontext->batch_array); /* Nobody else should use batch states */ Assert(new_batch_index == 0); - DecompressBatchState *batch_state = - batch_array_get_at(&chunk_state->batch_array, new_batch_index); + DecompressBatchState *batch_state = batch_array_get_at(&dcontext->batch_array, new_batch_index); /* Init per batch memory context */ Assert(batch_state != NULL); Assert(batch_state->per_batch_context == NULL); batch_state->per_batch_context = - create_per_batch_mctx(chunk_state->batch_array.batch_memory_context_bytes); + create_per_batch_mctx(dcontext->batch_array.batch_memory_context_bytes); Assert(batch_state->per_batch_context != NULL); /* Init bulk decompression memory context */ - Assert(chunk_state->bulk_decompression_context == NULL); - chunk_state->bulk_decompression_context = create_bulk_decompression_mctx(CurrentMemoryContext); - Assert(chunk_state->bulk_decompression_context != NULL); + Assert(dcontext->bulk_decompression_context == NULL); + dcontext->bulk_decompression_context = create_bulk_decompression_mctx(CurrentMemoryContext); + Assert(dcontext->bulk_decompression_context != NULL); /* Get a reference the the output TupleTableSlot */ TupleTableSlot *decompressed_scan_slot = chunk_state->csstate.ss.ss_ScanTupleSlot; @@ -649,7 +651,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) } else if (column_description->type == COMPRESSED_COLUMN) { - Assert(chunk_state->enable_bulk_decompression); + Assert(dcontext->enable_bulk_decompression); Assert(column_description->bulk_decompression_supported); Assert(list_length(aggref->args) == 1); @@ -681,7 +683,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(decompress_all != NULL); MemoryContext context_before_decompression = - MemoryContextSwitchTo(chunk_state->bulk_decompression_context); + MemoryContextSwitchTo(dcontext->bulk_decompression_context); arrow = decompress_all(PointerGetDatum(header), column_description->typid, @@ -689,7 +691,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(arrow != NULL); - MemoryContextReset(chunk_state->bulk_decompression_context); + MemoryContextReset(dcontext->bulk_decompression_context); MemoryContextSwitchTo(context_before_decompression); /* A compressed batch consists of 1000 tuples (see MAX_ROWS_PER_COMPRESSION). The @@ -741,6 +743,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) static TupleTableSlot * perform_vectorized_aggregation(DecompressChunkState *chunk_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(list_length(chunk_state->custom_scan_tlist) == 1); /* Checked by planner */ @@ -750,7 +754,7 @@ perform_vectorized_aggregation(DecompressChunkState *chunk_state) /* When using vectorized aggregates, only one result tuple is produced. So, if we have * already initialized a batch state, the aggregation was already performed. */ - if (batch_array_has_active_batches(&chunk_state->batch_array)) + if (batch_array_has_active_batches(&dcontext->batch_array)) { ExecClearTuple(chunk_state->csstate.ss.ss_ScanTupleSlot); return chunk_state->csstate.ss.ss_ScanTupleSlot; @@ -832,7 +836,7 @@ decompress_chunk_rescan(CustomScanState *node) DecompressChunkState *chunk_state = (DecompressChunkState *) node; chunk_state->batch_queue->reset(chunk_state); - batch_array_clear_all(&chunk_state->batch_array); + batch_array_clear_all(&chunk_state->decompress_context.batch_array); if (node->ss.ps.chgParam != NULL) UpdateChangedParamSet(linitial(node->custom_ps), node->ss.ps.chgParam); @@ -893,7 +897,9 @@ decompress_chunk_explain(CustomScanState *node, List *ancestors, ExplainState *e if (es->analyze && (es->verbose || es->format != EXPLAIN_FORMAT_TEXT)) { - ExplainPropertyBool("Bulk Decompression", chunk_state->enable_bulk_decompression, es); + ExplainPropertyBool("Bulk Decompression", + chunk_state->decompress_context.enable_bulk_decompression, + es); } if (chunk_state->perform_vectorized_aggregation) diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index 20457d8d56c..1610c6b8e86 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -45,6 +45,20 @@ typedef struct DecompressChunkColumnDescription bool bulk_decompression_supported; } DecompressChunkColumnDescription; +typedef struct DecompressContext +{ + BatchArray batch_array; + Size batch_memory_context_bytes; + bool reverse; + bool enable_bulk_decompression; + + /* + * Scratch space for bulk decompression which might need a lot of temporary + * data. + */ + MemoryContext bulk_decompression_context; +} DecompressContext; + typedef struct DecompressChunkState { CustomScanState csstate; @@ -56,14 +70,12 @@ typedef struct DecompressChunkState int num_total_columns; int num_compressed_columns; + DecompressContext decompress_context; DecompressChunkColumnDescription *template_columns; - bool reverse; int hypertable_id; Oid chunk_relid; - BatchArray batch_array; - Size batch_memory_context_bytes; const struct BatchQueueFunctions *batch_queue; CustomExecMethods exec_methods; @@ -74,18 +86,10 @@ typedef struct DecompressChunkState SortSupportData *sortkeys; /* Sort keys for binary heap compare function */ TupleTableSlot *last_batch_first_tuple; - bool enable_bulk_decompression; - /* Perform calculation of the aggregate directly in the decompress chunk node and emit partials */ bool perform_vectorized_aggregation; - /* - * Scratch space for bulk decompression which might need a lot of temporary - * data. - */ - MemoryContext bulk_decompression_context; - /* * For some predicates, we have more efficient implementation that work on * the entire compressed batch in one go. They go to this list, and the rest