Skip to content

Commit

Permalink
Break out decompress state from DecompressChunkState
Browse files Browse the repository at this point in the history
This change continues the refactor of transparent decompression in
order to make it more modular. A new structure called
DecompressContext is introduced that holds the state necessary to do
execution-time decompression. The context can be passed around to
related code without passing on the full DecompressChunkState
node. However, this is not yet fully realized and DecompressChunkState
is still passed around across many modules. This will be addressed in
follow-on changes.
  • Loading branch information
erimatnor committed Nov 30, 2023
1 parent 94884f4 commit 01a65d8
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 65 deletions.
21 changes: 13 additions & 8 deletions tsl/src/nodes/decompress_chunk/batch_queue_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,32 @@
inline static void
batch_queue_fifo_create(DecompressChunkState *chunk_state)
{
batch_array_init(&chunk_state->batch_array,
DecompressContext *dcontext = &chunk_state->decompress_context;

batch_array_init(&dcontext->batch_array,
1,
chunk_state->num_compressed_columns,
chunk_state->batch_memory_context_bytes);
dcontext->batch_memory_context_bytes);
}

inline static void
batch_queue_fifo_free(DecompressChunkState *chunk_state)
{
batch_array_destroy(&chunk_state->batch_array);
batch_array_destroy(&chunk_state->decompress_context.batch_array);
}

inline static bool
batch_queue_fifo_needs_next_batch(DecompressChunkState *chunk_state)
{
return TupIsNull(batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot);
return TupIsNull(batch_array_get_at(&chunk_state->decompress_context.batch_array, 0)
->decompressed_scan_slot);
}

