Skip to content

Commit

Permalink
Use correct output column id list for query processing
Browse files Browse the repository at this point in the history
* When reading postgres tables we need to know which columns need to to
  written to output vector. Based on filtering and query we need to
  either get this information from `projection_ids` or `columns_ids`
  list. Projection ids list will be used when that are fetched columns
  from table but those columns are not needed in further query processing.
  Otherwise columns_ids list will be used.
  • Loading branch information
mkaruza committed Sep 20, 2024
1 parent 49f9c50 commit 7303512
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 24 deletions.
4 changes: 2 additions & 2 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class PostgresScanGlobalState {
TupleDesc m_tuple_desc;
std::mutex m_lock; // Lock for one replacement scan
bool m_count_tuples_only;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_projections;
duckdb::map<duckdb::idx_t, duckdb::column_t> m_read_columns_ids;
duckdb::map<duckdb::idx_t, duckdb::column_t> m_output_columns_ids;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
};
Expand Down
36 changes: 20 additions & 16 deletions src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,12 +982,18 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanG
}

/* FIXME: all calls to duckdb_malloc/duckdb_free should be changed in future */
Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * scan_global_state->m_columns.size());
bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * scan_global_state->m_columns.size());
Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * scan_global_state->m_output_columns_ids.size());
bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * scan_global_state->m_read_columns_ids.size());

bool valid_tuple = true;

for (auto const &[columnIdx, valueIdx] : scan_global_state->m_columns) {
/* First we are fetching all required columns oredered by column id
* and than we need to write this tuple into output vector. Output column id list
* could be out of order so we need to match column values from first
*/

/* Read heap tuple with all required columns. */
for (auto const &[columnIdx, valueIdx] : scan_global_state->m_read_columns_ids) {
values[valueIdx] = HeapTupleFetchNextColumnDatum(scan_global_state->m_tuple_desc, tuple, heap_tuple_read_state,
columnIdx + 1, &nulls[valueIdx]);
if (scan_global_state->m_filters &&
Expand All @@ -1002,36 +1008,34 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanG
}
}

for (idx_t idx = 0; valid_tuple && idx < scan_global_state->m_projections.size(); idx++) {
/* Write tuple columns in output vector. */
for (idx_t idx = 0; valid_tuple && idx < scan_global_state->m_output_columns_ids.size(); idx++) {
auto &result = output.data[idx];
if (nulls[idx]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(scan_local_state->m_output_vector_size);
} else {
idx_t projectionColumnIdx = scan_global_state->m_columns[scan_global_state->m_projections[idx]];
if (scan_global_state->m_tuple_desc->attrs[scan_global_state->m_projections[idx]].attlen == -1) {
idx_t output_column_idx =
scan_global_state->m_read_columns_ids[scan_global_state->m_output_columns_ids[idx]];
if (scan_global_state->m_tuple_desc->attrs[scan_global_state->m_output_columns_ids[idx]].attlen == -1) {
bool should_free = false;
values[projectionColumnIdx] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[projectionColumnIdx]), &should_free);
ConvertPostgresToDuckValue(values[projectionColumnIdx], result, scan_local_state->m_output_vector_size);
values[output_column_idx] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[output_column_idx]), &should_free);
ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size);
if (should_free) {
duckdb_free(reinterpret_cast<void *>(values[projectionColumnIdx]));
duckdb_free(reinterpret_cast<void *>(values[output_column_idx]));
}
} else {
ConvertPostgresToDuckValue(values[projectionColumnIdx], result, scan_local_state->m_output_vector_size);
ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size);
}
}
}

if (valid_tuple) {
scan_local_state->m_output_vector_size++;
scan_global_state->m_total_row_count++;
}

output.SetCardinality(scan_local_state->m_output_vector_size);
output.Verify();

scan_global_state->m_total_row_count++;

duckdb_free(values);
duckdb_free(nulls);
}
Expand Down
2 changes: 2 additions & 0 deletions src/scan/heap_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) {
/* We have collected STANDARD_VECTOR_SIZE */
if (m_local_state->m_output_vector_size == STANDARD_VECTOR_SIZE) {
output.SetCardinality(m_local_state->m_output_vector_size);
output.Verify();
m_local_state->m_output_vector_size = 0;
return true;
}
Expand All @@ -147,6 +148,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) {
/* Next assigned block number is InvalidBlockNumber so we check did we write any tuples in output vector */
if (m_local_state->m_output_vector_size) {
output.SetCardinality(m_local_state->m_output_vector_size);
output.Verify();
m_local_state->m_output_vector_size = 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/scan/postgres_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ PostgresIndexScanFunction::PostgresIndexScanFunc(duckdb::ClientContext &context,
}

output.SetCardinality(local_state.m_local_state->m_output_vector_size);
output.Verify();
}

duckdb::unique_ptr<duckdb::NodeStatistics>
Expand Down
16 changes: 11 additions & 5 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,24 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input)
return;
}

