Skip to content

Commit

Permalink
Interrupt handling
Browse files Browse the repository at this point in the history
When custom scan node start we are increasing `QueryCancelHoldoffCount`
to block canceling query. In fetch page loop we are monitoring
`QueryCancelPending` signal for thread to finish execution.
`QueryCancelHoldoffCount` counter will be decreased when node exits.
  • Loading branch information
mkaruza committed May 13, 2024
1 parent cd4417a commit bf55f86
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
13 changes: 10 additions & 3 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn

void
PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) {
(void) GetRelation();
(void)GetRelation();
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);

/* SELECT COUNT(*) FROM */
Expand Down Expand Up @@ -111,7 +111,9 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
threadScanInfo.m_read_next_page = true;
} else {
block = threadScanInfo.m_block_number;
page = BufferGetPage(threadScanInfo.m_buffer);
if (block != InvalidBlockNumber) {
page = BufferGetPage(threadScanInfo.m_buffer);
}
}

while (block != InvalidBlockNumber) {
Expand Down Expand Up @@ -159,7 +161,12 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
UnlockReleaseBuffer(threadScanInfo.m_buffer);
m_parallel_scan_state.m_lock.unlock();
threadScanInfo.m_read_next_page = true;
block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber();
/* Handle cancel request */
if (QueryCancelPending) {
block = threadScanInfo.m_block_number = InvalidBlockNumber;
} else {
block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber();
}
}

/* We have collected STANDARD_VECTOR_SIZE */
Expand Down
3 changes: 2 additions & 1 deletion src/quack_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ void
Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) {
QuackScanState *quackScanState = (QuackScanState *)cscanstate;
quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor;
HOLD_CANCEL_INTERRUPTS();
}

static TupleTableSlot *
Quack_ExecCustomScan(CustomScanState *node) {
QuackScanState *quackScanState = (QuackScanState *)node;
TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot;
MemoryContext oldContext;


if (!quackScanState->is_executed) {
quackScanState->queryResult = quackScanState->preparedStatement->Execute();
Expand Down Expand Up @@ -107,6 +107,7 @@ Quack_EndCustomScan(CustomScanState *node) {
quackScanState->queryResult.reset();
delete quackScanState->preparedStatement;
delete quackScanState->duckdb;
RESUME_CANCEL_INTERRUPTS();
}

void
Expand Down

0 comments on commit bf55f86

Please sign in to comment.