From ddcaa36b37ff47471cc441c693b817e47e5dd6dd Mon Sep 17 00:00:00 2001 From: oiwn Date: Mon, 6 Jan 2025 22:26:39 +0700 Subject: [PATCH] better code --- Cargo.toml | 8 +- benches/bloom_benchmarks.rs | 19 +--- src/api.rs | 4 + src/bin/server.rs | 128 ++++++++++++++++++++--- src/lib.rs | 4 + src/redb_filter.rs | 2 +- src/types.rs | 56 ++++++++++ tests/server_tests.rs | 197 ++++++++++++++++++++++++++++++++++++ 8 files changed, 381 insertions(+), 37 deletions(-) create mode 100644 tests/server_tests.rs diff --git a/Cargo.toml b/Cargo.toml index c9a2b41..67cb0bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,20 +11,24 @@ murmur3 = "0.5" thiserror = "2" redb = { version = "2", optional = true } derive_builder = "0.20" -dotenvy = "0.15" # server +dotenvy = { version = "0.15", optional = true } axum = { version = "0.8", optional = true } tokio = { version = "1", features = ["full"], optional = true } utoipa = { version = "5.3", features = ["axum_extras"], optional = true } utoipa-swagger-ui = { version = "8.1", features = ["axum"], optional = true } serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true } +tracing-subscriber = "0.3.19" +tracing = "0.1.41" +tower-http = { version = "0.6.2", features = ["trace"] } [dev-dependencies] rand = "0.8" expiring-bloom-rs = { path = ".", features = ["redb", "server"] } criterion = { version = "0.5", features = ["html_reports"] } +tower = "0.5.2" [[bin]] name = "bloom-server" @@ -33,7 +37,7 @@ path = "src/bin/server.rs" [features] default = ["redb", "server"] redb = ["dep:redb"] -server = ["dep:axum", "dep:tokio", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde", "dep:serde_json"] +server = ["dep:axum", "dep:tokio", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde", "dep:serde_json", "dep:dotenvy"] [[bench]] name = "bloom_benchmarks" diff --git a/benches/bloom_benchmarks.rs b/benches/bloom_benchmarks.rs index 6c4a82a..b7ce166 100644 --- a/benches/bloom_benchmarks.rs +++ b/benches/bloom_benchmarks.rs @@ -1,7 +1,6 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use expiring_bloom_rs::{ - default_hash_function, inmemory_storage::InMemoryStorage, BloomFilterStorage, - SlidingBloomFilter, + default_hash_function, BloomStorage, InMemoryStorage, SlidingBloomFilter, }; use rand::{distributions::Alphanumeric, Rng}; use std::{time::Duration, time::SystemTime}; @@ -37,22 +36,6 @@ fn create_expired_timestamps( vec![past; num_levels] } -// Helper to create a bloom filter with given storage -fn create_bloom_filter( - storage: S, - capacity: usize, - fpr: f64, -) -> Result, Box> { - Ok(SlidingBloomFilter::new( - storage, - capacity, - fpr, - Duration::from_secs(1), - 5, - default_hash_function, - )?) -} - fn bench_insert(c: &mut Criterion) { let mut group = c.benchmark_group("insert_operations"); diff --git a/src/api.rs b/src/api.rs index a990ad7..97ba1fb 100644 --- a/src/api.rs +++ b/src/api.rs @@ -6,6 +6,7 @@ use axum::{ Json, Router, }; use std::sync::Arc; +use tracing::debug; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; @@ -39,6 +40,7 @@ struct ApiDoc; ) )] async fn health_check() -> impl IntoResponse { + debug!("Health check"); StatusCode::OK } @@ -57,6 +59,7 @@ async fn insert_item( State(state): State>, Json(request): Json, ) -> impl IntoResponse { + debug!("Inserting item: {}", &request.value); let mut filter = state.filter.lock().await; match filter.insert(request.value.as_bytes()) { Ok(_) => StatusCode::OK.into_response(), @@ -87,6 +90,7 @@ async fn query_item( State(state): State>, Path(value): Path, ) -> impl IntoResponse { + debug!("Querying item: {}", &value); let filter = state.filter.lock().await; match filter.query(value.as_bytes()) { Ok(exists) => { diff --git a/src/bin/server.rs b/src/bin/server.rs index 2ddda45..7a3973e 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,34 +1,130 @@ use expiring_bloom_rs::api::create_router; use expiring_bloom_rs::types::AppState; -use expiring_bloom_rs::{FilterConfigBuilder, RedbSlidingBloomFilter}; -use std::{sync::Arc, time::Duration}; +use expiring_bloom_rs::{ + FilterConfigBuilder, RedbSlidingBloomFilter, ServerConfig, +}; +use std::sync::Arc; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; #[tokio::main] async fn main() { - // Initialize the Bloom filter - let config = FilterConfigBuilder::default() - .capacity(1000) - .false_positive_rate(0.01) - .level_duration(Duration::from_secs(60)) - .max_levels(3) + // Initialize tracing + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::INFO) + .with_target(false) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true) + .pretty() + .init(); + + // load configuration from environment variables + let config = ServerConfig::from_env().expect("Failed to load configuration"); + + // Initialize the Bloom filter with configuration + let filter_config = FilterConfigBuilder::default() + .capacity(config.bloom_capacity) + .false_positive_rate(config.bloom_false_positive_rate) + .level_duration(config.bloom_level_duration) + .max_levels(config.bloom_max_levels) .build() .expect("Failed to build filter config"); - let filter = RedbSlidingBloomFilter::new(config, "bloom.redb".into()) - .expect("Failed to create filter"); + let filter = RedbSlidingBloomFilter::new( + filter_config.clone(), + config.bloom_db_path.clone().into(), + ) + .expect("Failed to create filter"); // Create application state let state = Arc::new(AppState { filter: tokio::sync::Mutex::new(filter), }); - // Create router - let app = create_router(state); + // Create router with logging middleware + let app = create_router(state.clone()).layer( + tower_http::trace::TraceLayer::new_for_http() + .make_span_with(|request: &axum::http::Request<_>| { + tracing::info_span!( + "http_request", + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + ) + }) + .on_response( + |response: &axum::http::Response<_>, + latency: std::time::Duration, + _span: &tracing::Span| { + tracing::info!( + status = %response.status(), + latency = ?latency, + "response generated" + ); + }, + ), + ); + + // Build address string + let addr = format!("{}:{}", config.server_host, config.server_port); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + + // Calculate the memory usage estimation + let bits_per_level = filter_config.capacity; + let total_bits = bits_per_level * filter_config.max_levels; + let estimated_memory_kb = (total_bits as f64 / 8.0 / 1024.0).ceil(); + + let level_duration = filter_config.level_duration; + let max_levels = filter_config.max_levels; + let false_positive_rate = filter_config.false_positive_rate.clone(); + + info!( + r#" + 🦀 Time-Decaying Bloom Filter Server Starting! 🚀 + + ,~~. + ( 6 )-_, + (\_/)\ )\ / < Ready to filter with blazing speed! > + \ | ) | + ^^ ^^ ^^ + + 📊 Filter Configuration: + • Capacity: {:>16} items + • False Positive Rate: {:>8.4}% + • Levels: {:>18} + • Level Duration: {:>12?} + • Estimated Memory: {:>8.2} KB + • Database Path: {:>14} - // Start the server - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); - println!("Server running on http://localhost:3000"); - println!("API documentation available at http://localhost:3000/swagger-ui/"); + 🌐 Server Information: + • Listening on: http://{} + • Swagger UI: http://{}/swagger-ui/ + • Health Check: http://{}/health + + 🎯 API Endpoints: + • POST /items - Insert item + • GET /items/:value - Query item + • POST /cleanup - Cleanup expired items + + 🔧 Performance Mode: {} + "#, + bits_per_level, + false_positive_rate * 100.0, + max_levels, + level_duration, + estimated_memory_kb, + &config.bloom_db_path, + addr, + addr, + addr, + if cfg!(debug_assertions) { + "DEBUG" + } else { + "RELEASE" + } + ); + info!("Starting server on {}", addr); axum::serve(listener, app).await.unwrap(); } diff --git a/src/lib.rs b/src/lib.rs index f64c4ce..6a62bda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,3 +49,7 @@ pub use inmemory_filter::InMemorySlidingBloomFilter; #[cfg(feature = "redb")] pub use redb_filter::RedbSlidingBloomFilter; pub use storage::{BloomStorage, InMemoryStorage}; +#[cfg(feature = "server")] +pub use types::{ + AppState, ServerConfig, ServerConfigBuilder, ServerConfigBuilderError, +}; diff --git a/src/redb_filter.rs b/src/redb_filter.rs index 70d73d4..d33295f 100644 --- a/src/redb_filter.rs +++ b/src/redb_filter.rs @@ -2,7 +2,7 @@ use crate::error::{BloomError, Result}; use crate::filter::{FilterConfig, SlidingBloomFilter}; use crate::hash::{optimal_bit_vector_size, optimal_num_hashes}; use crate::storage::{BloomStorage, InMemoryStorage}; -use redb::{Database, ReadableTable, TableDefinition}; +use redb::{Database, TableDefinition}; use std::{path::PathBuf, sync::Arc, time::SystemTime}; // Define table schemas for ReDB diff --git a/src/types.rs b/src/types.rs index 68d4d0a..20e6af8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,7 @@ use crate::RedbSlidingBloomFilter; +use derive_builder::Builder; use serde::{Deserialize, Serialize}; +use std::time::Duration; use tokio::sync::Mutex; use utoipa::ToSchema; @@ -21,3 +23,57 @@ pub struct ErrorResponse { pub struct AppState { pub filter: Mutex, } + +#[derive(Builder, Clone)] +#[builder(pattern = "owned")] +pub struct ServerConfig { + #[builder(default = "\"127.0.0.1\".to_string()")] + pub server_host: String, + #[builder(default = "3000")] + pub server_port: u16, + #[builder(default = "\"bloom.redb\".to_string()")] + pub bloom_db_path: String, + #[builder(default = "10000")] + pub bloom_capacity: usize, + #[builder(default = "0.01")] + pub bloom_false_positive_rate: f64, + #[builder(default = "Duration::from_secs(60)")] + pub bloom_level_duration: Duration, + #[builder(default = "3")] + pub bloom_max_levels: usize, +} + +impl ServerConfig { + pub fn from_env() -> Result { + dotenvy::dotenv().ok(); + + Ok(Self { + server_host: std::env::var("SERVER_HOST") + .unwrap_or_else(|_| "127.0.0.1".to_string()), + server_port: std::env::var("SERVER_PORT") + .unwrap_or_else(|_| "3000".to_string()) + .parse() + .map_err(|_| "Invalid SERVER_PORT")?, + bloom_db_path: std::env::var("BLOOM_DB_PATH") + .unwrap_or_else(|_| "bloom.redb".to_string()), + bloom_capacity: std::env::var("BLOOM_CAPACITY") + .unwrap_or_else(|_| "10000".to_string()) + .parse() + .map_err(|_| "Invalid BLOOM_CAPACITY")?, + bloom_false_positive_rate: std::env::var("BLOOM_FALSE_POSITIVE_RATE") + .unwrap_or_else(|_| "0.01".to_string()) + .parse() + .map_err(|_| "Invalid BLOOM_FALSE_POSITIVE_RATE")?, + bloom_level_duration: Duration::from_secs( + std::env::var("BLOOM_LEVEL_DURATION_SECS") + .unwrap_or_else(|_| "60".to_string()) + .parse() + .map_err(|_| "Invalid BLOOM_LEVEL_DURATION_SECS")?, + ), + bloom_max_levels: std::env::var("BLOOM_MAX_LEVELS") + .unwrap_or_else(|_| "3".to_string()) + .parse() + .map_err(|_| "Invalid BLOOM_MAX_LEVELS")?, + }) + } +} diff --git a/tests/server_tests.rs b/tests/server_tests.rs new file mode 100644 index 0000000..3b12ae3 --- /dev/null +++ b/tests/server_tests.rs @@ -0,0 +1,197 @@ +#[cfg(test)] +mod tests { + use axum::{ + body::{self, Body}, + http::{Request, StatusCode}, + Router, + }; + use expiring_bloom_rs::api::create_router; + use expiring_bloom_rs::{ + AppState, FilterConfigBuilder, RedbSlidingBloomFilter, + ServerConfigBuilder, + }; + use serde_json::json; + use std::{sync::Arc, time::Duration}; + use tower::util::ServiceExt; + + async fn setup_test_app() -> Router { + use rand::random; + + let test_config = ServerConfigBuilder::default() + .server_port(random::() % 10000 + 50000) // Random high port + .bloom_db_path(format!("test_bloom_{}.redb", random::())) + .build() + .unwrap(); + let config = FilterConfigBuilder::default() + .capacity(100) + .false_positive_rate(0.01) + .level_duration(Duration::from_secs(1)) + .max_levels(3) + .build() + .unwrap(); + + let filter = + RedbSlidingBloomFilter::new(config, test_config.bloom_db_path.into()) + .unwrap(); + let state = Arc::new(AppState { + filter: tokio::sync::Mutex::new(filter), + }); + + create_router(state) + } + + #[tokio::test] + async fn test_health_check() { + let app = setup_test_app().await; + + let response = app + .oneshot( + Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_insert_and_query() { + let app = setup_test_app().await; + let test_value = "test_item"; + + // Test insert + let insert_response = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/items") + .header("content-type", "application/json") + .body(Body::from(json!({ "value": test_value }).to_string())) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(insert_response.status(), StatusCode::OK); + + // Test query for inserted item + let query_response = app + .clone() + .oneshot( + Request::builder() + .uri(format!("/items/{}", test_value)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(query_response.status(), StatusCode::OK); + + let body_bytes = body::to_bytes(query_response.into_body(), 100) + .await + .unwrap(); + let response: serde_json::Value = + serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(response["exists"], true); + + // Test query for non-existent item + let query_response = app + .oneshot( + Request::builder() + .uri("/items/nonexistent") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(query_response.status(), StatusCode::OK); + + let body_bytes = body::to_bytes(query_response.into_body(), 100) + .await + .unwrap(); + let response: serde_json::Value = + serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(response["exists"], false); + } + + #[tokio::test] + async fn test_cleanup() { + let app = setup_test_app().await; + + let response = app + .oneshot( + Request::builder() + .method("POST") + .uri("/cleanup") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_expiration() { + let app = setup_test_app().await; + let test_value = "expiring_item"; + + // Insert item + let _ = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/items") + .header("content-type", "application/json") + .body(Body::from(json!({ "value": test_value }).to_string())) + .unwrap(), + ) + .await + .unwrap(); + + // Wait for expiration + tokio::time::sleep(Duration::from_secs(4)).await; + + // Trigger cleanup + let _ = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/cleanup") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Query expired item + let query_response = app + .oneshot( + Request::builder() + .uri(format!("/items/{}", test_value)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body_bytes = body::to_bytes(query_response.into_body(), 100) + .await + .unwrap(); + let response: serde_json::Value = + serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(response["exists"], false); + } +}