diff --git a/Cargo.lock b/Cargo.lock index 97abdc5db..eb1142658 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2057,6 +2057,15 @@ dependencies = [ "url", ] +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.1.0" @@ -2482,6 +2491,7 @@ dependencies = [ "hdrhistogram", "hex", "httpmock", + "humansize", "lazy_static", "libc", "linked-hash-map", diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index b7d78a25d..bd1345a7f 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -11,6 +11,7 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use futures::Stream; +use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use pin_project::pin_project; use crate::object_client::{ @@ -85,6 +86,10 @@ where self.client.initial_read_window_size() } + fn mem_usage_stats(&self) -> Option { + self.client.mem_usage_stats() + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index b663a9463..08e7c8be2 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use futures::{Stream, StreamExt}; use lazy_static::lazy_static; use mountpoint_s3_crt::checksums::crc32c; +use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use rand::seq::SliceRandom; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; @@ -572,6 +573,10 @@ impl ObjectClient for MockClient { } } + fn mem_usage_stats(&self) -> Option { + None + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index 065791069..fc2c74831 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -6,6 +6,7 @@ use std::time::Duration; use async_io::block_on; use async_trait::async_trait; use futures::Stream; +use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use pin_project::pin_project; use crate::mock_client::leaky_bucket::LeakyBucket; @@ -113,6 +114,10 @@ impl ObjectClient for ThroughputMockClient { self.inner.initial_read_window_size() } + fn mem_usage_stats(&self) -> Option { + self.inner.mem_usage_stats() + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index b92bf796c..1b34bbdae 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -2,6 +2,7 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; use async_trait::async_trait; use auto_impl::auto_impl; use futures::Stream; +use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use std::pin::Pin; use std::str::FromStr; use std::time::SystemTime; @@ -89,6 +90,10 @@ pub trait ObjectClient { /// This can be `None` if backpressure is disabled. fn initial_read_window_size(&self) -> Option; + /// Query current memory usage stats for the client. This can be `None` if the client + /// does not record the stats. + fn mem_usage_stats(&self) -> Option; + /// Delete a single object from the object store. /// /// DeleteObject will succeed even if the object within the bucket does not exist. diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index caf6bab45..d38742edf 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -25,8 +25,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup; use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions}; use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions}; use mountpoint_s3_crt::s3::client::{ - init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult, - MetaRequestType, RequestMetrics, RequestType, + init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, + MetaRequestResult, MetaRequestType, RequestMetrics, RequestType, }; use async_trait::async_trait; @@ -767,7 +767,9 @@ impl S3CrtClientInner { .set(metrics.num_requests_streaming_response as f64); // Buffer pool metrics + let start = Instant::now(); let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats(); + metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64); metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64); metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64); metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64); @@ -1208,6 +1210,13 @@ impl ObjectClient for S3CrtClient { } } + fn mem_usage_stats(&self) -> Option { + let start = Instant::now(); + let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats(); + metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64); + Some(crt_buffer_pool_stats) + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 9e28aaca5..cf64f5e5f 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -44,6 +44,7 @@ tracing = { version = "0.1.35", features = ["log"] } tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } async-stream = "0.3.5" +humansize = "2.1.3" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.16.0", default-features = false } @@ -83,6 +84,7 @@ built = { version = "0.7.1", features = ["git2"] } express_cache = ["block_size"] block_size = [] event_log = [] +mem_limiter = [] # Features for choosing tests fips_tests = [] fuse_tests = [] diff --git a/mountpoint-s3/examples/prefetch_benchmark.rs b/mountpoint-s3/examples/prefetch_benchmark.rs index fb1b93c2b..f9db56d36 100644 --- a/mountpoint-s3/examples/prefetch_benchmark.rs +++ b/mountpoint-s3/examples/prefetch_benchmark.rs @@ -6,11 +6,14 @@ use std::time::Instant; use clap::{Arg, Command}; use futures::executor::block_on; +use mountpoint_s3::mem_limiter::MemoryLimiter; +use mountpoint_s3::object::ObjectId; use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult}; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::{ObjectClient, S3CrtClient}; use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; +use sysinfo::{RefreshKind, System}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -39,6 +42,11 @@ fn main() { .long("throughput-target-gbps") .help("Desired throughput in Gbps"), ) + .arg( + Arg::new("max-memory-target") + .long("max-memory-target") + .help("Maximum memory usage target in MiB"), + ) .arg( Arg::new("part-size") .long("part-size") @@ -63,6 +71,9 @@ fn main() { let throughput_target_gbps = matches .get_one::("throughput-target-gbps") .map(|s| s.parse::().expect("throughput target must be an f64")); + let max_memory_target = matches + .get_one::("max-memory-target") + .map(|s| s.parse::().expect("throughput target must be a u64")); let part_size = matches .get_one::("part-size") .map(|s| s.parse::().expect("part size must be a usize")); @@ -92,6 +103,15 @@ fn main() { } let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client")); + let max_memory_target = if let Some(target) = max_memory_target { + target * 1024 * 1024 + } else { + // Default to 95% of total system memory + let sys = System::new_with_specifics(RefreshKind::everything()); + (sys.total_memory() as f64 * 0.95) as u64 + }; + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target)); + let head_object_result = block_on(client.head_object(bucket, key)).expect("HeadObject failed"); let size = head_object_result.object.size; let etag = ETag::from_str(&head_object_result.object.etag).unwrap(); @@ -103,10 +123,17 @@ fn main() { let start = Instant::now(); + let object_id = ObjectId::new(key.clone(), etag.clone()); thread::scope(|scope| { for _ in 0..downloads { let received_bytes = received_bytes.clone(); - let mut request = manager.prefetch(client.clone(), bucket, key, size, etag.clone()); + let mut request = manager.prefetch( + client.clone(), + mem_limiter.clone(), + bucket.clone(), + object_id.clone(), + size, + ); scope.spawn(|| { futures::executor::block_on(async move { diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 79132536e..2df7f2c0c 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -24,6 +24,7 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup; use nix::sys::signal::Signal; use nix::unistd::ForkResult; use regex::Regex; +use sysinfo::{RefreshKind, System}; use crate::build_info; use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir}; @@ -31,6 +32,7 @@ use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLiv use crate::fuse::session::FuseSession; use crate::fuse::S3FuseFilesystem; use crate::logging::{init_logging, LoggingConfig}; +use crate::mem_limiter::MINIMUM_MEM_LIMIT; use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch}; use crate::prefix::Prefix; use crate::s3::S3Personality; @@ -156,6 +158,17 @@ pub struct CliArgs { )] pub max_threads: u64, + // This config is still unstable + #[cfg(feature = "mem_limiter")] + #[clap( + long, + help = "Maximum memory usage target [default: 95% of total system memory with a minimum of 512 MiB]", + value_name = "MiB", + value_parser = value_parser!(u64).range(512..), + help_heading = CLIENT_OPTIONS_HEADER + )] + pub max_memory_target: Option, + #[clap( long, help = "Part size for multi-part GET and PUT in bytes", @@ -775,6 +788,15 @@ where filesystem_config.s3_personality = s3_personality; filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); + let sys = System::new_with_specifics(RefreshKind::everything()); + let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64; + filesystem_config.mem_limit = default_mem_target.max(MINIMUM_MEM_LIMIT); + + #[cfg(feature = "mem_limiter")] + if let Some(max_mem_target) = args.max_memory_target { + filesystem_config.mem_limit = max_mem_target * 1024 * 1024; + } + // Written in this awkward way to force us to update it if we add new checksum types filesystem_config.use_upload_checksums = match args.upload_checksums { Some(UploadChecksums::Crc32c) | None => true, diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index c42488f6c..fc3d7457d 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -18,6 +18,8 @@ use mountpoint_s3_client::ObjectClient; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_LOOKUP_NONEXISTENT}; use crate::logging; +use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT}; +use crate::object::ObjectId; use crate::prefetch::{Prefetch, PrefetchResult}; use crate::prefix::Prefix; use crate::s3::S3Personality; @@ -151,9 +153,14 @@ where None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", lookup.inode.ino())), Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"), }; - let request = fs - .prefetcher - .prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone()); + let object_id = ObjectId::new(full_key, etag); + let request = fs.prefetcher.prefetch( + fs.client.clone(), + fs.mem_limiter.clone(), + fs.bucket.clone(), + object_id, + object_size, + ); let handle = FileHandleState::Read { handle, request }; metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0); Ok(handle) @@ -393,6 +400,8 @@ pub struct S3FilesystemConfig { pub server_side_encryption: ServerSideEncryption, /// Use additional checksums for uploads pub use_upload_checksums: bool, + /// Memory limit + pub mem_limit: u64, } impl Default for S3FilesystemConfig { @@ -413,6 +422,7 @@ impl Default for S3FilesystemConfig { s3_personality: S3Personality::default(), server_side_encryption: Default::default(), use_upload_checksums: true, + mem_limit: MINIMUM_MEM_LIMIT, } } } @@ -526,6 +536,7 @@ where { config: S3FilesystemConfig, client: Client, + mem_limiter: Arc>, superblock: Superblock, prefetcher: Prefetcher, uploader: Uploader, @@ -556,7 +567,7 @@ where s3_personality: config.s3_personality, }; let superblock = Superblock::new(bucket, prefix, superblock_config); - + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit)); let uploader = Uploader::new( client.clone(), config.storage_class.to_owned(), @@ -567,6 +578,7 @@ where Self { config, client, + mem_limiter, superblock, prefetcher, uploader, diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index 2bd553d4f..dc452c890 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -6,8 +6,9 @@ pub mod data_cache; pub mod fs; pub mod fuse; pub mod logging; +pub mod mem_limiter; pub mod metrics; -mod object; +pub mod object; pub mod prefetch; pub mod prefix; pub mod s3; diff --git a/mountpoint-s3/src/mem_limiter.rs b/mountpoint-s3/src/mem_limiter.rs new file mode 100644 index 000000000..780071184 --- /dev/null +++ b/mountpoint-s3/src/mem_limiter.rs @@ -0,0 +1,136 @@ +use std::{sync::atomic::Ordering, time::Instant}; + +use humansize::make_format; +use metrics::atomics::AtomicU64; +use mountpoint_s3_client::ObjectClient; +use tracing::debug; + +pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024; + +/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted. +/// Currently, there are two metrics we take into account: +/// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client. +/// 2) the memory reserved by S3 client if it can report. +/// +/// Single instance of this struct is shared among all of the prefetchers (file handles). +/// +/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This +/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory +/// limit. Additional reservations for a file handle arise when the backpressure read window is incremented to fetch more data +/// from underlying part streams. Those reservations may be rejected if there is no available memory. +/// +/// Release of the reserved memory happens on one of the following events: +/// 1) prefetcher is destroyed (`RequestTask` will be dropped and remaining data in the backpressure read window will be released). +/// 2) data is moved out of the part queue. +/// +/// Following is the visualisation of a single prefetcher instance's data stream: +/// +/// backwards_seek_start part_queue_start in_part_queue window_end_offset preferred_window_end_offset +/// │ │ │ │ │ +/// ─┼──────────────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-► +/// │ │ │ +/// └──────────────────────────────┤ │ +/// mem reserved by the part queue │ │ +/// └───────────────────────────────────────────────────────────┤ +/// mem reserved by the backpressure controller +/// (on `BackpressureFeedbackEvent`) +/// +#[derive(Debug)] +pub struct MemoryLimiter { + mem_limit: u64, + /// Reserved memory for data we had requested via the request task but may not + /// arrived yet. + prefetcher_mem_reserved: AtomicU64, + /// Additional reserved memory for other non-buffer usage like storing metadata + additional_mem_reserved: u64, + // We will also take client's reserved memory into account because even if the + // prefetch takes control over the entire read path but we don't record or control + // memory usage on the write path today, so we will rely on the client's stats + // for "other buffers" and adjust the prefetcher read window accordingly. + client: Client, +} + +impl MemoryLimiter { + pub fn new(client: Client, mem_limit: u64) -> Self { + let min_reserved = 128 * 1024 * 1024; + let reserved_mem = (mem_limit / 8).max(min_reserved); + let formatter = make_format(humansize::BINARY); + debug!( + "target memory usage is {} with {} reserved memory", + formatter(mem_limit), + formatter(reserved_mem) + ); + Self { + client, + mem_limit, + prefetcher_mem_reserved: AtomicU64::new(0), + additional_mem_reserved: reserved_mem, + } + } + + /// Reserve the memory for future uses. Always succeeds, even if it means going beyond + /// the configured memory limit. + pub fn reserve(&self, size: u64) { + self.prefetcher_mem_reserved.fetch_add(size, Ordering::SeqCst); + metrics::gauge!("prefetch.bytes_reserved").increment(size as f64); + } + + /// Reserve the memory for future uses. If there is not enough memory returns `false`. + pub fn try_reserve(&self, size: u64) -> bool { + let start = Instant::now(); + let mut prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + loop { + let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size); + let client_mem_allocated = self.client_mem_allocated(); + let new_total_mem_usage = new_prefetcher_mem_reserved + .saturating_add(client_mem_allocated) + .saturating_add(self.additional_mem_reserved); + if new_total_mem_usage > self.mem_limit { + debug!(new_total_mem_usage, "not enough memory to reserve"); + metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64); + return false; + } + // Check that the value we have read is still the same before updating it + match self.prefetcher_mem_reserved.compare_exchange_weak( + prefetcher_mem_reserved, + new_prefetcher_mem_reserved, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + metrics::gauge!("prefetch.bytes_reserved").increment(size as f64); + metrics::histogram!("prefetch.mem_reserve_latency_us").record(start.elapsed().as_micros() as f64); + return true; + } + Err(current) => prefetcher_mem_reserved = current, // another thread updated the atomic before us, trying again + } + } + } + + /// Release the reserved memory. + pub fn release(&self, size: u64) { + self.prefetcher_mem_reserved.fetch_sub(size, Ordering::SeqCst); + metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64); + } + + /// Query available memory tracked by the memory limiter. + pub fn available_mem(&self) -> u64 { + let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + let client_mem_allocated = self.client_mem_allocated(); + self.mem_limit + .saturating_sub(prefetcher_mem_reserved) + .saturating_sub(client_mem_allocated) + .saturating_sub(self.additional_mem_reserved) + } + + // Get allocated memory for the client. Currently, only the CRT client is able to report its buffer pool stats. + // The CRT allocates memory in two areas. The first one is primary storage where memory is allocated in blocks + // and we can get number of allocated bytes from `primary_allocated` stat. Another area is called secondary storage + // where memory is allocated exactly equal to the used memory. So total allocated memory for the CRT client would + // be `primary_allocated` + `secondary_used`. + fn client_mem_allocated(&self) -> u64 { + self.client + .mem_usage_stats() + .map_or(0, |stats| stats.primary_allocated.saturating_add(stats.secondary_used)) + } +} diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 1dea5df9b..d6aac7fa7 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -46,15 +46,15 @@ use async_trait::async_trait; use futures::task::Spawn; use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; -use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; use part::PartOperationError; use part_stream::RequestTaskConfig; use thiserror::Error; -use tracing::trace; +use tracing::{debug, trace}; use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::data_cache::DataCache; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::caching_stream::CachingPartStream; use crate::prefetch::part_stream::{ClientPartStream, ObjectPartStream, RequestRange}; @@ -70,10 +70,10 @@ pub trait Prefetch { fn prefetch( &self, client: Client, - bucket: &str, - key: &str, + mem_limiter: Arc>, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self::PrefetchResult where Client: ObjectClient + Clone + Send + Sync + 'static; @@ -164,7 +164,7 @@ impl Default for PrefetcherConfig { fn default() -> Self { Self { max_read_window_size: determine_max_read_size(), - sequential_prefetch_multiplier: 8, + sequential_prefetch_multiplier: 2, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also @@ -239,15 +239,23 @@ where fn prefetch( &self, client: Client, - bucket: &str, - key: &str, + mem_limiter: Arc>, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self::PrefetchResult where Client: ObjectClient + Clone + Send + Sync + 'static, { - PrefetchGetObject::new(client, self.part_stream.clone(), self.config, bucket, key, size, etag) + PrefetchGetObject::new( + client, + self.part_stream.clone(), + mem_limiter, + self.config, + bucket, + object_id, + size, + ) } } @@ -257,8 +265,9 @@ where pub struct PrefetchGetObject { client: Client, part_stream: Arc, + mem_limiter: Arc>, config: PrefetcherConfig, - backpressure_task: Option>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -310,15 +319,16 @@ where fn new( client: Client, part_stream: Arc, + mem_limiter: Arc>, config: PrefetcherConfig, - bucket: &str, - key: &str, + bucket: String, + object_id: ObjectId, size: u64, - etag: ETag, ) -> Self { PrefetchGetObject { client, part_stream, + mem_limiter, config, backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), @@ -326,8 +336,8 @@ where sequential_read_start_offset: 0, next_sequential_read_offset: 0, next_request_offset: 0, - bucket: bucket.to_owned(), - object_id: ObjectId::new(key.to_owned(), etag), + bucket, + object_id, size, } } @@ -359,7 +369,7 @@ where if self.try_seek(offset).await? { trace!("seek succeeded"); } else { - trace!( + debug!( expected = self.next_sequential_read_offset, actual = offset, "out-of-order read, resetting prefetch" @@ -410,9 +420,10 @@ where /// We will be using flow-control window to control how much data we want to download into the prefetcher. fn spawn_read_backpressure_request( &mut self, - ) -> Result, PrefetchReadError> { + ) -> Result, PrefetchReadError> { let start = self.next_sequential_read_offset; let object_size = self.size as usize; + let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024); let range = RequestRange::new(object_size, start, object_size); // The prefetcher now relies on backpressure mechanism so it must be enabled @@ -431,12 +442,15 @@ where bucket: self.bucket.clone(), object_id: self.object_id.clone(), range, + read_part_size, preferred_part_size: self.preferred_part_size, initial_read_window_size, max_read_window_size: self.config.max_read_window_size, read_window_size_multiplier: self.config.sequential_prefetch_multiplier, }; - Ok(self.part_stream.spawn_get_object_request(&self.client, config)) + Ok(self + .part_stream + .spawn_get_object_request(&self.client, config, self.mem_limiter.clone())) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. @@ -514,6 +528,10 @@ where trace!("seek failed: not enough data in backwards seek window"); return Ok(false); }; + // This also increase `prefetcher_mem_reserved` value in memory limiter. + // At least one subsequent `RequestTask::read` is required for memory tracking to work correctly + // because `BackpressureController::drop` needs to know the start offset of the part queue to + // release the right amount of memory. task.push_front(parts).await?; self.next_sequential_read_offset = offset; Ok(true) @@ -542,6 +560,7 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; + use crate::mem_limiter::MINIMUM_MEM_LIMIT; use super::caching_stream::CachingPartStream; use super::*; @@ -549,6 +568,7 @@ mod tests { use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; + use mountpoint_s3_client::types::ETag; use proptest::proptest; use proptest::strategy::{Just, Strategy}; use proptest_derive::Arbitrary; @@ -600,6 +620,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests()); let etag = object.etag(); @@ -614,7 +635,8 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch(client, mem_limiter.into(), "test-bucket".to_owned(), object_id, size); let mut next_offset = 0; loop { @@ -692,7 +714,8 @@ mod tests { ) where Stream: ObjectPartStream + Send + Sync + 'static, { - let client = MockClient::new(client_config); + let client = Arc::new(MockClient::new(client_config)); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let read_size = 1 * MB; let object_size = 8 * MB; let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); @@ -705,7 +728,14 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size as u64, + ); let result = block_on(request.read(0, read_size)); assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); } @@ -785,7 +815,14 @@ mod tests { client.add_object("hello", object); - let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); + let client = Arc::new(countdown_failure_client( + client, + get_failures, + HashMap::new(), + HashMap::new(), + HashMap::new(), + )); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let prefetcher_config = PrefetcherConfig { max_read_window_size: test_config.max_read_window_size, @@ -794,7 +831,8 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch(client, mem_limiter.into(), "test-bucket".to_owned(), object_id, size); let mut next_offset = 0; loop { @@ -909,6 +947,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let etag = object.etag(); @@ -923,7 +962,14 @@ mod tests { }; let prefetcher = Prefetcher::new(part_stream, prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, etag); + let object_id = ObjectId::new("hello".to_owned(), etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); for (offset, length) in reads { assert!(offset < object_size); @@ -1085,10 +1131,19 @@ mod tests { HashMap::new(), HashMap::new(), )); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let prefetcher = Prefetcher::new(default_stream(), Default::default()); + let mem_limiter = Arc::new(mem_limiter); block_on(async { - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client, + mem_limiter, + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); // The first read should trigger the prefetcher to try and get the whole object (in 2 parts). _ = request.read(0, 1).await.expect("first read should succeed"); @@ -1129,6 +1184,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1138,8 +1194,14 @@ mod tests { // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { - let mut request = - prefetcher.prefetch(client.clone(), "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client.clone(), + mem_limiter.clone(), + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); if first_read_size > 0 { let _first_read = block_on(request.read(0, first_read_size)).unwrap(); } @@ -1164,6 +1226,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1173,8 +1236,14 @@ mod tests { // Try every possible seek from first_read_size for offset in 0..first_read_size { - let mut request = - prefetcher.prefetch(client.clone(), "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); + let object_id = ObjectId::new("hello".to_owned(), etag.clone()); + let mut request = prefetcher.prefetch( + client.clone(), + mem_limiter.clone(), + "test-bucket".to_owned(), + object_id, + OBJECT_SIZE as u64, + ); if first_read_size > 0 { let _first_read = block_on(request.read(0, first_read_size)).unwrap(); } @@ -1219,6 +1288,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1233,7 +1303,14 @@ mod tests { }; let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); + let object_id = ObjectId::new("hello".to_owned(), file_etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); let mut next_offset = 0; loop { @@ -1277,6 +1354,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); + let mem_limiter = MemoryLimiter::new(client.clone(), MINIMUM_MEM_LIMIT); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1291,7 +1369,14 @@ mod tests { }; let prefetcher = Prefetcher::new(ClientPartStream::new(ShuttleRuntime), prefetcher_config); - let mut request = prefetcher.prefetch(client, "test-bucket", "hello", object_size, file_etag); + let object_id = ObjectId::new("hello".to_owned(), file_etag); + let mut request = prefetcher.prefetch( + client, + mem_limiter.into(), + "test-bucket".to_owned(), + object_id, + object_size, + ); let num_reads = rng.gen_range(10usize..50); for _ in 0..num_reads { diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 269bf459d..86db54031 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -1,7 +1,12 @@ use std::ops::Range; +use std::sync::Arc; use async_channel::{unbounded, Receiver, Sender}; -use tracing::trace; +use humansize::make_format; +use mountpoint_s3_client::ObjectClient; +use tracing::{debug, trace}; + +use crate::mem_limiter::MemoryLimiter; use super::PrefetchReadError; @@ -16,6 +21,8 @@ pub enum BackpressureFeedbackEvent { pub struct BackpressureConfig { /// Backpressure's initial read window size pub initial_read_window_size: usize, + /// Minimum read window size that the backpressure controller is allowed to scale down to + pub min_read_window_size: usize, /// Maximum read window size that the backpressure controller is allowed to scale up to pub max_read_window_size: usize, /// Factor to increase the read window size by when the part queue is stalled @@ -25,17 +32,22 @@ pub struct BackpressureConfig { } #[derive(Debug)] -pub struct BackpressureController { +pub struct BackpressureController { read_window_updater: Sender, preferred_read_window_size: usize, + min_read_window_size: usize, max_read_window_size: usize, read_window_size_multiplier: usize, /// Upper bound of the current read window. The request can return data up to this /// offset *exclusively*. This value must be advanced to continue fetching new data. read_window_end_offset: u64, + /// Next offset of the data to be read. It is used for tracking how many bytes of + /// data has been read out of the stream. + next_read_offset: u64, /// End offset for the request we want to apply backpressure. The request can return /// data up to this offset *exclusively*. request_end_offset: u64, + mem_limiter: Arc>, } #[derive(Debug)] @@ -61,16 +73,25 @@ pub struct BackpressureLimiter { /// [BackpressureController] will be given to the consumer side of the object stream. /// It can be used anywhere to set preferred read window size for the stream and tell the /// producer when its read window should be increased. -pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureController, BackpressureLimiter) { +pub fn new_backpressure_controller( + config: BackpressureConfig, + mem_limiter: Arc>, +) -> (BackpressureController, BackpressureLimiter) { + // Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1. + const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2; let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; let (read_window_updater, read_window_incrementing_queue) = unbounded(); + mem_limiter.reserve(config.initial_read_window_size as u64); let controller = BackpressureController { read_window_updater, preferred_read_window_size: config.initial_read_window_size, + min_read_window_size: config.min_read_window_size, max_read_window_size: config.max_read_window_size, - read_window_size_multiplier: config.read_window_size_multiplier, + read_window_size_multiplier: config.read_window_size_multiplier.max(MIN_WINDOW_SIZE_MULTIPLIER), read_window_end_offset, + next_read_offset: config.request_range.start, request_end_offset: config.request_range.end, + mem_limiter, }; let limiter = BackpressureLimiter { read_window_incrementing_queue, @@ -80,7 +101,7 @@ pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureC (controller, limiter) } -impl BackpressureController { +impl BackpressureController { pub fn read_window_end_offset(&self) -> u64 { self.read_window_end_offset } @@ -91,58 +112,124 @@ impl BackpressureController { match event { // Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending BackpressureFeedbackEvent::DataRead { offset, length } => { - let next_read_offset = offset + length as u64; - let remaining_window = self.read_window_end_offset.saturating_sub(next_read_offset) as usize; + self.next_read_offset = offset + length as u64; + self.mem_limiter.release(length as u64); + let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize; + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. - if remaining_window < (self.preferred_read_window_size / 2) + // When memory is low the `preferred_read_window_size` will be scaled down so we have to keep trying + // until we have enough read window. + while remaining_window < (self.preferred_read_window_size / 2) && self.read_window_end_offset < self.request_end_offset { - let new_read_window_end_offset = next_read_offset + let new_read_window_end_offset = self + .next_read_offset .saturating_add(self.preferred_read_window_size as u64) .min(self.request_end_offset); - debug_assert!(self.read_window_end_offset < new_read_window_end_offset); + // We can skip if the new `read_window_end_offset` is less than or equal to the current one, this + // could happen after the read window is scaled down. + if new_read_window_end_offset <= self.read_window_end_offset { + break; + } let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; - trace!( - preferred_read_window_size = self.preferred_read_window_size, - next_read_offset, - read_window_end_offset = self.read_window_end_offset, - to_increase, - "incrementing read window" - ); - self.increment_read_window(to_increase).await; - self.read_window_end_offset = new_read_window_end_offset; + + // Force incrementing read window regardless of available memory when we are already at minimum + // read window size. + if self.preferred_read_window_size <= self.min_read_window_size { + self.mem_limiter.reserve(to_increase as u64); + self.increment_read_window(to_increase).await; + break; + } + + // Try to reserve the memory for the length we want to increase before sending the request, + // scale down the read window if it fails. + if self.mem_limiter.try_reserve(to_increase as u64) { + self.increment_read_window(to_increase).await; + break; + } else { + self.scale_down(); + } } } - BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), + BackpressureFeedbackEvent::PartQueueStall => self.scale_up(), } Ok(()) } // Send an increment read window request to the stream producer - async fn increment_read_window(&self, len: usize) { + async fn increment_read_window(&mut self, len: usize) { + trace!( + preferred_read_window_size = self.preferred_read_window_size, + next_read_offset = self.next_read_offset, + read_window_end_offset = self.read_window_end_offset, + len, + "incrementing read window" + ); + // This should not block since the channel is unbounded let _ = self .read_window_updater .send(len) .await .inspect_err(|_| trace!("read window incrementing queue is already closed")); + self.read_window_end_offset += len as u64; } - // Try scaling up preferred read window size with a multiplier configured at initialization. - fn try_scaling_up(&mut self) { + // Scale up preferred read window size with a multiplier configured at initialization. + // Scaling up fails silently if there is no enough free memory to perform it. + fn scale_up(&mut self) { if self.preferred_read_window_size < self.max_read_window_size { - let new_read_window_size = - (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); - trace!( - current_size = self.preferred_read_window_size, - new_size = new_read_window_size, - "scaling up preferred read window" + let new_read_window_size = (self.preferred_read_window_size * self.read_window_size_multiplier) + .max(self.min_read_window_size) + .min(self.max_read_window_size); + // Only scale up when there is enough memory. We don't have to reserve the memory here + // because only `preferred_read_window_size` is increased but the actual read window will + // be updated later on `DataRead` event (where we do reserve memory). + let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64; + let available_mem = self.mem_limiter.available_mem(); + if available_mem >= to_increase { + let formatter = make_format(humansize::BINARY); + debug!( + current_size = formatter(self.preferred_read_window_size), + new_size = formatter(new_read_window_size), + "scaled up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + metrics::histogram!("prefetch.window_after_increase_mib") + .record((self.preferred_read_window_size / 1024 / 1024) as f64); + } + } + } + + // Scale down preferred read window size by a multiplier configured at initialization. + fn scale_down(&mut self) { + if self.preferred_read_window_size > self.min_read_window_size { + assert!(self.read_window_size_multiplier > 1); + let new_read_window_size = (self.preferred_read_window_size / self.read_window_size_multiplier) + .max(self.min_read_window_size) + .min(self.max_read_window_size); + let formatter = make_format(humansize::BINARY); + debug!( + current_size = formatter(self.preferred_read_window_size), + new_size = formatter(new_read_window_size), + "scaling down read window" ); self.preferred_read_window_size = new_read_window_size; + metrics::histogram!("prefetch.window_after_decrease_mib") + .record((self.preferred_read_window_size / 1024 / 1024) as f64); } } } +impl Drop for BackpressureController { + fn drop(&mut self) { + // Free up memory we have reserved for the read window. + debug_assert!(self.request_end_offset >= self.next_read_offset); + let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset); + self.mem_limiter.release(remaining_window); + } +} + impl BackpressureLimiter { pub fn read_window_end_offset(&self) -> u64 { self.read_window_end_offset @@ -184,3 +271,88 @@ impl BackpressureLimiter { Ok(Some(self.read_window_end_offset)) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; + use test_case::test_case; + + use crate::mem_limiter::MemoryLimiter; + + use super::{new_backpressure_controller, BackpressureConfig, BackpressureController, BackpressureLimiter}; + + #[test_case(1024 * 1024 + 128 * 1024, 2)] // real config + #[test_case(3 * 1024 * 1024, 4)] + #[test_case(8 * 1024 * 1024, 8)] + #[test_case(2 * 1024 * 1024 * 1024, 2)] + fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) { + let request_range = 0..(5 * 1024 * 1024 * 1024); + let backpressure_config = BackpressureConfig { + initial_read_window_size, + min_read_window_size: 8 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, + read_window_size_multiplier, + request_range, + }; + + let (mut backpressure_controller, _backpressure_limiter) = + new_backpressure_controller_for_test(backpressure_config); + while backpressure_controller.preferred_read_window_size < backpressure_controller.max_read_window_size { + backpressure_controller.scale_up(); + assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size); + assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size); + } + assert_eq!( + backpressure_controller.preferred_read_window_size, backpressure_controller.max_read_window_size, + "should have scaled up to max read window size" + ); + } + + #[test_case(2 * 1024 * 1024 * 1024, 2)] + #[test_case(15 * 1024 * 1024 * 1024, 2)] + #[test_case(2 * 1024 * 1024 * 1024, 8)] + #[test_case(8 * 1024 * 1024, 8)] + fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) { + let request_range = 0..(5 * 1024 * 1024 * 1024); + let backpressure_config = BackpressureConfig { + initial_read_window_size, + min_read_window_size: 8 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, + read_window_size_multiplier, + request_range, + }; + + let (mut backpressure_controller, _backpressure_limiter) = + new_backpressure_controller_for_test(backpressure_config); + while backpressure_controller.preferred_read_window_size > backpressure_controller.min_read_window_size { + backpressure_controller.scale_down(); + assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size); + assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size); + } + assert_eq!( + backpressure_controller.preferred_read_window_size, backpressure_controller.min_read_window_size, + "should have scaled down to min read window size" + ); + } + + fn new_backpressure_controller_for_test( + backpressure_config: BackpressureConfig, + ) -> (BackpressureController, BackpressureLimiter) { + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: 8 * 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: backpressure_config.initial_read_window_size, + ..Default::default() + }; + + let client = MockClient::new(config); + let mem_limiter = Arc::new(MemoryLimiter::new( + client, + backpressure_config.max_read_window_size as u64, + )); + new_backpressure_controller(backpressure_config, mem_limiter.clone()) + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 38caf3c4a..7d7d044c8 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -9,6 +9,7 @@ use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig, BackpressureLimiter}; use crate::prefetch::part::Part; @@ -45,7 +46,8 @@ where &self, client: &Client, config: RequestTaskConfig, - ) -> RequestTask<::ClientError> + mem_limiter: Arc>, + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { @@ -53,12 +55,14 @@ where let backpressure_config = BackpressureConfig { initial_read_window_size: config.initial_read_window_size, + min_read_window_size: config.read_part_size, max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, request_range: range.into(), }; - let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); - let (part_queue, part_queue_producer) = unbounded_part_queue(); + let (backpressure_controller, backpressure_limiter) = + new_backpressure_controller(backpressure_config, mem_limiter.clone()); + let (part_queue, part_queue_producer) = unbounded_part_queue(mem_limiter); trace!(?range, "spawning request"); let request_task = { @@ -387,7 +391,11 @@ mod tests { }; use test_case::test_case; - use crate::{data_cache::InMemoryDataCache, object::ObjectId}; + use crate::{ + data_cache::InMemoryDataCache, + mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT}, + object::ObjectId, + }; use super::*; @@ -428,6 +436,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -443,12 +452,13 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(&mock_client, config, mem_limiter.clone()); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -468,12 +478,13 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(&mock_client, config, mem_limiter.clone()); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -506,6 +517,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), MINIMUM_MEM_LIMIT)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -517,22 +529,19 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range: RequestRange::new(object_size, offset as u64, preferred_size), + read_part_size: client_part_size, preferred_part_size: 256 * KB, initial_read_window_size, max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(&mock_client, config); + let request_task = stream.spawn_get_object_request(&mock_client, config, mem_limiter.clone()); compare_read(&id, &object, request_task); } } } - fn compare_read( - id: &ObjectId, - object: &MockObject, - mut request_task: RequestTask, - ) { + fn compare_read(id: &ObjectId, object: &MockObject, mut request_task: RequestTask) { let mut offset = request_task.start_offset(); let mut remaining = request_task.total_size(); while remaining > 0 { diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 08778666a..3f8b2e16d 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,7 +1,9 @@ use std::time::Instant; +use mountpoint_s3_client::ObjectClient; use tracing::trace; +use crate::mem_limiter::MemoryLimiter; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; @@ -11,15 +13,16 @@ use crate::sync::Arc; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] -pub struct PartQueue { +pub struct PartQueue { /// The auxiliary queue that supports pushing parts to the front of the part queue in order to /// allow partial reads and backwards seeks. front_queue: Vec, /// The main queue that receives parts from [super::ObjectPartStream] - receiver: Receiver>>, + receiver: Receiver>>, failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, + mem_limiter: Arc>, } /// Producer side of the queue of [Part]s. @@ -31,7 +34,9 @@ pub struct PartQueueProducer { } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. -pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { +pub fn unbounded_part_queue( + mem_limiter: Arc>, +) -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { @@ -39,6 +44,7 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP receiver, failed: false, bytes_received: Arc::clone(&bytes_counter), + mem_limiter, }; let part_queue_producer = PartQueueProducer { sender, @@ -47,14 +53,14 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP (part_queue, part_queue_producer) } -impl PartQueue { +impl PartQueue { /// Read up to `length` bytes from the queue at the current offset. This function always returns /// a contiguous [Bytes], and so may return fewer than `length` bytes if it would need to copy /// or reallocate to make the return value contiguous. This function blocks only if the queue is /// empty. /// /// If this method returns an Err, the PartQueue must never be accessed again. - pub async fn read(&mut self, length: usize) -> Result> { + pub async fn read(&mut self, length: usize) -> Result> { assert!(!self.failed, "cannot use a PartQueue after failure"); // Read from the auxiliary queue first if it's not empty @@ -92,10 +98,13 @@ impl PartQueue { } /// Push a new [Part] onto the front of the queue - pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError> { + pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError> { assert!(!self.failed, "cannot use a PartQueue after failure"); metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); + // The backpressure controller is not aware of the parts from backwards seek, + // so we have to reserve memory for them here. + self.mem_limiter.reserve(part.len() as u64); self.front_queue.push(part); Ok(()) } @@ -121,7 +130,7 @@ impl PartQueueProducer { } } -impl Drop for PartQueue { +impl Drop for PartQueue { fn drop(&mut self) { // close the channel and drain remaining parts from the main queue self.receiver.close(); @@ -142,16 +151,17 @@ impl Drop for PartQueue { #[cfg(test)] mod tests { use crate::checksums::ChecksummedBytes; + use crate::mem_limiter::MINIMUM_MEM_LIMIT; use crate::object::ObjectId; use super::*; use bytes::Bytes; use futures::executor::block_on; + use mountpoint_s3_client::mock_client::MockClient; use mountpoint_s3_client::types::ETag; use proptest::proptest; use proptest_derive::Arbitrary; - use thiserror::Error; #[derive(Debug, Arbitrary)] enum Op { @@ -160,12 +170,11 @@ mod tests { Push(#[proptest(strategy = "1usize..8192")] usize), } - #[derive(Debug, Error)] - enum DummyError {} - async fn run_test(ops: Vec) { + let client = MockClient::new(Default::default()); + let mem_limiter = MemoryLimiter::new(client, MINIMUM_MEM_LIMIT); let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (mut part_queue, part_queue_producer) = unbounded_part_queue::(); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(mem_limiter.into()); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index a01b03668..b5eb32e01 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -4,10 +4,12 @@ use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, Stream, StreamExt}; use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use std::marker::{Send, Sync}; +use std::sync::Arc; use std::{fmt::Debug, ops::Range}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; +use crate::mem_limiter::MemoryLimiter; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig}; use crate::prefetch::part::Part; @@ -26,7 +28,8 @@ pub trait ObjectPartStream { &self, client: &Client, config: RequestTaskConfig, - ) -> RequestTask + mem_limiter: Arc>, + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static; } @@ -37,6 +40,7 @@ pub struct RequestTaskConfig { pub bucket: String, pub object_id: ObjectId, pub range: RequestRange, + pub read_part_size: usize, pub preferred_part_size: usize, pub initial_read_window_size: usize, pub max_read_window_size: usize, @@ -181,7 +185,8 @@ where &self, client: &Client, config: RequestTaskConfig, - ) -> RequestTask + mem_limiter: Arc>, + ) -> RequestTask where Client: ObjectClient + Clone + Send + Sync + 'static, { @@ -191,12 +196,16 @@ where let backpressure_config = BackpressureConfig { initial_read_window_size: config.initial_read_window_size, + // We don't want to completely block the stream so let's use + // the read part size as minimum read window. + min_read_window_size: config.read_part_size, max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, request_range: range.into(), }; - let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config); - let (part_queue, part_queue_producer) = unbounded_part_queue(); + let (backpressure_controller, mut backpressure_limiter) = + new_backpressure_controller(backpressure_config, mem_limiter.clone()); + let (part_queue, part_queue_producer) = unbounded_part_queue(mem_limiter); trace!(?range, "spawning request"); let span = debug_span!("prefetch", ?range); @@ -236,7 +245,10 @@ struct ClientPartComposer { preferred_part_size: usize, } -impl ClientPartComposer { +impl ClientPartComposer +where + E: std::error::Error + Send + Sync, +{ async fn try_compose_parts(&self, request_stream: impl Stream>) { if let Err(e) = self.compose_parts(request_stream).await { trace!(error=?e, "part stream task failed"); @@ -374,6 +386,13 @@ fn read_from_request<'a, Client: ObjectClient + 'a>( if next_offset == request_range.end { break; } + + // The CRT could return data more than what we have requested in the read window + // which means unaccounted memory, so we want to record them here. + let excess_bytes = next_offset.saturating_sub(backpressure_limiter.read_window_end_offset()); + if excess_bytes > 0 { + metrics::histogram!("s3.client.read_window_excess_bytes").record(excess_bytes as f64); + } // Blocks if read window increment if it's not enough to read the next offset if let Some(next_read_window_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? { let diff = next_read_window_offset.saturating_sub(request.as_ref().read_window_end_offset()) as usize; diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index cd858d7bc..9ff2718ee 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,4 +1,5 @@ use futures::future::RemoteHandle; +use mountpoint_s3_client::ObjectClient; use crate::prefetch::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall}; use crate::prefetch::part::Part; @@ -10,22 +11,22 @@ use super::part_stream::RequestRange; /// A single GetObject request submitted to the S3 client #[derive(Debug)] -pub struct RequestTask { +pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) _task_handle: RemoteHandle<()>, remaining: usize, range: RequestRange, - part_queue: PartQueue, - backpressure_controller: BackpressureController, + part_queue: PartQueue, + backpressure_controller: BackpressureController, } -impl RequestTask { +impl RequestTask { pub fn from_handle( task_handle: RemoteHandle<()>, range: RequestRange, - part_queue: PartQueue, - backpressure_controller: BackpressureController, + part_queue: PartQueue, + backpressure_controller: BackpressureController, ) -> Self { Self { _task_handle: task_handle, @@ -37,7 +38,7 @@ impl RequestTask { } // Push a given list of parts in front of the part queue - pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { + pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { // Iterate backwards to push each part to the front of the part queue for part in parts.into_iter().rev() { self.remaining += part.len(); @@ -46,7 +47,7 @@ impl RequestTask { Ok(()) } - pub async fn read(&mut self, length: usize) -> Result> { + pub async fn read(&mut self, length: usize) -> Result> { let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len();