Skip to content

Commit

Permalink
Merge pull request #6 from oiwn/dev
Browse files Browse the repository at this point in the history
Better API
  • Loading branch information
oiwn authored Jan 2, 2025
2 parents f78914a + 37e5e41 commit 6b114eb
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ redb = { version = "2", optional = true }
async-trait = { version = "0.1", optional = true }
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
tokio = { version = "1.42.0", features = ["full"], optional = true }
derive_builder = "0.20.2"

[dev-dependencies]
rand = "0.8"
Expand Down
131 changes: 127 additions & 4 deletions src/redb_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use crate::expiring_bloom::{BloomError, BloomFilterStorage, Result};
use crate::expiring_bloom::{
default_hash_function, BloomError, BloomFilterStorage, Result,
SlidingBloomFilter,
};
use derive_builder::Builder;
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

const LEVELS_TABLE: TableDefinition<&[u8], &[u8]> =
TableDefinition::new("levels");
Expand All @@ -19,8 +24,72 @@ pub struct RedbStorage {
max_levels: usize,
}

// TODO: create builder to create SlidingBloomFilter with storage at once, i.e.
// API to create RedbExpiringBloomFilter
#[derive(Builder, Debug)]
#[builder(pattern = "owned")]
pub struct RedbExpiringBloomFilterOptions {
/// Path to the ReDB database file
path: PathBuf,
/// Maximum number of elements the filter is expected to contain
capacity: usize,
/// How long elements should stay in the filter
expiration_time: Duration,
/// False positive rate (default: 0.01)
#[builder(default = "0.01")]
false_positive_rate: f64,
/// Number of filter levels (default: 5)
#[builder(default = "5")]
max_levels: usize,
}

pub struct RedbExpiringBloomFilter {
filter: SlidingBloomFilter<RedbStorage>,
}

impl RedbExpiringBloomFilter {
/// Creates a new RedbExpiringBloomFilter from the provided options
pub fn new(opts: RedbExpiringBloomFilterOptions) -> Result<Self> {
// Calculate level duration based on expiration time and max levels
let level_duration = Duration::from_secs(
opts.expiration_time.as_secs() / opts.max_levels as u64,
);

// Create ReDB storage
let storage = RedbStorage::open(
opts.path.to_str().ok_or_else(|| {
BloomError::StorageError("Invalid path".to_string())
})?,
opts.capacity,
opts.max_levels,
)?;

// Create the sliding bloom filter
let filter = SlidingBloomFilter::new(
storage,
opts.capacity,
opts.false_positive_rate,
level_duration,
opts.max_levels,
default_hash_function,
)?;

Ok(Self { filter })
}

/// Insert an item into the filter
pub fn insert(&mut self, item: &[u8]) -> Result<()> {
self.filter.insert(item)
}

/// Query if an item might be in the filter
pub fn query(&self, item: &[u8]) -> Result<bool> {
self.filter.query(item)
}

/// Clean up expired items from the filter
pub fn cleanup_expired(&mut self) -> Result<()> {
self.filter.cleanup_expired_levels()
}
}

