diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index b7d78a25d..bbc5cf254 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -16,8 +16,8 @@ use pin_project::pin_project; use crate::object_client::{ DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, - PutObjectResult, UploadReview, + MemoryUsageStats, ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, + PutObjectRequest, PutObjectResult, UploadReview, }; use crate::ObjectClient; @@ -85,6 +85,10 @@ where self.client.initial_read_window_size() } + fn mem_usage_stats(&self) -> Option { + self.client.mem_usage_stats() + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index b663a9463..75d8d47e3 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -26,9 +26,9 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; use crate::object_client::{ Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, - HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, - ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, - PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, + HeadObjectResult, ListObjectsError, ListObjectsResult, MemoryUsageStats, ObjectAttribute, ObjectClient, + ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, + PutObjectResult, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, }; mod leaky_bucket; @@ -572,6 +572,10 @@ impl ObjectClient for MockClient { } } + fn mem_usage_stats(&self) -> Option { + None + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index 065791069..610f81ef4 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -13,7 +13,7 @@ use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObje use crate::object_client::{ DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, + MemoryUsageStats, ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, }; use crate::types::ETag; @@ -113,6 +113,10 @@ impl ObjectClient for ThroughputMockClient { self.inner.initial_read_window_size() } + fn mem_usage_stats(&self) -> Option { + self.inner.mem_usage_stats() + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index b92bf796c..22b10d9f0 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -63,6 +63,16 @@ impl FromStr for ETag { } } +/// Memory usage stats for the client +pub struct MemoryUsageStats { + /// Reserved memory for the client. For [S3CrtClient], this value is a sum of primary storage + /// and secondary storage reserved memory. + pub mem_reserved: u64, + /// Actual used memory for the client. For [S3CrtClient], this value is a sum of primanry + /// storage and secondary storage used memory. + pub mem_used: u64, +} + /// A generic interface to S3-like object storage services. /// /// This trait defines the common methods that all object services implement. @@ -89,6 +99,10 @@ pub trait ObjectClient { /// This can be `None` if backpressure is disabled. fn initial_read_window_size(&self) -> Option; + /// Query current memory usage stats for the client. This can be `None` if the client + /// does not record the stats. + fn mem_usage_stats(&self) -> Option; + /// Delete a single object from the object store. /// /// DeleteObject will succeed even if the object within the bucket does not exist. diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index 0568c0870..93bdabff7 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -1207,6 +1207,13 @@ impl ObjectClient for S3CrtClient { } } + fn mem_usage_stats(&self) -> Option { + let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats(); + let mem_reserved = crt_buffer_pool_stats.primary_reserved + crt_buffer_pool_stats.secondary_reserved; + let mem_used = crt_buffer_pool_stats.primary_used + crt_buffer_pool_stats.secondary_used; + Some(MemoryUsageStats { mem_reserved, mem_used }) + } + async fn delete_object( &self, bucket: &str, diff --git a/mountpoint-s3/examples/prefetch_benchmark.rs b/mountpoint-s3/examples/prefetch_benchmark.rs index 77397d194..b220c3a0c 100644 --- a/mountpoint-s3/examples/prefetch_benchmark.rs +++ b/mountpoint-s3/examples/prefetch_benchmark.rs @@ -79,7 +79,7 @@ fn main() { config = config.part_size(part_size); } let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client")); - let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); for i in 0..iterations.unwrap_or(1) { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 66a601a83..add44066c 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -402,14 +402,12 @@ impl CliArgs { let mut filter = if self.debug { String::from("debug") } else { - String::from("info") + String::from("warn") }; let crt_verbosity = if self.debug_crt { "debug" } else { "off" }; filter.push_str(&format!(",{}={}", AWSCRT_LOG_TARGET, crt_verbosity)); if self.log_metrics { filter.push_str(&format!(",{}=info", metrics::TARGET_NAME)); - } else { - filter.push_str(&format!(",{}=off", metrics::TARGET_NAME)); } filter }; diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 61e316e71..bc810d3a5 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -536,7 +536,7 @@ where { config: S3FilesystemConfig, client: Arc, - mem_limiter: Arc, + mem_limiter: Arc>, superblock: Superblock, prefetcher: Prefetcher, uploader: Uploader, @@ -569,7 +569,7 @@ where let superblock = Superblock::new(bucket, prefix, superblock_config); let client = Arc::new(client); - let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit)); let uploader = Uploader::new( client.clone(), diff --git a/mountpoint-s3/src/mem_limiter.rs b/mountpoint-s3/src/mem_limiter.rs index bd1810fcd..70d1aaeed 100644 --- a/mountpoint-s3/src/mem_limiter.rs +++ b/mountpoint-s3/src/mem_limiter.rs @@ -1,48 +1,55 @@ -use std::sync::atomic::Ordering; +use std::sync::{atomic::Ordering, Arc}; use humansize::make_format; use metrics::atomics::AtomicU64; +use mountpoint_s3_client::ObjectClient; use tracing::debug; /// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted. -/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or -/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles). +/// Currently, there are two metrics we take into account: +/// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client. +/// 2) the memory reserved by S3 client if it can report. +/// +/// Single instance of this struct is shared among all of the prefetchers (file handles). /// /// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This /// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory -/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the -/// client (PartQueueStall). Those reservations may be rejected if there is no available memory. +/// limit. Additional reservations for a file handle arise when the backpressure read window is incremented to fetch more data +/// from underlying part streams. Those reservations may be rejected if there is no available memory. /// /// Release of the reserved memory happens on one of the following events: -/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release) -/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed) -/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed. +/// 1) prefetcher is destroyed (`RequestTask` will be dropped and remaining data in the backpressure read window will be released). +/// 2) data is moved out of the part queue. /// /// Following is the visualisation of a single prefetcher instance's data stream: /// -/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset -/// │ │ │ │ │ -/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-► -/// │ ├───────────────────────────┤ │ │ -/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │ -/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤ -/// │ (usage may be less than reserved) will be used after the │ -/// │ window increase │ -/// └────────────────────────────────────────────────────────────────────────────────────────┘ -/// preferred_read_window_size (reserved in MemoryLimiter) +/// backwards_seek_start part_queue_start in_part_queue window_end_offset preferred_window_end_offset +/// │ │ │ │ │ +/// ─┼──────────────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-► +/// │ │ │ +/// └──────────────────────────────┤ │ +/// mem reserved by the part queue │ │ +/// └───────────────────────────────────────────────────────────┤ +/// mem reserved by the backpressure controller +/// (on `BackpressureFeedbackEvent`) /// #[derive(Debug)] -pub struct MemoryLimiter { +pub struct MemoryLimiter { mem_limit: u64, /// Reserved memory for data we had requested via the request task but may not /// arrived yet. prefetcher_mem_reserved: AtomicU64, /// Additional reserved memory for other non-buffer usage like storing metadata additional_mem_reserved: u64, + // We will also take client's reserved memory into account because even if the + // prefetch takes control over the entire read path but we don't record or control + // memory usage on the write path today, so we will rely on the client's stats + // for "other buffers" and adjust the prefetcher read window accordingly. + client: Arc, } -impl MemoryLimiter { - pub fn new(mem_limit: u64) -> Self { +impl MemoryLimiter { + pub fn new(client: Arc, mem_limit: u64) -> Self { let min_reserved = 128 * 1024 * 1024; let reserved_mem = (mem_limit / 8).max(min_reserved); let formatter = make_format(humansize::BINARY); @@ -52,6 +59,7 @@ impl MemoryLimiter { formatter(reserved_mem) ); Self { + client, mem_limit, prefetcher_mem_reserved: AtomicU64::new(0), additional_mem_reserved: reserved_mem, @@ -66,19 +74,19 @@ impl MemoryLimiter { } /// Reserve the memory for future uses. If there is not enough memory returns `false`. - pub fn try_reserve(&self, size: u64, min_available: u64) -> bool { + pub fn try_reserve(&self, size: u64) -> bool { + let mut prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); loop { - let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size); - let total_mem_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved); - let new_total_mem_usage = new_prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved); - if new_total_mem_usage > self.mem_limit - min_available { - debug!( - "not enough memory to reserve, current usage: {}, new (if scaled up): {}, allowed diff: {}", - total_mem_usage, new_total_mem_usage, min_available, - ); + let client_mem_reserved = self.client_mem_reserved(); + let new_total_mem_usage = new_prefetcher_mem_reserved + .saturating_add(client_mem_reserved) + .saturating_add(self.additional_mem_reserved); + if new_total_mem_usage > self.mem_limit { + debug!(new_total_mem_usage, "not enough memory to reserve"); return false; } + // Check that the value we have read is still the same before updating it match self.prefetcher_mem_reserved.compare_exchange_weak( prefetcher_mem_reserved, new_prefetcher_mem_reserved, @@ -89,7 +97,7 @@ impl MemoryLimiter { metrics::gauge!("prefetch.bytes_reserved").increment(size as f64); return true; } - Err(_) => continue, // another thread updated the atomic before us, trying again + Err(current) => prefetcher_mem_reserved = current, // another thread updated the atomic before us, trying again } } } @@ -100,10 +108,17 @@ impl MemoryLimiter { metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64); } + /// Query available memory tracked by the memory limiter. pub fn available_mem(&self) -> u64 { - let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst); + let client_mem_reserved = self.client_mem_reserved(); self.mem_limit - .saturating_sub(fs_mem_usage) + .saturating_sub(prefetcher_mem_reserved) + .saturating_sub(client_mem_reserved) .saturating_sub(self.additional_mem_reserved) } + + fn client_mem_reserved(&self) -> u64 { + self.client.mem_usage_stats().map_or(0, |stats| stats.mem_reserved) + } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 4e1866bad..9dd89abfb 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -70,7 +70,7 @@ pub trait Prefetch { fn prefetch( &self, client: Arc, - mem_limiter: Arc, + mem_limiter: Arc>, bucket: String, object_id: ObjectId, size: u64, @@ -203,7 +203,7 @@ where fn prefetch( &self, client: Arc, - mem_limiter: Arc, + mem_limiter: Arc>, bucket: String, object_id: ObjectId, size: u64, @@ -229,9 +229,9 @@ where pub struct PrefetchGetObject { client: Arc, part_stream: Arc, - mem_limiter: Arc, + mem_limiter: Arc>, config: PrefetcherConfig, - backpressure_task: Option>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -283,7 +283,7 @@ where fn new( client: Arc, part_stream: Arc, - mem_limiter: Arc, + mem_limiter: Arc>, config: PrefetcherConfig, bucket: String, object_id: ObjectId, @@ -384,7 +384,7 @@ where /// We will be using flow-control window to control how much data we want to download into the prefetcher. fn spawn_read_backpressure_request( &mut self, - ) -> Result, PrefetchReadError> { + ) -> Result, PrefetchReadError> { let start = self.next_sequential_read_offset; let object_size = self.size as usize; let read_part_size = self.client.read_part_size().unwrap_or(8 * 1024 * 1024); @@ -579,7 +579,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests()); let etag = object.etag(); @@ -674,7 +674,7 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let client = Arc::new(MockClient::new(client_config)); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let read_size = 1 * MB; let object_size = 8 * MB; let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); @@ -781,7 +781,7 @@ mod tests { HashMap::new(), HashMap::new(), )); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let prefetcher_config = PrefetcherConfig { max_read_window_size: test_config.max_read_window_size, @@ -906,7 +906,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let etag = object.etag(); @@ -1090,7 +1090,7 @@ mod tests { HashMap::new(), HashMap::new(), )); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let prefetcher = Prefetcher::new(default_stream(), Default::default()); let mem_limiter = Arc::new(mem_limiter); @@ -1143,7 +1143,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1185,7 +1185,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024)); let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests()); let etag = object.etag(); @@ -1247,7 +1247,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); @@ -1313,7 +1313,7 @@ mod tests { ..Default::default() }; let client = Arc::new(MockClient::new(config)); - let mem_limiter = MemoryLimiter::new(512 * 1024 * 1024); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()); let file_etag = object.etag(); diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 7fadacb99..131e03d5f 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use async_channel::{unbounded, Receiver, Sender}; use humansize::make_format; +use mountpoint_s3_client::ObjectClient; use tracing::{debug, trace}; use crate::mem_limiter::MemoryLimiter; @@ -32,7 +33,7 @@ pub struct BackpressureConfig { } #[derive(Debug)] -pub struct BackpressureController { +pub struct BackpressureController { read_window_updater: Sender, preferred_read_window_size: usize, min_read_window_size: usize, @@ -48,7 +49,7 @@ pub struct BackpressureController { /// data up to this offset *exclusively*. request_end_offset: u64, read_part_size: usize, - mem_limiter: Arc, + mem_limiter: Arc>, } #[derive(Debug)] @@ -74,10 +75,12 @@ pub struct BackpressureLimiter { /// [BackpressureController] will be given to the consumer side of the object stream. /// It can be used anywhere to set preferred read window size for the stream and tell the /// producer when its read window should be increased. -pub fn new_backpressure_controller( +pub fn new_backpressure_controller( config: BackpressureConfig, - mem_limiter: Arc, -) -> (BackpressureController, BackpressureLimiter) { + mem_limiter: Arc>, +) -> (BackpressureController, BackpressureLimiter) { + // Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1. + const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2; let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; let (read_window_updater, read_window_incrementing_queue) = unbounded(); mem_limiter.reserve(config.initial_read_window_size as u64); @@ -86,7 +89,7 @@ pub fn new_backpressure_controller( preferred_read_window_size: config.initial_read_window_size, min_read_window_size: config.min_read_window_size, max_read_window_size: config.max_read_window_size, - read_window_size_multiplier: config.read_window_size_multiplier, + read_window_size_multiplier: config.read_window_size_multiplier.max(MIN_WINDOW_SIZE_MULTIPLIER), read_window_end_offset, next_read_offset: config.request_range.start, request_end_offset: config.request_range.end, @@ -101,7 +104,7 @@ pub fn new_backpressure_controller( (controller, limiter) } -impl BackpressureController { +impl BackpressureController { pub fn read_window_end_offset(&self) -> u64 { self.read_window_end_offset } @@ -112,81 +115,57 @@ impl BackpressureController { match event { // Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending BackpressureFeedbackEvent::DataRead { offset, length } => { - // Step 2. of scale down, including the case when we're approaching the request end. See `self.scale_down` for the logic. - let next_read_offset = offset + length as u64; - // We don't update `self.next_read_offset` if this feedback arrived from read after a backwards seek - if next_read_offset > self.next_read_offset { - self.next_read_offset = next_read_offset; - } - if self.next_read_offset >= self.request_end_offset { - self.next_read_offset = self.request_end_offset; - } - let remaining_window = self.read_window_end_offset.saturating_sub(next_read_offset) as usize; - - let preffered_window_end_offset = self - .next_read_offset - .saturating_add(self.preferred_read_window_size as u64); - let over_reserved = self.read_window_end_offset.saturating_sub(preffered_window_end_offset); - if over_reserved > 0 { - self.mem_limiter.release((length as u64).min(over_reserved)); - } - if self.request_end_offset < preffered_window_end_offset { - // We won't need the full `preffered_window_end_offset` as we're approaching the request's end. - self.mem_limiter.release(length as u64); - } + self.next_read_offset = offset + length as u64; + self.mem_limiter.release(length as u64); + let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize; // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. - if remaining_window < (self.preferred_read_window_size / 2) + while remaining_window < (self.preferred_read_window_size / 2) && self.read_window_end_offset < self.request_end_offset { - // If there is not enough available memory in the system, we'll try to reduce the read window of the current request. - // We define "not enough memory" as a situation where no new request with a minimum window may fit in the limit. - // - // Scaling down is best effort, meaning that there is no guarantee that after this action such a - // request will fit in memory. This may not be the case if during the scale down a new memory reservation was made by - // another request. - // - // We reduce the frequency of scale downs by only performing it when sufficient amount of data (half of read_window) - // was read. - let mut available_mem = self.mem_limiter.available_mem(); - let mut new_read_window_size = self.preferred_read_window_size; // new_preferred_read_window_size is just too wordy - while available_mem < self.min_read_window_size as u64 && self.read_window_size_multiplier > 1 { - let scaled_down = new_read_window_size / self.read_window_size_multiplier; - if scaled_down < self.min_read_window_size { - break; - } - available_mem += (new_read_window_size - scaled_down) as u64; - new_read_window_size = scaled_down; - } - if new_read_window_size != self.preferred_read_window_size { - self.scale_down(new_read_window_size); - } - - let new_read_window_end_offset = next_read_offset + let new_read_window_end_offset = self + .next_read_offset .saturating_add(self.preferred_read_window_size as u64) .min(self.request_end_offset); + // We can skip if the new `read_window_end_offset` is less than or equal to the current one, this + // could happen after the read window is scaled down. + if new_read_window_end_offset <= self.read_window_end_offset { + break; + } + let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; + + // Force incrementing read window regardless of available memory when we are already at minimum + // read window size. + if self.preferred_read_window_size <= self.min_read_window_size { + self.mem_limiter.reserve(to_increase as u64); + self.increment_read_window(to_increase).await; + break; + } - if self.read_window_end_offset < new_read_window_end_offset { - let to_increase = - new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; - trace!( - preferred_read_window_size = self.preferred_read_window_size, - next_read_offset = self.next_read_offset, - read_window_end_offset = self.read_window_end_offset, - to_increase, - "incrementing read window" - ); + // Try to reserve the memory for the length we want to increase before sending the request, + // scale down the read window if it fails. + if self.mem_limiter.try_reserve(to_increase as u64) { self.increment_read_window(to_increase).await; + } else { + self.scale_down(); } } } - BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), + BackpressureFeedbackEvent::PartQueueStall => self.scale_up(), } Ok(()) } // Send an increment read window request to the stream producer async fn increment_read_window(&mut self, len: usize) { + trace!( + preferred_read_window_size = self.preferred_read_window_size, + next_read_offset = self.next_read_offset, + read_window_end_offset = self.read_window_end_offset, + len, + "incrementing read window" + ); + // This should not block since the channel is unbounded let _ = self .read_window_updater @@ -196,21 +175,21 @@ impl BackpressureController { self.read_window_end_offset += len as u64; } - // Try scaling up preferred read window size with a multiplier configured at initialization. + // Scale up preferred read window size with a multiplier configured at initialization. // Scaling up fails silently if there is no enough free memory to perform it. - fn try_scaling_up(&mut self) { + fn scale_up(&mut self) { if self.preferred_read_window_size < self.max_read_window_size { let new_read_window_size = self.preferred_read_window_size * self.read_window_size_multiplier; // Also align the new read window size to the client part size let new_read_window_size = align(new_read_window_size, self.read_part_size, false).min(self.max_read_window_size); - // Only scale up when there is enough memory + // Only scale up when there is enough memory. We don't have to reserve the memory here + // because only `preferred_read_window_size` is increased but the actual read window will + // be updated later on `DataRead` event (where we do reserve memory). let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64; - if self - .mem_limiter - .try_reserve(to_increase, self.min_read_window_size as u64) - { + let available_mem = self.mem_limiter.available_mem(); + if available_mem >= to_increase { let formatter = make_format(humansize::BINARY); debug!( current_size = formatter(self.preferred_read_window_size), @@ -224,72 +203,34 @@ impl BackpressureController { } } - pub fn scale_down(&mut self, new_read_window_size: usize) { - /* - Scaling down is performed in 2 steps, one in this method and another on read. Note that `window_end_offset` is the value - which is set in CRT and it may not be decreased. This function implements step 1. - - 0. Before scale down: - - read_until window_end_offset preferred_window_end_offset - │ │ │ - ────┼───────────────────────────────────────────────────────────┼───────────────┼─────────────────────────────────► - │ │ - └───────────────────────────────────────────────────────────────────────────┘ - preferred_read_window_size - - 1. Scaling down (`new_read_window_size` is applied): - - read_until preferred_window_end_offset window_end_offset preferred_window_end_offset_old - │ │ │ │ - ────┼──────────────────────────────────────┼────────────────────┼───────────────┼─────────────────────────────────► - │ ├───────────────┘ - └────────────────────┘ released immediatelly - over_reserved - - 2. Part read: - - read_until(old) read_until preferred_window_end_offset window_end_offset - │ │ │ │ - ────┼────────────┼─────────────────────────────────────┼────────┼─────────────────────────────────────────────────► - └────────────┘ └────────┘ - released on read: over_reserved (new) - 1. if over_reserved > 0 - 2. min(part.size(), over_reserved) is to deduct - */ - // Align the new read window size to the client part size - let new_read_window_size = - align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size); + // Scale down preferred read window size by a multiplier configured at initialization. + fn scale_down(&mut self) { + if self.preferred_read_window_size > self.min_read_window_size { + assert!(self.read_window_size_multiplier > 1); + let new_read_window_size = self.preferred_read_window_size / self.read_window_size_multiplier; + // Also align the new read window size to the client part size + let new_read_window_size = + align(new_read_window_size, self.read_part_size, false).max(self.min_read_window_size); - let formatter = make_format(humansize::BINARY); - debug!( - current_size = formatter(self.preferred_read_window_size), - new_size = formatter(new_read_window_size), - "scaling down read window" - ); - let preferred_window_end_offset_old = self - .next_read_offset - .saturating_add(self.preferred_read_window_size as u64); - let preferred_window_end_offset = self.next_read_offset.saturating_add(new_read_window_size as u64); - // In most cases we'll keep memory reserved for `self.read_window_end_offset`, but if the new - // `preferred_window_end_offset` is greater, we'll reserve for it instead. - let reserve_until_offset = self.read_window_end_offset.max(preferred_window_end_offset); - let to_release = preferred_window_end_offset_old.saturating_sub(reserve_until_offset); - self.mem_limiter.release(to_release); - self.preferred_read_window_size = new_read_window_size; - metrics::histogram!("prefetch.window_after_decrease_mib") - .record((self.preferred_read_window_size / 1024 / 1024) as f64); + let formatter = make_format(humansize::BINARY); + debug!( + current_size = formatter(self.preferred_read_window_size), + new_size = formatter(new_read_window_size), + "scaling down read window" + ); + self.preferred_read_window_size = new_read_window_size; + metrics::histogram!("prefetch.window_after_decrease_mib") + .record((self.preferred_read_window_size / 1024 / 1024) as f64); + } } } -impl Drop for BackpressureController { +impl Drop for BackpressureController { fn drop(&mut self) { - // When approaching request end we have less memory still reserved than `self.preferred_read_window_size`. + // Free up memory we have reserved for the read window. debug_assert!(self.request_end_offset >= self.next_read_offset); - let remaining_in_request = self.request_end_offset.saturating_sub(self.next_read_offset); - - self.mem_limiter - .release((self.preferred_read_window_size as u64).min(remaining_in_request)); + let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset); + self.mem_limiter.release(remaining_window); } } diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 01c3f3452..02309749f 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -46,8 +46,8 @@ where &self, client: Arc, config: RequestTaskConfig, - mem_limiter: Arc, - ) -> RequestTask<::ClientError> + mem_limiter: Arc>, + ) -> RequestTask<::ClientError, Client> where Client: ObjectClient + Send + Sync + 'static, { @@ -63,7 +63,7 @@ where }; let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config, mem_limiter.clone()); - let (part_queue, part_queue_producer) = unbounded_part_queue(); + let (part_queue, part_queue_producer) = unbounded_part_queue(mem_limiter); trace!(?range, "spawning request"); let request_task = { @@ -434,7 +434,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -515,7 +515,7 @@ mod tests { ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); - let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024)); + let mem_limiter = Arc::new(MemoryLimiter::new(mock_client.clone(), 512 * 1024 * 1024)); mock_client.add_object(key, object.clone()); let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); @@ -539,10 +539,10 @@ mod tests { } } - fn compare_read( + fn compare_read( id: &ObjectId, object: &MockObject, - mut request_task: RequestTask, + mut request_task: RequestTask, ) { let mut offset = request_task.start_offset(); let mut remaining = request_task.total_size(); diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 08778666a..99fc5ed8d 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,7 +1,9 @@ use std::time::Instant; +use mountpoint_s3_client::ObjectClient; use tracing::trace; +use crate::mem_limiter::MemoryLimiter; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; @@ -11,7 +13,7 @@ use crate::sync::Arc; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] -pub struct PartQueue { +pub struct PartQueue { /// The auxiliary queue that supports pushing parts to the front of the part queue in order to /// allow partial reads and backwards seeks. front_queue: Vec, @@ -20,6 +22,7 @@ pub struct PartQueue { failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, + mem_limiter: Arc>, } /// Producer side of the queue of [Part]s. @@ -31,7 +34,9 @@ pub struct PartQueueProducer { } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. -pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { +pub fn unbounded_part_queue( + mem_limiter: Arc>, +) -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { @@ -39,6 +44,7 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP receiver, failed: false, bytes_received: Arc::clone(&bytes_counter), + mem_limiter, }; let part_queue_producer = PartQueueProducer { sender, @@ -47,7 +53,7 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP (part_queue, part_queue_producer) } -impl PartQueue { +impl PartQueue { /// Read up to `length` bytes from the queue at the current offset. This function always returns /// a contiguous [Bytes], and so may return fewer than `length` bytes if it would need to copy /// or reallocate to make the return value contiguous. This function blocks only if the queue is @@ -96,6 +102,9 @@ impl PartQueue { assert!(!self.failed, "cannot use a PartQueue after failure"); metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); + // The backpressure controller is not aware of the parts from backwards seek, + // so we have to reserve memory for them here. + self.mem_limiter.reserve(part.len() as u64); self.front_queue.push(part); Ok(()) } @@ -121,7 +130,7 @@ impl PartQueueProducer { } } -impl Drop for PartQueue { +impl Drop for PartQueue { fn drop(&mut self) { // close the channel and drain remaining parts from the main queue self.receiver.close(); @@ -148,6 +157,7 @@ mod tests { use bytes::Bytes; use futures::executor::block_on; + use mountpoint_s3_client::mock_client::MockClient; use mountpoint_s3_client::types::ETag; use proptest::proptest; use proptest_derive::Arbitrary; @@ -164,8 +174,10 @@ mod tests { enum DummyError {} async fn run_test(ops: Vec) { + let client = Arc::new(MockClient::new(Default::default())); + let mem_limiter = MemoryLimiter::new(client.clone(), 512 * 1024 * 1024); let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (mut part_queue, part_queue_producer) = unbounded_part_queue::(); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(mem_limiter.into()); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 9dd9a0944..8f1f22a08 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -28,8 +28,8 @@ pub trait ObjectPartStream { &self, client: Arc, config: RequestTaskConfig, - mem_limiter: Arc, - ) -> RequestTask + mem_limiter: Arc>, + ) -> RequestTask where Client: ObjectClient + Send + Sync + 'static; } @@ -185,8 +185,8 @@ where &self, client: Arc, config: RequestTaskConfig, - mem_limiter: Arc, - ) -> RequestTask + mem_limiter: Arc>, + ) -> RequestTask where Client: ObjectClient + Send + Sync + 'static, { @@ -206,7 +206,7 @@ where }; let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config, mem_limiter.clone()); - let (part_queue, part_queue_producer) = unbounded_part_queue(); + let (part_queue, part_queue_producer) = unbounded_part_queue(mem_limiter); trace!(?range, "spawning request"); let span = debug_span!("prefetch", ?range); diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index cd858d7bc..bf47190fe 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,4 +1,5 @@ use futures::future::RemoteHandle; +use mountpoint_s3_client::ObjectClient; use crate::prefetch::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall}; use crate::prefetch::part::Part; @@ -10,22 +11,22 @@ use super::part_stream::RequestRange; /// A single GetObject request submitted to the S3 client #[derive(Debug)] -pub struct RequestTask { +pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) _task_handle: RemoteHandle<()>, remaining: usize, range: RequestRange, - part_queue: PartQueue, - backpressure_controller: BackpressureController, + part_queue: PartQueue, + backpressure_controller: BackpressureController, } -impl RequestTask { +impl RequestTask { pub fn from_handle( task_handle: RemoteHandle<()>, range: RequestRange, - part_queue: PartQueue, - backpressure_controller: BackpressureController, + part_queue: PartQueue, + backpressure_controller: BackpressureController, ) -> Self { Self { _task_handle: task_handle,