Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust read window based on used memory #1013

Merged
merged 9 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::object_client::{
Expand Down Expand Up @@ -85,6 +86,10 @@ where
self.client.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.client.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
Expand Down Expand Up @@ -572,6 +573,10 @@ impl ObjectClient for MockClient {
}
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
None
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use async_io::block_on;
use async_trait::async_trait;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::mock_client::leaky_bucket::LeakyBucket;
Expand Down Expand Up @@ -113,6 +114,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.initial_read_window_size()
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
self.inner.mem_usage_stats()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
Expand Down Expand Up @@ -89,6 +90,10 @@ pub trait ObjectClient {
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Query current memory usage stats for the client. This can be `None` if the client
/// does not record the stats.
fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down
13 changes: 11 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestMetrics, RequestType,
init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions,
MetaRequestResult, MetaRequestType, RequestMetrics, RequestType,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -767,7 +767,9 @@ impl S3CrtClientInner {
.set(metrics.num_requests_streaming_response as f64);

// Buffer pool metrics
let start = Instant::now();
let buffer_pool_stats = s3_client.poll_buffer_pool_usage_stats();
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
metrics::gauge!("s3.client.buffer_pool.mem_limit").set(buffer_pool_stats.mem_limit as f64);
metrics::gauge!("s3.client.buffer_pool.primary_cutoff").set(buffer_pool_stats.primary_cutoff as f64);
metrics::gauge!("s3.client.buffer_pool.primary_used").set(buffer_pool_stats.primary_used as f64);
Expand Down Expand Up @@ -1208,6 +1210,13 @@ impl ObjectClient for S3CrtClient {
}
}

fn mem_usage_stats(&self) -> Option<BufferPoolUsageStats> {
let start = Instant::now();
let crt_buffer_pool_stats = self.inner.s3_client.poll_buffer_pool_usage_stats();
vladem marked this conversation as resolved.
Show resolved Hide resolved
metrics::histogram!("s3.client.buffer_pool.get_usage_latency_us").record(start.elapsed().as_micros() as f64);
Some(crt_buffer_pool_stats)
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tracing = { version = "0.1.35", features = ["log"] }
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
async-stream = "0.3.5"
humansize = "2.1.3"

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0", default-features = false }
Expand Down Expand Up @@ -83,6 +84,7 @@ built = { version = "0.7.1", features = ["git2"] }
express_cache = ["block_size"]
block_size = []
event_log = []
mem_limiter = []
# Features for choosing tests
fips_tests = []
fuse_tests = []
Expand Down
29 changes: 28 additions & 1 deletion mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::block_on;
use mountpoint_s3::mem_limiter::MemoryLimiter;
use mountpoint_s3::object::ObjectId;
use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use sysinfo::{RefreshKind, System};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -39,6 +42,11 @@ fn main() {
.long("throughput-target-gbps")
.help("Desired throughput in Gbps"),
)
.arg(
Arg::new("max-memory-target")
.long("max-memory-target")
.help("Maximum memory usage target in MiB"),
)
.arg(
Arg::new("part-size")
.long("part-size")
Expand All @@ -63,6 +71,9 @@ fn main() {
let throughput_target_gbps = matches
.get_one::<String>("throughput-target-gbps")
.map(|s| s.parse::<f64>().expect("throughput target must be an f64"));
let max_memory_target = matches
.get_one::<String>("max-memory-target")
.map(|s| s.parse::<u64>().expect("throughput target must be a u64"));
let part_size = matches
.get_one::<String>("part-size")
.map(|s| s.parse::<usize>().expect("part size must be a usize"));
Expand Down Expand Up @@ -92,6 +103,15 @@ fn main() {
}
let client = Arc::new(S3CrtClient::new(config).expect("couldn't create client"));

let max_memory_target = if let Some(target) = max_memory_target {
target * 1024 * 1024
} else {
// Default to 95% of total system memory
let sys = System::new_with_specifics(RefreshKind::everything());
(sys.total_memory() as f64 * 0.95) as u64
};
let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), max_memory_target));

let head_object_result = block_on(client.head_object(bucket, key)).expect("HeadObject failed");
let size = head_object_result.object.size;
let etag = ETag::from_str(&head_object_result.object.etag).unwrap();
Expand All @@ -103,10 +123,17 @@ fn main() {

let start = Instant::now();

let object_id = ObjectId::new(key.clone(), etag.clone());
thread::scope(|scope| {
for _ in 0..downloads {
let received_bytes = received_bytes.clone();
let mut request = manager.prefetch(client.clone(), bucket, key, size, etag.clone());
let mut request = manager.prefetch(
client.clone(),
mem_limiter.clone(),
bucket.clone(),
object_id.clone(),
size,
);

scope.spawn(|| {
futures::executor::block_on(async move {
Expand Down
22 changes: 22 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use nix::sys::signal::Signal;
use nix::unistd::ForkResult;
use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
use crate::logging::{init_logging, LoggingConfig};
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -156,6 +158,17 @@ pub struct CliArgs {
)]
pub max_threads: u64,

// This config is still unstable
#[cfg(feature = "mem_limiter")]
#[clap(
long,
help = "Maximum memory usage target [default: 95% of total system memory with a minimum of 512 MiB]",
value_name = "MiB",
value_parser = value_parser!(u64).range(512..),
help_heading = CLIENT_OPTIONS_HEADER
)]
pub max_memory_target: Option<u64>,

#[clap(
long,
help = "Part size for multi-part GET and PUT in bytes",
Expand Down Expand Up @@ -775,6 +788,15 @@ where
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone());

let sys = System::new_with_specifics(RefreshKind::everything());
let default_mem_target = (sys.total_memory() as f64 * 0.95) as u64;
filesystem_config.mem_limit = default_mem_target.max(MINIMUM_MEM_LIMIT);

#[cfg(feature = "mem_limiter")]
if let Some(max_mem_target) = args.max_memory_target {
filesystem_config.mem_limit = max_mem_target * 1024 * 1024;
}

// Written in this awkward way to force us to update it if we add new checksum types
filesystem_config.use_upload_checksums = match args.upload_checksums {
Some(UploadChecksums::Crc32c) | None => true,
Expand Down
20 changes: 16 additions & 4 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::inode::{
Inode, InodeError, InodeKind, LookedUp, ReadHandle, ReaddirHandle, Superblock, SuperblockConfig, WriteHandle,
};
use crate::logging;
use crate::mem_limiter::{MemoryLimiter, MINIMUM_MEM_LIMIT};
use crate::object::ObjectId;
use crate::prefetch::{Prefetch, PrefetchResult};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
Expand Down Expand Up @@ -151,9 +153,14 @@ where
None => return Err(err!(libc::EBADF, "no E-Tag for inode {}", lookup.inode.ino())),
Some(etag) => ETag::from_str(etag).expect("E-Tag should be set"),
};
let request = fs
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let object_id = ObjectId::new(full_key, etag);
let request = fs.prefetcher.prefetch(
fs.client.clone(),
fs.mem_limiter.clone(),
fs.bucket.clone(),
object_id,
object_size,
);
let handle = FileHandleState::Read { handle, request };
metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
Ok(handle)
Expand Down Expand Up @@ -393,6 +400,8 @@ pub struct S3FilesystemConfig {
pub server_side_encryption: ServerSideEncryption,
/// Use additional checksums for uploads
pub use_upload_checksums: bool,
/// Memory limit
pub mem_limit: u64,
}

impl Default for S3FilesystemConfig {
Expand All @@ -413,6 +422,7 @@ impl Default for S3FilesystemConfig {
s3_personality: S3Personality::default(),
server_side_encryption: Default::default(),
use_upload_checksums: true,
mem_limit: MINIMUM_MEM_LIMIT,
}
}
}
Expand Down Expand Up @@ -526,6 +536,7 @@ where
{
config: S3FilesystemConfig,
client: Client,
mem_limiter: Arc<MemoryLimiter<Client>>,
superblock: Superblock,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
Expand Down Expand Up @@ -556,7 +567,7 @@ where
s3_personality: config.s3_personality,
};
let superblock = Superblock::new(bucket, prefix, superblock_config);

let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit));
let uploader = Uploader::new(
client.clone(),
config.storage_class.to_owned(),
Expand All @@ -567,6 +578,7 @@ where
Self {
config,
client,
mem_limiter,
superblock,
prefetcher,
uploader,
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ pub mod fs;
pub mod fuse;
mod inode;
pub mod logging;
pub mod mem_limiter;
pub mod metrics;
mod object;
pub mod object;
pub mod prefetch;
pub mod prefix;
pub mod s3;
Expand Down
Loading
Loading