Skip to content

Commit

Permalink
Simplify the logic and include client metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Sep 19, 2024
1 parent 874f74b commit f191fac
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 215 deletions.
8 changes: 6 additions & 2 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use pin_project::pin_project;
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, UploadReview,
MemoryUsageStats, ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, UploadReview,
};
use crate::ObjectClient;

Expand Down Expand Up @@ -85,6 +85,10 @@ where
self.client.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
self.client.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
10 changes: 7 additions & 3 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
HeadObjectResult, ListObjectsError, ListObjectsResult, MemoryUsageStats, ObjectAttribute, ObjectClient,
ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -572,6 +572,10 @@ impl ObjectClient for MockClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
None
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObje
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
MemoryUsageStats, ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
};
use crate::types::ETag;

Expand Down Expand Up @@ -113,6 +113,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
self.inner.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
16 changes: 16 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ impl FromStr for ETag {
}
}

/// Memory usage stats for the client
pub struct MemoryUsageStats {
/// Total allocated memory for the client.
pub mem_allocated: u64,
/// Reserved memory for the client. For [S3CrtClient], this value is a sum of primary storage
/// and secondary storage reserved memory.
pub mem_reserved: u64,
/// Actual used memory for the client. For [S3CrtClient], this value is a sum of primanry
/// storage and secondary storage used memory.
pub mem_used: u64,
}

/// A generic interface to S3-like object storage services.
///
/// This trait defines the common methods that all object services implement.
Expand All @@ -89,6 +101,10 @@ pub trait ObjectClient {
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Query current memory usage stats for the client. This can be `None` if the client
/// does not record the stats.
fn mem_usage_stats(&self) -> Option<MemoryUsageStats>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down
21 changes: 21 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,27 @@ impl ObjectClient for S3CrtClient {
}
}

fn mem_usage_stats(&self) -> Option<MemoryUsageStats> {
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
let mem_allocated = crt_buffer_pool_stats
.primary_allocated
.saturating_add(crt_buffer_pool_stats.secondary_reserved)
.saturating_add(crt_buffer_pool_stats.secondary_used)
.saturating_add(crt_buffer_pool_stats.forced_used);
let mem_reserved = crt_buffer_pool_stats
.primary_reserved
.saturating_add(crt_buffer_pool_stats.secondary_reserved);
let mem_used = crt_buffer_pool_stats
.primary_used
.saturating_add(crt_buffer_pool_stats.secondary_used)
.saturating_add(crt_buffer_pool_stats.forced_used);
Some(MemoryUsageStats {
mem_allocated,
mem_reserved,
mem_used,
})
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() {
config = config.part_size(part_size);
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));
let mem_limiter = Arc::new(MemoryLimiter::new(512 * 1024 * 1024));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), 512 * 1024 * 1024));

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
Expand Down
4 changes: 1 addition & 3 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,12 @@ impl CliArgs {
let mut filter = if self.debug {
String::from("debug")
} else {
String::from("info")
String::from("warn")
};
let crt_verbosity = if self.debug_crt { "debug" } else { "off" };
filter.push_str(&format!(",{}={}", AWSCRT_LOG_TARGET, crt_verbosity));
if self.log_metrics {
filter.push_str(&format!(",{}=info", metrics::TARGET_NAME));
} else {
filter.push_str(&format!(",{}=off", metrics::TARGET_NAME));
}
filter
};
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Arc<Client>,
mem_limiter: Arc<MemoryLimiter>,
mem_limiter: Arc<MemoryLimiter<Client>>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -569,7 +569,7 @@ where
let superblock = Superblock::new(bucket, prefix, superblock_config);

let client = Arc::new(client);
let mem_limiter = Arc::new(MemoryLimiter::new(config.mem_limit));
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));