inline static void
batch_queue_fifo_pop(DecompressChunkState *chunk_state)
{
DecompressBatchState *batch_state = batch_array_get_at(&chunk_state->batch_array, 0);
DecompressBatchState *batch_state =
batch_array_get_at(&chunk_state->decompress_context.batch_array, 0);
if (TupIsNull(batch_state->decompressed_scan_slot))
{
/* Allow this function to be called on the initial empty queue. */
Expand All @@ -48,7 +52,7 @@ batch_queue_fifo_pop(DecompressChunkState *chunk_state)
inline static void
batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;
DecompressBatchState *batch_state = batch_array_get_at(batch_array, 0);
Assert(TupIsNull(batch_array_get_at(batch_array, 0)->decompressed_scan_slot));
compressed_batch_set_compressed_tuple(chunk_state, batch_state, compressed_slot);
Expand All @@ -58,11 +62,12 @@ batch_queue_fifo_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c
inline static void
batch_queue_fifo_reset(DecompressChunkState *chunk_state)
{
batch_array_clear_at(&chunk_state->batch_array, 0);
batch_array_clear_at(&chunk_state->decompress_context.batch_array, 0);
}

inline static TupleTableSlot *
batch_queue_fifo_top_tuple(DecompressChunkState *chunk_state)
{
return batch_array_get_at(&chunk_state->batch_array, 0)->decompressed_scan_slot;
return batch_array_get_at(&chunk_state->decompress_context.batch_array, 0)
->decompressed_scan_slot;
}
18 changes: 10 additions & 8 deletions tsl/src/nodes/decompress_chunk/batch_queue_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static int32
decompress_binaryheap_compare_heap_pos(Datum a, Datum b, void *arg)
{
DecompressChunkState *chunk_state = (DecompressChunkState *) arg;
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;
int batchA = DatumGetInt32(a);
Assert(batchA <= batch_array->n_batch_states);

Expand Down Expand Up @@ -95,7 +95,7 @@ binaryheap_add_unordered_autoresize(binaryheap *heap, Datum d)
void
batch_queue_heap_pop(DecompressChunkState *chunk_state)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;

if (binaryheap_empty(chunk_state->merge_heap))
{
Expand Down Expand Up @@ -124,7 +124,7 @@ batch_queue_heap_pop(DecompressChunkState *chunk_state)
bool
batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;

if (binaryheap_empty(chunk_state->merge_heap))
{
Expand Down Expand Up @@ -156,7 +156,7 @@ batch_queue_heap_needs_next_batch(DecompressChunkState *chunk_state)
void
batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *compressed_slot)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;

Assert(!TupIsNull(compressed_slot));

Expand All @@ -182,7 +182,7 @@ batch_queue_heap_push_batch(DecompressChunkState *chunk_state, TupleTableSlot *c
TupleTableSlot *
batch_queue_heap_top_tuple(DecompressChunkState *chunk_state)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;

if (binaryheap_empty(chunk_state->merge_heap))
{
Expand All @@ -198,10 +198,12 @@ batch_queue_heap_top_tuple(DecompressChunkState *chunk_state)
void
batch_queue_heap_create(DecompressChunkState *chunk_state)
{
batch_array_init(&chunk_state->batch_array,
DecompressContext *dcontext = &chunk_state->decompress_context;

batch_array_init(&dcontext->batch_array,
INITIAL_BATCH_CAPACITY,
chunk_state->num_compressed_columns,
chunk_state->batch_memory_context_bytes);
dcontext->batch_memory_context_bytes);

chunk_state->merge_heap = binaryheap_allocate(INITIAL_BATCH_CAPACITY,
decompress_binaryheap_compare_heap_pos,
Expand All @@ -225,7 +227,7 @@ batch_queue_heap_reset(DecompressChunkState *chunk_state)
void
batch_queue_heap_free(DecompressChunkState *chunk_state)
{
BatchArray *batch_array = &chunk_state->batch_array;
BatchArray *batch_array = &chunk_state->decompress_context.batch_array;

elog(DEBUG3, "Heap has capacity of %d", chunk_state->merge_heap->bh_space);
elog(DEBUG3, "Created batch states %d", batch_array->n_batch_states);
Expand Down
29 changes: 17 additions & 12 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ make_single_value_arrow(Oid pgtype, Datum datum, bool isnull)
static void
decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch_state, int i)
{
DecompressContext *dcontext = &chunk_state->decompress_context;
DecompressChunkColumnDescription *column_description = &chunk_state->template_columns[i];
CompressedColumnValues *column_values = &batch_state->compressed_columns[i];
column_values->iterator = NULL;
Expand Down Expand Up @@ -115,11 +116,11 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch
/* Decompress the entire batch if it is supported. */
CompressedDataHeader *header = (CompressedDataHeader *) PG_DETOAST_DATUM(value);
ArrowArray *arrow = NULL;
if (chunk_state->enable_bulk_decompression && column_description->bulk_decompression_supported)
if (dcontext->enable_bulk_decompression && column_description->bulk_decompression_supported)
{
if (chunk_state->bulk_decompression_context == NULL)
if (dcontext->bulk_decompression_context == NULL)
{
chunk_state->bulk_decompression_context = create_bulk_decompression_mctx(
dcontext->bulk_decompression_context = create_bulk_decompression_mctx(
MemoryContextGetParent(batch_state->per_batch_context));
}

Expand All @@ -128,13 +129,13 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch
Assert(decompress_all != NULL);

MemoryContext context_before_decompression =
MemoryContextSwitchTo(chunk_state->bulk_decompression_context);
MemoryContextSwitchTo(dcontext->bulk_decompression_context);

arrow = decompress_all(PointerGetDatum(header),
column_description->typid,
batch_state->per_batch_context);

MemoryContextReset(chunk_state->bulk_decompression_context);
MemoryContextReset(dcontext->bulk_decompression_context);

MemoryContextSwitchTo(context_before_decompression);
}
Expand All @@ -158,8 +159,8 @@ decompress_column(DecompressChunkState *chunk_state, DecompressBatchState *batch
/* As a fallback, decompress row-by-row. */
column_values->iterator =
tsl_get_decompression_iterator_init(header->compression_algorithm,
chunk_state->reverse)(PointerGetDatum(header),
column_description->typid);
dcontext->reverse)(PointerGetDatum(header),
column_description->typid);
}

/*
Expand Down Expand Up @@ -330,6 +331,8 @@ void
compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
DecompressBatchState *batch_state, TupleTableSlot *subslot)
{
DecompressContext *dcontext = &chunk_state->decompress_context;

Assert(TupIsNull(batch_state->decompressed_scan_slot));

/*
Expand All @@ -340,7 +343,7 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
{
/* Init memory context */
batch_state->per_batch_context =
create_per_batch_mctx(chunk_state->batch_memory_context_bytes);
create_per_batch_mctx(dcontext->batch_memory_context_bytes);
Assert(batch_state->per_batch_context != NULL);

Assert(batch_state->compressed_slot == NULL);
Expand Down Expand Up @@ -502,16 +505,16 @@ compressed_batch_set_compressed_tuple(DecompressChunkState *chunk_state,
static void
make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
DecompressContext *dcontext = &chunk_state->decompress_context;
TupleTableSlot *decompressed_scan_slot = batch_state->decompressed_scan_slot;
Assert(decompressed_scan_slot != NULL);

Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);

const int output_row = batch_state->next_batch_row;
const size_t arrow_row = unlikely(chunk_state->reverse) ?
batch_state->total_batch_rows - 1 - output_row :
output_row;
const size_t arrow_row =
unlikely(dcontext->reverse) ? batch_state->total_batch_rows - 1 - output_row : output_row;

const int num_compressed_columns = chunk_state->num_compressed_columns;
for (int i = 0; i < num_compressed_columns; i++)
Expand Down Expand Up @@ -588,12 +591,14 @@ make_next_tuple(DecompressChunkState *chunk_state, DecompressBatchState *batch_s
static bool
vector_qual(DecompressChunkState *chunk_state, DecompressBatchState *batch_state)
{
DecompressContext *dcontext = &chunk_state->decompress_context;

Assert(batch_state->total_batch_rows > 0);
Assert(batch_state->next_batch_row < batch_state->total_batch_rows);

const int output_row = batch_state->next_batch_row;
const size_t arrow_row =
chunk_state->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row;
dcontext->reverse ? batch_state->total_batch_rows - 1 - output_row : output_row;

if (!batch_state->vector_qual_result)
{
Expand Down
Loading

0 comments on commit 01a65d8

Please sign in to comment.