diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 5aa4ff3..9d7ecac 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -14,7 +14,6 @@ extern "C" { // Postgres Relation - namespace quack { struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { @@ -23,7 +22,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { ~PostgresHeapScanLocalState() override; public: - PostgresHeapSeqScan & m_rel; + PostgresHeapSeqScan &m_rel; PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info; bool m_exhausted_scan = false; }; @@ -31,7 +30,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { // Global State struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation); + explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input); ~PostgresHeapScanGlobalState(); idx_t MaxThreads() const override { @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output); + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 4a30a9a..1cc88d2 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -41,6 +41,9 @@ class PostgresHeapSeqScan { std::mutex m_lock; BlockNumber m_nblocks; BlockNumber m_last_assigned_block_number; + duckdb::vector m_columns; + duckdb::vector m_projections; + duckdb::TableFilterSet *m_filters = nullptr; }; public: @@ -52,7 +55,8 @@ class PostgresHeapSeqScan { PostgresHeapSeqScan(PostgresHeapSeqScan &&other); public: - void InitParallelScanState(); + void InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, duckdb::TableFilterSet *filters); void SetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index 05f3860..d4d6fe5 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -11,5 +11,7 @@ namespace quack { duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); -void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset); +void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset, + duckdb::vector &columns, duckdb::vector &projections, + duckdb::TableFilterSet *filters); } // namespace quack \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 09d0606..3cb32cd 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -28,9 +28,11 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { // PostgresHeapScanGlobalState // -PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) { - relation.InitParallelScanState(); +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, + duckdb::TableFunctionInitInput &input) { elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); + auto projections = input.projection_ids.empty() ? input.column_ids : input.projection_ids; + relation.InitParallelScanState(input.column_ids, projections, input.filters.get()); } PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { @@ -58,7 +60,9 @@ PostgresHeapScanFunction::PostgresHeapScanFunction() PostgresHeapInitLocal) { named_parameters["table"] = duckdb::LogicalType::POINTER; named_parameters["snapshot"] = duckdb::LogicalType::POINTER; - // projection_pushdown = true; + projection_pushdown = true; + //filter_pushdown = true; + //filter_prune = true; } duckdb::unique_ptr @@ -94,7 +98,7 @@ duckdb::unique_ptr PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { auto &bind_data = input.bind_data->CastNoConst(); - return duckdb::make_uniq(bind_data.m_relation); + return duckdb::make_uniq(bind_data.m_relation, input); } duckdb::unique_ptr @@ -153,10 +157,11 @@ FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { if (!rel->rd_amhandler || (GetTableAmRoutine(rel->rd_amhandler) != GetHeapamTableAmRoutine())) { /* This doesn't have an access method handler, we cant read from this */ RelationClose(rel); - return nullptr; + continue; + } else { + RelationClose(rel); + return table; } - RelationClose(rel); - return table; } RelationClose(rel); } diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3cf9087..f2dc1ee 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -55,8 +55,13 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState() { +PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, + duckdb::TableFilterSet *filters) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); + m_parallel_scan_state.m_columns = columns; + m_parallel_scan_state.m_projections = projections; + m_parallel_scan_state.m_filters = filters; } bool @@ -115,7 +120,8 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc pgstat_count_heap_getnext(m_rel); InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, - threadScanInfo.m_output_vector_size); + threadScanInfo.m_output_vector_size, m_parallel_scan_state.m_columns, + m_parallel_scan_state.m_projections, m_parallel_scan_state.m_filters); } /* No more items on current page */ diff --git a/src/quack_select.cpp b/src/quack_select.cpp index 53b2c4d..c607e6a 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -25,7 +25,7 @@ namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } @@ -59,7 +59,10 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co TupleTableSlot *slot = NULL; // FIXME: try-catch ? - auto res = connection->Query(query_desc->sourceText); + + duckdb::unique_ptr res = nullptr; + + res = connection->Query(query_desc->sourceText); if (res->HasError()) { return false; } diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 11c9ac3..6248992 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -149,15 +149,15 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { } } -typedef struct HeapTuplePageReadState { +typedef struct HeapTupleReadState { bool m_slow = 0; - int m_nvalid = 0; - uint32 m_offset = 0; -} HeapTuplePageReadState; + int m_last_tuple_att = 0; + uint32 m_page_tuple_offset = 0; +} HeapTupleReadState; static Datum -HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState, - int natts, bool *isNull) { +HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heapTupleReadState, int attNum, + bool *isNull) { HeapTupleHeader tup = tuple->t_data; bool hasnulls = HeapTupleHasNulls(tuple); @@ -168,23 +168,21 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage bool slow = false; Datum value = (Datum)0; - /* We can only fetch as many attributes as the tuple has. */ - natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts); + attnum = heapTupleReadState.m_last_tuple_att; - attnum = heapTupleReadState.m_nvalid; if (attnum == 0) { /* Start from the first attribute */ off = 0; heapTupleReadState.m_slow = false; } else { /* Restore state from previous execution */ - off = heapTupleReadState.m_offset; + off = heapTupleReadState.m_page_tuple_offset; slow = heapTupleReadState.m_slow; } tp = (char *)tup + tup->t_hoff; - for (; attnum < natts; attnum++) { + for (; attnum < attNum; attnum++) { Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); if (hasnulls && att_isnull(attnum, bp)) { @@ -199,7 +197,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage if (!slow && thisatt->attcacheoff >= 0) { off = thisatt->attcacheoff; } else if (thisatt->attlen == -1) { - if (!slow && off == att_align_nominal(off, thisatt->attalign)) { thisatt->attcacheoff = off; } else { @@ -208,7 +205,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } else { off = att_align_nominal(off, thisatt->attalign); - if (!slow) { thisatt->attcacheoff = off; } @@ -223,8 +219,8 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } - heapTupleReadState.m_nvalid = attnum; - heapTupleReadState.m_offset = off; + heapTupleReadState.m_last_tuple_att = attNum; + heapTupleReadState.m_page_tuple_offset = off; if (slow) { heapTupleReadState.m_slow = true; @@ -236,19 +232,31 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } void -InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) { - HeapTuplePageReadState heapTupleReadState = {}; - for (int i = 0; i < tupleDesc->natts; i++) { +InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset, + duckdb::vector &columns, duckdb::vector &projections, + duckdb::TableFilterSet *filters) { + HeapTupleReadState heapTupleReadState = {}; + Datum *values = (Datum *)palloc0(sizeof(Datum) * columns.size()); + bool *nulls = (bool *)palloc0(sizeof(bool) * columns.size()); + bool skipTuple = false; + + for (duckdb::column_t i = 0; i < columns.size(); i++) { + values[i] = HeapTupleFetchNextColumnDatum(tupleDesc, slot, heapTupleReadState, columns[i] + 1, &nulls[i]); + } + + /* Append tuple to output vector */ + for (duckdb::column_t i = 0; !skipTuple && i < projections.size(); i++) { auto &result = output.data[i]; - bool isNull = false; - Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull); - if (isNull) { + if (nulls[i]) { auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(offset); } else { - ConvertPostgresToDuckValue(value, result, offset); + ConvertPostgresToDuckValue(values[i], result, offset); } } + + pfree(values); + pfree(nulls); } } // namespace quack