From 2bf80e3c8004b8bbf7de71ee036780a6fbe5e7a7 Mon Sep 17 00:00:00 2001 From: oiwn Date: Thu, 2 Jan 2025 18:52:50 +0700 Subject: [PATCH] add builder and api to redb based filter --- Cargo.toml | 1 + src/redb_storage.rs | 78 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a58eb73..cee3e30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ redb = { version = "2", optional = true } async-trait = { version = "0.1", optional = true } rustis = { version = "0.13", features = ["tokio-runtime"], optional = true } tokio = { version = "1.42.0", features = ["full"], optional = true } +derive_builder = "0.20.2" [dev-dependencies] rand = "0.8" diff --git a/src/redb_storage.rs b/src/redb_storage.rs index 756bf91..e5e0187 100644 --- a/src/redb_storage.rs +++ b/src/redb_storage.rs @@ -1,8 +1,13 @@ -use crate::expiring_bloom::{BloomError, BloomFilterStorage, Result}; +use crate::expiring_bloom::{ + default_hash_function, BloomError, BloomFilterStorage, Result, + SlidingBloomFilter, +}; +use derive_builder::Builder; use redb::{Database, ReadableTable, TableDefinition}; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; use std::sync::Arc; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; const LEVELS_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("levels"); @@ -19,8 +24,72 @@ pub struct RedbStorage { max_levels: usize, } -// TODO: create builder to create SlidingBloomFilter with storage at once, i.e. -// API to create RedbExpiringBloomFilter +#[derive(Builder, Debug)] +#[builder(pattern = "owned")] +pub struct RedbExpiringBloomFilterOptions { + /// Path to the ReDB database file + path: PathBuf, + /// Maximum number of elements the filter is expected to contain + capacity: usize, + /// How long elements should stay in the filter + expiration_time: Duration, + /// False positive rate (default: 0.01) + #[builder(default = "0.01")] + false_positive_rate: f64, + /// Number of filter levels (default: 5) + #[builder(default = "5")] + max_levels: usize, +} + +pub struct RedbExpiringBloomFilter { + filter: SlidingBloomFilter, +} + +impl RedbExpiringBloomFilter { + /// Creates a new RedbExpiringBloomFilter from the provided options + pub fn new(opts: RedbExpiringBloomFilterOptions) -> Result { + // Calculate level duration based on expiration time and max levels + let level_duration = Duration::from_secs( + opts.expiration_time.as_secs() / opts.max_levels as u64, + ); + + // Create ReDB storage + let storage = RedbStorage::open( + opts.path.to_str().ok_or_else(|| { + BloomError::StorageError("Invalid path".to_string()) + })?, + opts.capacity, + opts.max_levels, + )?; + + // Create the sliding bloom filter + let filter = SlidingBloomFilter::new( + storage, + opts.capacity, + opts.false_positive_rate, + level_duration, + opts.max_levels, + default_hash_function, + )?; + + Ok(Self { filter }) + } + + /// Insert an item into the filter + pub fn insert(&mut self, item: &[u8]) -> Result<()> { + self.filter.insert(item) + } + + /// Query if an item might be in the filter + pub fn query(&self, item: &[u8]) -> Result { + self.filter.query(item) + } + + /// Clean up expired items from the filter + pub fn cleanup_expired(&mut self) -> Result<()> { + self.filter.cleanup_expired_levels() + } +} impl RedbStorage { pub fn open(path: &str, capacity: usize, max_levels: usize) -> Result { @@ -135,6 +204,7 @@ impl BloomFilterStorage for RedbStorage { self.save_level_data(level, &level_data) } + #[inline] fn get_bit(&self, level: usize, index: usize) -> Result { if index >= self.capacity { return Err(BloomError::IndexOutOfBounds {