Skip to content

Commit

Permalink
feat: add index crud to db (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
edwinkys authored Aug 4, 2024
2 parents d474896 + d909f6e commit c3df2c8
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 13 deletions.
21 changes: 21 additions & 0 deletions docs/refs/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,24 @@ avoid the overhead of loading the index from disk which can be slow.
By default, performing any operation related to an index like search or refresh
will load the index to the pool. If we want to release the index from the pool,
we can use the `release_indices` method.

## Advanced Data Operations

The Database also provides advanced operations to manage the indices directly
without the SQL layer. These operations are useful when we want to perform
low-level operations updating the records directly in the index data store.

These are the available advanced operations:

- Insert records into an index.
- Update records in an index.
- Delete records from an index by their IDs.

!!! warning "Low-level Operations"

Please note that these operations are not recommended for general use since
they bypass the SQL layer and can lead to inconsistencies between the SQL
data source and the index which can lead to unexpected behavior.

With this in mind, technically you can use these operations to use OasysDB
without the SQL layer and directly interact with each index data store.
119 changes: 107 additions & 12 deletions src/db/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,12 @@ impl Database {
})?;

// Cloning is necessary here to avoid borrowing issues.
let IndexRef { algorithm, file, config } = index_ref.to_owned();

// It's safe to unwrap here because we validated that index exists by
// calling get_index_ref method above.
let index: Index = self.get_index(name).unwrap();
let IndexRef { config, .. } = index_ref.to_owned();

let (query, config) = {
// We wrap the index lock in a closure to make sure it's dropped
// before async functionalities are called.
let index: Index = self.get_index(name).unwrap();
let index = index.lock()?;
let meta = index.metadata();
let checkpoint = meta.last_inserted.unwrap_or_default();
Expand All @@ -253,12 +250,7 @@ impl Database {
records.insert(id, record);
}

// Update the index with new records and persist it.
// We might want to persist the index after every fit operation.
let mut index = index.lock()?;
index.insert(records)?;
algorithm.persist_index(file, index.as_ref())?;
Ok(())
self.insert_into_index(name, records)
}

/// Updates the index with new records from the source synchronously.
Expand All @@ -285,11 +277,58 @@ impl Database {
k: usize,
filters: impl Into<Filters>,
) -> Result<Vec<SearchResult>, Error> {
// These 2 lines are necessary to avoid memory issues.
let index: Index = self.try_get_index(name)?;
let index = index.lock()?;
index.search(query.into(), k, filters.into())
}

/// Inserts new records into the index.
/// - `name`: Index name.
/// - `records`: Records to insert.
pub fn insert_into_index(
&self,
name: impl AsRef<str>,
records: HashMap<RecordID, Record>,
) -> Result<(), Error> {
let index: Index = self.try_get_index(name.as_ref())?;
let mut index = index.lock()?;
index.insert(records)?;
self.persist_existing_index(name, index.as_ref())
}

/// Updates the index with new record data.
/// - `name`: Index name.
/// - `records`: Records to update.
///
/// This method will replace the existing record data of the provided
/// ID with the new record data. If the record doesn't exist in the
/// index, it will be ignored.
pub fn update_index(
&self,
name: impl AsRef<str>,
records: HashMap<RecordID, Record>,
) -> Result<(), Error> {
let index: Index = self.try_get_index(name.as_ref())?;
let mut index = index.lock()?;
index.update(records)?;
self.persist_existing_index(name, index.as_ref())
}

/// Deletes records from the index.
/// - `name`: Index name.
/// - `ids`: List of record IDs to delete.
pub fn delete_from_index(
&self,
name: impl AsRef<str>,
ids: Vec<RecordID>,
) -> Result<(), Error> {
let index: Index = self.try_get_index(name.as_ref())?;
let mut index = index.lock()?;
index.delete(ids)?;
self.persist_existing_index(name, index.as_ref())
}

/// Deletes an index from the database.
/// - `name`: Index name.
///
Expand Down Expand Up @@ -383,6 +422,29 @@ impl Database {
fn indices_dir(&self) -> PathBuf {
self.root.join("indices")
}

/// Persists an existing index to its file.
/// - `name`: Index name.
/// - `index`: Index trait object.
///
/// This method requires the reference to the index with the given
/// name to exist in the database state. If the index doesn't exist,
/// this method will return a not found error.
fn persist_existing_index(
&self,
name: impl AsRef<str>,
index: &dyn VectorIndex,
) -> Result<(), Error> {
let name = name.as_ref();
let IndexRef { algorithm, file, .. } =
self.get_index_ref(name).ok_or_else(|| {
let code = ErrorCode::NotFound;
let message = format!("Index might not exists: {name}.");
Error::new(code, message)
})?;

algorithm.persist_index(file, index)
}
}

