Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Jan 1, 2025
1 parent d57c4dd commit 008aca4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 115 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ bincode = "1"
bitvec = "1"
fnv = "1"
murmur3 = "0.5"
async-trait = "0.1"
thiserror = "2"
serde = { version = "1", features = ["derive"] }
redb = { version = "2", optional = true }
async-trait = { version = "0.1", optional = true }
rustis = { version = "0.13", features = ["tokio-runtime"], optional = true }
thiserror = "2"
tokio = { version = "1.42.0", features = ["full"] }
tokio = { version = "1.42.0", features = ["full"], optional = true }

[dev-dependencies]
rand = "0.8"
expiring-bloom-rs = { path = ".", features = [] }

[features]
redis = ["dep:rustis"]
redis = ["dep:rustis", "dep:tokio", "dep:async-trait"]
redb = ["dep:redb"]
129 changes: 18 additions & 111 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,99 +212,6 @@ impl<S: BloomFilterStorage> SlidingBloomFilter<S> {
}
}

/*
impl<S: BloomFilterStorage> SlidingBloomFilter<S> {
pub fn new(
capacity: usize,
false_positive_rate: f64,
level_time: Duration,
max_levels: usize,
hash_function: HashFunction,
) -> Self {
let bit_vector_size =
optimal_bit_vector_size(capacity, false_positive_rate);
let num_hashes = optimal_num_hashes(capacity, bit_vector_size);
Self {
storage: S::new(bit_vector_size, max_levels),
hash_function,
capacity,
num_hashes,
false_positive_rate,
level_time,
max_levels,
current_level_index: 0,
}
}
pub fn cleanup_expired_levels(&mut self) {
let now = SystemTime::now();
for level in 0..self.max_levels {
if let Some(timestamp) = self.storage.get_timestamp(level) {
if now.duration_since(timestamp).unwrap()
>= self.level_time * self.max_levels as u32
{
self.storage.clear_level(level);
}
}
}
}
fn should_create_new_level(&self) -> bool {
let current_level = self.current_level_index;
if let Some(last_timestamp) = self.storage.get_timestamp(current_level) {
let now = SystemTime::now();
now.duration_since(last_timestamp).unwrap() >= self.level_time
} else {
true
}
}
fn create_new_level(&mut self) {
// Advance current level index in a circular manner
self.current_level_index =
(self.current_level_index + 1) % self.max_levels;
// Clear the level at the new current level index
self.storage.clear_level(self.current_level_index);
// Set the timestamp
self.storage
.set_timestamp(self.current_level_index, SystemTime::now());
}
pub fn insert(&mut self, item: &[u8]) {
if self.should_create_new_level() {
self.create_new_level();
}
let current_level = self.current_level_index;
let hashes = (self.hash_function)(item, self.num_hashes, self.capacity);
for hash in hashes {
self.storage.set_bit(current_level, hash as usize);
}
}
pub fn query(&self, item: &[u8]) -> bool {
let hashes = (self.hash_function)(item, self.num_hashes, self.capacity);
let now = SystemTime::now();
for level in 0..self.max_levels {
if let Some(timestamp) = self.storage.get_timestamp(level) {
#[allow(clippy::collapsible_if)]
if now.duration_since(timestamp).unwrap()
<= self.level_time * self.max_levels as u32
{
if hashes
.iter()
.all(|&hash| self.storage.get_bit(level, hash as usize))
{
return true;
}
}
}
}
false
}
}
*/

impl<B: BloomFilterStorage> std::fmt::Debug for SlidingBloomFilter<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -326,8 +233,8 @@ mod tests {
use rand::Rng;
use std::thread;

#[tokio::test]
async fn test_workflow() {
#[test]
fn test_workflow() {
let hash_function = |item: &[u8],
num_hashes_var: usize,
capacity_var: usize|
Expand Down Expand Up @@ -358,8 +265,8 @@ mod tests {
assert!(!bloom_filter.query(b"another").unwrap());
}

#[tokio::test]
async fn test_expiration_of_elements() {
#[test]
fn test_expiration_of_elements() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
100,
0.01,
Expand All @@ -381,8 +288,8 @@ mod tests {
assert!(!bloom_filter.query(b"item1").unwrap());
}

#[tokio::test]
async fn test_no_false_negatives_within_decay_time() {
#[test]
fn test_no_false_negatives_within_decay_time() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
1000,
0.01,
Expand Down Expand Up @@ -413,8 +320,8 @@ mod tests {
}
}

#[tokio::test]
async fn test_items_expire_after_decay_time() {
#[test]
fn test_items_expire_after_decay_time() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
100,
0.01,
Expand All @@ -434,8 +341,8 @@ mod tests {
assert!(!bloom_filter.query(b"item_to_expire").unwrap());
}

#[tokio::test]
async fn test_immediate_expiration() {
#[test]
fn test_immediate_expiration() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
100,
0.01,
Expand All @@ -457,8 +364,8 @@ mod tests {
);
}

#[tokio::test]
async fn test_partial_expiration() {
#[test]
fn test_partial_expiration() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
100,
0.01,
Expand Down Expand Up @@ -507,8 +414,8 @@ mod tests {
}
}

#[tokio::test]
async fn test_continuous_insertion_and_query() {
#[test]
fn test_continuous_insertion_and_query() {
let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
1000,
0.01,
Expand Down Expand Up @@ -583,8 +490,8 @@ mod tests {
}
}

#[tokio::test]
async fn test_false_positive_rate() {
#[test]
fn test_false_positive_rate() {
const FALSE_POSITIVE_RATE: f64 = 0.05;

let mut bloom_filter = SlidingBloomFilter::<InMemoryStorage>::new(
Expand Down Expand Up @@ -632,8 +539,8 @@ mod tests {
);
}

#[tokio::test]
async fn test_concurrent_inserts() {
#[test]
fn test_concurrent_inserts() {
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down

0 comments on commit 008aca4

Please sign in to comment.