Skip to content

Commit

Permalink
Interrupt handling
Browse files Browse the repository at this point in the history
* Declare QuackNode to HOLD_INTERRUPTS for duration of scan. Once
  interrupt is observed we exit execution after heap pages are closed.
  • Loading branch information
mkaruza committed May 8, 2024
1 parent 78ddeda commit 583f4b4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,13 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
threadScanInfo.m_read_next_page = true;
} else {
block = threadScanInfo.m_block_number;
page = BufferGetPage(threadScanInfo.m_buffer);
if (block != InvalidBlockNumber) {
page = BufferGetPage(threadScanInfo.m_buffer);
}
}

while (block != InvalidBlockNumber) {
if (threadScanInfo.m_read_next_page) {
CHECK_FOR_INTERRUPTS();
m_parallel_scan_state.m_lock.lock();
block = threadScanInfo.m_block_number;
threadScanInfo.m_buffer =
Expand Down Expand Up @@ -160,7 +161,11 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
UnlockReleaseBuffer(threadScanInfo.m_buffer);
m_parallel_scan_state.m_lock.unlock();
threadScanInfo.m_read_next_page = true;
block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber();
if (INTERRUPTS_PENDING_CONDITION()) {
block = threadScanInfo.m_block_number = InvalidBlockNumber;
} else {
block = threadScanInfo.m_block_number = m_parallel_scan_state.AssignNextBlockNumber();
}
}

/* We have collected STANDARD_VECTOR_SIZE */
Expand Down
2 changes: 2 additions & 0 deletions src/quack_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void
Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) {
QuackScanState *quackScanState = (QuackScanState *)cscanstate;
quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor;
HOLD_INTERRUPTS();
}

static TupleTableSlot *
Expand Down Expand Up @@ -99,6 +100,7 @@ Quack_EndCustomScan(CustomScanState *node) {
quackScanState->queryResult.reset();
delete quackScanState->preparedStatement;
delete quackScanState->duckdb;
RESUME_INTERRUPTS();
}

void
Expand Down

0 comments on commit 583f4b4

Please sign in to comment.