Skip to content

Commit

Permalink
[experimental] feat: pinning index in memory when building
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <[email protected]>
  • Loading branch information
usamoi committed Dec 27, 2024
1 parent 3e23a14 commit 45444f2
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 6 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nalgebra = "=0.33.0"
# lock rkyv version forever so that data is always compatible
rkyv = { version = "=0.7.45", features = ["validation"] }

bincode = "1.3.3"
half = { version = "2.4.1", features = ["rkyv"] }
log = "0.4.22"
paste = "1"
Expand Down
2 changes: 0 additions & 2 deletions src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl Page {
assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize);
this
}
#[allow(dead_code)]
pub fn clone_into_boxed(&self) -> Box<Self> {
let mut result = Box::new_uninit();
unsafe {
Expand Down Expand Up @@ -357,7 +356,6 @@ impl Relation {
}
}
}
#[allow(dead_code)]
pub fn len(&self) -> u32 {
unsafe {
pgrx::pg_sys::RelationGetNumberOfBlocksInFork(
Expand Down
201 changes: 197 additions & 4 deletions src/vchordrq/index/am.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::postgres::Relation;
use crate::vchordrq::algorithm;
use crate::postgres::{Page, Relation};
use crate::vchordrq::algorithm::build::{HeapRelation, Reporter};
use crate::vchordrq::algorithm::tuples::Vector;
use crate::vchordrq::algorithm::{self, PageGuard};
use crate::vchordrq::algorithm::{RelationRead, RelationWrite};
use crate::vchordrq::index::am_options::{Opfamily, Reloption};
use crate::vchordrq::index::am_scan::Scanner;
use crate::vchordrq::index::utils::{ctid_to_pointer, pointer_to_ctid};
Expand All @@ -12,6 +13,8 @@ use base::vector::VectOwned;
use half::f16;
use pgrx::datum::Internal;
use pgrx::pg_sys::Datum;
use std::collections::HashMap;
use std::ops::Deref;

static mut RELOPT_KIND_VCHORDRQ: pgrx::pg_sys::relopt_kind::Type = 0;

Expand Down Expand Up @@ -259,7 +262,52 @@ pub unsafe extern "C" fn ambuild(
reporter.clone(),
),
}
if let Some(leader) = unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent) }
let cache = {
let n = index_relation.len();
let mut dir = HashMap::<u32, usize>::with_capacity(n as _);
let mut pages = Vec::<Box<Page>>::new();
{
use crate::vchordrq::algorithm::tuples::{Height1Tuple, MetaTuple};
let mut read = |id| {
let result = index_relation.read(id);
dir.insert(id, pages.len());
pages.push(result.clone_into_boxed());
result
};
let meta_guard = read(0);
let meta_tuple = meta_guard
.get(1)
.map(rkyv::check_archived_root::<MetaTuple>)
.expect("data corruption")
.expect("data corruption");
let mut firsts = vec![meta_tuple.first];
let mut make_firsts = |firsts| {
let mut results = Vec::new();
for first in firsts {
let mut current = first;
while current != u32::MAX {
let h1_guard = read(current);
for i in 1..=h1_guard.len() {
let h1_tuple = h1_guard
.get(i)
.map(rkyv::check_archived_root::<Height1Tuple>)
.expect("data corruption")
.expect("data corruption");
results.push(h1_tuple.first);
}
current = h1_guard.get_opaque().next;
}
}
results
};
for _ in (1..meta_tuple.height_of_root).rev() {
firsts = make_firsts(firsts);
}
}
(dir, pages)
};
if let Some(leader) =
unsafe { VchordrqLeader::enter(heap, index, (*index_info).ii_Concurrent, &cache) }
{
unsafe {
parallel_build(
Expand All @@ -269,6 +317,8 @@ pub unsafe extern "C" fn ambuild(
leader.tablescandesc,
leader.vchordrqshared,
Some(reporter),
&*leader.cache_0,
&*leader.cache_1,
);
leader.wait();
let nparticipants = leader.nparticipants;
Expand All @@ -290,6 +340,10 @@ pub unsafe extern "C" fn ambuild(
let mut indtuples = 0;
reporter.tuples_done(indtuples);
let relation = unsafe { Relation::new(index) };
let relation = CachingRelation {
cache: &(cache.0, cache.1.iter().map(|x| x.as_ref()).collect()),
relation,
};
match opfamily.vector_kind() {
VectorKind::Vecf32 => {
HeapRelation::<VectOwned<f32>>::traverse(
Expand Down Expand Up @@ -330,11 +384,78 @@ pub unsafe extern "C" fn ambuild(
unsafe { pgrx::pgbox::PgBox::<pgrx::pg_sys::IndexBuildResult>::alloc0().into_pg() }
}

#[derive(Clone)]
struct CachingRelation<'a, R> {
cache: &'a (HashMap<u32, usize>, Vec<&'a Page>),
relation: R,
}

enum CachingRelationReadGuard<'a, G> {
Wrapping(G),
Cached(u32, &'a Page),
}

impl<G: PageGuard> PageGuard for CachingRelationReadGuard<'_, G> {
fn id(&self) -> u32 {
match self {
CachingRelationReadGuard::Wrapping(x) => x.id(),
CachingRelationReadGuard::Cached(id, _) => *id,
}
}
}

impl<G: Deref<Target = Page>> Deref for CachingRelationReadGuard<'_, G> {
type Target = Page;

fn deref(&self) -> &Self::Target {
match self {
CachingRelationReadGuard::Wrapping(x) => x,
CachingRelationReadGuard::Cached(_, page) => page,
}
}
}

impl<R: RelationRead> RelationRead for CachingRelation<'_, R> {
type ReadGuard<'a>
= CachingRelationReadGuard<'a, R::ReadGuard<'a>>
where
Self: 'a;

fn read(&self, id: u32) -> Self::ReadGuard<'_> {
if let Some(&x) = self.cache.0.get(&id) {
CachingRelationReadGuard::Cached(id, self.cache.1[x])
} else {
CachingRelationReadGuard::Wrapping(self.relation.read(id))
}
}
}

impl<R: RelationWrite> RelationWrite for CachingRelation<'_, R> {
type WriteGuard<'a>
= R::WriteGuard<'a>
where
Self: 'a;

fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.write(id, tracking_freespace)
}

fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> {
self.relation.extend(tracking_freespace)
}

fn search(&self, freespace: usize) -> Option<Self::WriteGuard<'_>> {
self.relation.search(freespace)
}
}

struct VchordrqShared {
/* Immutable state */
heaprelid: pgrx::pg_sys::Oid,
indexrelid: pgrx::pg_sys::Oid,
isconcurrent: bool,
est_cache_0: usize,
est_cache_1: usize,

/* Worker progress */
workersdonecv: pgrx::pg_sys::ConditionVariable,
Expand All @@ -360,6 +481,8 @@ struct VchordrqLeader {
nparticipants: i32,
vchordrqshared: *mut VchordrqShared,
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
cache_0: *const [u8],
cache_1: *const [u8],
snapshot: pgrx::pg_sys::Snapshot,
}

Expand All @@ -368,7 +491,10 @@ impl VchordrqLeader {
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
isconcurrent: bool,
cache: &(HashMap<u32, usize>, Vec<Box<Page>>),
) -> Option<Self> {
let cache_mapping: Vec<u8> = bincode::serialize(&cache.0).unwrap();

unsafe fn compute_parallel_workers(
heap: pgrx::pg_sys::Relation,
index: pgrx::pg_sys::Relation,
Expand Down Expand Up @@ -419,11 +545,17 @@ impl VchordrqLeader {
}
let est_tablescandesc =
unsafe { pgrx::pg_sys::table_parallelscan_estimate(heap, snapshot) };
let est_cache_0 = cache_mapping.len();
let est_cache_1 = cache.1.len() * size_of::<Page>();
unsafe {
estimate_chunk(&mut (*pcxt).estimator, size_of::<VchordrqShared>());
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_tablescandesc);
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_cache_0);
estimate_keys(&mut (*pcxt).estimator, 1);
estimate_chunk(&mut (*pcxt).estimator, est_cache_1);
estimate_keys(&mut (*pcxt).estimator, 1);
}

unsafe {
Expand All @@ -450,6 +582,8 @@ impl VchordrqLeader {
mutex: std::mem::zeroed(),
nparticipantsdone: 0,
indtuples: 0,
est_cache_0,
est_cache_1,
});
pgrx::pg_sys::ConditionVariableInit(&raw mut (*vchordrqshared).workersdonecv);
pgrx::pg_sys::SpinLockInit(&raw mut (*vchordrqshared).mutex);
Expand All @@ -463,9 +597,29 @@ impl VchordrqLeader {
tablescandesc
};

let cache_0 = unsafe {
let cache_0 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_0).cast::<u8>();
std::ptr::copy(cache_mapping.as_ptr(), cache_0, est_cache_0);
core::ptr::slice_from_raw_parts(cache_0, est_cache_0)
};