impl RedbStorage {
pub fn open(path: &str, capacity: usize, max_levels: usize) -> Result<Self> {
Expand Down Expand Up @@ -135,6 +204,7 @@ impl BloomFilterStorage for RedbStorage {
self.save_level_data(level, &level_data)
}

#[inline]
fn get_bit(&self, level: usize, index: usize) -> Result<bool> {
if index >= self.capacity {
return Err(BloomError::IndexOutOfBounds {
Expand Down Expand Up @@ -200,3 +270,56 @@ impl BloomFilterStorage for RedbStorage {
self.max_levels
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_builder_required_fields() {
// Test builder with only required fields
let result = RedbExpiringBloomFilterOptionsBuilder::default()
.path("filter_tests.redb".into())
.capacity(1000)
.expiration_time(Duration::from_secs(3600))
.build();

assert!(result.is_ok());
let opts = result.unwrap();
assert_eq!(opts.false_positive_rate, 0.01); // Check default value
assert_eq!(opts.max_levels, 5); // Check default value
}

#[test]
fn test_builder_custom_fields() {
let result = RedbExpiringBloomFilterOptionsBuilder::default()
.path("filter_tests.redb".into())
.capacity(1000)
.expiration_time(Duration::from_secs(3600))
.false_positive_rate(0.001)
.max_levels(10)
.build();

assert!(result.is_ok());
let opts = result.unwrap();
assert_eq!(opts.false_positive_rate, 0.001);
assert_eq!(opts.max_levels, 10);
}

#[test]
fn test_builder_missing_required() {
// Test missing path
let result = RedbExpiringBloomFilterOptionsBuilder::default()
.capacity(1000)
.expiration_time(Duration::from_secs(3600))
.build();
assert!(result.is_err());

// Test missing capacity
let result = RedbExpiringBloomFilterOptionsBuilder::default()
.path("filter_tests.redb".into())
.expiration_time(Duration::from_secs(3600))
.build();
assert!(result.is_err());
}
}
91 changes: 75 additions & 16 deletions tests/redb_storage_tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#[cfg(test)]
mod tests {
use expiring_bloom_rs::redb_storage::RedbStorage;
use expiring_bloom_rs::redb_storage::RedbExpiringBloomFilterOptionsBuilder;
use expiring_bloom_rs::redb_storage::{RedbExpiringBloomFilter, RedbStorage};
use expiring_bloom_rs::{
default_hash_function, BloomFilterStorage, SlidingBloomFilter,
};
use rand::random;
use std::{
fs,
path::PathBuf,
sync::{Arc, Mutex},
thread,
time::{Duration, SystemTime},
Expand All @@ -17,7 +20,11 @@ mod tests {
(storage, path)
}

fn cleanup_db(path: &str) {
fn temp_db_path() -> PathBuf {
format!("test_bloom_{}.redb", random::<u64>()).into()
}

fn cleanup_db(path: &PathBuf) {
let _ = fs::remove_file(path);
}

Expand All @@ -34,7 +41,7 @@ mod tests {
storage.clear_level(0).unwrap();
assert!(!storage.get_bit(0, 5).unwrap());

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand All @@ -53,7 +60,7 @@ mod tests {
assert!(storage.get_bit(0, 5).unwrap());
}

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand All @@ -71,7 +78,7 @@ mod tests {
storage.set_timestamp(0, time2).unwrap();
assert!(storage.get_timestamp(0).unwrap().unwrap() == time2);

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand All @@ -89,7 +96,7 @@ mod tests {
assert!(storage.set_bit(0, 2000).is_err());
assert!(storage.get_bit(0, 2000).is_err());

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand All @@ -112,7 +119,7 @@ mod tests {
handle.join().unwrap();
}

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand Down Expand Up @@ -142,7 +149,7 @@ mod tests {
assert!(bloom.query(b"item1").unwrap()); // First item should still be present

// Wait for another rotation
thread::sleep(Duration::from_millis(200));
thread::sleep(Duration::from_millis(150));

// Insert and verify third item
bloom.insert(b"item3").unwrap();
Expand All @@ -163,7 +170,7 @@ mod tests {
"Latest item should have expired"
);

cleanup_db(&path);
cleanup_db(&path.into());
}

#[test]
Expand Down Expand Up @@ -206,16 +213,68 @@ mod tests {
println!("Item {} exists: {}", i, exists);
}

cleanup_db(&path.into());
}

#[test]
fn test_filter_basic_operations() {
let path = temp_db_path();

let opts = RedbExpiringBloomFilterOptionsBuilder::default()
.path(path.clone())
.capacity(1000)
.expiration_time(Duration::from_secs(3600))
.build()
.unwrap();

let mut filter = RedbExpiringBloomFilter::new(opts).unwrap();

// Test insert and query
filter.insert(b"test_item").unwrap();
assert!(filter.query(b"test_item").unwrap());
assert!(!filter.query(b"nonexistent_item").unwrap());

cleanup_db(&path);
}

// Helper function to simulate consistent hashing for test items
fn hash_item(item: &str) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
// #[test]
/// This test running for more than 60 seconds, definitely because backend poorly implemented
fn test_false_positive_rate() {
let path = temp_db_path();

// Create filter with specific false positive rate
let opts = RedbExpiringBloomFilterOptionsBuilder::default()
.path(path.clone())
.capacity(10000)
.expiration_time(Duration::from_secs(3600))
.false_positive_rate(0.01)
.build()
.unwrap();

let mut filter = RedbExpiringBloomFilter::new(opts).unwrap();

// Insert some known items
let mut known_items = Vec::new();
for i in 0..1000 {
let item = format!("known_item_{}", i);
known_items.push(item.clone());
filter.insert(item.as_bytes()).unwrap();
}

let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
(hasher.finish() % 1000) as usize
// Test unknown items
let mut false_positives = 0;
let test_count = 10000;

for i in 0..test_count {
let unknown_item = format!("unknown_item_{}", i);
if filter.query(unknown_item.as_bytes()).unwrap() {
false_positives += 1;
}
}

let observed_fpr = false_positives as f64 / test_count as f64;
assert!(observed_fpr < 0.02); // Allow some margin above target 0.01

cleanup_db(&path);
}
}

0 comments on commit 6b114eb

Please sign in to comment.