diff --git a/Cargo.toml b/Cargo.toml index a58eb73..cee3e30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ redb = { version = "2", optional = true } async-trait = { version = "0.1", optional = true } rustis = { version = "0.13", features = ["tokio-runtime"], optional = true } tokio = { version = "1.42.0", features = ["full"], optional = true } +derive_builder = "0.20.2" [dev-dependencies] rand = "0.8" diff --git a/src/redb_storage.rs b/src/redb_storage.rs index 756bf91..8521560 100644 --- a/src/redb_storage.rs +++ b/src/redb_storage.rs @@ -1,8 +1,13 @@ -use crate::expiring_bloom::{BloomError, BloomFilterStorage, Result}; +use crate::expiring_bloom::{ + default_hash_function, BloomError, BloomFilterStorage, Result, + SlidingBloomFilter, +}; +use derive_builder::Builder; use redb::{Database, ReadableTable, TableDefinition}; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; use std::sync::Arc; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; const LEVELS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("levels"); @@ -19,8 +24,72 @@ pub struct RedbStorage { max_levels: usize, } -// TODO: create builder to create SlidingBloomFilter with storage at once, i.e. -// API to create RedbExpiringBloomFilter +#[derive(Builder, Debug)] +#[builder(pattern = "owned")] +pub struct RedbExpiringBloomFilterOptions { + /// Path to the ReDB database file + path: PathBuf, + /// Maximum number of elements the filter is expected to contain + capacity: usize, + /// How long elements should stay in the filter + expiration_time: Duration, + /// False positive rate (default: 0.01) + #[builder(default = "0.01")] + false_positive_rate: f64, + /// Number of filter levels (default: 5) + #[builder(default = "5")] + max_levels: usize, +} + +pub struct RedbExpiringBloomFilter { + filter: SlidingBloomFilter, +} + +impl RedbExpiringBloomFilter { + /// Creates a new RedbExpiringBloomFilter from the provided options + pub fn new(opts: RedbExpiringBloomFilterOptions) -> Result { + // Calculate level duration based on expiration time and max levels + let level_duration = Duration::from_secs( + opts.expiration_time.as_secs() / opts.max_levels as u64, + ); + + // Create ReDB storage + let storage = RedbStorage::open( + opts.path.to_str().ok_or_else(|| { + BloomError::StorageError("Invalid path".to_string()) + })?, + opts.capacity, + opts.max_levels, + )?; + + // Create the sliding bloom filter + let filter = SlidingBloomFilter::new( + storage, + opts.capacity, + opts.false_positive_rate, + level_duration, + opts.max_levels, + default_hash_function, + )?; + + Ok(Self { filter }) + } + + /// Insert an item into the filter + pub fn insert(&mut self, item: &[u8]) -> Result<()> { + self.filter.insert(item) + } + + /// Query if an item might be in the filter + pub fn query(&self, item: &[u8]) -> Result { + self.filter.query(item) + } + + /// Clean up expired items from the filter + pub fn cleanup_expired(&mut self) -> Result<()> { + self.filter.cleanup_expired_levels() + } +} impl RedbStorage { pub fn open(path: &str, capacity: usize, max_levels: usize) -> Result { @@ -135,6 +204,7 @@ impl BloomFilterStorage for RedbStorage { self.save_level_data(level, &level_data) } + #[inline] fn get_bit(&self, level: usize, index: usize) -> Result { if index >= self.capacity { return Err(BloomError::IndexOutOfBounds { @@ -200,3 +270,56 @@ impl BloomFilterStorage for RedbStorage { self.max_levels } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_builder_required_fields() { + // Test builder with only required fields + let result = RedbExpiringBloomFilterOptionsBuilder::default() + .path("filter_tests.redb".into()) + .capacity(1000) + .expiration_time(Duration::from_secs(3600)) + .build(); + + assert!(result.is_ok()); + let opts = result.unwrap(); + assert_eq!(opts.false_positive_rate, 0.01); // Check default value + assert_eq!(opts.max_levels, 5); // Check default value + } + + #[test] + fn test_builder_custom_fields() { + let result = RedbExpiringBloomFilterOptionsBuilder::default() + .path("filter_tests.redb".into()) + .capacity(1000) + .expiration_time(Duration::from_secs(3600)) + .false_positive_rate(0.001) + .max_levels(10) + .build(); + + assert!(result.is_ok()); + let opts = result.unwrap(); + assert_eq!(opts.false_positive_rate, 0.001); + assert_eq!(opts.max_levels, 10); + } + + #[test] + fn test_builder_missing_required() { + // Test missing path + let result = RedbExpiringBloomFilterOptionsBuilder::default() + .capacity(1000) + .expiration_time(Duration::from_secs(3600)) + .build(); + assert!(result.is_err()); + + // Test missing capacity + let result = RedbExpiringBloomFilterOptionsBuilder::default() + .path("filter_tests.redb".into()) + .expiration_time(Duration::from_secs(3600)) + .build(); + assert!(result.is_err()); + } +} diff --git a/tests/redb_storage_tests.rs b/tests/redb_storage_tests.rs index 87e6f86..56ab594 100644 --- a/tests/redb_storage_tests.rs +++ b/tests/redb_storage_tests.rs @@ -1,11 +1,14 @@ #[cfg(test)] mod tests { - use expiring_bloom_rs::redb_storage::RedbStorage; + use expiring_bloom_rs::redb_storage::RedbExpiringBloomFilterOptionsBuilder; + use expiring_bloom_rs::redb_storage::{RedbExpiringBloomFilter, RedbStorage}; use expiring_bloom_rs::{ default_hash_function, BloomFilterStorage, SlidingBloomFilter, }; + use rand::random; use std::{ fs, + path::PathBuf, sync::{Arc, Mutex}, thread, time::{Duration, SystemTime}, @@ -17,7 +20,11 @@ mod tests { (storage, path) } - fn cleanup_db(path: &str) { + fn temp_db_path() -> PathBuf { + format!("test_bloom_{}.redb", random::()).into() + } + + fn cleanup_db(path: &PathBuf) { let _ = fs::remove_file(path); } @@ -34,7 +41,7 @@ mod tests { storage.clear_level(0).unwrap(); assert!(!storage.get_bit(0, 5).unwrap()); - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -53,7 +60,7 @@ mod tests { assert!(storage.get_bit(0, 5).unwrap()); } - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -71,7 +78,7 @@ mod tests { storage.set_timestamp(0, time2).unwrap(); assert!(storage.get_timestamp(0).unwrap().unwrap() == time2); - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -89,7 +96,7 @@ mod tests { assert!(storage.set_bit(0, 2000).is_err()); assert!(storage.get_bit(0, 2000).is_err()); - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -112,7 +119,7 @@ mod tests { handle.join().unwrap(); } - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -142,7 +149,7 @@ mod tests { assert!(bloom.query(b"item1").unwrap()); // First item should still be present // Wait for another rotation - thread::sleep(Duration::from_millis(200)); + thread::sleep(Duration::from_millis(150)); // Insert and verify third item bloom.insert(b"item3").unwrap(); @@ -163,7 +170,7 @@ mod tests { "Latest item should have expired" ); - cleanup_db(&path); + cleanup_db(&path.into()); } #[test] @@ -206,16 +213,68 @@ mod tests { println!("Item {} exists: {}", i, exists); } + cleanup_db(&path.into()); + } + + #[test] + fn test_filter_basic_operations() { + let path = temp_db_path(); + + let opts = RedbExpiringBloomFilterOptionsBuilder::default() + .path(path.clone()) + .capacity(1000) + .expiration_time(Duration::from_secs(3600)) + .build() + .unwrap(); + + let mut filter = RedbExpiringBloomFilter::new(opts).unwrap(); + + // Test insert and query + filter.insert(b"test_item").unwrap(); + assert!(filter.query(b"test_item").unwrap()); + assert!(!filter.query(b"nonexistent_item").unwrap()); + cleanup_db(&path); } - // Helper function to simulate consistent hashing for test items - fn hash_item(item: &str) -> usize { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; + // #[test] + /// This test running for more than 60 seconds, definitely because backend poorly implemented + fn test_false_positive_rate() { + let path = temp_db_path(); + + // Create filter with specific false positive rate + let opts = RedbExpiringBloomFilterOptionsBuilder::default() + .path(path.clone()) + .capacity(10000) + .expiration_time(Duration::from_secs(3600)) + .false_positive_rate(0.01) + .build() + .unwrap(); + + let mut filter = RedbExpiringBloomFilter::new(opts).unwrap(); + + // Insert some known items + let mut known_items = Vec::new(); + for i in 0..1000 { + let item = format!("known_item_{}", i); + known_items.push(item.clone()); + filter.insert(item.as_bytes()).unwrap(); + } - let mut hasher = DefaultHasher::new(); - item.hash(&mut hasher); - (hasher.finish() % 1000) as usize + // Test unknown items + let mut false_positives = 0; + let test_count = 10000; + + for i in 0..test_count { + let unknown_item = format!("unknown_item_{}", i); + if filter.query(unknown_item.as_bytes()).unwrap() { + false_positives += 1; + } + } + + let observed_fpr = false_positives as f64 / test_count as f64; + assert!(observed_fpr < 0.02); // Allow some margin above target 0.01 + + cleanup_db(&path); } }