Skip to content

Commit

Permalink
Improve variable naming in the Postgres scan code (#395)
Browse files Browse the repository at this point in the history
This mostly makes naming of variables clearer, so that it's harder to
get confused about the different indexes. It also changes a two small
things:

1. Use `AttrNumber` to store postgres column indexes, these are always 1-based
2. Simplify some of the assignments `HeapTupleFetchNextColumnDatum`
  • Loading branch information
JelteF authored Nov 5, 2024
1 parent be2050b commit fb271ec
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 77 deletions.
13 changes: 7 additions & 6 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,22 @@ class PostgresScanGlobalState {
TupleDesc m_tuple_desc;
std::mutex m_lock; // Lock for one replacement scan
bool m_count_tuples_only;
/* Postgres column id to duckdb output vector idx */
std::vector<duckdb::pair<duckdb::column_t, duckdb::idx_t>> m_input_columns;
/* Postgres column id to duckdb scanned index. The scanned index is DuckDB
* its scan order of the columns. */
std::vector<duckdb::pair<AttrNumber, duckdb::idx_t>> m_columns_to_scan;
/* These are indexed by the DuckDB scan index */
std::vector<duckdb::TableFilter *> m_column_filters;
/* Duckdb output vector idx with information about postgres column id */
duckdb::vector<duckdb::pair<duckdb::idx_t, duckdb::column_t>> m_output_columns;
duckdb::vector<duckdb::pair<duckdb::idx_t, AttrNumber>> m_output_columns;
std::atomic<std::uint32_t> m_total_row_count;
duckdb::map<int, Datum> m_relation_missing_attrs;
};

class PostgresScanLocalState {
public:
PostgresScanLocalState(const PostgresScanGlobalState *psgs)
: m_output_vector_size(0), m_exhausted_scan(false) {
PostgresScanLocalState(const PostgresScanGlobalState *psgs) : m_output_vector_size(0), m_exhausted_scan(false) {
if (!psgs->m_count_tuples_only) {
const auto s = psgs->m_input_columns.size();
const auto s = psgs->m_columns_to_scan.size();
values.reserve(s);
nulls.reserve(s);
}
Expand Down
112 changes: 53 additions & 59 deletions src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,19 +981,14 @@ typedef struct HeapTupleReadState {

static Datum
HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heap_tuple_read_state,
int att_num, bool *is_null, const duckdb::map<int, Datum> &missing_attrs) {
AttrNumber target_attr_num, bool *is_null, const duckdb::map<int, Datum> &missing_attrs) {
HeapTupleHeader tup = tuple->t_data;
bool hasnulls = HeapTupleHasNulls(tuple);
int natts = HeapTupleHeaderGetNatts(tup);
int attnum;
char *tp;
uint32 off;
bits8 *bp = tup->t_bits;
bool slow = false;
Datum value = (Datum)0;
bits8 *null_bitmap = tup->t_bits;

if (natts < att_num) {
if (auto missing_attr = missing_attrs.find(att_num - 1); missing_attr != missing_attrs.end()) {
if (natts < target_attr_num) {
if (auto missing_attr = missing_attrs.find(target_attr_num - 1); missing_attr != missing_attrs.end()) {
*is_null = false;
return missing_attr->second;
} else {
Expand All @@ -1002,65 +997,64 @@ HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleRea
}
}

attnum = heap_tuple_read_state.m_last_tuple_att;
/* Which tuple are we currently reading */
AttrNumber current_attr_num = heap_tuple_read_state.m_last_tuple_att + 1;
/* Either restore from previous fetch, or use the defaults of 0 and false */
uint32 current_tuple_offset = heap_tuple_read_state.m_page_tuple_offset;
bool slow = heap_tuple_read_state.m_slow;

if (attnum == 0) {
/* Start from the first attribute */
off = 0;
heap_tuple_read_state.m_slow = false;
} else {
/* Restore state from previous execution */
off = heap_tuple_read_state.m_page_tuple_offset;
slow = heap_tuple_read_state.m_slow;
}
/* Points to the start of the tuple data section, i.e. right after the
* tuple header */
char *tuple_data = (char *)tup + tup->t_hoff;

tp = (char *)tup + tup->t_hoff;

for (; attnum < att_num; attnum++) {
Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum);
Datum value = (Datum)0;
for (; current_attr_num <= target_attr_num; current_attr_num++) {
Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, current_attr_num - 1);

if (hasnulls && att_isnull(attnum, bp)) {
if (hasnulls && att_isnull(current_attr_num - 1, null_bitmap)) {
value = (Datum)0;
*is_null = true;
slow = true; /* can't use attcacheoff anymore */
/*
* Can't use attcacheoff anymore. The hardcoded attribute offset
* assumes all attribute before it are present in the tuple. If
* they are NULL, they are not present.
*/
slow = true;
continue;
}

*is_null = false;

if (!slow && thisatt->attcacheoff >= 0) {
off = thisatt->attcacheoff;
current_tuple_offset = thisatt->attcacheoff;
} else if (thisatt->attlen == -1) {
if (!slow && off == att_align_nominal(off, thisatt->attalign)) {
thisatt->attcacheoff = off;
if (!slow && current_tuple_offset == att_align_nominal(current_tuple_offset, thisatt->attalign)) {
thisatt->attcacheoff = current_tuple_offset;
} else {
off = att_align_pointer(off, thisatt->attalign, -1, tp + off);
current_tuple_offset =
att_align_pointer(current_tuple_offset, thisatt->attalign, -1, tuple_data + current_tuple_offset);
slow = true;
}
} else {
off = att_align_nominal(off, thisatt->attalign);
current_tuple_offset = att_align_nominal(current_tuple_offset, thisatt->attalign);
if (!slow) {
thisatt->attcacheoff = off;
thisatt->attcacheoff = current_tuple_offset;
}
}

value = fetchatt(thisatt, tp + off);
value = fetchatt(thisatt, tuple_data + current_tuple_offset);

off = att_addlength_pointer(off, thisatt->attlen, tp + off);
current_tuple_offset =
att_addlength_pointer(current_tuple_offset, thisatt->attlen, tuple_data + current_tuple_offset);

if (thisatt->attlen <= 0) {
slow = true;
}
}

heap_tuple_read_state.m_last_tuple_att = att_num;
heap_tuple_read_state.m_page_tuple_offset = off;

if (slow) {
heap_tuple_read_state.m_slow = true;
} else {
heap_tuple_read_state.m_slow = false;
}
heap_tuple_read_state.m_last_tuple_att = target_attr_num;
heap_tuple_read_state.m_page_tuple_offset = current_tuple_offset;
heap_tuple_read_state.m_slow = slow;

return value;
}
Expand All @@ -1084,48 +1078,48 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanG
*/

/* Read heap tuple with all required columns. */
for (auto const &[attr_id, input_column_idx] : scan_global_state->m_input_columns) {
for (auto const &[attr_num, duckdb_scanned_index] : scan_global_state->m_columns_to_scan) {
bool is_null = false;
values[input_column_idx] =
HeapTupleFetchNextColumnDatum(scan_global_state->m_tuple_desc, tuple, heap_tuple_read_state, attr_id + 1,
values[duckdb_scanned_index] =
HeapTupleFetchNextColumnDatum(scan_global_state->m_tuple_desc, tuple, heap_tuple_read_state, attr_num,
&is_null, scan_global_state->m_relation_missing_attrs);
nulls[input_column_idx] = is_null;
auto filter = scan_global_state->m_column_filters[input_column_idx];
nulls[duckdb_scanned_index] = is_null;
auto filter = scan_global_state->m_column_filters[duckdb_scanned_index];
if (!filter) {
continue;
}

const auto valid_tuple = ApplyValueFilter(*filter, values[input_column_idx], is_null,
scan_global_state->m_tuple_desc->attrs[attr_id].atttypid);
const auto valid_tuple = ApplyValueFilter(*filter, values[duckdb_scanned_index], is_null,
scan_global_state->m_tuple_desc->attrs[attr_num - 1].atttypid);
if (!valid_tuple) {
return;
}
}

/* Write tuple columns in output vector. */
int i = 0;
for (auto const &[input_column_idx, attr_id] : scan_global_state->m_output_columns) {
auto &result = output.data[i];
if (nulls[input_column_idx]) {
int duckdb_output_index = 0;
for (auto const &[duckdb_scanned_index, attr_num] : scan_global_state->m_output_columns) {
auto &result = output.data[duckdb_output_index];
if (nulls[duckdb_scanned_index]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(scan_local_state->m_output_vector_size);
} else {
auto attr = scan_global_state->m_tuple_desc->attrs[attr_id];
auto attr = scan_global_state->m_tuple_desc->attrs[attr_num - 1];
if (attr.attlen == -1) {
bool should_free = false;
values[input_column_idx] = DetoastPostgresDatum(
reinterpret_cast<varlena *>(values[input_column_idx]), &should_free);
ConvertPostgresToDuckValue(attr.atttypid, values[input_column_idx], result,
values[duckdb_scanned_index] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[duckdb_scanned_index]), &should_free);
ConvertPostgresToDuckValue(attr.atttypid, values[duckdb_scanned_index], result,
scan_local_state->m_output_vector_size);
if (should_free) {
duckdb_free(reinterpret_cast<void *>(values[input_column_idx]));
duckdb_free(reinterpret_cast<void *>(values[duckdb_scanned_index]));
}
} else {
ConvertPostgresToDuckValue(attr.atttypid, values[input_column_idx], result,
ConvertPostgresToDuckValue(attr.atttypid, values[duckdb_scanned_index], result,
scan_local_state->m_output_vector_size);
}
}
i++;
duckdb_output_index++;
}

scan_local_state->m_output_vector_size++;
Expand Down
30 changes: 18 additions & 12 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,32 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input)
return;
}

/* We need ordered columns id for reading tuple. */
duckdb::map<duckdb::column_t, duckdb::idx_t> ordered_input_columns;
duckdb::idx_t i = 0;
for (const auto &input_column : input.column_ids) {
ordered_input_columns[input_column] = i++;
/*
* We need to read columns from the Postgres tuple in column order, but for
* outputting them we care about the DuckDB order. A map automatically
* orders them based on key, which in this case is the Postgres column
* order
*/
duckdb::map<AttrNumber, duckdb::idx_t> pg_column_order;
duckdb::idx_t scan_index = 0;
for (const auto &pg_column : input.column_ids) {
/* Postgres AttrNumbers are 1-based */
pg_column_order[pg_column + 1] = scan_index++;
}

auto table_filters = input.filters.get();
m_column_filters.resize(input.column_ids.size(), 0);

for (auto const &[attr_id, column_idx] : ordered_input_columns) {
m_input_columns.emplace_back(attr_id, column_idx);
for (auto const &[att_num, duckdb_scanned_index] : pg_column_order) {
m_columns_to_scan.emplace_back(att_num, duckdb_scanned_index);

if (!table_filters) {
continue;
}

auto column_filter_it = table_filters->filters.find(column_idx);
auto column_filter_it = table_filters->filters.find(duckdb_scanned_index);
if (column_filter_it != table_filters->filters.end()) {
m_column_filters[column_idx] = column_filter_it->second.get();
m_column_filters[duckdb_scanned_index] = column_filter_it->second.get();
}
}

Expand All @@ -70,12 +76,12 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input)
*/
if (input.CanRemoveFilterColumns()) {
for (const auto &projection_id : input.projection_ids) {
m_output_columns.emplace_back(projection_id, input.column_ids[projection_id]);
m_output_columns.emplace_back(projection_id, input.column_ids[projection_id] + 1);
}
} else {
duckdb::idx_t i = 0;
duckdb::idx_t output_index = 0;
for (const auto &column_id : input.column_ids) {
m_output_columns.emplace_back(i++, column_id);
m_output_columns.emplace_back(output_index++, column_id + 1);
}
}
}
Expand Down

0 comments on commit fb271ec

Please sign in to comment.