diff --git a/Cargo.lock b/Cargo.lock index 963e02305..50372e0a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -655,6 +655,46 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "4.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c937d4061031a6d0c8da4b9a4f98a172fc2976dfb1c19213a9cf7d0d3c837e36" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85379ba512b21a328adf887e85f7742d12e96eb31f3ef077df4ffc26b506ffed" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.72", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + [[package]] name = "cli" version = "0.0.0" @@ -683,6 +723,7 @@ dependencies = [ "base", "log", "memmap2", + "parking_lot", "rand", "rustix 0.38.34", "serde", @@ -1151,6 +1192,17 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-tests" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeb39ec0dacc89541b6eced815ab9e97f6b7d44078628abb090c6437763fd050" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1192,7 +1244,6 @@ version = "0.0.0" dependencies = [ "base", "common", - "parking_lot", "rand", "stoppable_rayon", ] @@ -1242,6 +1293,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1537,6 +1594,7 @@ dependencies = [ "quantization", "rabitq", "rand", + "seismic", "serde", "serde_json", "stoppable_rayon", @@ -2347,6 +2405,22 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "qwt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7038d350ddeead336e47f04e5149329ce6b7af1d1b5e7ad0fddc3dcd1bed64fe" +dependencies = [ + "bincode", + "clap", + "generic-tests", + "num-traits", + "paste", + "rand", + "serde", + "serde-big-array", +] + [[package]] name = "rabitq" version = "0.0.0" @@ -2682,6 +2756,22 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "seismic" +version = "0.0.0" +dependencies = [ + "base", + "bincode", + "common", + "log", + "quantization", + "qwt", + "rand", + "serde", + "stoppable_rayon", + "storage", +] + [[package]] name = "semver" version = "0.11.0" @@ -2718,6 +2808,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde_cbor" version = "0.11.2" diff --git a/crates/base/src/index.rs b/crates/base/src/index.rs index 554e5fc12..976139277 100644 --- a/crates/base/src/index.rs +++ b/crates/base/src/index.rs @@ -160,6 +160,18 @@ impl IndexOptions { )); } } + IndexingOptions::Seismic(_) => { + if !matches!(self.vector.d, DistanceKind::Dot) { + return Err(ValidationError::new( + "seismic is not support for distance that is not negative dot product", + )); + } + if !matches!(self.vector.v, VectorKind::SVecf32) { + return Err(ValidationError::new( + "seismic is not support for vectors that are not sparse vectors", + )); + } + } } Ok(()) } @@ -294,6 +306,7 @@ pub enum IndexingOptions { Hnsw(HnswIndexingOptions), InvertedIndex(InvertedIndexingOptions), Rabitq(RabitqIndexingOptions), + Seismic(SeismicIndexingOptions), } impl IndexingOptions { @@ -321,6 +334,12 @@ impl IndexingOptions { }; x } + pub fn unwrap_seismic(self) -> SeismicIndexingOptions { + let IndexingOptions::Seismic(x) = self else { + unreachable!() + }; + x + } } impl Default for IndexingOptions { @@ -337,6 +356,7 @@ impl Validate for IndexingOptions { Self::Hnsw(x) => x.validate(), Self::InvertedIndex(x) => x.validate(), Self::Rabitq(x) => x.validate(), + Self::Seismic(x) => x.validate(), } } } @@ -462,6 +482,51 @@ impl Default for RabitqIndexingOptions { } } +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[validate(schema(function = "Self::validate_self"))] +#[serde(deny_unknown_fields)] +pub struct SeismicIndexingOptions { + #[serde(default = "SeismicIndexingOptions::default_n_postings")] + #[validate(range(min = 100, max = 100_000))] + pub n_postings: u32, + #[serde(default = "SeismicIndexingOptions::default_centroid_fraction")] + #[validate(range(min = 0.01, max = 1.))] + pub centroid_fraction: f32, + #[serde(default = "SeismicIndexingOptions::default_summary_energy")] + #[validate(range(min = 0., max = 1.))] + pub summary_energy: f32, +} + +impl SeismicIndexingOptions { + fn default_n_postings() -> u32 { + 4000 + } + fn default_centroid_fraction() -> f32 { + 0.1 + } + fn default_summary_energy() -> f32 { + 0.4 + } + fn validate_self(&self) -> Result<(), ValidationError> { + if (self.n_postings as f32 * self.centroid_fraction) as u32 > 65535 { + return Err(ValidationError::new( + "centroids number cannot exceed 65535 in seismic indexing", + )); + } + Ok(()) + } +} + +impl Default for SeismicIndexingOptions { + fn default() -> Self { + Self { + n_postings: Self::default_n_postings(), + centroid_fraction: Self::default_centroid_fraction(), + summary_energy: Self::default_summary_energy(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "snake_case")] @@ -583,6 +648,12 @@ pub struct SearchOptions { pub rabitq_fast_scan: bool, #[validate(range(min = 1, max = 65535))] pub diskann_ef_search: u32, + #[validate(range(min = 1, max = 65535))] + pub seismic_heap_size: u32, + #[validate(range(min = 1, max = 100_000))] + pub seismic_q_cut: u32, + #[validate(range(min = 0.01, max = 1.))] + pub seismic_heap_factor: f32, } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index e0f8fb7d3..5b374a90f 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -144,6 +144,9 @@ impl QueryArguments { ivf_pq_fast_scan: false, rabitq_fast_scan: true, rabitq_nprobe: self.probe, + seismic_heap_size: 100, + seismic_q_cut: 3, + seismic_heap_factor: 1.0, } } } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 85227e1ba..eed371438 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -8,6 +8,7 @@ base = { path = "../base" } log.workspace = true memmap2.workspace = true +parking_lot.workspace = true rand.workspace = true rustix.workspace = true serde.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 4917bc514..dab581f66 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,11 +1,15 @@ +#![cfg_attr(target_arch = "aarch64", feature(stdarch_aarch64_prefetch))] + pub mod always_equal; pub mod clean; pub mod dir_ops; pub mod file_atomic; pub mod json; pub mod mmap_array; +pub mod prefetch; pub mod rand; pub mod remap; pub mod sample; pub mod variants; pub mod vec2; +pub mod visited; diff --git a/crates/common/src/prefetch.rs b/crates/common/src/prefetch.rs new file mode 100644 index 000000000..1ca7ac0e3 --- /dev/null +++ b/crates/common/src/prefetch.rs @@ -0,0 +1,26 @@ +#[allow(clippy::not_unsafe_ptr_arg_deref)] +#[allow(non_snake_case)] +#[inline] +pub fn prefetch_read_NTA(ptr: *const i8) { + #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] + { + #[cfg(target_arch = "x86")] + use core::arch::x86::{_mm_prefetch, _MM_HINT_NTA}; + + #[cfg(target_arch = "x86_64")] + use core::arch::x86_64::{_mm_prefetch, _MM_HINT_NTA}; + + unsafe { + _mm_prefetch(ptr, _MM_HINT_NTA); + } + } + + #[cfg(target_arch = "aarch64")] + { + use core::arch::aarch64::{_prefetch, _PREFETCH_LOCALITY0, _PREFETCH_READ}; + + unsafe { + _prefetch(ptr, _PREFETCH_READ, _PREFETCH_LOCALITY0); + } + } +} diff --git a/crates/graph/src/visited.rs b/crates/common/src/visited.rs similarity index 100% rename from crates/graph/src/visited.rs rename to crates/common/src/visited.rs diff --git a/crates/graph/Cargo.toml b/crates/graph/Cargo.toml index 4616a2451..a1069a286 100644 --- a/crates/graph/Cargo.toml +++ b/crates/graph/Cargo.toml @@ -4,7 +4,6 @@ version.workspace = true edition.workspace = true [dependencies] -parking_lot.workspace = true rand.workspace = true base = { path = "../base" } diff --git a/crates/graph/src/lib.rs b/crates/graph/src/lib.rs index 3ba53ab94..25109c055 100644 --- a/crates/graph/src/lib.rs +++ b/crates/graph/src/lib.rs @@ -2,4 +2,3 @@ pub mod prune; pub mod search; -pub mod visited; diff --git a/crates/graph/src/search.rs b/crates/graph/src/search.rs index acf7cacfe..62fd2dcc2 100644 --- a/crates/graph/src/search.rs +++ b/crates/graph/src/search.rs @@ -1,9 +1,9 @@ -use crate::visited::VisitedGuard; -use crate::visited::VisitedPool; use base::scalar::F32; use base::search::Element; use base::search::GraphReranker; use base::search::Payload; +use common::visited::VisitedGuard; +use common::visited::VisitedPool; use std::cmp::Reverse; use std::collections::BinaryHeap; diff --git a/crates/hnsw/src/lib.rs b/crates/hnsw/src/lib.rs index 656f2ac4d..3679f2021 100644 --- a/crates/hnsw/src/lib.rs +++ b/crates/hnsw/src/lib.rs @@ -9,7 +9,7 @@ use base::vector::VectorBorrowed; use common::json::Json; use common::mmap_array::MmapArray; use common::remap::RemappedCollection; -use graph::visited::VisitedPool; +use common::visited::VisitedPool; use num_traits::Float; use parking_lot::RwLock; use quantization::operator::OperatorQuantization; diff --git a/crates/index/Cargo.toml b/crates/index/Cargo.toml index 29d9faa58..3f969fe17 100644 --- a/crates/index/Cargo.toml +++ b/crates/index/Cargo.toml @@ -31,6 +31,7 @@ hnsw = { path = "../hnsw" } inverted = { path = "../inverted" } ivf = { path = "../ivf" } rabitq = { path = "../rabitq" } +seismic = { path = "../seismic" } [lints] workspace = true diff --git a/crates/index/src/indexing/sealed.rs b/crates/index/src/indexing/sealed.rs index e11094bd4..4e75bbe47 100644 --- a/crates/index/src/indexing/sealed.rs +++ b/crates/index/src/indexing/sealed.rs @@ -7,6 +7,7 @@ use hnsw::Hnsw; use inverted::InvertedIndex; use ivf::Ivf; use rabitq::Rabitq; +use seismic::Seismic; use std::path::Path; pub enum SealedIndexing { @@ -15,6 +16,7 @@ pub enum SealedIndexing { Hnsw(Hnsw), InvertedIndex(InvertedIndex), Rabitq(Rabitq), + Seismic(Seismic), } impl SealedIndexing { @@ -31,6 +33,7 @@ impl SealedIndexing { Self::InvertedIndex(InvertedIndex::create(path, options, source)) } IndexingOptions::Rabitq(_) => Self::Rabitq(Rabitq::create(path, options, source)), + IndexingOptions::Seismic(_) => Self::Seismic(Seismic::create(path, options, source)), } } @@ -41,6 +44,7 @@ impl SealedIndexing { IndexingOptions::Hnsw(_) => Self::Hnsw(Hnsw::open(path)), IndexingOptions::InvertedIndex(_) => Self::InvertedIndex(InvertedIndex::open(path)), IndexingOptions::Rabitq(_) => Self::Rabitq(Rabitq::open(path)), + IndexingOptions::Seismic(_) => Self::Seismic(Seismic::open(path)), } } @@ -55,6 +59,7 @@ impl SealedIndexing { SealedIndexing::Hnsw(x) => x.vbase(vector, opts), SealedIndexing::InvertedIndex(x) => x.vbase(vector, opts), SealedIndexing::Rabitq(x) => x.vbase(vector, opts), + SealedIndexing::Seismic(x) => x.vbase(vector, opts), } } @@ -65,6 +70,7 @@ impl SealedIndexing { SealedIndexing::Hnsw(x) => x.len(), SealedIndexing::InvertedIndex(x) => x.len(), SealedIndexing::Rabitq(x) => x.len(), + SealedIndexing::Seismic(x) => x.len(), } } @@ -75,6 +81,7 @@ impl SealedIndexing { SealedIndexing::Hnsw(x) => x.vector(i), SealedIndexing::InvertedIndex(x) => x.vector(i), SealedIndexing::Rabitq(x) => x.vector(i), + SealedIndexing::Seismic(x) => x.vector(i), } } @@ -85,6 +92,7 @@ impl SealedIndexing { SealedIndexing::Hnsw(x) => x.payload(i), SealedIndexing::InvertedIndex(x) => x.payload(i), SealedIndexing::Rabitq(x) => x.payload(i), + SealedIndexing::Seismic(x) => x.payload(i), } } } diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index 1d0cc9961..988a291e5 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -30,6 +30,7 @@ use ivf::operator::OperatorIvf; use parking_lot::Mutex; use quantization::operator::OperatorQuantization; use rabitq::operator::OperatorRabitq; +use seismic::operator::OperatorSeismic; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::collections::HashSet; @@ -50,6 +51,7 @@ pub trait Op: + OperatorIvf + OperatorInvertedIndex + OperatorRabitq + + OperatorSeismic { } @@ -59,7 +61,8 @@ impl< + OperatorStorage + OperatorIvf + OperatorInvertedIndex - + OperatorRabitq, + + OperatorRabitq + + OperatorSeismic, > Op for T { } diff --git a/crates/index/src/segment/sealed.rs b/crates/index/src/segment/sealed.rs index 4fa8ee476..6906a1ae3 100644 --- a/crates/index/src/segment/sealed.rs +++ b/crates/index/src/segment/sealed.rs @@ -123,6 +123,7 @@ impl SealedSegment { SealedIndexing::Hnsw(x) => x, SealedIndexing::InvertedIndex(x) => x, SealedIndexing::Rabitq(x) => x, + SealedIndexing::Seismic(x) => x, } } } diff --git a/crates/seismic/Cargo.toml b/crates/seismic/Cargo.toml new file mode 100644 index 000000000..17b8f97a7 --- /dev/null +++ b/crates/seismic/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "seismic" +version.workspace = true +edition.workspace = true + +[dependencies] +bincode.workspace = true +log.workspace = true +rand.workspace = true +serde.workspace = true + +base = { path = "../base" } +common = { path = "../common" } +quantization = { path = "../quantization" } +stoppable_rayon = { path = "../stoppable_rayon" } +storage = { path = "../storage" } + +qwt = "0.2" + +[lints] +workspace = true diff --git a/crates/seismic/src/lib.rs b/crates/seismic/src/lib.rs new file mode 100644 index 000000000..52cda4a6c --- /dev/null +++ b/crates/seismic/src/lib.rs @@ -0,0 +1,395 @@ +#![allow(clippy::len_without_is_empty)] + +pub mod operator; +mod quantized_summary; + +use base::index::*; +use base::operator::*; +use base::scalar::F32; +use base::search::*; +use base::vector::SVecf32Borrowed; +use base::vector::SVecf32Owned; +use common::mmap_array::MmapArray; +use common::remap::RemappedCollection; +use common::visited::VisitedGuardChecker; +use common::visited::VisitedPool; +use operator::OperatorSeismic; +use quantized_summary::QuantizedSummary; +use rand::seq::IteratorRandom; +use serde::Deserialize; +use serde::Serialize; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::collections::HashMap; +use std::fs::create_dir; +use std::path::Path; +use stoppable_rayon as rayon; +use stoppable_rayon::iter::IntoParallelIterator; +use stoppable_rayon::iter::IntoParallelRefMutIterator; +use stoppable_rayon::iter::ParallelIterator; +use storage::Storage; + +const MIN_CLUSTER_SIZE: usize = 2; + +#[derive(Debug, Serialize, Deserialize)] +struct PostingList { + postings: Box<[u32]>, + block_offsets: Box<[usize]>, + summaries: QuantizedSummary, +} +pub struct Seismic { + storage: O::Storage, + payloads: MmapArray, + // ---------------------- + posting_lists: Box<[PostingList]>, + // ---------------------- + visited: VisitedPool, +} + +impl Seismic { + pub fn create(path: impl AsRef, options: IndexOptions, source: &impl Source) -> Self { + let remapped = RemappedCollection::from_source(source); + from_nothing(path.as_ref(), options, &remapped) + } + + pub fn open(path: impl AsRef) -> Self { + open(path.as_ref()) + } + + pub fn vbase<'a>( + &'a self, + vector: Borrowed<'a, O>, + opts: &'a SearchOptions, + ) -> (Vec, Box<(dyn Iterator + 'a)>) { + vbase( + self, + vector, + opts.seismic_heap_size, + opts.seismic_q_cut, + opts.seismic_heap_factor, + ) + } + + pub fn len(&self) -> u32 { + self.storage.len() + } + + pub fn vector(&self, i: u32) -> Borrowed<'_, O> { + self.storage.vector(i) + } + + pub fn payload(&self, i: u32) -> Payload { + self.payloads[i as usize] + } +} + +fn from_nothing( + path: &Path, + options: IndexOptions, + collection: &(impl Collection + Sync), +) -> Seismic { + create_dir(path).unwrap(); + let SeismicIndexingOptions { + n_postings, + centroid_fraction, + summary_energy, + } = options.indexing.clone().unwrap_seismic(); + let n_postings = n_postings as usize; + let centroids = (n_postings as f32 * centroid_fraction) as usize; + let dims = collection.dims() as usize; + rayon::check(); + + // 1. Static Pruning + let mut postings = Vec::new(); + for i in 0..collection.len() { + let vector = O::cast_svec(collection.vector(i)); + for (&index, &value) in vector.indexes().iter().zip(vector.values()) { + postings.push((value, i, index)); + } + for v in vector.values() { + assert!(v.0 > 0., "Seismic index doesn't support negative values"); + } + } + let tot_postings = std::cmp::min(dims * n_postings, postings.len() - 1); + postings.select_nth_unstable_by(tot_postings, |a, b| b.0.cmp(&a.0)); + let mut inverted_lists: Vec> = vec![Vec::new(); dims]; + for (val, id, index) in postings.into_iter().take(tot_postings) { + inverted_lists[index as usize].push((val, id)); + } + inverted_lists.par_iter_mut().for_each(|vec| { + vec.sort_by(|a, b| b.0.cmp(&a.0)); + vec.truncate(n_postings * 3 / 2); + vec.shrink_to_fit(); + }); + + let posting_lists = inverted_lists + .into_par_iter() + .map(|postings| build_posting_list(&postings, centroids, summary_energy, collection)) + .collect(); + + rayon::check(); + let storage = O::Storage::create(path.join("storage"), collection); + rayon::check(); + let payloads = MmapArray::create( + path.join("payloads"), + (0..collection.len()).map(|i| collection.payload(i)), + ); + rayon::check(); + let posting_lists_file = std::fs::File::create_new(path.join("posting_lists")).unwrap(); + let posting_lists_writer = std::io::BufWriter::new(posting_lists_file); + bincode::serialize_into(posting_lists_writer, &posting_lists).unwrap(); + rayon::check(); + let visited = VisitedPool::new(storage.len()); + + Seismic { + storage, + payloads, + posting_lists, + visited, + } +} + +fn build_posting_list( + postings: &[(F32, u32)], + centroids: usize, + summary_energy: f32, + collection: &(impl Collection + Sync), +) -> PostingList { + // 2. Blocking of Inverted Lists + let mut postings = postings.iter().map(|&(_, i)| i).collect::>(); + let mut block_offsets = Vec::new(); + if !postings.is_empty() { + let mut reordered_postings = Vec::with_capacity(postings.len()); + block_offsets = Vec::with_capacity(centroids + 1); + let clustering_results = random_kmeans(&postings, centroids, collection); + block_offsets.push(0); + for cluster in clustering_results { + if cluster.is_empty() { + continue; + } + reordered_postings.extend(cluster); + block_offsets.push(reordered_postings.len()); + } + postings.copy_from_slice(&reordered_postings); + } + + // 3. Per-block Summary Vectors + let summary_vectors = block_offsets + .windows(2) + .map(|w| { + let mut map = HashMap::new(); + for &id in &postings[w[0]..w[1]] { + let vector = O::cast_svec(collection.vector(id)); + for i in 0..vector.len() { + let index = vector.indexes()[i as usize]; + let value = vector.values()[i as usize]; + map.entry(index) + .and_modify(|v| *v = std::cmp::max(*v, value)) + .or_insert(value); + } + } + + // alpha mass + let min_l1 = map.iter().map(|x| x.1 .0).sum::() * summary_energy; + let mut indexes = map.keys().copied().collect::>(); + indexes.sort_unstable_by_key(|&i| Reverse(map[&i])); + let mut l1 = 0.; + let mut j = 0; + while j < indexes.len() { + let val = map[&indexes[j]].0; + l1 += val; + j += 1; + if l1 >= min_l1 { + break; + } + } + indexes.truncate(j); + indexes.sort_unstable(); + let values = indexes.iter().map(|i| map[i]).collect::>(); + SVecf32Owned::new(collection.dims(), indexes, values) + }) + .collect::>(); + let summaries = QuantizedSummary::create(collection.dims(), &summary_vectors); + + PostingList { + postings, + block_offsets: block_offsets.into_boxed_slice(), + summaries, + } +} + +fn random_kmeans( + postings: &[u32], + centroids: usize, + collection: &(impl Collection + Sync), +) -> Vec> { + let mut rng = rand::thread_rng(); + let centroid_ids = postings + .iter() + .copied() + .choose_multiple(&mut rng, centroids); + + let mut inverted_lists = vec![Vec::new(); centroid_ids.len()]; + for &id in postings { + let vector = O::cast_svec(collection.vector(id)); + // Because `SVecf32Dot::distance` calculates (-1 * dot), we use `min_by_key` instead of `max_by_key` here + let argmax = (0..centroid_ids.len()) + .min_by_key(|&j| { + let centroid = O::cast_svec(collection.vector(centroid_ids[j])); + SVecf32Dot::distance(centroid, vector) + }) + .unwrap(); + inverted_lists[argmax].push(id); + } + + let mut to_be_replaced = Vec::new(); // ids that belong to too small clusters + for inverted_list in inverted_lists.iter_mut() { + if !inverted_list.is_empty() && inverted_list.len() <= MIN_CLUSTER_SIZE { + to_be_replaced.extend(inverted_list.iter()); + inverted_list.clear(); + } + } + + for id in to_be_replaced { + let vector = O::cast_svec(collection.vector(id)); + let argmax = (0..centroid_ids.len()) + .min_by_key(|&j| { + if inverted_lists[j].len() <= MIN_CLUSTER_SIZE { + return F32(0.); + } + let centroid = O::cast_svec(collection.vector(centroid_ids[j])); + SVecf32Dot::distance(centroid, vector) + }) + .unwrap(); + inverted_lists[argmax].push(id); + } + + inverted_lists +} + +fn open(path: &Path) -> Seismic { + let storage = O::Storage::open(path.join("storage")); + let payloads = MmapArray::open(path.join("payloads")); + let posting_lists_file = std::fs::File::open(path.join("posting_lists")).unwrap(); + let posting_lists_reader = std::io::BufReader::new(posting_lists_file); + let posting_lists = bincode::deserialize_from(posting_lists_reader).unwrap(); + let visited = VisitedPool::new(storage.len()); + Seismic { + storage, + payloads, + posting_lists, + visited, + } +} + +fn vbase<'a, O: OperatorSeismic>( + s: &'a Seismic, + vector: Borrowed<'a, O>, + k: u32, + q_cut: u32, + heap_factor: f32, +) -> (Vec, Box<(dyn Iterator + 'a)>) { + let mut visited = s.visited.fetch_guard_checker(); + let vector = O::cast_svec(vector); + let mut perm = (0..vector.len()).collect::>(); + perm.sort_unstable_by_key(|&i| Reverse(vector.values()[i as usize])); + perm.truncate(q_cut as usize); + let top_cut = perm.into_iter().map(|i| vector.indexes()[i as usize]); + let mut heap = ElementHeap::new(k as usize); + for i in top_cut { + let posting_list = &s.posting_lists[i as usize]; + let mut blocks_to_evaluate = None; + let summary_dots = posting_list.summaries.matmul(vector); + + for (j, &dot) in summary_dots.iter().enumerate() { + if !heap.check(-dot / heap_factor) { + continue; + } + let offset1 = posting_list.block_offsets[j]; + let offset2 = posting_list.block_offsets[j + 1]; + let posting_block = &posting_list.postings[offset1..offset2]; + + if let Some(block) = std::mem::replace(&mut blocks_to_evaluate, Some(posting_block)) { + vbase_block(s, block, vector, &mut heap, &mut visited); + } + + for i in (0..posting_block.len()).step_by(8) { + let ptr = posting_block.as_ptr().wrapping_add(i); + common::prefetch::prefetch_read_NTA(ptr as *const i8); + } + } + + if let Some(block) = blocks_to_evaluate { + vbase_block(s, block, vector, &mut heap, &mut visited); + } + } + + (heap.into_vec(), Box::new(std::iter::empty())) +} + +#[inline] +fn vbase_block( + s: &Seismic, + block: &[u32], + vector: SVecf32Borrowed, + heap: &mut ElementHeap, + visited: &mut VisitedGuardChecker<'_>, +) { + let mut prev_id = block[0]; + for &id in block.iter().skip(1) { + O::prefetch(&s.storage, id); + + if visited.check(prev_id) { + let distance = SVecf32Dot::distance(vector, O::cast_svec(s.storage.vector(prev_id))); + if heap.check(distance) { + let payload = s.payload(prev_id); + heap.push(Element { distance, payload }); + } + visited.mark(prev_id); + } + + prev_id = id; + } + + if visited.check(prev_id) { + let distance = SVecf32Dot::distance(vector, O::cast_svec(s.storage.vector(prev_id))); + if heap.check(distance) { + let payload = s.payload(prev_id); + heap.push(Element { distance, payload }); + } + visited.mark(prev_id); + } +} + +pub struct ElementHeap { + binary_heap: BinaryHeap, + k: usize, +} + +impl ElementHeap { + pub fn new(k: usize) -> Self { + assert!(k != 0); + Self { + binary_heap: BinaryHeap::new(), + k, + } + } + pub fn check(&self, distance: F32) -> bool { + self.binary_heap.len() < self.k || distance < self.binary_heap.peek().unwrap().distance + } + pub fn push(&mut self, element: Element) -> Option { + self.binary_heap.push(element); + if self.binary_heap.len() == self.k + 1 { + self.binary_heap.pop() + } else { + None + } + } + pub fn into_reversed_heap(self) -> BinaryHeap> { + self.binary_heap.into_iter().map(Reverse).collect() + } + + pub fn into_vec(self) -> Vec { + self.binary_heap.into_sorted_vec() + } +} diff --git a/crates/seismic/src/operator.rs b/crates/seismic/src/operator.rs new file mode 100644 index 000000000..f0f015576 --- /dev/null +++ b/crates/seismic/src/operator.rs @@ -0,0 +1,43 @@ +use base::operator::*; +use base::vector::SVecf32Borrowed; +use quantization::operator::OperatorQuantization; +use storage::OperatorStorage; + +pub trait OperatorSeismic: OperatorQuantization + OperatorStorage { + fn cast_svec(vec: Borrowed<'_, Self>) -> SVecf32Borrowed; + + fn prefetch(storage: &Self::Storage, i: u32); +} + +impl OperatorSeismic for SVecf32Dot { + fn cast_svec(vec: Borrowed<'_, Self>) -> SVecf32Borrowed { + vec + } + + fn prefetch(storage: &Self::Storage, i: u32) { + storage.prefetch(i) + } +} + +macro_rules! unimpl_operator_seismic { + ($t:ty) => { + impl OperatorSeismic for $t { + fn cast_svec(_: Borrowed<'_, Self>) -> SVecf32Borrowed { + unimplemented!() + } + + fn prefetch(_: &Self::Storage, _: u32) { + unimplemented!() + } + } + }; +} + +unimpl_operator_seismic!(SVecf32L2); +unimpl_operator_seismic!(BVectorDot); +unimpl_operator_seismic!(BVectorJaccard); +unimpl_operator_seismic!(BVectorHamming); +unimpl_operator_seismic!(Vecf32Dot); +unimpl_operator_seismic!(Vecf32L2); +unimpl_operator_seismic!(Vecf16Dot); +unimpl_operator_seismic!(Vecf16L2); diff --git a/crates/seismic/src/quantized_summary.rs b/crates/seismic/src/quantized_summary.rs new file mode 100644 index 000000000..05bb02978 --- /dev/null +++ b/crates/seismic/src/quantized_summary.rs @@ -0,0 +1,88 @@ +use base::{ + scalar::F32, + vector::{SVecf32Borrowed, SVecf32Owned, VectorBorrowed}, +}; +use qwt::{DArray, SelectBin}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct QuantizedSummary { + dims: u32, + mins: Box<[F32]>, + quants: Box<[F32]>, // quantization step + summary_ids: Box<[u16]>, + codes: Box<[u8]>, + offsets: DArray, +} + +impl QuantizedSummary { + pub fn create(dims: u32, vectors: &[SVecf32Owned]) -> Self { + let mut summary_ids = vec![Vec::new(); dims as usize]; + let mut codes = vec![Vec::new(); dims as usize]; + let mut mins = Vec::with_capacity(vectors.len()); + let mut quants = Vec::with_capacity(vectors.len()); + + for (id, vec) in vectors.iter().enumerate() { + let indexes = vec.indexes(); + let values = vec.values(); + + let (min, max) = values + .iter() + .fold((values[0], values[0]), |(min, max), &value| { + (min.min(value), max.max(value)) + }); + let quant = (max - min) / 256.; + let quant_codes = values + .iter() + .map(|&value| (((value - min) / quant).0 as u32).clamp(0, 255) as u8); + + mins.push(min); + quants.push(quant); + for (&idx, code) in indexes.iter().zip(quant_codes) { + summary_ids[idx as usize].push(id as u16); + codes[idx as usize].push(code); + } + } + + let offsets = std::iter::once(0) + .chain(summary_ids.iter().map(|ids| ids.len()).scan(0, |state, x| { + *state += x + 1; + Some(*state) + })) + .collect(); + let summary_ids = summary_ids.into_iter().flatten().collect(); + let codes = codes.into_iter().flatten().collect(); + + QuantizedSummary { + dims, + mins: mins.into_boxed_slice(), + quants: quants.into_boxed_slice(), + summary_ids, + codes, + offsets, + } + } + + pub fn len(&self) -> usize { + self.mins.len() + } + + pub fn matmul(&self, query: SVecf32Borrowed) -> Vec { + assert!(query.dims() == self.dims); + let mut results = vec![F32(0.); self.len()]; + + for (&qi, &qv) in query.indexes().iter().zip(query.values()) { + let start = self.offsets.select1(qi as usize).unwrap() - qi as usize; + let end = self.offsets.select1(qi as usize + 1).unwrap() - qi as usize - 1; + let current_summary_ids = &self.summary_ids[start..end]; + let current_codes = &self.codes[start..end]; + + for (&sid, &v) in current_summary_ids.iter().zip(current_codes) { + let val = F32(v as f32) * self.quants[sid as usize] + self.mins[sid as usize]; + results[sid as usize] += val * qv; + } + } + + results + } +} diff --git a/crates/storage/src/svec.rs b/crates/storage/src/svec.rs index a76bebff2..11995067c 100644 --- a/crates/storage/src/svec.rs +++ b/crates/storage/src/svec.rs @@ -79,3 +79,16 @@ impl> Storage for SVecStorage { } } } + +impl SVecStorage { + pub fn prefetch(&self, i: u32) { + let s = self.offsets[i as usize]; + let e = self.offsets[i as usize + 1]; + for i in (s..e).step_by(16) { + let index_ptr = self.indexes.as_ptr().wrapping_add(i); + let value_ptr = self.values.as_ptr().wrapping_add(i); + common::prefetch::prefetch_read_NTA(index_ptr as *const i8); + common::prefetch::prefetch_read_NTA(value_ptr as *const i8); + } + } +} diff --git a/src/gucs/executing.rs b/src/gucs/executing.rs index d4f9391a3..80b0aec09 100644 --- a/src/gucs/executing.rs +++ b/src/gucs/executing.rs @@ -27,6 +27,12 @@ static RABITQ_FAST_SCAN: GucSetting = GucSetting::::new(true); static DISKANN_EF_SEARCH: GucSetting = GucSetting::::new(100); +static SEISMIC_HEAP_SIZE: GucSetting = GucSetting::::new(100); + +static SEISMIC_Q_CUT: GucSetting = GucSetting::::new(3); + +static SEISMIC_HEAP_FACTOR: GucSetting = GucSetting::::new(1.0); + pub unsafe fn init() { GucRegistry::define_int_guc( "vectors.flat_sq_rerank_size", @@ -148,6 +154,36 @@ pub unsafe fn init() { GucContext::Userset, GucFlags::default(), ); + GucRegistry::define_int_guc( + "vectors.seismic_heap_size", + "The heap size of Seismic algorithm.", + "https://docs.pgvecto.rs/usage/search.html", + &SEISMIC_HEAP_SIZE, + 1, + u16::MAX as _, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_int_guc( + "vectors.seismic_q_cut", + "`q_cut` argument of Seismic algorithm.", + "https://docs.pgvecto.rs/usage/search.html", + &SEISMIC_Q_CUT, + 1, + 100_000, + GucContext::Userset, + GucFlags::default(), + ); + GucRegistry::define_float_guc( + "vectors.seismic_heap_factor", + "`heap_factor` argument of Seismic algorithm.", + "https://docs.pgvecto.rs/usage/search.html", + &SEISMIC_HEAP_FACTOR, + 0.01, + 1.0, + GucContext::Userset, + GucFlags::default(), + ); } pub fn search_options() -> SearchOptions { @@ -165,5 +201,8 @@ pub fn search_options() -> SearchOptions { rabitq_nprobe: RABITQ_NPROBE.get() as u32, rabitq_fast_scan: RABITQ_FAST_SCAN.get(), diskann_ef_search: DISKANN_EF_SEARCH.get() as u32, + seismic_heap_size: SEISMIC_HEAP_SIZE.get() as u32, + seismic_q_cut: SEISMIC_Q_CUT.get() as u32, + seismic_heap_factor: SEISMIC_HEAP_FACTOR.get() as f32, } } diff --git a/tests/sqllogictest/seismic.slt b/tests/sqllogictest/seismic.slt new file mode 100644 index 000000000..f520d3cc3 --- /dev/null +++ b/tests/sqllogictest/seismic.slt @@ -0,0 +1,20 @@ +statement ok +SET search_path TO pg_temp, vectors; + +statement ok +CREATE TABLE t (val svector(6)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[0, random(), 0, 0, random(), random()]::real[]::vector::svector FROM generate_series(1, 1000); + +statement ok +CREATE INDEX ON t USING vectors (val svector_dot_ops) +WITH (options = "[indexing.seismic]"); + +query I +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '{1:3,2:1}/6'::svector limit 10) t2; +---- +10 + +statement ok +DROP TABLE t;