diff --git a/Cargo.lock b/Cargo.lock index 466e2849b6c..edabe9aa6c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3055,11 +3055,10 @@ dependencies = [ [[package]] name = "foyer" -version = "0.15.3" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb3b4e79fe76576b3970ada6437b30bee8cefe28685535290c3188d9b79b981e" +checksum = "635c7077026867cb5e5ea576c461f29b1c4151fce7a9d7cc3a1b1a9902d95c65" dependencies = [ - "ahash", "anyhow", "equivalent", "fastrace", @@ -3076,11 +3075,12 @@ dependencies = [ [[package]] name = "foyer-common" -version = "0.15.3" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e71666dc1f66539cc9a21668fa59d11dbf9e1fd7cb9048d3e2293e28080f5d0" +checksum = "7ed2316785e80137c7b91bb74dab1dc1967c3272df05825397b73ae8fc527041" dependencies = [ "ahash", + "bincode", "bytes", "cfg-if", "fastrace", @@ -3090,6 +3090,7 @@ dependencies = [ "parking_lot", "pin-project", "serde", + "thiserror 2.0.4", "tokio", ] @@ -3104,9 +3105,9 @@ dependencies = [ [[package]] name = "foyer-memory" -version = "0.15.3" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e9332460864db122775ef95b0603562288674daad7a8df0add81adaca7e5e09" +checksum = "090cf5b89d49fd61e7da9bfae3a1aef605f03196d542b2f8171c74f3add013f4" dependencies = [ "ahash", "arc-swap", @@ -3130,16 +3131,15 @@ dependencies = [ [[package]] name = "foyer-storage" -version = "0.15.3" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfae1b6d15d65c014e72047939fcf937f964582c6561942e736fc5dec1fc8c3c" +checksum = "095e857c97d6339d4a4a6424b88d08fe08ad0366bfbfaf65d6ddf55baf3d2a38" dependencies = [ "ahash", "allocator-api2", "anyhow", "array-util", "auto_enums", - "bincode", "bytes", "clap", "equivalent", @@ -4507,7 +4507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4861,9 +4861,9 @@ dependencies = [ [[package]] name = "mixtrics" -version = "0.0.4" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "285f6f175bfc4a068c0aa66d1fc48b6f4af0193d9577f8a86fa3a80014be9321" +checksum = "749ed12bab176c8a42c13a679dd2de12876d5ad4abe7525548e31ae001a9ebbf" dependencies = [ "itertools 0.14.0", "opentelemetry", @@ -7307,7 +7307,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2 1.0.92", "quote 1.0.37", "syn 2.0.101", diff --git a/rust/blockstore/src/arrow/block/types.rs b/rust/blockstore/src/arrow/block/types.rs index 6bc71073a3e..46638d78e07 100644 --- a/rust/blockstore/src/arrow/block/types.rs +++ b/rust/blockstore/src/arrow/block/types.rs @@ -855,3 +855,30 @@ where Ok(()) } + +#[cfg(test)] +mod tests { + + use std::sync::Arc; + + use arrow::{ + array::Int32Array, + datatypes::{DataType, Field, Schema}, + }; + + use super::*; + + #[test] + fn test_block_serde() { + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))], + ) + .unwrap(); + let b1 = Block::from_record_batch(Uuid::new_v4(), batch.clone()); + let bytes = bincode::serialize(&b1).unwrap(); + let b2 = bincode::deserialize::(&bytes).unwrap(); + assert_eq!(b1.id, b2.id); + assert_eq!(b1.data.0, b2.data.0); + } +} diff --git a/rust/cache/Cargo.toml b/rust/cache/Cargo.toml index 2f9bf328d2f..1411b75cc05 100644 --- a/rust/cache/Cargo.toml +++ b/rust/cache/Cargo.toml @@ -8,8 +8,8 @@ path = "src/lib.rs" [dependencies] clap = { workspace = true } -foyer = { version = "0.15.3", features = ["tracing"] } -mixtrics = { version = "0.0.4", features = ["opentelemetry_0_27"] } +foyer = { version = "0.17.3", features = ["tracing", "serde"] } +mixtrics = { version = "0.1.0", features = ["opentelemetry_0_27"] } anyhow = "1.0" opentelemetry = { version = "0.27.0", default-features = false, features = ["trace", "metrics"] } diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs index 96281ed613f..c2f6646e4f3 100644 --- a/rust/cache/src/foyer.rs +++ b/rust/cache/src/foyer.rs @@ -5,8 +5,8 @@ use chroma_tracing::util::Stopwatch; use clap::Parser; use foyer::{ CacheBuilder, DirectFsDeviceOptions, Engine, FifoConfig, FifoPicker, HybridCacheBuilder, - InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker, S3FifoConfig, - StorageKey, StorageValue, TracingOptions, + InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, S3FifoConfig, StorageKey, + StorageValue, Throttle, TracingOptions, }; use opentelemetry::global; use serde::{Deserialize, Serialize}; @@ -303,7 +303,7 @@ where K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static, V: Clone + Send + Sync + StorageValue + Weighted + 'static, { - cache: foyer::HybridCache, + cache: foyer::HybridCache, cache_hit: opentelemetry::metrics::Counter, cache_miss: opentelemetry::metrics::Counter, get_latency: opentelemetry::metrics::Histogram, @@ -376,7 +376,7 @@ where ); builder.with_hash_builder(rs) } - false => builder, + false => builder.with_hash_builder(RandomState::new()), }; let Some(dir) = config.dir.as_ref() else { @@ -385,14 +385,19 @@ where ))); }; - let mut builder = builder + let mut device_options = DirectFsDeviceOptions::new(dir) + .with_capacity(config.disk * MIB) + .with_file_size(config.file_size * MIB); + if config.admission_rate_limit > 0 { + device_options = device_options.with_throttle( + Throttle::new().with_write_throughput(config.admission_rate_limit * MIB), + ); + } + + let builder = builder .with_weighter(|_, v| v.weight()) .storage(Engine::Large) - .with_device_options( - DirectFsDeviceOptions::new(dir) - .with_capacity(config.disk * MIB) - .with_file_size(config.file_size * MIB), - ) + .with_device_options(device_options) .with_flush(config.flush) .with_recover_mode(foyer::RecoverMode::Strict) .with_large_object_disk_cache_options( @@ -408,11 +413,6 @@ where ]), ); - if config.admission_rate_limit > 0 { - builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new( - config.admission_rate_limit * MIB, - ))); - } let cache = builder.build().await.map_err(|e| { CacheError::InvalidCacheConfig(format!("builder failed: {:?}", e)).boxed() })?;