diff --git a/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h b/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h index 9f3b245c7af..ada20c824e6 100644 --- a/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h +++ b/tsl/src/nodes/decompress_chunk/batch_queue_fifo.h @@ -14,28 +14,32 @@ inline static void batch_queue_fifo_create(DecompressChunkState *chunk_state) { - batch_array_init(&chunk_state->batch_array, + DecompressContext *dcontext = &chunk_state->decompress_context; + + batch_array_init(&dcontext->batch_array, 1, chunk_state->num_compressed_columns, - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); } inline static void batch_queue_fifo_free(DecompressChunkState *chunk_state) { - batch_array_destroy(&chunk_state->batch_array); + batch_array_destroy(&chunk_state->decompress_context.batch_array); } inline static bool batch_queue_fifo_needs_next_batch(DecompressChunkState *chunk_state) { - return TupIsNull(batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot); + return TupIsNull(batch_array_get_at(&chunk_state->decompress_context.batch_array, 0) + ->decompressed_scan_slot); } inline static void batch_queue_fifo_pop(DecompressChunkState *chunk_state) { - DecompressBatchState *batch_state = batch_array_get_at(&chunk_state->batch_array, 0); + DecompressBatchState *batch_state = + batch_array_get_at(&chunk_state->decompress_context.batch_array, 0); if (TupIsNull(batch_state->decompressed_scan_slot)) { /* Allow this function to be called on the initial empty queue. */ @@ -48,7 +52,7 @@ batch_queue_fifo_pop(DecompressChunkState *chunk_state) inline static void batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; DecompressBatchState *batch_state = batch_array_get_at(batch_array, 0); Assert(TupIsNull(batch_array_get_at(batch_array, 0)->decompressed_scan_slot)); compressed_batch_set_compressed_tuple(chunk_state, batch_state, compressed_slot); @@ -58,11 +62,12 @@ batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c inline static void batch_queue_fifo_reset(DecompressChunkState *chunk_state) { - batch_array_clear_at(&chunk_state->batch_array, 0); + batch_array_clear_at(&chunk_state->decompress_context.batch_array, 0); } inline static TupleTableSlot * batch_queue_fifo_top_tuple(DecompressChunkState *chunk_state) { - return batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot; + return batch_array_get_at(&chunk_state->decompress_context.batch_array, 0) + ->decompressed_scan_slot; } diff --git a/tsl/src/nodes/decompress_chunk/batch_queue_heap.c b/tsl/src/nodes/decompress_chunk/batch_queue_heap.c index 011bec0a445..f2d9c6bbc63 100644 --- a/tsl/src/nodes/decompress_chunk/batch_queue_heap.c +++ b/tsl/src/nodes/decompress_chunk/batch_queue_heap.c @@ -58,7 +58,7 @@ static int32 decompress_binaryheap_compare_heap_pos(Datum a, Datum b, void *arg) { DecompressChunkState *chunk_state = (DecompressChunkState *) arg; - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; int batchA = DatumGetInt32(a); Assert(batchA <= batch_array->n_batch_states); @@ -95,7 +95,7 @@ binaryheap_add_unordered_autoresize(binaryheap *heap, Datum d) void batch_queue_heap_pop(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -124,7 +124,7 @@ batch_queue_heap_pop(DecompressChunkState *chunk_state) bool batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -156,7 +156,7 @@ batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state) void batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; Assert(!TupIsNull(compressed_slot)); @@ -182,7 +182,7 @@ batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c TupleTableSlot * batch_queue_heap_top_tuple(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; if (binaryheap_empty(chunk_state->merge_heap)) { @@ -198,10 +198,12 @@ batch_queue_heap_top_tuple(DecompressChunkState *chunk_state) void batch_queue_heap_create(DecompressChunkState *chunk_state) { - batch_array_init(&chunk_state->batch_array, + DecompressContext *dcontext = &chunk_state->decompress_context; + + batch_array_init(&dcontext->batch_array, INITIAL_BATCH_CAPACITY, chunk_state->num_compressed_columns, - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); chunk_state->merge_heap = binaryheap_allocate(INITIAL_BATCH_CAPACITY, decompress_binaryheap_compare_heap_pos, @@ -225,7 +227,7 @@ batch_queue_heap_reset(DecompressChunkState *chunk_state) void batch_queue_heap_free(DecompressChunkState *chunk_state) { - BatchArray *batch_array = &chunk_state->batch_array; + BatchArray *batch_array = &chunk_state->decompress_context.batch_array; elog(DEBUG3, "Heap has capacity of %d", chunk_state->merge_heap->bh_space); elog(DEBUG3, "Created batch states %d", batch_array->n_batch_states); diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index 99c6868690b..5875d23f9e0 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -81,6 +81,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull) static void decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, int i) { + DecompressContext *dcontext = &chunk_state->decompress_context; DecompressChunkColumnDescription *column_description = &chunk_state->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; column_values->iterator = NULL; @@ -115,11 +116,11 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value); ArrowArray *arrow = NULL; - if (chunk_state->enable_bulk_decompression && column_description->bulk_decompression_supported) + if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported) { - if (chunk_state->bulk_decompression_context == NULL) + if (dcontext->bulk_decompression_context == NULL) { - chunk_state->bulk_decompression_context = create_bulk_decompression_mctx( + dcontext->bulk_decompression_context = create_bulk_decompression_mctx( MemoryContextGetParent(batch_state->per_batch_context)); } @@ -128,13 +129,13 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch Assert(decompress_all != NULL); MemoryContext context_before_decompression = - MemoryContextSwitchTo(chunk_state->bulk_decompression_context); + MemoryContextSwitchTo(dcontext->bulk_decompression_context); arrow = decompress_all(PointerGetDatum(header), column_description->typid, batch_state->per_batch_context); - MemoryContextReset(chunk_state->bulk_decompression_context); + MemoryContextReset(dcontext->bulk_decompression_context); MemoryContextSwitchTo(context_before_decompression); } @@ -158,8 +159,8 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch /* As a fallback, decompress row-by-row. */ column_values->iterator = tsl_get_decompression_iterator_init(header->compression_algorithm, - chunk_state->reverse)(PointerGetDatum(header), - column_description->typid); + dcontext->reverse)(PointerGetDatum(header), + column_description->typid); } /* @@ -330,6 +331,8 @@ void compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, TupleTableSlot *subslot) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(TupIsNull(batch_state->decompressed_scan_slot)); /* @@ -340,7 +343,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, { /* Init memory context */ batch_state->per_batch_context = - create_per_batch_mctx(chunk_state->batch_memory_context_bytes); + create_per_batch_mctx(dcontext->batch_memory_context_bytes); Assert(batch_state->per_batch_context != NULL); Assert(batch_state->compressed_slot == NULL); @@ -502,6 +505,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state, static void make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; TupleTableSlot *decompressed_scan_slot = batch_state->decompressed_scan_slot; Assert(decompressed_scan_slot != NULL); @@ -509,9 +513,8 @@ make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_s Assert(batch_state->next_batch_row < batch_state->total_batch_rows); const int output_row = batch_state->next_batch_row; - const size_t arrow_row = unlikely(chunk_state->reverse) ? - batch_state->total_batch_rows - 1 - output_row : - output_row; + const size_t arrow_row = + unlikely(dcontext->reverse) ? batch_state->total_batch_rows - 1 - output_row : output_row; const int num_compressed_columns = chunk_state->num_compressed_columns; for (int i = 0; i < num_compressed_columns; i++) @@ -588,12 +591,14 @@ make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_s static bool vector_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(batch_state->total_batch_rows > 0); Assert(batch_state->next_batch_row < batch_state->total_batch_rows); const int output_row = batch_state->next_batch_row; const size_t arrow_row = - chunk_state->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row; + dcontext->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row; if (!batch_state->vector_qual_result) { diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 70ae40e7ec5..9edd1fa54b5 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -155,9 +155,9 @@ decompress_chunk_state_create(CustomScan *cscan) Assert(list_length(settings) == 6); chunk_state->hypertable_id = linitial_int(settings); chunk_state->chunk_relid = lsecond_int(settings); - chunk_state->reverse = lthird_int(settings); + chunk_state->decompress_context.reverse = lthird_int(settings); chunk_state->batch_sorted_merge = lfourth_int(settings); - chunk_state->enable_bulk_decompression = lfifth_int(settings); + chunk_state->decompress_context.enable_bulk_decompression = lfifth_int(settings); chunk_state->perform_vectorized_aggregation = lsixth_int(settings); Assert(IsA(cscan->custom_exprs, List)); @@ -265,6 +265,7 @@ static void decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) { DecompressChunkState *chunk_state = (DecompressChunkState *) node; + DecompressContext *dcontext = &chunk_state->decompress_context; CustomScan *cscan = castNode(CustomScan, node->ss.ps.plan); Plan *compressed_scan = linitial(cscan->custom_plans); Assert(list_length(cscan->custom_plans) == 1); @@ -436,8 +437,8 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) * * Start with the default size. */ - chunk_state->batch_memory_context_bytes = ALLOCSET_DEFAULT_INITSIZE; - if (chunk_state->enable_bulk_decompression) + dcontext->batch_memory_context_bytes = ALLOCSET_DEFAULT_INITSIZE; + if (dcontext->enable_bulk_decompression) { for (int i = 0; i < num_total; i++) { @@ -445,31 +446,31 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) if (column->bulk_decompression_supported) { /* Values array, with 64 element padding (actually we have less). */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += (GLOBAL_MAX_ROWS_PER_COMPRESSION + 64) * column->value_bytes; /* Also nulls bitmap. */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += GLOBAL_MAX_ROWS_PER_COMPRESSION / (64 * sizeof(uint64)); /* Arrow data structure. */ - chunk_state->batch_memory_context_bytes += + dcontext->batch_memory_context_bytes += sizeof(ArrowArray) + sizeof(void *) * 2 /* buffers */; /* Memory context header overhead for the above parts. */ - chunk_state->batch_memory_context_bytes += sizeof(void *) * 3; + dcontext->batch_memory_context_bytes += sizeof(void *) * 3; } } } /* Round up to even number of 4k pages. */ - chunk_state->batch_memory_context_bytes = - ((chunk_state->batch_memory_context_bytes + 4095) / 4096) * 4096; + dcontext->batch_memory_context_bytes = + ((dcontext->batch_memory_context_bytes + 4095) / 4096) * 4096; /* As a precaution, limit it to 1MB. */ - chunk_state->batch_memory_context_bytes = - Min(chunk_state->batch_memory_context_bytes, 1 * 1024 * 1024); + dcontext->batch_memory_context_bytes = + Min(dcontext->batch_memory_context_bytes, 1 * 1024 * 1024); elog(DEBUG3, "Batch memory context has initial capacity of %zu bytes", - chunk_state->batch_memory_context_bytes); + dcontext->batch_memory_context_bytes); /* * Choose which batch queue we are going to use: heap for batch sorted @@ -545,6 +546,8 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) static TupleTableSlot * perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(chunk_state != NULL); Assert(aggref != NULL); @@ -558,24 +561,23 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(chunk_state->template_columns[1].type == COUNT_COLUMN); /* Get a free batch slot */ - const int new_batch_index = batch_array_get_unused_slot(&chunk_state->batch_array); + const int new_batch_index = batch_array_get_unused_slot(&dcontext->batch_array); /* Nobody else should use batch states */ Assert(new_batch_index == 0); - DecompressBatchState *batch_state = - batch_array_get_at(&chunk_state->batch_array, new_batch_index); + DecompressBatchState *batch_state = batch_array_get_at(&dcontext->batch_array, new_batch_index); /* Init per batch memory context */ Assert(batch_state != NULL); Assert(batch_state->per_batch_context == NULL); batch_state->per_batch_context = - create_per_batch_mctx(chunk_state->batch_array.batch_memory_context_bytes); + create_per_batch_mctx(dcontext->batch_array.batch_memory_context_bytes); Assert(batch_state->per_batch_context != NULL); /* Init bulk decompression memory context */ - Assert(chunk_state->bulk_decompression_context == NULL); - chunk_state->bulk_decompression_context = create_bulk_decompression_mctx(CurrentMemoryContext); - Assert(chunk_state->bulk_decompression_context != NULL); + Assert(dcontext->bulk_decompression_context == NULL); + dcontext->bulk_decompression_context = create_bulk_decompression_mctx(CurrentMemoryContext); + Assert(dcontext->bulk_decompression_context != NULL); /* Get a reference the the output TupleTableSlot */ TupleTableSlot *decompressed_scan_slot = chunk_state->csstate.ss.ss_ScanTupleSlot; @@ -649,7 +651,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) } else if (column_description->type == COMPRESSED_COLUMN) { - Assert(chunk_state->enable_bulk_decompression); + Assert(dcontext->enable_bulk_decompression); Assert(column_description->bulk_decompression_supported); Assert(list_length(aggref->args) == 1); @@ -681,7 +683,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(decompress_all != NULL); MemoryContext context_before_decompression = - MemoryContextSwitchTo(chunk_state->bulk_decompression_context); + MemoryContextSwitchTo(dcontext->bulk_decompression_context); arrow = decompress_all(PointerGetDatum(header), column_description->typid, @@ -689,7 +691,7 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) Assert(arrow != NULL); - MemoryContextReset(chunk_state->bulk_decompression_context); + MemoryContextReset(dcontext->bulk_decompression_context); MemoryContextSwitchTo(context_before_decompression); /* A compressed batch consists of 1000 tuples (see MAX_ROWS_PER_COMPRESSION). The @@ -741,6 +743,8 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) static TupleTableSlot * perform_vectorized_aggregation(DecompressChunkState *chunk_state) { + DecompressContext *dcontext = &chunk_state->decompress_context; + Assert(list_length(chunk_state->custom_scan_tlist) == 1); /* Checked by planner */ @@ -750,7 +754,7 @@ perform_vectorized_aggregation(DecompressChunkState *chunk_state) /* When using vectorized aggregates, only one result tuple is produced. So, if we have * already initialized a batch state, the aggregation was already performed. */ - if (batch_array_has_active_batches(&chunk_state->batch_array)) + if (batch_array_has_active_batches(&dcontext->batch_array)) { ExecClearTuple(chunk_state->csstate.ss.ss_ScanTupleSlot); return chunk_state->csstate.ss.ss_ScanTupleSlot; @@ -832,7 +836,7 @@ decompress_chunk_rescan(CustomScanState *node) DecompressChunkState *chunk_state = (DecompressChunkState *) node; chunk_state->batch_queue->reset(chunk_state); - batch_array_clear_all(&chunk_state->batch_array); + batch_array_clear_all(&chunk_state->decompress_context.batch_array); if (node->ss.ps.chgParam != NULL) UpdateChangedParamSet(linitial(node->custom_ps), node->ss.ps.chgParam); @@ -893,7 +897,9 @@ decompress_chunk_explain(CustomScanState *node, List *ancestors, ExplainState *e if (es->analyze && (es->verbose || es->format != EXPLAIN_FORMAT_TEXT)) { - ExplainPropertyBool("Bulk Decompression", chunk_state->enable_bulk_decompression, es); + ExplainPropertyBool("Bulk Decompression", + chunk_state->decompress_context.enable_bulk_decompression, + es); } if (chunk_state->perform_vectorized_aggregation) diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index 20457d8d56c..1610c6b8e86 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -45,6 +45,20 @@ typedef struct DecompressChunkColumnDescription bool bulk_decompression_supported; } DecompressChunkColumnDescription; +typedef struct DecompressContext +{ + BatchArray batch_array; + Size batch_memory_context_bytes; + bool reverse; + bool enable_bulk_decompression; + + /* + * Scratch space for bulk decompression which might need a lot of temporary + * data. + */ + MemoryContext bulk_decompression_context; +} DecompressContext; + typedef struct DecompressChunkState { CustomScanState csstate; @@ -56,14 +70,12 @@ typedef struct DecompressChunkState int num_total_columns; int num_compressed_columns; + DecompressContext decompress_context; DecompressChunkColumnDescription *template_columns; - bool reverse; int hypertable_id; Oid chunk_relid; - BatchArray batch_array; - Size batch_memory_context_bytes; const struct BatchQueueFunctions *batch_queue; CustomExecMethods exec_methods; @@ -74,18 +86,10 @@ typedef struct DecompressChunkState SortSupportData *sortkeys; /* Sort keys for binary heap compare function */ TupleTableSlot *last_batch_first_tuple; - bool enable_bulk_decompression; - /* Perform calculation of the aggregate directly in the decompress chunk node and emit partials */ bool perform_vectorized_aggregation; - /* - * Scratch space for bulk decompression which might need a lot of temporary - * data. - */ - MemoryContext bulk_decompression_context; - /* * For some predicates, we have more efficient implementation that work on * the entire compressed batch in one go. They go to this list, and the rest