Skip to content

Commit

Permalink
Adjust read window based on used memory (#1013)
Browse files Browse the repository at this point in the history
* Mem limiter prototype

Signed-off-by: Monthon Klongklaew <[email protected]>

* Clean up development logging

Signed-off-by: Vlad Volodkin <[email protected]>

* Scale up atomically, scale down after data was consumed

Signed-off-by: Vladislav Volodkin <[email protected]>

* Remove Client from MemoryLimiter, document this structure

Signed-off-by: Vladislav Volodkin <[email protected]>

* Simplify the logic and include client metrics

Signed-off-by: Monthon Klongklaew <[email protected]>

* Correct client mem usage stats

Signed-off-by: Monthon Klongklaew <[email protected]>

* PR comments

Signed-off-by: Monthon Klongklaew <[email protected]>

* Put the cli argument behind a feature flag

Signed-off-by: Monthon Klongklaew <[email protected]>

* Fix scaling logic and address comments

Signed-off-by: Monthon Klongklaew <[email protected]>

---------

Signed-off-by: Monthon Klongklaew <[email protected]>
Signed-off-by: Vlad Volodkin <[email protected]>
Signed-off-by: Vladislav Volodkin <[email protected]>
Co-authored-by: Vlad Volodkin <[email protected]>
Co-authored-by: Vladislav Volodkin <[email protected]>
  • Loading branch information
3 people authored Oct 2, 2024
1 parent e95560b commit fda5103
Show file tree
Hide file tree
Showing 18 changed files with 640 additions and 106 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::object_client::{
Expand Down Expand Up @@ -85,6 +86,10 @@ where
self.client.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.client.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -572,6 +573,10 @@ impl ObjectClient for MockClient {
}
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
None
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use async_io::block_on;
use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::mock_client::leaky_bucket::LeakyBucket;
Expand Down Expand Up @@ -113,6 +114,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.inner.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
Expand Down Expand Up @@ -89,6 +90,10 @@ pub trait ObjectClient {
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Query current memory usage stats for the client. This can be `None` if the client
/// does not record the stats.
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down
13 changes: 11 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestMetrics, RequestType,
init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions,
MetaRequestResult, MetaRequestType, RequestMetrics, RequestType,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -767,7 +767,9 @@ impl S3CrtClientInner {
.set(metrics.num_requests_streaming_response as f64);

// Buffer pool metrics
let start = Instant::now();
let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
Expand Down Expand Up @@ -1208,6 +1210,13 @@ impl ObjectClient for S3CrtClient {
}
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
let start = Instant::now();
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
Some(crt_buffer_pool_stats)
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tracing = { version = "0.1.35", features = ["log"] }
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
async-stream = "0.3.5"
humansize = "2.1.3"

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0", default-features = false }
Expand Down Expand Up @@ -83,6 +84,7 @@ built = { version = "0.7.1", features = ["git2"] }
express_cache = ["block_size"]
block_size = []
event_log = []
mem_limiter = []
# Features for choosing tests
fips_tests = []
fuse_tests = []
Expand Down
29 changes: 28 additions & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::block_on;
use mountpoint_s3::mem_limiter::MemoryLimiter;
use mountpoint_s3::object::ObjectId;
use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use sysinfo::{RefreshKind, System};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -39,6 +42,11 @@ fn main() {
.long("throughput-target-gbps")
.help("Desired throughput in Gbps"),
)
.arg(
Arg::new("max-memory-target")
.long("max-memory-target")
.help("Maximum memory usage target in MiB"),
)
.arg(
Arg::new("part-size")
.long("part-size")
Expand All @@ -63,6 +71,9 @@ fn main() {
let throughput_target_gbps = matches
.get_one::<String>("throughput-target-gbps")
.map(|s| s.parse::<f64>().expect("throughput target must be an f64"));
let max_memory_target = matches
.get_one::<String>("max-memory-target")
.map(|s| s.parse::<u64>().expect("throughput target must be a u64"));
let part_size = matches
.get_one::<String>("part-size")
.map(|s| s.parse::<usize>().expect("part size must be a usize"));
Expand Down Expand Up @@ -92,6 +103,15 @@ fn main() {
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));

let max_memory_target = if let Some(target) = max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory
let sys = System::new_with_specifics(RefreshKind::everything());
(sys.total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));

let head_object_result = block_on(client.head_object(bucket, key)).expect("HeadObject failed");
let size = head_object_result.object.size;
let etag = ETag::from_str(&head_object_result.object.etag).unwrap();
Expand All @@ -103,10 +123,17 @@ fn main() {

let start = Instant::now();

let object_id = ObjectId::new(key.clone(), etag.clone());
thread::scope(|scope| {
for _ in 0..downloads {
let received_bytes = received_bytes.clone();
let mut request = manager.prefetch(client.clone(), bucket, key, size, etag.clone());
let mut request = manager.prefetch(
client.clone(),
mem_limiter.clone(),
bucket.clone(),
object_id.clone(),
size,
);

scope.spawn(|| {
futures::executor::block_on(async move {
Expand Down
22 changes: 22 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use nix::sys::signal::Signal;
use nix::unistd::ForkResult;
use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
use crate::logging::{init_logging, LoggingConfig};
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -156,6 +158,17 @@ pub struct CliArgs {
)]
pub max_threads: u64,

// This config is still unstable
#[cfg(feature = "mem_limiter")]
#[clap(
long,
help = "Maximum memory usage target [default: 95% of total system memory with a minimum of 512 MiB]",
value_name = "MiB",
value_parser = value_parser!(u64).range(512..),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub max_memory_target: Option<u64>,

#[clap(
long,
help = "Part size for multi-part GET and PUT in bytes",
Expand Down Expand Up @@ -775,6 +788,15 @@ where
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

let sys = System::new_with_specifics(RefreshKind::everything());
let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64;
filesystem_config.mem_limit = default_mem_target.max(MINIMUM_MEM_LIMIT);

#[cfg(feature = "mem_limiter")]
if let Some(max_mem_target) = args.max_memory_target {
filesystem_config.mem_limit = max_mem_target * 1024 * 1024;
}

// Written in this awkward way to force us to update it if we add new checksum types
filesystem_config.use_upload_checksums = match args.upload_checksums {
Some(UploadChecksums::Crc32c) | None => true,
Expand Down
20 changes: 16 additions & 4 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use mountpoint_s3_client::ObjectClient;

use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_LOOKUP_NONEXISTENT};
use crate::logging;
use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT};
use crate::object::ObjectId;
use crate::prefetch::{Prefetch, PrefetchResult};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -151,9 +153,14 @@ where
None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", lookup.inode.ino())),
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
};
let request = fs
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let object_id = ObjectId::new(full_key, etag);
let request = fs.prefetcher.prefetch(
fs.client.clone(),
fs.mem_limiter.clone(),
fs.bucket.clone(),
object_id,
object_size,
);
let handle = FileHandleState::Read { handle, request };
metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
Ok(handle)
Expand Down Expand Up @@ -393,6 +400,8 @@ pub struct S3FilesystemConfig {
pub server_side_encryption: ServerSideEncryption,
/// Use additional checksums for uploads
pub use_upload_checksums: bool,
/// Memory limit
pub mem_limit: u64,
}

impl Default for S3FilesystemConfig {
Expand All @@ -413,6 +422,7 @@ impl Default for S3FilesystemConfig {
s3_personality: S3Personality::default(),
server_side_encryption: Default::default(),
use_upload_checksums: true,
mem_limit: MINIMUM_MEM_LIMIT,
}
}
}
Expand Down Expand Up @@ -526,6 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Client,
mem_limiter: Arc<MemoryLimiter<Client>>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -556,7 +567,7 @@ where
s3_personality: config.s3_personality,
};
let superblock = Superblock::new(bucket, prefix, superblock_config);

let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));
let uploader = Uploader::new(
client.clone(),
config.storage_class.to_owned(),
Expand All @@ -567,6 +578,7 @@ where
Self {
config,
client,
mem_limiter,
superblock,
prefetcher,
uploader,
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ pub mod data_cache;
pub mod fs;
pub mod fuse;
pub mod logging;
pub mod mem_limiter;
pub mod metrics;
mod object;
pub mod object;
pub mod prefetch;
pub mod prefix;
pub mod s3;
Expand Down
Loading

0 comments on commit fda5103

Please sign in to comment.