/* We need ordered columns ids for tuple fetch */
/* We need ordered columns ids for reading tuple. */
for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) {
m_columns[input.column_ids[i]] = i;
m_read_columns_ids[input.column_ids[i]] = i;
}

/* We need to check do we consider projection_ids or column_ids list to be used
* for writing to output vector. Projection ids list will be used when
* columns that are used for query filtering are not used afterwards; otherwise
* column ids list will be used and all read tuple columns need to passed
* to upper layers of query execution.
*/
if (input.CanRemoveFilterColumns()) {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
m_projections[i] = input.column_ids[input.projection_ids[i]];
m_output_columns_ids[i] = input.column_ids[input.projection_ids[i]];
}
} else {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
m_projections[i] = input.column_ids[i];
for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) {
m_output_columns_ids[i] = input.column_ids[i];
}
}

Expand Down
48 changes: 48 additions & 0 deletions test/regression/expected/query_filter.out
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,51 @@ SELECT COUNT(*) FROM query_filter_float WHERE a < 1.1;
(1 row)

DROP TABLE query_filter_float;
CREATE TABLE query_filter_varchar(a VARCHAR);
INSERT INTO query_filter_varchar VALUES ('t1'), ('t2'), ('t1');
SELECT COUNT(*)FROM query_filter_varchar WHERE a = 't1';
count
-------
2
(1 row)

SELECT COUNT(a) FROM query_filter_varchar WHERE a = 't1';
count
-------
2
(1 row)

SELECT a, COUNT(*) FROM query_filter_varchar WHERE a = 't1' GROUP BY a;
a | count
----+-------
t1 | 2
(1 row)

DROP TABLE query_filter_varchar;
CREATE TABLE query_filter_output_column(a INT, b VARCHAR, c FLOAT8);
INSERT INTO query_filter_output_column VALUES (1, 't1', 1.0), (2, 't1', 2.0), (2, 't2', 1.0);
-- Projection ids list will be used (column `a`is not needed after scan)
SELECT b FROM query_filter_output_column WHERE a = 2;
b
----
t1
t2
(2 rows)

-- Column ids list used because both of fetched column are used after scan
SELECT a, b FROM query_filter_output_column WHERE b = 't1';
a | b
---+----
1 | t1
2 | t1
(2 rows)

-- Projection ids list will be used (column `b`is not needed after scan)
SELECT a, c FROM query_filter_output_column WHERE b = 't1';
a | c
---+---
1 | 1
2 | 2
(2 rows)

DROP TABLE query_filter_output_column;
18 changes: 17 additions & 1 deletion test/regression/sql/query_filter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,21 @@ INSERT INTO query_filter_float VALUES (0.9), (1.0), (1.1);
SELECT COUNT(*) FROM query_filter_float WHERE a < 1.0;
SELECT COUNT(*) FROM query_filter_float WHERE a <= 1.0;
SELECT COUNT(*) FROM query_filter_float WHERE a < 1.1;

DROP TABLE query_filter_float;

CREATE TABLE query_filter_varchar(a VARCHAR);
INSERT INTO query_filter_varchar VALUES ('t1'), ('t2'), ('t1');
SELECT COUNT(*)FROM query_filter_varchar WHERE a = 't1';
SELECT COUNT(a) FROM query_filter_varchar WHERE a = 't1';
SELECT a, COUNT(*) FROM query_filter_varchar WHERE a = 't1' GROUP BY a;
DROP TABLE query_filter_varchar;

CREATE TABLE query_filter_output_column(a INT, b VARCHAR, c FLOAT8);
INSERT INTO query_filter_output_column VALUES (1, 't1', 1.0), (2, 't1', 2.0), (2, 't2', 1.0);
-- Projection ids list will be used (column `a`is not needed after scan)
SELECT b FROM query_filter_output_column WHERE a = 2;
-- Column ids list used because both of fetched column are used after scan
SELECT a, b FROM query_filter_output_column WHERE b = 't1';
-- Projection ids list will be used (column `b`is not needed after scan)
SELECT a, c FROM query_filter_output_column WHERE b = 't1';
DROP TABLE query_filter_output_column;

0 comments on commit 7303512

Please sign in to comment.