Skip to content

Commit

Permalink
add builder and api to redb based filter
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Jan 2, 2025
1 parent e8893a7 commit 2bf80e3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ redb = { version = "2", optional = true }
async-trait = { version = "0.1", optional = true }
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
tokio = { version = "1.42.0", features = ["full"], optional = true }
derive_builder = "0.20.2"

[dev-dependencies]
rand = "0.8"
Expand Down
78 changes: 74 additions & 4 deletions src/redb_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use crate::expiring_bloom::{BloomError, BloomFilterStorage, Result};
use crate::expiring_bloom::{
default_hash_function, BloomError, BloomFilterStorage, Result,
SlidingBloomFilter,
};
use derive_builder::Builder;
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

const LEVELS_TABLE: TableDefinition<&[u8], &[u8]> =
TableDefinition::new("levels");
Expand All @@ -19,8 +24,72 @@ pub struct RedbStorage {
max_levels: usize,
}

// TODO: create builder to create SlidingBloomFilter with storage at once, i.e.
// API to create RedbExpiringBloomFilter
#[derive(Builder, Debug)]
#[builder(pattern = "owned")]
pub struct RedbExpiringBloomFilterOptions {
/// Path to the ReDB database file
path: PathBuf,
/// Maximum number of elements the filter is expected to contain
capacity: usize,
/// How long elements should stay in the filter
expiration_time: Duration,
/// False positive rate (default: 0.01)
#[builder(default = "0.01")]
false_positive_rate: f64,
/// Number of filter levels (default: 5)
#[builder(default = "5")]
max_levels: usize,
}

pub struct RedbExpiringBloomFilter {
filter: SlidingBloomFilter<RedbStorage>,
}

impl RedbExpiringBloomFilter {
/// Creates a new RedbExpiringBloomFilter from the provided options
pub fn new(opts: RedbExpiringBloomFilterOptions) -> Result<Self> {
// Calculate level duration based on expiration time and max levels
let level_duration = Duration::from_secs(
opts.expiration_time.as_secs() / opts.max_levels as u64,
);

// Create ReDB storage
let storage = RedbStorage::open(
opts.path.to_str().ok_or_else(|| {
BloomError::StorageError("Invalid path".to_string())
})?,
opts.capacity,
opts.max_levels,
)?;

// Create the sliding bloom filter
let filter = SlidingBloomFilter::new(
storage,
opts.capacity,
opts.false_positive_rate,
level_duration,
opts.max_levels,
default_hash_function,
)?;

Ok(Self { filter })
}

/// Insert an item into the filter
pub fn insert(&mut self, item: &[u8]) -> Result<()> {
self.filter.insert(item)
}

/// Query if an item might be in the filter
pub fn query(&self, item: &[u8]) -> Result<bool> {
self.filter.query(item)
}

/// Clean up expired items from the filter
pub fn cleanup_expired(&mut self) -> Result<()> {
self.filter.cleanup_expired_levels()
}
}

impl RedbStorage {
pub fn open(path: &str, capacity: usize, max_levels: usize) -> Result<Self> {
Expand Down Expand Up @@ -135,6 +204,7 @@ impl BloomFilterStorage for RedbStorage {
self.save_level_data(level, &level_data)
}

#[inline]
fn get_bit(&self, level: usize, index: usize) -> Result<bool> {
if index >= self.capacity {
return Err(BloomError::IndexOutOfBounds {
Expand Down

0 comments on commit 2bf80e3

Please sign in to comment.