Skip to content

Commit

Permalink
add server
Browse files Browse the repository at this point in the history
  • Loading branch information
oiwn committed Jan 6, 2025
1 parent 04000fc commit dfc27cb
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 1 deletion.
16 changes: 15 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,27 @@ redb = { version = "2", optional = true }
derive_builder = "0.20"
dotenvy = "0.15"

# server
axum = { version = "0.8", optional = true }
tokio = { version = "1", features = ["full"], optional = true }
utoipa = { version = "5.3", features = ["axum_extras"], optional = true }
utoipa-swagger-ui = { version = "8.1", features = ["axum"], optional = true }
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }

[dev-dependencies]
rand = "0.8"
expiring-bloom-rs = { path = ".", features = ["redb"] }
expiring-bloom-rs = { path = ".", features = ["redb", "server"] }
criterion = { version = "0.5", features = ["html_reports"] }

[[bin]]
name = "bloom-server"
path = "src/bin/server.rs"

[features]
default = ["redb", "server"]
redb = ["dep:redb"]
server = ["dep:axum", "dep:tokio", "dep:utoipa", "dep:utoipa-swagger-ui", "dep:serde", "dep:serde_json"]

[[bench]]
name = "bloom_benchmarks"
Expand Down
143 changes: 143 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use std::sync::Arc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::filter::SlidingBloomFilter;
use crate::types::{AppState, ErrorResponse, InsertRequest, QueryResponse};

#[derive(OpenApi)]
#[openapi(
paths(
health_check,
insert_item,
query_item,
cleanup_expired,
),
components(
schemas(InsertRequest, QueryResponse, ErrorResponse)
),
tags(
(name = "bloom-filter", description = "Time-Decaying Bloom Filter API")
)
)]
struct ApiDoc;

/// Check API health
#[utoipa::path(
get,
path = "/health",
tag = "bloom-filter",
responses(
(status = 200, description = "API is healthy")
)
)]
async fn health_check() -> impl IntoResponse {
StatusCode::OK
}

/// Insert an item into the Bloom filter
#[utoipa::path(
post,
path = "/items",
tag = "bloom-filter",
request_body = InsertRequest,
responses(
(status = 200, description = "Item inserted successfully"),
(status = 500, description = "Internal server error", body = ErrorResponse)
)
)]
async fn insert_item(
State(state): State<Arc<AppState>>,
Json(request): Json<InsertRequest>,
) -> impl IntoResponse {
let mut filter = state.filter.lock().await;
match filter.insert(request.value.as_bytes()) {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: e.to_string(),
}),
)
.into_response(),
}
}

/// Query if an item exists in the Bloom filter
#[utoipa::path(
get,
path = "/items/{value}",
tag = "bloom-filter",
params(
("value" = String, Path, description = "Value to query")
),
responses(
(status = 200, description = "Query successful", body = QueryResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
)
)]
async fn query_item(
State(state): State<Arc<AppState>>,
Path(value): Path<String>,
) -> impl IntoResponse {
let filter = state.filter.lock().await;
match filter.query(value.as_bytes()) {
Ok(exists) => {
(StatusCode::OK, Json(QueryResponse { exists })).into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: e.to_string(),
}),
)
.into_response(),
}
}

/// Clean up expired items
#[utoipa::path(
post,
path = "/cleanup",
tag = "bloom-filter",
responses(
(status = 200, description = "Cleanup successful"),
(status = 500, description = "Internal server error", body = ErrorResponse)
)
)]
async fn cleanup_expired(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let mut filter = state.filter.lock().await;
match filter.cleanup_expired_levels() {
Ok(_) => StatusCode::OK.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: e.to_string(),
}),
)
.into_response(),
}
}

pub fn create_router(state: Arc<AppState>) -> Router {
let openapi = ApiDoc::openapi();

Router::new()
.merge(
SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", openapi),
)
.route("/health", get(health_check))
.route("/items", post(insert_item))
.route("/items/{value}", get(query_item))
.route("/cleanup", post(cleanup_expired))
.with_state(state)
}
34 changes: 34 additions & 0 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use expiring_bloom_rs::api::create_router;
use expiring_bloom_rs::types::AppState;
use expiring_bloom_rs::{FilterConfigBuilder, RedbSlidingBloomFilter};
use std::{sync::Arc, time::Duration};

#[tokio::main]
async fn main() {
// Initialize the Bloom filter
let config = FilterConfigBuilder::default()
.capacity(1000)
.false_positive_rate(0.01)
.level_duration(Duration::from_secs(60))
.max_levels(3)
.build()
.expect("Failed to build filter config");

let filter = RedbSlidingBloomFilter::new(config, "bloom.redb".into())
.expect("Failed to create filter");

// Create application state
let state = Arc::new(AppState {
filter: tokio::sync::Mutex::new(filter),
});

// Create router
let app = create_router(state);

// Start the server
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("Server running on http://localhost:3000");
println!("API documentation available at http://localhost:3000/swagger-ui/");

axum::serve(listener, app).await.unwrap();
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
//! access during sub-filter rotation.
//! * Since 32 bit hashes used, max capacity would be 2**32-1 (Not sure)
#[cfg(feature = "server")]
pub mod api;
mod error;
mod filter;
mod hash;
mod inmemory_filter;
#[cfg(feature = "redb")]
mod redb_filter;
mod storage;
#[cfg(feature = "server")]
pub mod types;

pub use error::{BloomError, Result};
pub use filter::{
Expand Down
23 changes: 23 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use crate::RedbSlidingBloomFilter;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use utoipa::ToSchema;

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct InsertRequest {
pub value: String,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct QueryResponse {
pub exists: bool,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ErrorResponse {
pub message: String,
}

pub struct AppState {
pub filter: Mutex<RedbSlidingBloomFilter>,
}

0 comments on commit dfc27cb

Please sign in to comment.