let uploader = Uploader::new(
client.clone(),
Expand Down
81 changes: 48 additions & 33 deletions mountpoint-s3/src/mem_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,55 @@
use std::sync::atomic::Ordering;
use std::sync::{atomic::Ordering, Arc};

use humansize::make_format;
use metrics::atomics::AtomicU64;
use mountpoint_s3_client::ObjectClient;
use tracing::debug;

/// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted.
/// Currently the only metric which we take into account is the memory reserved by prefetcher instances for the data requested or
/// fetched from CRT client. Single instance of this struct is shared among all of the prefetchers (file handles).
/// Currently, there are two metrics we take into account:
/// 1) the memory reserved by prefetcher instances for the data requested or fetched from CRT client.
/// 2) the memory reserved by S3 client if it can report.
///
/// Single instance of this struct is shared among all of the prefetchers (file handles).
///
/// Each file handle upon creation makes an initial reservation request with a minimal read window size of `1MiB + 128KiB`. This
/// is accepted unconditionally since we want to allow any file handle to make progress even if that means going over the memory
/// limit. Additional reservations for a file handle arise when data is being read from fuse **faster** than it arrives from the
/// client (PartQueueStall). Those reservations may be rejected if there is no available memory.
/// limit. Additional reservations for a file handle arise when the backpressure read window is incremented to fetch more data
/// from underlying part streams. Those reservations may be rejected if there is no available memory.
///
/// Release of the reserved memory happens on one of the following events:
/// 1) prefetcher is destroyed (`PartQueue` holding the data should be dropped and the CRT request cancelled before this release)
/// 2) prefetcher's read window is scaled down (we wait for the previously requested data to be consumed)
/// 3) prefetcher is approaching the end of the request, in which case we can be sure that reservation in full won't be needed.
/// 1) prefetcher is destroyed (`RequestTask` will be dropped and remaining data in the backpressure read window will be released).
/// 2) data is moved out of the part queue.
///
/// Following is the visualisation of a single prefetcher instance's data stream:
///
/// backwards_seek_start next_read_offset in_part_queue window_end_offset preferred_window_end_offset
/// │ │ │ │ │
/// ─┼────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-►
/// │ ├───────────────────────────┤ │ │
/// └────────────────────┤ certainly used memory └───────────────────────────────┤ │
/// memory not accounted │ in CRT buffer, or callback queue └────────────────────────────┤
/// │ (usage may be less than reserved) will be used after the │
/// │ window increase │
/// └────────────────────────────────────────────────────────────────────────────────────────┘
/// preferred_read_window_size (reserved in MemoryLimiter)
/// backwards_seek_start part_queue_start in_part_queue window_end_offset preferred_window_end_offset
/// │ │ │ │ │
/// ─┼──────────────────────────────┼───────────────────────────┼───────────────────────────────┼────────────────────────────┼───────────-►
/// │ │ │
/// └──────────────────────────────┤ │
/// mem reserved by the part queue │ │
/// └───────────────────────────────────────────────────────────┤
/// mem reserved by the backpressure controller
/// (on `BackpressureFeedbackEvent`)
///
#[derive(Debug)]
pub struct MemoryLimiter {
pub struct MemoryLimiter<Client: ObjectClient> {
mem_limit: u64,
/// Reserved memory for data we had requested via the request task but may not
/// arrived yet.
prefetcher_mem_reserved: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
// We will also take client's reserved memory into account because even if the
// prefetch takes control over the entire read path but we don't record or control
// memory usage on the write path today, so we will rely on the client's stats
// for "other buffers" and adjust the prefetcher read window accordingly.
client: Arc<Client>,
}

impl MemoryLimiter {
pub fn new(mem_limit: u64) -> Self {
impl<Client: ObjectClient> MemoryLimiter<Client> {
pub fn new(client: Arc<Client>, mem_limit: u64) -> Self {
let min_reserved = 128 * 1024 * 1024;
let reserved_mem = (mem_limit / 8).max(min_reserved);
let formatter = make_format(humansize::BINARY);
Expand All @@ -52,6 +59,7 @@ impl MemoryLimiter {
formatter(reserved_mem)
);
Self {
client,
mem_limit,
prefetcher_mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
Expand All @@ -66,19 +74,19 @@ impl MemoryLimiter {
}

/// Reserve the memory for future uses. If there is not enough memory returns `false`.
pub fn try_reserve(&self, size: u64, min_available: u64) -> bool {
pub fn try_reserve(&self, size: u64) -> bool {
let mut prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
loop {
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let new_prefetcher_mem_reserved = prefetcher_mem_reserved.saturating_add(size);
let total_mem_usage = prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
let new_total_mem_usage = new_prefetcher_mem_reserved.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit - min_available {
debug!(
"not enough memory to reserve, current usage: {}, new (if scaled up): {}, allowed diff: {}",
total_mem_usage, new_total_mem_usage, min_available,
);
let client_mem_allocated = self.client_mem_allocated();
let new_total_mem_usage = new_prefetcher_mem_reserved
.saturating_add(client_mem_allocated)
.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit {
debug!(new_total_mem_usage, "not enough memory to reserve");
return false;
}
// Check that the value we have read is still the same before updating it
match self.prefetcher_mem_reserved.compare_exchange_weak(
prefetcher_mem_reserved,
new_prefetcher_mem_reserved,
Expand All @@ -89,7 +97,7 @@ impl MemoryLimiter {
metrics::gauge!("prefetch.bytes_reserved").increment(size as f64);
return true;
}
Err(_) => continue, // another thread updated the atomic before us, trying again
Err(current) => prefetcher_mem_reserved = current, // another thread updated the atomic before us, trying again
}
}
}
Expand All @@ -100,10 +108,17 @@ impl MemoryLimiter {
metrics::gauge!("prefetch.bytes_reserved").decrement(size as f64);
}

/// Query available memory tracked by the memory limiter.
pub fn available_mem(&self) -> u64 {
let fs_mem_usage = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let prefetcher_mem_reserved = self.prefetcher_mem_reserved.load(Ordering::SeqCst);
let client_mem_allocated = self.client_mem_allocated();
self.mem_limit
.saturating_sub(fs_mem_usage)
.saturating_sub(prefetcher_mem_reserved)
.saturating_sub(client_mem_allocated)
.saturating_sub(self.additional_mem_reserved)
}

fn client_mem_allocated(&self) -> u64 {
self.client.mem_usage_stats().map_or(0, |stats| stats.mem_allocated)
}
}
Loading

0 comments on commit f191fac

Please sign in to comment.