Skip to content

Commit

Permalink
better code
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Jan 6, 2025
1 parent dfc27cb commit ddcaa36
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 37 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ murmur3 = "0.5"
thiserror = "2"
redb = { version = "2", optional = true }
derive_builder = "0.20"
dotenvy = "0.15"

# server
dotenvy = { version = "0.15", optional = true }
axum = { version = "0.8", optional = true }
tokio = { version = "1", features = ["full"], optional = true }
utoipa = { version = "5.3", features = ["axum_extras"], optional = true }
utoipa-swagger-ui = { version = "8.1", features = ["axum"], optional = true }
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
tracing-subscriber = "0.3.19"
tracing = "0.1.41"
tower-http = { version = "0.6.2", features = ["trace"] }

[dev-dependencies]
rand = "0.8"
expiring-bloom-rs = { path = ".", features = ["redb", "server"] }
criterion = { version = "0.5", features = ["html_reports"] }
tower = "0.5.2"

[[bin]]
name = "bloom-server"
Expand All @@ -33,7 +37,7 @@ path = "src/bin/server.rs"
[features]
default = ["redb", "server"]
redb = ["dep:redb"]
server = ["dep:axum", "dep:tokio", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde", "dep:serde_json"]
server = ["dep:axum", "dep:tokio", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde", "dep:serde_json", "dep:dotenvy"]

[[bench]]
name = "bloom_benchmarks"
Expand Down
19 changes: 1 addition & 18 deletions benches/bloom_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use expiring_bloom_rs::{
default_hash_function, inmemory_storage::InMemoryStorage, BloomFilterStorage,
SlidingBloomFilter,
default_hash_function, BloomStorage, InMemoryStorage, SlidingBloomFilter,
};
use rand::{distributions::Alphanumeric, Rng};
use std::{time::Duration, time::SystemTime};
Expand Down Expand Up @@ -37,22 +36,6 @@ fn create_expired_timestamps(
vec![past; num_levels]
}

// Helper to create a bloom filter with given storage
fn create_bloom_filter<S: BloomFilterStorage>(
storage: S,
capacity: usize,
fpr: f64,
) -> Result<SlidingBloomFilter<S>, Box<dyn std::error::Error>> {
Ok(SlidingBloomFilter::new(
storage,
capacity,
fpr,
Duration::from_secs(1),
5,
default_hash_function,
)?)
}

fn bench_insert(c: &mut Criterion) {
let mut group = c.benchmark_group("insert_operations");

Expand Down
4 changes: 4 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::{
Json, Router,
};
use std::sync::Arc;
use tracing::debug;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

Expand Down Expand Up @@ -39,6 +40,7 @@ struct ApiDoc;
)
)]
async fn health_check() -> impl IntoResponse {
debug!("Health check");
StatusCode::OK
}