/// The state of the vector database.
Expand Down Expand Up @@ -514,7 +576,6 @@ impl IndexRef {
#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::RecordID;
use sqlx::{Executor, Row};
use std::sync::MutexGuard;

Expand Down Expand Up @@ -581,6 +642,40 @@ mod tests {
assert_eq!(results[0].id, RecordID(51));
}

#[test]
fn test_database_insert_into_index() -> Result<(), Error> {
// Create sample records to insert.
let id = RecordID(101);
let vector = Vector::from(vec![100.0; 128]);
let data = HashMap::from([(
"number".to_string(),
Some(DataValue::Integer(1100)),
)]);

let record = Record { vector, data };
let records = HashMap::from([(id, record)]);

let db = create_test_database()?;
db.insert_into_index(TEST_INDEX, records)?;

let index: Index = db.try_get_index(TEST_INDEX)?;
let index = index.lock()?;
assert_eq!(index.len(), 101);
Ok(())
}

#[test]
fn test_database_delete_from_index() -> Result<(), Error> {
let db = create_test_database()?;
let ids = vec![RecordID(1), RecordID(2)];
db.delete_from_index(TEST_INDEX, ids)?;

let index: Index = db.try_get_index(TEST_INDEX)?;
let index = index.lock()?;
assert_eq!(index.len(), 98);
Ok(())
}

#[test]
fn test_database_delete_index() {
let db = create_test_database().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::indices::*;
use crate::types::err::{Error, ErrorCode};
use crate::types::filter::Filters;
use crate::types::record::Vector;
use crate::types::record::*;
use crate::utils::file;
use serde::{Deserialize, Serialize};
use sqlx::{AnyConnection as SourceConnection, Connection};
Expand Down
6 changes: 6 additions & 0 deletions src/indices/idx_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ impl VectorIndex for IndexFlat {
return Ok(());
}

// Filter only records that are not already in the index.
let records: HashMap<RecordID, Record> = records
.into_iter()
.filter(|(id, _)| !self.data.contains_key(id))
.collect();

self.metadata.last_inserted = records.keys().max().copied();
self.data.par_extend(records);
Ok(())
Expand Down
12 changes: 12 additions & 0 deletions src/indices/idx_ivfpq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ impl VectorIndex for IndexIVFPQ {
return Err(Error::new(code, message));
}

// Only process records that are not already in the index.
let records: HashMap<RecordID, Record> = records
.into_iter()
.filter(|(id, _)| !self.data.contains_key(id))
.collect();

for (id, record) in records.iter() {
let vector = &record.vector;
let cid = self.find_nearest_centroid(vector).to_usize();
Expand Down Expand Up @@ -256,6 +262,12 @@ impl VectorIndex for IndexIVFPQ {
&mut self,
records: HashMap<RecordID, Record>,
) -> Result<(), Error> {
// Only process records that are already in the index.
let records: HashMap<RecordID, Record> = records
.into_iter()
.filter(|(id, _)| self.data.contains_key(id))
.collect();

let ids: Vec<RecordID> = records.keys().cloned().collect();
self.delete(ids)?;
self.insert(records)
Expand Down
8 changes: 8 additions & 0 deletions src/indices/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,21 @@ pub trait VectorIndex: Debug + Send + Sync {

/// Inserts new records into the index incrementally.
/// - `records`: Records to insert into the index.
///
/// This method will only insert records that are not already
/// in the index. If the record already exists in the index,
/// it will be skipped.
fn insert(
&mut self,
records: HashMap<RecordID, Record>,
) -> Result<(), Error>;

/// Updates records in the index with new values.
/// - `records`: Records to update along with their new values.
///
/// This method will update the records with the provided IDs to
/// the new values. If the record does not exist in the index,
/// it will be skipped.
fn update(
&mut self,
records: HashMap<RecordID, Record>,
Expand Down

0 comments on commit c3df2c8

Please sign in to comment.