From 610db31241f77be09e49cfce7e8cce0db9e223ea Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Thu, 11 Apr 2024 13:55:49 +0200 Subject: [PATCH] Don't copy compressed slot to compressed batch struct (#6806) There is overhead associated with copying the heap tuple and (un)pinning the respective heap buffers, which becomes apparent in vectorized aggregation. Instead of this, it is enough to copy the by-reference segmentby values to the per-batch context. Also we have to copy in the rare case where the compressed data is inlined into the compressed row and not toasted. --- tsl/src/compression/compression.c | 8 +- .../nodes/decompress_chunk/compressed_batch.c | 136 ++++++++++-------- .../nodes/decompress_chunk/compressed_batch.h | 8 +- .../decompress_chunk/decompress_context.h | 18 +-- tsl/src/nodes/decompress_chunk/detoaster.c | 20 ++- tsl/src/nodes/decompress_chunk/detoaster.h | 4 +- tsl/src/nodes/decompress_chunk/exec.c | 11 +- 7 files changed, 102 insertions(+), 103 deletions(-) diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index bf6bb65cd79..08214bbf60d 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -1652,10 +1652,10 @@ decompress_batch(RowDecompressor *decompressor) /* Normal compressed column. */ Datum compressed_datum = PointerGetDatum( - detoaster_detoast_attr((struct varlena *) DatumGetPointer( - decompressor->compressed_datums[input_column]), - &decompressor->detoaster, - CurrentMemoryContext)); + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer( + decompressor->compressed_datums[input_column]), + &decompressor->detoaster, + CurrentMemoryContext)); CompressedDataHeader *header = get_compressed_data_header(compressed_datum); column_info->iterator = definitions[header->compression_algorithm] diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.c b/tsl/src/nodes/decompress_chunk/compressed_batch.c index af5ae3d5127..ad38433534e 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.c +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.c @@ -156,7 +156,8 @@ get_max_text_datum_size(ArrowArray *text_array) } static void -decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, int i) +decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, int i) { CompressionColumnDescription *column_description = &dcontext->template_columns[i]; CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; @@ -168,9 +169,7 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state Assert(value_bytes != 0); bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, - column_description->compressed_scan_attno, - &isnull); + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); if (isnull) { @@ -188,9 +187,9 @@ decompress_column(DecompressContext *dcontext, DecompressBatchState *batch_state } /* Detoast the compressed datum. */ - value = PointerGetDatum(detoaster_detoast_attr((struct varlena *) DatumGetPointer(value), - &dcontext->detoaster, - batch_state->per_batch_context)); + value = PointerGetDatum(detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + batch_state->per_batch_context)); /* Decompress the entire batch if it is supported. */ CompressedDataHeader *header = (CompressedDataHeader *) value; @@ -330,8 +329,8 @@ translate_bitmap_from_dictionary(const ArrowArray *arrow, const uint64 *dict_res } static void -compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { /* * Some predicates can be evaluated to a Const at run time. @@ -423,7 +422,7 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat * skip decompressing some columns if the entire batch doesn't pass * the quals. */ - decompress_column(dcontext, batch_state, column_index); + decompress_column(dcontext, batch_state, compressed_slot, column_index); Assert(column_values->decompression_type != DT_Invalid); } @@ -566,16 +565,16 @@ compute_plain_qual(DecompressContext *dcontext, DecompressBatchState *batch_stat } static void compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, - Node *qual, uint64 *restrict result); + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result); static void compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { ListCell *lc; foreach (lc, quals) { - compute_one_qual(dcontext, batch_state, lfirst(lc), result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), result); if (get_vector_qual_summary(result, batch_state->total_batch_rows) == NoRowsPass) { /* @@ -589,7 +588,7 @@ compute_qual_conjunction(DecompressContext *dcontext, DecompressBatchState *batc static void compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batch_state, - List *quals, uint64 *restrict result) + TupleTableSlot *compressed_slot, List *quals, uint64 *restrict result) { const size_t n_rows = batch_state->total_batch_rows; const size_t n_result_words = (n_rows + 63) / 64; @@ -608,7 +607,7 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc { one_qual_result[i] = (uint64) -1; } - compute_one_qual(dcontext, batch_state, lfirst(lc), one_qual_result); + compute_one_qual(dcontext, batch_state, compressed_slot, lfirst(lc), one_qual_result); for (size_t i = 0; i < n_result_words; i++) { or_result[i] |= one_qual_result[i]; @@ -631,19 +630,19 @@ compute_qual_disjunction(DecompressContext *dcontext, DecompressBatchState *batc } static void -compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, Node *qual, - uint64 *restrict result) +compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot, Node *qual, uint64 *restrict result) { if (!IsA(qual, BoolExpr)) { - compute_plain_qual(dcontext, batch_state, qual, result); + compute_plain_qual(dcontext, batch_state, compressed_slot, qual, result); return; } BoolExpr *boolexpr = castNode(BoolExpr, qual); if (boolexpr->boolop == AND_EXPR) { - compute_qual_conjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_conjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); return; } @@ -652,7 +651,7 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * NOT and consider it non-vectorizable at planning time. So only OR is left. */ Ensure(boolexpr->boolop == OR_EXPR, "expected OR"); - compute_qual_disjunction(dcontext, batch_state, boolexpr->args, result); + compute_qual_disjunction(dcontext, batch_state, compressed_slot, boolexpr->args, result); } /* @@ -661,7 +660,8 @@ compute_one_qual(DecompressContext *dcontext, DecompressBatchState *batch_state, * optimizations. */ static VectorQualSummary -compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state) +compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { /* * Allocate the bitmap that will hold the vectorized qual results. We will @@ -688,6 +688,7 @@ compute_vector_quals(DecompressContext *dcontext, DecompressBatchState *batch_st */ compute_qual_conjunction(dcontext, batch_state, + compressed_slot, dcontext->vectorized_quals_constified, batch_state->vector_qual_result); @@ -709,7 +710,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) if (batch_state->per_batch_context != NULL) { - ExecClearTuple(batch_state->compressed_slot); ExecClearTuple(&batch_state->decompressed_scan_slot_data.base); MemoryContextReset(batch_state->per_batch_context); } @@ -720,7 +720,6 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) */ Assert(IsA(&batch_state->decompressed_scan_slot_data, Invalid)); Assert(batch_state->decompressed_scan_slot_data.base.tts_ops == NULL); - Assert(batch_state->compressed_slot == NULL); } } @@ -730,24 +729,12 @@ compressed_batch_discard_tuples(DecompressBatchState *batch_state) * relatively expensive. */ static void -compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *compressed_slot) +compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *batch_state) { /* Init memory context */ batch_state->per_batch_context = create_per_batch_mctx(dcontext); Assert(batch_state->per_batch_context != NULL); - Assert(batch_state->compressed_slot == NULL); - - /* Create a non ref-counted copy of the compressed tuple descriptor */ - if (dcontext->compressed_slot_tdesc == NULL) - dcontext->compressed_slot_tdesc = - CreateTupleDescCopyConstr(compressed_slot->tts_tupleDescriptor); - Assert(dcontext->compressed_slot_tdesc->tdrefcount == -1); - - batch_state->compressed_slot = - MakeSingleTupleTableSlot(dcontext->compressed_slot_tdesc, compressed_slot->tts_ops); - /* Get a reference to the output TupleTableSlot */ TupleTableSlot *decompressed_slot = dcontext->decompressed_slot; @@ -771,11 +758,19 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba slot->tts_mcxt = CurrentMemoryContext; slot->tts_nvalid = 0; - slot->tts_values = palloc(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + - MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); + slot->tts_values = palloc0(MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)) + + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(bool))); slot->tts_isnull = (bool *) ((char *) slot->tts_values) + MAXALIGN(slot->tts_tupleDescriptor->natts * sizeof(Datum)); + /* + * Have to initially set nulls to true, because this is the uncompressed chunk + * tuple, and some of its columns might be not even decompressed. The tuple + * slot functions will get confused by them, because they expect a non-null + * value for attributes not marked as null. + */ + memset(slot->tts_isnull, true, slot->tts_tupleDescriptor->natts * sizeof(bool)); + /* * DecompressChunk produces virtual tuple slots. */ @@ -788,7 +783,8 @@ compressed_batch_lazy_init(DecompressContext *dcontext, DecompressBatchState *ba */ void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, - DecompressBatchState *batch_state, TupleTableSlot *subslot) + DecompressBatchState *batch_state, + TupleTableSlot *compressed_slot) { Assert(TupIsNull(compressed_batch_current_tuple(batch_state))); @@ -798,23 +794,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ if (batch_state->per_batch_context == NULL) { - compressed_batch_lazy_init(dcontext, batch_state, subslot); - } - else - { - Assert(batch_state->compressed_slot != NULL); + compressed_batch_lazy_init(dcontext, batch_state); } - - /* Ensure that all fields are empty. Calling ExecClearTuple is not enough - * because some attributes might not be populated (e.g., due to a dropped - * column) and these attributes need to be set to null. */ TupleTableSlot *decompressed_tuple = compressed_batch_current_tuple(batch_state); Assert(decompressed_tuple != NULL); - ExecStoreAllNullTuple(decompressed_tuple); - ExecClearTuple(decompressed_tuple); - - ExecCopySlot(batch_state->compressed_slot, subslot); - Assert(!TupIsNull(batch_state->compressed_slot)); batch_state->total_batch_rows = 0; batch_state->next_batch_row = 0; @@ -849,15 +832,43 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, */ AttrNumber attr = AttrNumberGetAttrOffset(column_description->output_attno); decompressed_tuple->tts_values[attr] = - slot_getattr(batch_state->compressed_slot, + slot_getattr(compressed_slot, column_description->compressed_scan_attno, &decompressed_tuple->tts_isnull[attr]); + + /* + * Note that if it's not a by-value type, we should copy it into + * the slot context. + */ + if (!column_description->by_value && + DatumGetPointer(decompressed_tuple->tts_values[attr]) != NULL) + { + if (column_description->value_bytes < 0) + { + /* This is a varlena type. */ + decompressed_tuple->tts_values[attr] = PointerGetDatum( + detoaster_detoast_attr_copy((struct varlena *) + decompressed_tuple->tts_values[attr], + &dcontext->detoaster, + batch_state->per_batch_context)); + } + else + { + /* This is a fixed-length by-reference type. */ + void *tmp = MemoryContextAlloc(batch_state->per_batch_context, + column_description->value_bytes); + memcpy(tmp, + DatumGetPointer(decompressed_tuple->tts_values[attr]), + column_description->value_bytes); + decompressed_tuple->tts_values[attr] = PointerGetDatum(tmp); + } + } break; } case COUNT_COLUMN: { bool isnull; - Datum value = slot_getattr(batch_state->compressed_slot, + Datum value = slot_getattr(compressed_slot, column_description->compressed_scan_attno, &isnull); /* count column should never be NULL */ @@ -885,9 +896,10 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, } } - VectorQualSummary vector_qual_summary = dcontext->vectorized_quals_constified != NIL ? - compute_vector_quals(dcontext, batch_state) : - AllRowsPass; + VectorQualSummary vector_qual_summary = + dcontext->vectorized_quals_constified != NIL ? + compute_vector_quals(dcontext, batch_state, compressed_slot) : + AllRowsPass; if (vector_qual_summary == NoRowsPass && !dcontext->batch_sorted_merge) { /* @@ -917,7 +929,7 @@ compressed_batch_set_compressed_tuple(DecompressContext *dcontext, CompressedColumnValues *column_values = &batch_state->compressed_columns[i]; if (column_values->decompression_type == DT_Invalid) { - decompress_column(dcontext, batch_state, i); + decompress_column(dcontext, batch_state, compressed_slot, i); Assert(column_values->decompression_type != DT_Invalid); } } @@ -1225,16 +1237,14 @@ compressed_batch_destroy(DecompressBatchState *batch_state) batch_state->per_batch_context = NULL; } - if (batch_state->compressed_slot != NULL) + if (batch_state->decompressed_scan_slot_data.base.tts_values != NULL) { /* * Can be separately NULL in the current simplified prototype for * vectorized aggregation, but ideally it should change together with * per-batch context. */ - ExecDropSingleTupleTableSlot(batch_state->compressed_slot); - batch_state->compressed_slot = NULL; - pfree(batch_state->decompressed_scan_slot_data.base.tts_values); + batch_state->decompressed_scan_slot_data.base.tts_values = NULL; } } diff --git a/tsl/src/nodes/decompress_chunk/compressed_batch.h b/tsl/src/nodes/decompress_chunk/compressed_batch.h index bbde12a7119..486f3e9c637 100644 --- a/tsl/src/nodes/decompress_chunk/compressed_batch.h +++ b/tsl/src/nodes/decompress_chunk/compressed_batch.h @@ -82,12 +82,6 @@ typedef struct DecompressBatchState */ VirtualTupleTableSlot decompressed_scan_slot_data; - /* - * Compressed target slot. We have to keep a local copy when doing batch - * sorted merge, because the segmentby column values might reference the - * original tuple, and a batch outlives its source tuple. - */ - TupleTableSlot *compressed_slot; uint16 total_batch_rows; uint16 next_batch_row; MemoryContext per_batch_context; @@ -104,7 +98,7 @@ typedef struct DecompressBatchState extern void compressed_batch_set_compressed_tuple(DecompressContext *dcontext, DecompressBatchState *batch_state, - TupleTableSlot *subslot); + TupleTableSlot *compressed_slot); extern void compressed_batch_advance(DecompressContext *dcontext, DecompressBatchState *batch_state); diff --git a/tsl/src/nodes/decompress_chunk/decompress_context.h b/tsl/src/nodes/decompress_chunk/decompress_context.h index 7670163b0e2..0d084ace1ff 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_context.h +++ b/tsl/src/nodes/decompress_chunk/decompress_context.h @@ -27,7 +27,8 @@ typedef struct CompressionColumnDescription { CompressionColumnType type; Oid typid; - int value_bytes; + int16 value_bytes; + bool by_value; /* * Attno of the decompressed column in the output of DecompressChunk node. @@ -63,21 +64,6 @@ typedef struct DecompressContext TupleTableSlot *decompressed_slot; - /* - * Make non-refcounted copies of the tupdesc for reuse across all batch states - * and avoid spending CPU in ResourceOwner when creating a big number of table - * slots. This happens because each new slot pins its tuple descriptor using - * PinTupleDesc, and for reference-counting tuples this involves adding a new - * reference to ResourceOwner, which is not very efficient for a large number of - * references. - * - * We don't have to do this for the decompressed slot tuple descriptor, - * because there we use custom tuple slot (de)initialization functions, which - * don't use reference counting and just use a raw pointer to the tuple - * descriptor. - */ - TupleDesc compressed_slot_tdesc; - PlanState *ps; /* Set for filtering and instrumentation */ Detoaster detoaster; diff --git a/tsl/src/nodes/decompress_chunk/detoaster.c b/tsl/src/nodes/decompress_chunk/detoaster.c index b04d3369937..981998ce362 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.c +++ b/tsl/src/nodes/decompress_chunk/detoaster.c @@ -217,7 +217,7 @@ ts_fetch_toast(Detoaster *detoaster, struct varatt_external *toast_pointer, stru /* * The memory context is used to store intermediate data, and is supposed to - * live over the calls to detoaster_detoast_attr(). + * live over the calls to detoaster_detoast_attr_copy(). * That function itself can be called in a short-lived memory context. */ void @@ -338,15 +338,25 @@ ts_toast_decompress_datum(struct varlena *attr) /* * Modification of Postgres' detoast_attr() where we use the stateful Detoaster - * and skip some cases that don't occur for the toasted compressed data. + * and skip some cases that don't occur for the toasted compressed data. Even if + * the data is inline and no detoasting is needed, copies it into the destination + * memory context. */ struct varlena * -detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) +detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, MemoryContext dest_mctx) { if (!VARATT_IS_EXTENDED(attr)) { - /* Nothing to do here. */ - return attr; + /* + * This case is unlikely because the compressed data is almost always + * toasted and not inline, but we still have to copy the data into the + * destination memory context. The source compressed tuple may have + * independent unknown lifetime. + */ + Size len = VARSIZE(attr); + struct varlena *result = (struct varlena *) MemoryContextAlloc(dest_mctx, len); + memcpy(result, attr, len); + return result; } if (VARATT_IS_EXTERNAL_ONDISK(attr)) diff --git a/tsl/src/nodes/decompress_chunk/detoaster.h b/tsl/src/nodes/decompress_chunk/detoaster.h index b27ff6ffa5f..90c78d54590 100644 --- a/tsl/src/nodes/decompress_chunk/detoaster.h +++ b/tsl/src/nodes/decompress_chunk/detoaster.h @@ -27,5 +27,5 @@ typedef struct Detoaster void detoaster_init(Detoaster *detoaster, MemoryContext mctx); void detoaster_close(Detoaster *detoaster); -struct varlena *detoaster_detoast_attr(struct varlena *attr, Detoaster *detoaster, - MemoryContext dest_mctx); +struct varlena *detoaster_detoast_attr_copy(struct varlena *attr, Detoaster *detoaster, + MemoryContext dest_mctx); diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 238b52f927b..e9bebc5bb06 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -305,7 +305,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags) TupleDescAttr(desc, AttrNumberGetAttrOffset(column.output_attno)); column.typid = attribute->atttypid; - column.value_bytes = get_typlen(column.typid); + get_typlenbyval(column.typid, &column.value_bytes, &column.by_value); } if (list_nth_int(chunk_state->is_segmentby_column, compressed_index)) @@ -543,11 +543,10 @@ perform_vectorized_sum_int4(DecompressChunkState *chunk_state, Aggref *aggref) /* We have at least one value */ decompressed_scan_slot->tts_isnull[0] = false; - CompressedDataHeader *header = - (CompressedDataHeader *) detoaster_detoast_attr((struct varlena *) DatumGetPointer( - value), - &dcontext->detoaster, - CurrentMemoryContext); + CompressedDataHeader *header = (CompressedDataHeader *) + detoaster_detoast_attr_copy((struct varlena *) DatumGetPointer(value), + &dcontext->detoaster, + CurrentMemoryContext); ArrowArray *arrow = NULL;