Skip to content

Commit

Permalink
refactor: sealing incrementally (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Mingzhuo Yin <[email protected]>
  • Loading branch information
silver-ymz authored Nov 28, 2024
1 parent 24e6576 commit b6955f7
Show file tree
Hide file tree
Showing 23 changed files with 592 additions and 323 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ bytemuck = "1.18"
lazy_static = "1.5"
pgrx = { version = "=0.12.8", default-features = false, features = ["cshim"] }
regex = "1.11.1"
serde = { version = "1.0", features = ["derive"] }
stop-words = "0.8.0"
tantivy-stemmers = { version = "0.4.0", features = [
"default",
Expand All @@ -41,7 +40,12 @@ rand = "0.8"
[profile.release]
lto = "fat"
codegen-units = 1
debug = true

[profile.dev-opt]
inherits = "dev"
opt-level = 3
lto = "thin"
codegen-units = 8

[lints.clippy]
missing_safety_doc = "allow"
Expand Down
17 changes: 9 additions & 8 deletions src/index/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::ops::DerefMut;

use pgrx::{itemptr::item_pointer_to_u64, FromDatum, PgMemoryContexts};

use crate::{
Expand All @@ -10,7 +8,8 @@ use crate::{
},
segment::{
builder::IndexBuilder,
meta::{metapage_append_sealed_segment, MetaPageData, META_VERSION},
meta::{MetaPageData, META_VERSION},
sealed::SealedSegmentData,
},
token::vocab_len,
};
Expand Down Expand Up @@ -42,8 +41,9 @@ pub unsafe extern "C" fn ambuildempty(index: pgrx::pg_sys::Relation) {
current_doc_id: 0,
sealed_doc_id: 0,
growing_segment: None,
sealed_length: 0,
sealed_segment: [],
sealed_segment: SealedSegmentData {
term_info_blkno: pgrx::pg_sys::InvalidBlockNumber,
},
});
meta_page.header.pd_lower += std::mem::size_of::<MetaPageData>() as u16;
}
Expand Down Expand Up @@ -128,8 +128,9 @@ unsafe fn write_down(state: &BuildState) {
current_doc_id: doc_cnt,
sealed_doc_id: doc_cnt,
growing_segment: None,
sealed_length: 0,
sealed_segment: [],
sealed_segment: SealedSegmentData {
term_info_blkno: pgrx::pg_sys::InvalidBlockNumber,
},
});
meta_page.header.pd_lower += std::mem::size_of::<MetaPageData>() as u16;
}
Expand All @@ -155,5 +156,5 @@ unsafe fn write_down(state: &BuildState) {
meta.payload_blkno = payload_blkno;
meta.term_stat_blkno = term_stat_blkno;
meta.delete_bitmap_blkno = delete_bitmap_blkno;
metapage_append_sealed_segment(meta_page.deref_mut(), sealed_data);
meta.sealed_segment = sealed_data;
}
91 changes: 57 additions & 34 deletions src/index/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,17 @@ use pgrx::{itemptr::item_pointer_to_u64, FromDatum};

use crate::{
datatype::Bm25VectorInput,
guc::SEGMENT_GROWING_MAX_PAGE_SIZE,
page::{page_free, page_read, page_write, VirtualPageWriter, METAPAGE_BLKNO},
segment::{
delete::extend_delete_bit,
field_norm::fieldnorm_to_id,
meta::{metapage_append_sealed_segment, MetaPageData},
field_norm::{fieldnorm_to_id, FieldNormReader},
growing::{GrowingSegmentData, GrowingSegmentReader},
meta::MetaPageData,
posting::{InvertedAppender, InvertedWriter},
term_stat::TermStatReader,
},
};