Expand All @@ -57,6 +59,7 @@ async fn insert_item(
State(state): State<Arc<AppState>>,
Json(request): Json<InsertRequest>,
) -> impl IntoResponse {
debug!("Inserting item: {}", &request.value);
let mut filter = state.filter.lock().await;
match filter.insert(request.value.as_bytes()) {
Ok(_) => StatusCode::OK.into_response(),
Expand Down Expand Up @@ -87,6 +90,7 @@ async fn query_item(
State(state): State<Arc<AppState>>,
Path(value): Path<String>,
) -> impl IntoResponse {
debug!("Querying item: {}", &value);
let filter = state.filter.lock().await;
match filter.query(value.as_bytes()) {
Ok(exists) => {
Expand Down
128 changes: 112 additions & 16 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,130 @@
use expiring_bloom_rs::api::create_router;
use expiring_bloom_rs::types::AppState;
use expiring_bloom_rs::{FilterConfigBuilder, RedbSlidingBloomFilter};
use std::{sync::Arc, time::Duration};
use expiring_bloom_rs::{
FilterConfigBuilder, RedbSlidingBloomFilter, ServerConfig,
};
use std::sync::Arc;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() {
// Initialize the Bloom filter
let config = FilterConfigBuilder::default()
.capacity(1000)
.false_positive_rate(0.01)
.level_duration(Duration::from_secs(60))
.max_levels(3)
// Initialize tracing
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.with_target(false)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.pretty()
.init();

// load configuration from environment variables
let config = ServerConfig::from_env().expect("Failed to load configuration");

// Initialize the Bloom filter with configuration
let filter_config = FilterConfigBuilder::default()
.capacity(config.bloom_capacity)
.false_positive_rate(config.bloom_false_positive_rate)
.level_duration(config.bloom_level_duration)
.max_levels(config.bloom_max_levels)
.build()
.expect("Failed to build filter config");

let filter = RedbSlidingBloomFilter::new(config, "bloom.redb".into())
.expect("Failed to create filter");
let filter = RedbSlidingBloomFilter::new(
filter_config.clone(),
config.bloom_db_path.clone().into(),
)
.expect("Failed to create filter");

// Create application state
let state = Arc::new(AppState {
filter: tokio::sync::Mutex::new(filter),
});

// Create router
let app = create_router(state);
// Create router with logging middleware
let app = create_router(state.clone()).layer(
tower_http::trace::TraceLayer::new_for_http()
.make_span_with(|request: &axum::http::Request<_>| {
tracing::info_span!(
"http_request",
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
)
})
.on_response(
|response: &axum::http::Response<_>,
latency: std::time::Duration,
_span: &tracing::Span| {
tracing::info!(
status = %response.status(),
latency = ?latency,
"response generated"
);
},
),
);

// Build address string
let addr = format!("{}:{}", config.server_host, config.server_port);
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();

// Calculate the memory usage estimation
let bits_per_level = filter_config.capacity;
let total_bits = bits_per_level * filter_config.max_levels;
let estimated_memory_kb = (total_bits as f64 / 8.0 / 1024.0).ceil();

let level_duration = filter_config.level_duration;
let max_levels = filter_config.max_levels;
let false_positive_rate = filter_config.false_positive_rate.clone();

info!(
r#"
🦀 Time-Decaying Bloom Filter Server Starting! 🚀
,~~.
( 6 )-_,
(\_/)\ )\ / < Ready to filter with blazing speed! >
\ | ) |
^^ ^^ ^^
📊 Filter Configuration:
• Capacity: {:>16} items
• False Positive Rate: {:>8.4}%
• Levels: {:>18}
• Level Duration: {:>12?}
• Estimated Memory: {:>8.2} KB
• Database Path: {:>14}
// Start the server
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("Server running on http://localhost:3000");
println!("API documentation available at http://localhost:3000/swagger-ui/");
🌐 Server Information:
• Listening on: http://{}
• Swagger UI: http://{}/swagger-ui/
• Health Check: http://{}/health
🎯 API Endpoints:
• POST /items - Insert item
• GET /items/:value - Query item
• POST /cleanup - Cleanup expired items
🔧 Performance Mode: {}
"#,
bits_per_level,
false_positive_rate * 100.0,
max_levels,
level_duration,
estimated_memory_kb,
&config.bloom_db_path,
addr,
addr,
addr,
if cfg!(debug_assertions) {
"DEBUG"
} else {
"RELEASE"
}
);

info!("Starting server on {}", addr);
axum::serve(listener, app).await.unwrap();
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ pub use inmemory_filter::InMemorySlidingBloomFilter;
#[cfg(feature = "redb")]
pub use redb_filter::RedbSlidingBloomFilter;
pub use storage::{BloomStorage, InMemoryStorage};
#[cfg(feature = "server")]
pub use types::{
AppState, ServerConfig, ServerConfigBuilder, ServerConfigBuilderError,
};
2 changes: 1 addition & 1 deletion src/redb_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::{BloomError, Result};
use crate::filter::{FilterConfig, SlidingBloomFilter};
use crate::hash::{optimal_bit_vector_size, optimal_num_hashes};
use crate::storage::{BloomStorage, InMemoryStorage};
use redb::{Database, ReadableTable, TableDefinition};
use redb::{Database, TableDefinition};
use std::{path::PathBuf, sync::Arc, time::SystemTime};

// Define table schemas for ReDB
Expand Down
56 changes: 56 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::RedbSlidingBloomFilter;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::Mutex;
use utoipa::ToSchema;

Expand All @@ -21,3 +23,57 @@ pub struct ErrorResponse {
pub struct AppState {
pub filter: Mutex<RedbSlidingBloomFilter>,
}

#[derive(Builder, Clone)]
#[builder(pattern = "owned")]
pub struct ServerConfig {
#[builder(default = "\"127.0.0.1\".to_string()")]
pub server_host: String,
#[builder(default = "3000")]
pub server_port: u16,
#[builder(default = "\"bloom.redb\".to_string()")]
pub bloom_db_path: String,
#[builder(default = "10000")]
pub bloom_capacity: usize,
#[builder(default = "0.01")]
pub bloom_false_positive_rate: f64,
#[builder(default = "Duration::from_secs(60)")]
pub bloom_level_duration: Duration,
#[builder(default = "3")]
pub bloom_max_levels: usize,
}

impl ServerConfig {
pub fn from_env() -> Result<Self, String> {
dotenvy::dotenv().ok();

Ok(Self {
server_host: std::env::var("SERVER_HOST")
.unwrap_or_else(|_| "127.0.0.1".to_string()),
server_port: std::env::var("SERVER_PORT")
.unwrap_or_else(|_| "3000".to_string())
.parse()
.map_err(|_| "Invalid SERVER_PORT")?,
bloom_db_path: std::env::var("BLOOM_DB_PATH")
.unwrap_or_else(|_| "bloom.redb".to_string()),
bloom_capacity: std::env::var("BLOOM_CAPACITY")
.unwrap_or_else(|_| "10000".to_string())
.parse()
.map_err(|_| "Invalid BLOOM_CAPACITY")?,
bloom_false_positive_rate: std::env::var("BLOOM_FALSE_POSITIVE_RATE")
.unwrap_or_else(|_| "0.01".to_string())
.parse()
.map_err(|_| "Invalid BLOOM_FALSE_POSITIVE_RATE")?,
bloom_level_duration: Duration::from_secs(
std::env::var("BLOOM_LEVEL_DURATION_SECS")
.unwrap_or_else(|_| "60".to_string())
.parse()
.map_err(|_| "Invalid BLOOM_LEVEL_DURATION_SECS")?,
),
bloom_max_levels: std::env::var("BLOOM_MAX_LEVELS")
.unwrap_or_else(|_| "3".to_string())
.parse()
.map_err(|_| "Invalid BLOOM_MAX_LEVELS")?,
})
}
}
Loading

0 comments on commit ddcaa36

Please sign in to comment.