Skip to content

Commit

Permalink
fix: do not release resources when an interrupt comes (#49)
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi authored Nov 7, 2024
1 parent d6100ee commit 9161a67
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
1 change: 0 additions & 1 deletion src/algorithm/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub fn build<T: HeapRelation, R: Reporter>(
let mut samples = Vec::new();
let mut number_of_samples = 0_u32;
heap_relation.traverse(|(_, vector)| {
pgrx::check_for_interrupts!();
assert_eq!(dims as usize, vector.len(), "invalid vector dimensions",);
let vector = rabitq::project(&vector);
if number_of_samples < max_number_of_samples {
Expand Down
30 changes: 14 additions & 16 deletions src/index/am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ pub unsafe extern "C" fn ambuild(
) where
F: FnMut((Pointer, Vec<f32>)),
{
pgrx::check_for_interrupts!();
use base::vector::OwnedVector;
let state = unsafe { &mut *state.cast::<State<F>>() };
let vector = unsafe {
Expand Down Expand Up @@ -247,10 +246,10 @@ pub unsafe extern "C" fn ambuild(
unsafe { RabbitholeLeader::enter(heap, index, (*index_info).ii_Concurrent) }
{
unsafe {
let nparticipanttuplesorts = leader.nparticipanttuplesorts;
let nparticipants = leader.nparticipants;
loop {
pgrx::pg_sys::SpinLockAcquire(&raw mut (*leader.rabbitholeshared).mutex);
if (*leader.rabbitholeshared).nparticipantsdone == nparticipanttuplesorts {
if (*leader.rabbitholeshared).nparticipantsdone == nparticipants {
pgrx::pg_sys::SpinLockRelease(&raw mut (*leader.rabbitholeshared).mutex);
break;
}
Expand All @@ -266,7 +265,6 @@ pub unsafe extern "C" fn ambuild(
let mut tuples_done = 0;
reporter.tuples_done(tuples_done);
heap_relation.traverse(|(payload, vector)| {
pgrx::check_for_interrupts!();
algorithm::insert::insert(
index_relation.clone(),
payload,
Expand Down Expand Up @@ -306,7 +304,7 @@ fn is_mvcc_snapshot(snapshot: *mut pgrx::pg_sys::SnapshotData) -> bool {

struct RabbitholeLeader {
pcxt: *mut pgrx::pg_sys::ParallelContext,
nparticipanttuplesorts: i32,
nparticipants: i32,
rabbitholeshared: *mut RabbitholeShared,
snapshot: pgrx::pg_sys::Snapshot,
}
Expand Down Expand Up @@ -419,10 +417,10 @@ impl RabbitholeLeader {
pgrx::pg_sys::LaunchParallelWorkers(pcxt);
}

let nparticipanttuplesorts = unsafe { (*pcxt).nworkers_launched };
let nparticipants = unsafe { (*pcxt).nworkers_launched };

unsafe {
if nparticipanttuplesorts == 0 {
if nparticipants == 0 {
pgrx::pg_sys::WaitForParallelWorkersToFinish(pcxt);
if is_mvcc_snapshot(snapshot) {
pgrx::pg_sys::UnregisterSnapshot(snapshot);
Expand All @@ -437,7 +435,7 @@ impl RabbitholeLeader {
}
Some(Self {
pcxt,
nparticipanttuplesorts,
nparticipants,
rabbitholeshared,
snapshot,
})
Expand All @@ -446,13 +444,15 @@ impl RabbitholeLeader {

impl Drop for RabbitholeLeader {
fn drop(&mut self) {
unsafe {
pgrx::pg_sys::WaitForParallelWorkersToFinish(self.pcxt);
if is_mvcc_snapshot(self.snapshot) {
pgrx::pg_sys::UnregisterSnapshot(self.snapshot);
if !std::thread::panicking() {
unsafe {
pgrx::pg_sys::WaitForParallelWorkersToFinish(self.pcxt);
if is_mvcc_snapshot(self.snapshot) {
pgrx::pg_sys::UnregisterSnapshot(self.snapshot);
}
pgrx::pg_sys::DestroyParallelContext(self.pcxt);
pgrx::pg_sys::ExitParallelMode();
}
pgrx::pg_sys::DestroyParallelContext(self.pcxt);
pgrx::pg_sys::ExitParallelMode();
}
}
}
Expand Down Expand Up @@ -514,7 +514,6 @@ pub unsafe extern "C" fn rabbithole_parallel_build_main(
) where
F: FnMut((Pointer, Vec<f32>)),
{
pgrx::check_for_interrupts!();
use base::vector::OwnedVector;
let state = unsafe { &mut *state.cast::<State<F>>() };
let vector = unsafe {
Expand Down Expand Up @@ -568,7 +567,6 @@ pub unsafe extern "C" fn rabbithole_parallel_build_main(
scan,
};
heap_relation.traverse(|(payload, vector)| {
pgrx::check_for_interrupts!();
algorithm::insert::insert(
index_relation.clone(),
payload,
Expand Down

0 comments on commit 9161a67

Please sign in to comment.