/// Insert Progress:
/// 1. lock metapage
/// 2. update doc_cnt, doc_term_cnt
/// 3. insert into growing segment
/// - if no growing segment, create one
/// - if growing segment is full, seal it
/// - otherwise, append to the last page
/// 4. write payload, field_norm, term_stat
#[allow(clippy::too_many_arguments)]
#[pgrx::pg_guard]
pub unsafe extern "C" fn aminsert(
Expand Down Expand Up @@ -47,7 +40,7 @@ pub unsafe extern "C" fn aminsert(
meta.doc_cnt += 1;
meta.doc_term_cnt += doc_len as u64;

let need_sealed = crate::segment::growing::growing_segment_insert(index, meta, &vector);
let growing_results = crate::segment::growing::growing_segment_insert(index, meta, &vector);

let payload_blkno = meta.payload_blkno;
let field_norm_blkno = meta.field_norm_blkno;
Expand Down Expand Up @@ -76,33 +69,63 @@ pub unsafe extern "C" fn aminsert(

extend_delete_bit(index, delete_bitmap_blkno, current_doc_id);

if need_sealed {
let metapage = metapage.degrade();
let (sealed_data, sealed_doc_id) =
crate::segment::growing::build_sealed_segment(index, metapage.as_ref());
let prev_growing_segment = *meta.growing_segment.as_ref().unwrap();
let sealed_doc_id = meta.sealed_doc_id;
drop(metapage);

if let Some(block_count) = growing_results {
let growing_reader = GrowingSegmentReader::new(index, &prev_growing_segment);
let mut doc_id = sealed_doc_id;

// check if any other process is sealing the segment
if !pgrx::pg_sys::ConditionalLockPage(
index,
METAPAGE_BLKNO,
pgrx::pg_sys::ExclusiveLock as _,
) {
return false;
}

let max_growing_segment_page_size = SEGMENT_GROWING_MAX_PAGE_SIZE.get() as u32;
let mut metapage = metapage.upgrade(index);
let meta: &mut MetaPageData = metapage.as_mut();
let mut writer = InvertedWriter::new();
let mut iter = growing_reader.into_iter(block_count);
while let Some(vector) = iter.next() {
writer.insert(doc_id, vector);
doc_id += 1;
}
writer.finalize();

let mut metapage = page_write(index, METAPAGE_BLKNO);
let meta: &mut MetaPageData = metapage.as_mut();
let fieldnorm_reader = FieldNormReader::new(index, field_norm_blkno);
let mut appender = InvertedAppender::new(
index,
meta.doc_cnt,
meta.avgdl(),
fieldnorm_reader,
meta.sealed_segment.term_info_blkno,
);
writer.serialize(&mut appender);

meta.sealed_doc_id = doc_id;
let growing_segment = meta.growing_segment.as_mut().unwrap();
let mut current_free_blkno = growing_segment.first_blkno;
let mut free_page_count = 0;
while free_page_count < max_growing_segment_page_size {
assert!(current_free_blkno != pgrx::pg_sys::InvalidBlockNumber);
let page = page_read(index, current_free_blkno);
let next_blkno = page.opaque.next_blkno;
page_free(index, current_free_blkno);
free_page_count += 1;
current_free_blkno = next_blkno;
}
assert!(current_free_blkno != pgrx::pg_sys::InvalidBlockNumber);
growing_segment.first_blkno = prev_growing_segment.last_blkno.try_into().unwrap();
growing_segment.growing_full_page_count -= block_count;
drop(metapage);

meta.sealed_doc_id = sealed_doc_id;
growing_segment.first_blkno = current_free_blkno;
growing_segment.growing_full_page_count -= free_page_count;
metapage_append_sealed_segment(&mut metapage, sealed_data);
pgrx::pg_sys::UnlockPage(index, METAPAGE_BLKNO, pgrx::pg_sys::ExclusiveLock as _);

free_growing_segment(index, prev_growing_segment);
}

false
}

fn free_growing_segment(index: pgrx::pg_sys::Relation, segment: GrowingSegmentData) {
let mut blkno = segment.first_blkno.get();
for _ in 0..segment.growing_full_page_count {
assert!(blkno != pgrx::pg_sys::InvalidBlockNumber);
let next_blkno = page_read(index, blkno).opaque.next_blkno;
page_free(index, blkno);
blkno = next_blkno;
}
}
60 changes: 28 additions & 32 deletions src/index/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,40 +152,36 @@ unsafe fn scan_main(index: pgrx::pg_sys::Relation, query_vector: Bm25VectorBorro
})
.collect::<Vec<_>>();

let mut sealed_sgement = meta.sealed_segment().to_vec();
sealed_sgement.reverse();
for sealed_data in sealed_sgement {
let sealed_reader = SealedSegmentReader::new(index, sealed_data);
let term_ids = query_vector.indexes();
let mut scorers = Vec::new();

for i in 0..term_ids.len() {
let term_id = term_ids[i];
if let Some(posting_reader) = sealed_reader.get_postings(term_id) {
let weight = bm25_weight[i];
scorers.push(SealedScorer {
posting: posting_reader,
weight,
max_score: weight.max_score(),
});
}
let sealed_reader = SealedSegmentReader::new(index, meta.sealed_segment);
let term_ids = query_vector.indexes();
let mut scorers = Vec::new();

for i in 0..term_ids.len() {
let term_id = term_ids[i];
if let Some(posting_reader) = sealed_reader.get_postings(term_id) {
let weight = bm25_weight[i];
scorers.push(SealedScorer {
posting: posting_reader,
weight,
max_score: weight.max_score(),
});
}
}

if scorers.len() == 1 {
block_wand_single(
scorers.into_iter().next().unwrap(),
&fieldnorm_reader,
&delete_bitmap_reader,
&mut computer,
);
} else {
block_wand(
scorers,
&fieldnorm_reader,
&delete_bitmap_reader,
&mut computer,
);
}
if scorers.len() == 1 {
block_wand_single(
scorers.into_iter().next().unwrap(),
&fieldnorm_reader,
&delete_bitmap_reader,
&mut computer,
);
} else {
block_wand(
scorers,
&fieldnorm_reader,
&delete_bitmap_reader,
&mut computer,
);
}

let payload_reader = PayloadReader::new(index, meta.payload_blkno);
Expand Down
30 changes: 14 additions & 16 deletions src/index/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,25 @@ pub unsafe extern "C" fn amvacuumcleanup(
}
}

for &sealed_data in meta.sealed_segment() {
let sealed_reader = SealedSegmentReader::new(index, sealed_data);
for i in 0..vocab_len() {
let Some(mut posting) = sealed_reader.get_postings_docid_only(i) else {
continue;
};
let sealed_reader = SealedSegmentReader::new(index, meta.sealed_segment);
for i in 0..vocab_len() {
let Some(mut posting) = sealed_reader.get_postings_docid_only(i) else {
continue;
};
loop {
posting.decode_block();
loop {
posting.decode_block();
loop {
let doc_id = posting.doc_id();
if !delete_bitmap_reader.is_delete(doc_id) {
term_stats[i as usize] += 1;
}
if !posting.advance_cur() {
break;
}
let doc_id = posting.doc_id();
if !delete_bitmap_reader.is_delete(doc_id) {
term_stats[i as usize] += 1;
}
if !posting.advance_block() {
if !posting.advance_cur() {
break;
}
}
if !posting.advance_block() {
break;
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/page/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ bitflags::bitflags! {
const META = 1 << 0;
const PAYLOAD = 1 << 1;
const FIELD_NORM = 1 << 2;
const POSTINGS = 1 << 3;
const TERM_STATISTIC = 1 << 4;
const GROWING = 1 << 5;
const DELETE = 1 << 6;
const TERM_STATISTIC = 1 << 3;
const TERM_INFO = 1 << 4;
const SKIP_INFO = 1 << 5;
const BLOCK_DATA = 1 << 6;
const GROWING = 1 << 7;
const DELETE = 1 << 8;
const FREE = 1 << 15;
}
}
Expand Down
37 changes: 31 additions & 6 deletions src/page/virtual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ impl VirtualPageWriter {
}
}

pub fn page_count(&self) -> usize {
match &self.state {
VirtualPageWriterState::Direct([_, direct]) => direct.data().len() / 4,
VirtualPageWriterState::Indirect1([_, indirect1_page, indirect1_inode]) => {
indirect1_page.data().len() / 4
+ (indirect1_inode.data().len() / 4 - 1) * DIRECT_COUNT
}
VirtualPageWriterState::Indirect2(
[_, indirect1_page, indirect2_page, indirect2_inode],
) => {
indirect1_page.data().len() / 4
+ (indirect2_page.data().len() / 4 - 1) * DIRECT_COUNT
+ (indirect2_inode.data().len() / 4 - 1) * INDIRECT1_COUNT
}
}
}

pub fn finalize(self) -> u32 {
self.first_blkno
}
Expand All @@ -210,15 +227,23 @@ impl VirtualPageWriter {
}

// it will make sure the data is on the same page
pub fn write_no_cross(&mut self, data: &[u8]) {
assert!(data.len() <= bm25_page_size());
pub fn write_vectorized_no_cross(&mut self, data: &[&[u8]]) -> bool {
let mut change_page = false;
let len = data.iter().map(|d| d.len()).sum::<usize>();
assert!(len <= bm25_page_size());
let mut space = self.freespace_mut();
if space.len() < data.len() {
if space.len() < len {
change_page = true;
self.new_page();
space = self.freespace_mut();
}
space[..data.len()].copy_from_slice(data);
*self.offset() += data.len() as u16;
let mut offset = 0;
for d in data {
space[offset..][..d.len()].copy_from_slice(d);
offset += d.len();
}
*self.offset() += len as u16;
change_page
}

fn offset(&mut self) -> &mut u16 {
Expand Down Expand Up @@ -350,7 +375,7 @@ impl VirtualPageWriter {
}
}

fn data_page(&mut self) -> &mut PageWriteGuard {
pub fn data_page(&mut self) -> &mut PageWriteGuard {
match &mut self.state {
VirtualPageWriterState::Direct(pages) => &mut pages[0],
VirtualPageWriterState::Indirect1(pages) => &mut pages[0],
Expand Down
Loading

0 comments on commit b6955f7

Please sign in to comment.