let cache_1 = unsafe {
let cache_1 = pgrx::pg_sys::shm_toc_allocate((*pcxt).toc, est_cache_1).cast::<u8>();
for i in 0..cache.1.len() {
std::ptr::copy(
(cache.1[i].deref() as *const Page).cast::<u8>(),
cache_1.cast::<Page>().add(i).cast(),
size_of::<Page>(),
);
}
core::ptr::slice_from_raw_parts(cache_1, est_cache_1)
};

unsafe {
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000001, vchordrqshared.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000002, tablescandesc.cast());
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000003, cache_0 as _);
pgrx::pg_sys::shm_toc_insert((*pcxt).toc, 0xA000000000000004, cache_1 as _);
}

unsafe {
Expand All @@ -491,6 +645,8 @@ impl VchordrqLeader {
nparticipants: nworkers_launched + 1,
vchordrqshared,
tablescandesc,
cache_0,
cache_1,
snapshot,
})
}
Expand Down Expand Up @@ -530,6 +686,18 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000002, false)
.cast::<pgrx::pg_sys::ParallelTableScanDescData>()
};
let cache_0 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000003, false).cast::<u8>(),
(*vchordrqshared).est_cache_0,
)
};
let cache_1 = unsafe {
std::slice::from_raw_parts(
pgrx::pg_sys::shm_toc_lookup(toc, 0xA000000000000004, false).cast::<u8>(),
(*vchordrqshared).est_cache_1,
)
};
let heap_lockmode;
let index_lockmode;
if unsafe { !(*vchordrqshared).isconcurrent } {
Expand All @@ -547,7 +715,16 @@ pub unsafe extern "C" fn vchordrq_parallel_build_main(
}

unsafe {
parallel_build(index, heap, index_info, tablescandesc, vchordrqshared, None);
parallel_build(
index,
heap,
index_info,
tablescandesc,
vchordrqshared,
None,
cache_0,
cache_1,
);
}

unsafe {
Expand All @@ -563,6 +740,8 @@ unsafe fn parallel_build(
tablescandesc: *mut pgrx::pg_sys::ParallelTableScanDescData,
vchordrqshared: *mut VchordrqShared,
mut reporter: Option<PgReporter>,
cache_0: &[u8],
cache_1: &[u8],
) {
#[derive(Debug, Clone)]
pub struct Heap {
Expand Down Expand Up @@ -628,6 +807,20 @@ unsafe fn parallel_build(
}

let index_relation = unsafe { Relation::new(index) };
let index_relation = CachingRelation {
cache: {
let cache_0: HashMap<u32, usize> = bincode::deserialize(cache_0).unwrap();
assert!(cache_1.len() % size_of::<Page>() == 0);
let n = cache_1.len() / size_of::<Page>();
let cache_1 = unsafe {
(0..n)
.map(|i| &*cache_1.as_ptr().cast::<Page>().add(i))
.collect::<Vec<&Page>>()
};
&(cache_0, cache_1)
},
relation: index_relation,
};

let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap, tablescandesc) };
let opfamily = unsafe { am_options::opfamily(index) };
Expand Down

0 comments on commit 45444f2

Please sign in to comment.