diff --git a/src/vchordrqfscan/algorithm/build.rs b/src/vchordrqfscan/algorithm/build.rs index a245d77..e46d6a7 100644 --- a/src/vchordrqfscan/algorithm/build.rs +++ b/src/vchordrqfscan/algorithm/build.rs @@ -72,10 +72,7 @@ pub fn build( }; let mut meta = Tape::create(&relation, false); assert_eq!(meta.first(), 0); - let mut forwards = Tape::::create(&relation, false); - assert_eq!(forwards.first(), 1); let mut vectors = Tape::create(&relation, true); - assert_eq!(vectors.first(), 2); let mut pointer_of_means = Vec::>::new(); for i in 0..structures.len() { let mut level = Vec::new(); @@ -166,13 +163,11 @@ pub fn build( } pointer_of_firsts.push(level); } - forwards.head.get_opaque_mut().skip = vectors.first(); meta.push(&MetaTuple { dims, height_of_root: structures.len() as u32, is_residual, vectors_first: vectors.first(), - forwards_first: forwards.first(), mean: pointer_of_means.last().unwrap()[0], first: pointer_of_firsts.last().unwrap()[0], }); @@ -244,20 +239,32 @@ impl Structure { ) -> Vec { use std::collections::BTreeMap; let VchordrqfscanExternalBuildOptions { table } = external_build; - let query = format!("SELECT id, parent, vector FROM {table};"); let mut parents = BTreeMap::new(); let mut vectors = BTreeMap::new(); pgrx::spi::Spi::connect(|client| { use crate::datatype::memory_pgvector_vector::PgvectorVectorOutput; use pgrx::pg_sys::panic::ErrorReportable; use vector::VectorBorrowed; - let table = client.select(&query, None, None).unwrap_or_report(); - for row in table { + let schema_query = "SELECT n.nspname::TEXT + FROM pg_catalog.pg_extension e + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace + WHERE e.extname = 'vector';"; + let pgvector_schema: String = client + .select(schema_query, None, None) + .unwrap_or_report() + .first() + .get_by_name("nspname") + .expect("external build: cannot get schema of pgvector") + .expect("external build: cannot get schema of pgvector"); + let dump_query = + format!("SELECT id, parent, vector::{pgvector_schema}.vector FROM {table};"); + let centroids = client.select(&dump_query, None, None).unwrap_or_report(); + for row in centroids { let id: Option = row.get_by_name("id").unwrap(); let parent: Option = row.get_by_name("parent").unwrap(); let vector: Option = row.get_by_name("vector").unwrap(); - let id = id.expect("extern build: id could not be NULL"); - let vector = vector.expect("extern build: vector could not be NULL"); + let id = id.expect("external build: id could not be NULL"); + let vector = vector.expect("external build: vector could not be NULL"); let pop = parents.insert(id, parent); if pop.is_some() { pgrx::error!( @@ -265,11 +272,34 @@ impl Structure { ); } if vector_options.dims != vector.as_borrowed().dims() { - pgrx::error!("extern build: incorrect dimension, id = {id}"); + pgrx::error!("external build: incorrect dimension, id = {id}"); } vectors.insert(id, crate::projection::project(vector.as_borrowed().slice())); } }); + if parents.len() >= 2 && parents.values().all(|x| x.is_none()) { + // if there are more than one vertexs and no edges, + // assume there is an implicit root + let n = parents.len(); + let mut result = Vec::new(); + result.push(Structure { + means: vectors.values().cloned().collect::>(), + children: vec![Vec::new(); n], + }); + result.push(Structure { + means: vec![{ + // compute the vector on root, without normalizing it + let mut sum = vec![0.0f32; vector_options.dims as _]; + for vector in vectors.values() { + f32::vector_add_inplace(&mut sum, vector); + } + f32::vector_mul_scalar_inplace(&mut sum, 1.0 / n as f32); + sum + }], + children: vec![(0..n as u32).collect()], + }); + return result; + } let mut children = parents .keys() .map(|x| (*x, Vec::new())) @@ -293,7 +323,7 @@ impl Structure { } } let Some(root) = root else { - pgrx::error!("extern build: there are no root"); + pgrx::error!("external build: there are no root"); }; let mut heights = BTreeMap::<_, _>::new(); fn dfs_for_heights( @@ -302,7 +332,7 @@ impl Structure { u: i32, ) { if heights.contains_key(&u) { - pgrx::error!("extern build: detect a cycle, id = {u}"); + pgrx::error!("external build: detect a cycle, id = {u}"); } heights.insert(u, None); let mut height = None; @@ -311,7 +341,7 @@ impl Structure { let new = heights[&v].unwrap() + 1; if let Some(height) = height { if height != new { - pgrx::error!("extern build: two heights, id = {u}"); + pgrx::error!("external build: two heights, id = {u}"); } } else { height = Some(new); @@ -329,7 +359,7 @@ impl Structure { .collect::>(); if !(1..=8).contains(&(heights[&root] - 1)) { pgrx::error!( - "extern build: unexpected tree height, height = {}", + "external build: unexpected tree height, height = {}", heights[&root] ); } @@ -367,7 +397,7 @@ impl Structure { } } -struct Tape<'a, 'b, T, R: 'b + RelationWrite> { +struct Tape<'a: 'b, 'b, T, R: 'b + RelationWrite> { relation: &'a R, head: R::WriteGuard<'b>, first: u32, @@ -377,7 +407,8 @@ struct Tape<'a, 'b, T, R: 'b + RelationWrite> { impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> { fn create(relation: &'a R, tracking_freespace: bool) -> Self { - let head = relation.extend(tracking_freespace); + let mut head = relation.extend(tracking_freespace); + head.get_opaque_mut().skip = head.id(); let first = head.id(); Self { relation, diff --git a/src/vchordrqfscan/algorithm/insert.rs b/src/vchordrqfscan/algorithm/insert.rs index ee44760..88cf3a6 100644 --- a/src/vchordrqfscan/algorithm/insert.rs +++ b/src/vchordrqfscan/algorithm/insert.rs @@ -12,10 +12,11 @@ use std::collections::BinaryHeap; use std::num::NonZeroU64; pub fn insert( - relation: impl RelationWrite, + relation: impl RelationWrite + Clone, payload: NonZeroU64, vector: Vec, distance_kind: DistanceKind, + in_building: bool, ) { let meta_guard = relation.read(0); let meta_tuple = meta_guard @@ -32,56 +33,20 @@ pub fn insert( } else { None }; - let h0_vector = 'h0_vector: { + let h0_vector = { let tuple = rkyv::to_bytes::<_, 8192>(&VectorTuple { vector: vector.clone(), payload: Some(payload), }) .unwrap(); - if let Some(mut write) = relation.search(tuple.len()) { - let i = write.alloc(&tuple).unwrap(); - break 'h0_vector (write.id(), i); - } - let mut current = relation.read(meta_tuple.forwards_first).get_opaque().skip; - let mut changed = false; - loop { - let read = relation.read(current); - let flag = 'flag: { - if read.freespace() as usize >= tuple.len() { - break 'flag true; - } - if read.get_opaque().next == u32::MAX { - break 'flag true; - } - false - }; - if flag { - drop(read); - let mut write = relation.write(current, true); - if let Some(i) = write.alloc(&tuple) { - break (current, i); - } - if write.get_opaque().next == u32::MAX { - if changed { - relation - .write(meta_tuple.forwards_first, false) - .get_opaque_mut() - .skip = write.id(); - } - let mut extend = relation.extend(true); - write.get_opaque_mut().next = extend.id(); - if let Some(i) = extend.alloc(&tuple) { - break (extend.id(), i); - } else { - panic!("a tuple cannot even be fit in a fresh page"); - } - } - current = write.get_opaque().next; - } else { - current = read.get_opaque().next; - } - changed = true; - } + append( + relation.clone(), + meta_tuple.vectors_first, + &tuple, + true, + true, + true, + ) }; let h0_payload = payload; let mut list = ( @@ -186,19 +151,93 @@ pub fn insert( t: vec![0; (dims.div_ceil(4) * 16) as usize], }) .unwrap(); - let first = list.0; + append_by_update( + relation.clone(), + list.0, + &dummy, + in_building, + in_building, + |bytes| { + let t = rkyv::check_archived_root::(bytes).expect("data corruption"); + t.mask.iter().any(|x| *x) + }, + |bytes| put(bytes, dims, &code, h0_vector, h0_payload), + ); +} + +fn append( + relation: impl RelationWrite, + first: u32, + tuple: &[u8], + tracking_freespace: bool, + skipping_traversal: bool, + updating_skip: bool, +) -> (u32, u16) { + if tracking_freespace { + if let Some(mut write) = relation.search(tuple.len()) { + let i = write.alloc(tuple).unwrap(); + return (write.id(), i); + } + } + assert!(first != u32::MAX); + let mut current = first; + loop { + let read = relation.read(current); + if read.freespace() as usize >= tuple.len() || read.get_opaque().next == u32::MAX { + drop(read); + let mut write = relation.write(current, tracking_freespace); + if let Some(i) = write.alloc(tuple) { + return (current, i); + } + if write.get_opaque().next == u32::MAX { + let mut extend = relation.extend(tracking_freespace); + write.get_opaque_mut().next = extend.id(); + drop(write); + if let Some(i) = extend.alloc(tuple) { + let result = (extend.id(), i); + drop(extend); + if updating_skip { + let mut past = relation.write(first, tracking_freespace); + let skip = &mut past.get_opaque_mut().skip; + assert!(*skip != u32::MAX); + *skip = std::cmp::max(*skip, result.0); + } + return result; + } else { + panic!("a tuple cannot even be fit in a fresh page"); + } + } + if skipping_traversal && current == first && write.get_opaque().skip != first { + current = write.get_opaque().skip; + } else { + current = write.get_opaque().next; + } + } else { + if skipping_traversal && current == first && read.get_opaque().skip != first { + current = read.get_opaque().skip; + } else { + current = read.get_opaque().next; + } + } + } +} + +fn append_by_update( + relation: impl RelationWrite, + first: u32, + dummy: &[u8], + skipping_traversal: bool, + updating_skip: bool, + can_update: impl Fn(&[u8]) -> bool, + mut update: impl FnMut(&mut [u8]) -> bool, +) { assert!(first != u32::MAX); let mut current = first; loop { let read = relation.read(current); let flag = 'flag: { for i in 1..=read.len() { - let h0_tuple = read - .get(i) - .map(rkyv::check_archived_root::) - .expect("data corruption") - .expect("data corruption"); - if h0_tuple.mask.iter().any(|x| *x) { + if can_update(read.get(i).expect("data corruption")) { break 'flag true; } } @@ -214,48 +253,47 @@ pub fn insert( drop(read); let mut write = relation.write(current, false); for i in 1..=write.len() { - let flag = put( - write.get_mut(i).expect("data corruption"), - dims, - &code, - h0_vector, - h0_payload, - ); - if flag { + if update(write.get_mut(i).expect("data corruption")) { return; } } if let Some(i) = write.alloc(&dummy) { - let flag = put( - write.get_mut(i).expect("data corruption"), - dims, - &code, - h0_vector, - h0_payload, - ); - assert!(flag, "a put fails even on a fresh tuple"); - return; + if update(write.get_mut(i).expect("data corruption")) { + return; + } + panic!("a put fails even on a fresh tuple"); } if write.get_opaque().next == u32::MAX { let mut extend = relation.extend(false); write.get_opaque_mut().next = extend.id(); + drop(write); if let Some(i) = extend.alloc(&dummy) { - let flag = put( - extend.get_mut(i).expect("data corruption"), - dims, - &code, - h0_vector, - h0_payload, - ); - assert!(flag, "a put fails even on a fresh tuple"); - return; - } else { - panic!("a tuple cannot even be fit in a fresh page"); + if update(extend.get_mut(i).expect("data corruption")) { + let id = extend.id(); + drop(extend); + if updating_skip { + let mut past = relation.write(first, false); + let skip = &mut past.get_opaque_mut().skip; + assert!(*skip != u32::MAX); + *skip = std::cmp::max(*skip, id); + } + return; + } + panic!("a put fails even on a fresh tuple"); } + panic!("a tuple cannot even be fit in a fresh page"); + } + if skipping_traversal && current == first && write.get_opaque().skip != first { + current = write.get_opaque().skip; + } else { + current = write.get_opaque().next; } - current = write.get_opaque().next; } else { - current = read.get_opaque().next; + if skipping_traversal && current == first && read.get_opaque().skip != first { + current = read.get_opaque().skip; + } else { + current = read.get_opaque().next; + } } } } diff --git a/src/vchordrqfscan/algorithm/tuples.rs b/src/vchordrqfscan/algorithm/tuples.rs index ff94713..7d1c97a 100644 --- a/src/vchordrqfscan/algorithm/tuples.rs +++ b/src/vchordrqfscan/algorithm/tuples.rs @@ -10,7 +10,6 @@ pub struct MetaTuple { pub height_of_root: u32, pub is_residual: bool, pub vectors_first: u32, - pub forwards_first: u32, // raw vector pub mean: (u32, u16), // for meta tuple, it's pointers to next level diff --git a/src/vchordrqfscan/index/am.rs b/src/vchordrqfscan/index/am.rs index 7011e26..5f25bb8 100644 --- a/src/vchordrqfscan/index/am.rs +++ b/src/vchordrqfscan/index/am.rs @@ -287,6 +287,7 @@ pub unsafe extern "C" fn ambuild( payload, vector, opfamily.distance_kind(), + true, ); indtuples += 1; reporter.tuples_done(indtuples); @@ -624,6 +625,7 @@ unsafe fn parallel_build( payload, vector, opfamily.distance_kind(), + true, ); unsafe { let indtuples; @@ -706,6 +708,7 @@ pub unsafe extern "C" fn aminsert( pointer, vector.into_vec(), opfamily.distance_kind(), + false, ); } false