Skip to content

Commit

Permalink
Refactor option to skip compressed in Hypercore TAM scans
Browse files Browse the repository at this point in the history
Replace the scankey flag used to skip compressed data when starting a
Hypercore scan with a function that sets this option on the scan
descriptor. Internally, use the scan flags instead of scankey flags to
convey this setting.

Overloading scankey flags was not ideal since this is supposed to be
per-column settings and not overall scan settings.

Note that it is possible to set the scan flags when calling the TAM's
beginscan callback, but the table_beginscan() wrapper does not expose
flags and instead there's a separate function for each flag
settings. Hypercore could define its own beginscan function to do the
same, but this is left for the future.
  • Loading branch information
erimatnor committed Nov 21, 2024
1 parent b1b2380 commit c0353d8
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 66 deletions.
12 changes: 4 additions & 8 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1141,13 +1141,8 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
TableScanDesc scan;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};

scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, &scankey);
scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL);
hypercore_scan_set_skip_compressed(scan, true);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -1216,8 +1211,9 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
snapshot = GetLatestSnapshot();
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. */
scankey->sk_flags = SK_NO_COMPRESSED;

scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey);
hypercore_scan_set_skip_compressed(scan, true);
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
16 changes: 4 additions & 12 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,8 @@ static void
RelationDeleteAllRows(Relation rel, Snapshot snap)
{
TupleTableSlot *slot = table_slot_create(rel, NULL);
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey);
TableScanDesc scan = table_beginscan(rel, snap, 0, NULL);
hypercore_scan_set_skip_compressed(scan, true);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -580,13 +576,9 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
tuplesortstate = compression_create_tuplesort_state(settings, in_rel);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, NULL);
hypercore_scan_set_skip_compressed(scan, true);
slot = table_slot_create(in_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
43 changes: 31 additions & 12 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ typedef struct HypercoreScanDescData
int32 compressed_row_count;
HypercoreScanState hs_scan_state;
bool reset;
bool skip_compressed; /* Skip compressed data when scanning */
#if PG17_GE
/* These fields are only used for ANALYZE */
ReadStream *canalyze_read_stream;
Expand All @@ -394,6 +395,34 @@ static bool hypercore_getnextslot_noncompressed(HypercoreScanDesc scan, ScanDire
static bool hypercore_getnextslot_compressed(HypercoreScanDesc scan, ScanDirection direction,
TupleTableSlot *slot);

/*
* Skip scanning compressed data in a table scan.
*
* This function can be called on a scan descriptor to skip scanning of
* compressed data. Typically called directly after table_beginscan().
*/
void
hypercore_scan_set_skip_compressed(TableScanDesc scan, bool skip)
{
HypercoreScanDesc hscan;

if (!REL_IS_HYPERCORE(scan->rs_rd))
return;

hscan = (HypercoreScanDesc) scan;

if (skip)
{
scan->rs_flags |= SO_HYPERCORE_SKIP_COMPRESSED;
hscan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
}
else
{
scan->rs_flags &= ~SO_HYPERCORE_SKIP_COMPRESSED;
hscan->hs_scan_state = HYPERCORE_SCAN_START;
}
}

#if PG17_GE
static int
compute_targrows(Relation rel)
Expand Down Expand Up @@ -526,8 +555,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
HypercoreInfo *hsinfo = RelationGetHypercoreInfo(relation);
scan->compressed_rel = table_open(hsinfo->compressed_relid, AccessShareLock);

if ((ts_guc_enable_transparent_decompression == 2) ||
(keys && keys->sk_flags & SK_NO_COMPRESSED))
if ((ts_guc_enable_transparent_decompression == 2) || (flags & SO_HYPERCORE_SKIP_COMPRESSED))
{
/*
* Don't read compressed data if transparent decompression is enabled
Expand Down Expand Up @@ -582,16 +610,7 @@ hypercore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_s
initscan(scan, key, scan->rs_base.rs_nkeys);
scan->reset = true;

/* Check if there's a change in "skip compressed" */
if (key)
{
if (key->sk_flags & SK_NO_COMPRESSED)
scan->rs_base.rs_flags = SO_HYPERCORE_SKIP_COMPRESSED;
else
scan->rs_base.rs_flags &= ~SO_HYPERCORE_SKIP_COMPRESSED;
}

if (scan->rs_base.rs_flags & SO_HYPERCORE_SKIP_COMPRESSED)
if (sscan->rs_flags & SO_HYPERCORE_SKIP_COMPRESSED)
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
else
scan->hs_scan_state = HYPERCORE_SCAN_START;
Expand Down
6 changes: 1 addition & 5 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@

#include "hypertable.h"

/* Scan key flag (skey.h) to indicate that a table scan should only return
* tuples from the non-compressed relation. Bits 16-31 are reserved for
* individual access methods, so use bit 16. */
#define SK_NO_COMPRESSED 0x8000

typedef enum HypercoreScanOptions
{
/* Normal scan options stretch to 9th bit. Start at bit 15 out of 32 to be
Expand All @@ -33,6 +28,7 @@ extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am);
extern Datum hypercore_handler(PG_FUNCTION_ARGS);
extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);
extern void hypercore_scan_set_skip_compressed(TableScanDesc scan, bool skip);

typedef struct ColumnCompressionSettings
{
Expand Down
36 changes: 7 additions & 29 deletions tsl/test/src/test_hypercore.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ test_rescan_hypercore(Oid relid)
TupleTableSlot *slot = table_slot_create(rel, NULL);
TableScanDesc scan;
Snapshot snapshot = GetTransactionSnapshot();
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
unsigned int compressed_tuple_count = 0;
unsigned int noncompressed_tuple_count = 0;
unsigned int prev_noncompressed_tuple_count = 0;
Expand All @@ -39,7 +34,8 @@ test_rescan_hypercore(Oid relid)
TestAssertTrue(TTS_IS_ARROWTUPLE(slot));

/* Scan only non-compressed data */
scan = table_beginscan(rel, snapshot, 0, &scankey);
scan = table_beginscan(rel, snapshot, 0, NULL);
hypercore_scan_set_skip_compressed(scan, true);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand All @@ -56,26 +52,8 @@ test_rescan_hypercore(Oid relid)
compressed_tuple_count = 0;
noncompressed_tuple_count = 0;

/* Rescan only non-compressed data */
table_rescan(scan, &scankey);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
if (is_compressed_tid(&slot->tts_tid))
compressed_tuple_count++;
else
noncompressed_tuple_count++;
}

TestAssertTrue(compressed_tuple_count == 0);
TestAssertTrue(noncompressed_tuple_count == prev_noncompressed_tuple_count);
TestAssertTrue(compressed_tuple_count == prev_compressed_tuple_count);
prev_noncompressed_tuple_count = noncompressed_tuple_count;
prev_compressed_tuple_count = compressed_tuple_count;
compressed_tuple_count = 0;
noncompressed_tuple_count = 0;

/* Rescan only non-compressed data even though giving no new scan key */
/* Skipping compressed data should be sticky, so a rescan should also
* return only non-compressed data */
table_rescan(scan, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand All @@ -94,9 +72,9 @@ test_rescan_hypercore(Oid relid)
compressed_tuple_count = 0;
noncompressed_tuple_count = 0;

/* Rescan both compressed and non-compressed data by specifying new flag */
scankey.sk_flags = 0;
table_rescan(scan, &scankey);
/* Rescan both compressed and non-compressed */
hypercore_scan_set_skip_compressed(scan, false);
table_rescan(scan, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down

0 comments on commit c0353d8

Please sign in to comment.