Skip to content

Commit

Permalink
Merge pull request #10 from oiwn/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
oiwn authored Jan 6, 2025
2 parents bb09af8 + 2b2113a commit 7b8b312
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 78 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
name = "expiring-bloom-rs"
version = "0.1.0"
edition = "2021"
authors = ["oiwn"]
description = "A time-decaying Bloom filter implementation with multiple storage backends"
repository = "https://github.com/oiwn/expiring-bloom-rs"
documentation = "https://docs.rs/expiring-bloom-rs"
homepage = "https://github.com/yourusername/expiring-bloom-rs"
license = "MIT"
readme = "README.md"
keywords = ["bloom-filter", "probabilistic", "time-decay", "cache", "rate-limiting"]
categories = ["algorithms", "caching", "data-structures"]

[dependencies]
bincode = "1"
Expand All @@ -22,7 +31,6 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"
# tower-http = { version = "0.6.2", features = ["trace"] }

[dev-dependencies]
rand = "0.8"
Expand Down
140 changes: 139 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,143 @@
# expiring-bloom-rs

[![Crates.io](https://img.shields.io/crates/v/expiring-bloom-rs.svg)](https://crates.io/crates/expiring-bloom-rs)
[![Documentation](https://docs.rs/expiring-bloom-rs/badge.svg)](https://docs.rs/expiring-bloom-rs)
[![codecov](https://codecov.io/gh/oiwn/expiring-bloom-rs/graph/badge.svg?token=5JMM0V5RFO)](https://codecov.io/gh/oiwn/expiring-bloom-rs)

Expiring bloom filter implementation in Rust
# Time-Decaying Bloom Filter

A Rust implementation of a time-decaying Bloom filter with multiple storage
backends and a high-performance HTTP API server.

## Overview

This crate provides a Bloom filter implementation that automatically expires
elements after a configurable time period using a sliding window approach. It's
particularly useful for rate limiting, caching, and tracking recently seen items
where older data becomes less relevant over time.

### Key Features

- Time-based automatic element expiration
- Multiple storage backends (In-memory and ReDB persistence)
- Configurable false positive rate
- Multi-level sliding window design
- High-performance HTTP API server with Swagger UI
- Comprehensive benchmarking suite
- Thread-safe concurrent operations

## How It Works

The time-decaying Bloom filter uses a sliding window approach with the following
characteristics:

1. **Sub-Filters**: The main Bloom filter is divided into N sub-filters (BF_1, BF_2, …, BF_N)
2. **Time Windows**: Each sub-filter corresponds to a fixed time window T (e.g., 1 minute)
3. **Rotation Mechanism**: Sub-filters are rotated in a circular manner to represent sliding time intervals

### Operations

- **Insertion**: Elements are added to the current active sub-filter with timestamps
- **Query**: Checks for element presence across all non-expired sub-filters
- **Cleanup**: Automatically removes expired elements based on configured time windows

## Usage

Add this to your `Cargo.toml`:

```toml
[dependencies]
expiring-bloom-rs = "0.1"
```

### Basic Example

```rust
use expiring_bloom_rs::{FilterConfigBuilder, InMemorySlidingBloomFilter, SlidingBloomFilter};
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure the filter
let config = FilterConfigBuilder::default()
.capacity(1000)
.false_positive_rate(0.01)
.level_duration(Duration::from_secs(60))
.max_levels(3)
.build()?;

// Create an in-memory filter
let mut filter = InMemorySlidingBloomFilter::new(config)?;

// Insert and query items
filter.insert(b"test_item")?;
assert!(filter.query(b"test_item")?);

Ok(())
}
```

### Using the HTTP Server

```rust
use expiring_bloom_rs::{ServerConfigBuilder, FilterConfigBuilder};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure the server
let server_config = ServerConfigBuilder::default()
.server_host("127.0.0.1".to_string())
.server_port(3000)
.bloom_db_path("bloom.redb".to_string())
.bloom_capacity(10000)
.bloom_false_positive_rate(0.01)
.bloom_level_duration(Duration::from_secs(60))
.bloom_max_levels(3)
.build()?;

// Start the server
expiring_bloom_rs::run_server(server_config).await?;

Ok(())
}
```

## API Endpoints

The HTTP server provides the following REST endpoints:

- `GET /health` - Health check endpoint
- `POST /items` - Insert an item into the filter
- `GET /items/{value}` - Query if an item exists in the filter
- `POST /cleanup` - Manually trigger cleanup of expired items
- `/swagger-ui` - Interactive API documentation

## Configuration

The filter can be configured with the following parameters:

- `capacity`: Maximum number of elements (default: 1000)
- `false_positive_rate`: Desired false positive rate (default: 0.01)
- `level_duration`: Duration after which entries in a level expire (default: 60s)
- `max_levels`: Number of filter levels for time-based expiration (default: 3)

## Performance

Bro, it's 🦀🦀🦀 RUST 🦀🦀🦀 and its BLAZINGLY FAST 🚀🚀🚀

### Memory Usage

Memory usage is calculated as:

```
total_bits = capacity * max_levels
memory_bytes = total_bits / 8
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

## License

This project is licensed under the MIT License - see the LICENSE file for details.
88 changes: 18 additions & 70 deletions benches/bloom_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use expiring_bloom_rs::{
default_hash_function, BloomStorage, InMemoryStorage, SlidingBloomFilter,
FilterConfigBuilder, InMemorySlidingBloomFilter, SlidingBloomFilter,
};
use rand::{distributions::Alphanumeric, Rng};
use std::{time::Duration, time::SystemTime};
Expand All @@ -19,6 +19,19 @@ fn generate_test_data(count: usize) -> Vec<String> {
(0..count).map(|_| generate_random_string(32)).collect()
}

fn create_test_filter(capacity: usize) -> InMemorySlidingBloomFilter {
let config = FilterConfigBuilder::default()
.capacity(capacity)
.false_positive_rate(0.01)
.level_duration(Duration::from_secs(1))
.max_levels(5)
.build()
.expect("Failed to create config");

InMemorySlidingBloomFilter::new(config)
.expect("Failed to create Bloom filter")
}

// Helper to create "expired" timestamps
// see: https://github.com/rust-lang/rust/issues/100141
// to figure why this crap is so complicated.
Expand All @@ -44,20 +57,13 @@ fn bench_insert(c: &mut Criterion) {
let test_data = generate_test_data(capacity);

// Benchmark in-memory storage

group.bench_with_input(
BenchmarkId::new("inmemory", capacity),
&(capacity, &test_data),
|b, (cap, data)| {
b.iter_batched(
|| {
create_bloom_filter(
InMemoryStorage::new(*cap, 5)
.expect("Failed to create InMemory storage"),
*cap,
0.01,
)
.expect("Failed to create Bloom filter")
},
|| create_test_filter(*cap),
|mut filter| {
for item in data.iter() {
if let Err(e) = filter.insert(item.as_bytes()) {
Expand Down Expand Up @@ -87,12 +93,7 @@ fn bench_query(c: &mut Criterion) {
BenchmarkId::new("inmemory", capacity),
&(capacity, &known_data, &unknown_data),
|b, (cap, known, unknown)| {
let mut filter = create_bloom_filter(
InMemoryStorage::new(*cap, 5).unwrap(),
*cap,
0.01,
)
.unwrap();
let mut filter = create_test_filter(*cap);

// Insert known data
for item in known.iter() {
Expand All @@ -114,58 +115,5 @@ fn bench_query(c: &mut Criterion) {
group.finish();
}

fn bench_cleanup(c: &mut Criterion) {
let mut group = c.benchmark_group("cleanup_operations");

for capacity in [1_000, 100_000, 1_000_000] {
let test_data = generate_test_data(capacity);

group.bench_with_input(
BenchmarkId::new("inmemory", capacity),
&(capacity, &test_data),
|b, (cap, data)| {
b.iter_with_setup(
|| {
// Setup: Create filter and insert data with artificially expired timestamps
let mut filter = SlidingBloomFilter::new(
InMemoryStorage::new(*cap, 5).unwrap(),
*cap,
0.01,
Duration::from_secs(1),
5,
default_hash_function,
)
.unwrap();

// Insert test data
for item in data.iter() {
filter.insert(item.as_bytes()).unwrap();
}

// Artificially expire the timestamps
let expired_timestamps =
create_expired_timestamps(5, Duration::from_secs(6));
for (level, timestamp) in
expired_timestamps.iter().enumerate()
{
filter
.storage
.set_timestamp(level, *timestamp)
.unwrap();
}

filter
},
|mut filter| {
// Benchmark just the cleanup operation
filter.cleanup_expired_levels().unwrap();
},
);
},
);
}
group.finish();
}

criterion_group!(benches, bench_insert, bench_query, bench_cleanup);
criterion_group!(benches, bench_insert, bench_query);
criterion_main!(benches);
3 changes: 1 addition & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.init();
Expand Down Expand Up @@ -56,7 +55,7 @@ async fn main() {
,~~.
( 6 )-_,
(\_/)\ )\ / < Ready to filter with blazing speed! >
(\_/)\ )\ / < Ready to filter with blazing speed! >
\ | ) |
^^ ^^ ^^
Expand Down
8 changes: 4 additions & 4 deletions src/inmemory_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::time::SystemTime;

// Base filter implementation
pub struct InMemorySlidingBloomFilter {
storage: InMemoryStorage,
config: FilterConfig,
num_hashes: usize,
current_level_index: usize,
pub storage: InMemoryStorage,
pub config: FilterConfig,
pub num_hashes: usize,
pub current_level_index: usize,
}

impl InMemorySlidingBloomFilter {
Expand Down

0 comments on commit 7b8b312

Please sign in to comment.