Skip to content

Commit

Permalink
[Breaking] Remove ResumableFileSlot and rely on high ulimits
Browse files Browse the repository at this point in the history
ResumableFileSlot became to difficult to manage, instead of
managing resources this way, we use much higher
DEFAULT_OPEN_FILE_PERMITS, set ulimit (when unix) and warn
user if the limits are likely too low.

Breaking: Removed idle_file_descriptor_timeout_millis from config.

closes: #1288, #513, #1298, #527
  • Loading branch information
allada committed Feb 9, 2025
1 parent 7afe286 commit 3e2df54
Show file tree
Hide file tree
Showing 21 changed files with 554 additions and 961 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,21 +716,6 @@ pub struct GlobalConfig {
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
pub max_open_files: usize,

/// If a file descriptor is idle for this many milliseconds, it will be closed.
/// In the event a client or store takes a long time to send or receive data
/// the file descriptor will be closed, and since `max_open_files` blocks new
/// `open_file` requests until a slot opens up, it will allow new requests to be
/// processed. If a read or write is attempted on a closed file descriptor, the
/// file will be reopened and the operation will continue.
///
/// On services where worker(s) and scheduler(s) live in the same process, this
/// also prevents deadlocks if a file->file copy is happening, but cannot open
/// a new file descriptor because the limit has been reached.
///
/// Default: 1000 (1 second)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub idle_file_descriptor_timeout_millis: u64,

/// This flag can be used to prevent metrics from being collected at runtime.
/// Metrics are still able to be collected, but this flag prevents metrics that
/// are collected at runtime (performance metrics) from being tallied. The
Expand Down
4 changes: 1 addition & 3 deletions nativelink-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream {
async fn #fn_name(#fn_inputs) #fn_output {
// Error means already initialized, which is ok.
let _ = nativelink_util::init_tracing();
// If already set it's ok.
let _ = nativelink_util::fs::set_idle_file_descriptor_timeout(std::time::Duration::from_millis(100));

#[warn(clippy::disallowed_methods)]
::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async(
::nativelink_util::__tracing::trace_span!("test"), async move {
::nativelink_util::__tracing::error_span!(stringify!(#fn_name)), async move {
#fn_block
}
)
Expand Down
3 changes: 0 additions & 3 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ aws-sdk-s3 = { version = "=1.68.0", features = [
"rt-tokio",
], default-features = false }
aws-smithy-runtime-api = "1.7.3"
serial_test = { version = "3.2.0", features = [
"async",
], default-features = false }
serde_json = "1.0.135"
fred = { version = "10.0.3", default-features = false, features = ["mocks"] }
tracing-subscriber = { version = "0.3.19", default-features = false }
10 changes: 6 additions & 4 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::borrow::BorrowMut;
use std::cmp::{max, min};
use std::ffi::OsString;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -224,9 +225,10 @@ impl StoreDriver for FastSlowStore {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
mut file: fs::ResumeableFileSlot,
path: OsString,
mut file: fs::FileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
) -> Result<Option<fs::FileSlot>, Error> {
if self
.fast_store
.optimized_for(StoreOptimizations::FileUpdates)
Expand All @@ -246,7 +248,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.fast_store
.update_with_whole_file(key, file, upload_size)
.update_with_whole_file(key, path, file, upload_size)
.await;
}

Expand All @@ -269,7 +271,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.slow_store
.update_with_whole_file(key, file, upload_size)
.update_with_whole_file(key, path, file, upload_size)
.await;
}

Expand Down
132 changes: 34 additions & 98 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use std::time::SystemTime;

use async_lock::RwLock;
use async_trait::async_trait;
Expand All @@ -39,8 +39,7 @@ use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::time::{sleep, timeout, Sleep};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{event, Level};

Expand Down Expand Up @@ -168,7 +167,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
where
Self: Sized;

Expand All @@ -186,7 +185,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
&self,
offset: u64,
length: u64,
) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;

/// This function is a safe way to extract the file name of the underlying file. To protect users from
/// accidentally creating undefined behavior we encourage users to do the logic they need to do with
Expand Down Expand Up @@ -231,7 +230,7 @@ impl FileEntry for FileEntryImpl {
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
) -> Result<(FileEntryImpl, fs::FileSlot, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path().to_os_string();
let temp_file_result = fs::create_file(temp_full_path.clone())
.or_else(|mut err| async {
Expand Down Expand Up @@ -276,30 +275,19 @@ impl FileEntry for FileEntryImpl {
&self.encoded_file_path
}

async fn read_file_part(
fn read_file_part(
&self,
offset: u64,
length: u64,
) -> Result<fs::ResumeableFileSlot, Error> {
let (mut file, full_content_path_for_debug_only) = self
.get_file_path_locked(|full_content_path| async move {
let file = fs::open_file(full_content_path.clone(), length)
.await
.err_tip(|| {
format!("Failed to open file in filesystem store {full_content_path:?}")
})?;
Ok((file, full_content_path))
})
.await?;

file.as_reader()
.await
.err_tip(|| "Could not seek file in read_file_part()")?
.get_mut()
.seek(SeekFrom::Start(offset))
.await
.err_tip(|| format!("Failed to seek file: {full_content_path_for_debug_only:?}"))?;
Ok(file)
) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
self.get_file_path_locked(move |full_content_path| async move {
let file = fs::open_file(&full_content_path, offset, length)
.await
.err_tip(|| {
format!("Failed to open file in filesystem store {full_content_path:?}")
})?;
Ok(file)
})
}

async fn get_file_path_locked<
Expand Down Expand Up @@ -524,6 +512,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
);
}
};

Result::<(String, SystemTime, u64, bool), Error>::Ok((
file_name,
atime,
Expand Down Expand Up @@ -668,19 +657,16 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
#[metric(help = "Size of the configured read buffer size")]
read_buffer_size: usize,
weak_self: Weak<Self>,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
}

impl<Fe: FileEntry> FilesystemStore<Fe> {
pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to))
.await
Self::new_with_timeout_and_rename_fn(spec, |from, to| std::fs::rename(from, to)).await
}

pub async fn new_with_timeout_and_rename_fn(
spec: &FilesystemSpec,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
) -> Result<Arc<Self>, Error> {
async fn create_subdirs(path: &str) -> Result<(), Error> {
Expand Down Expand Up @@ -735,7 +721,6 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
block_size,
read_buffer_size,
weak_self: weak_self.clone(),
sleep_fn,
rename_fn,
}))
}
Expand All @@ -754,50 +739,34 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
async fn update_file<'a>(
self: Pin<&'a Self>,
mut entry: Fe,
mut resumeable_temp_file: fs::ResumeableFileSlot,
mut temp_file: fs::FileSlot,
final_key: StoreKey<'static>,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut data_size = 0;
loop {
let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await
else {
// In the event we timeout, we want to close the writing file, to prevent
// the file descriptor left open for long periods of time.
// This is needed because we wrap `fs` so only a fixed number of file
// descriptors may be open at any given time. If we are streaming from
// File -> File, it can cause a deadlock if the Write file is not sending
// data because it is waiting for a file descriotor to open before sending data.
resumeable_temp_file.close_file().await.err_tip(|| {
"Could not close file due to timeout in FileSystemStore::update_file"
})?;
continue;
};
let mut data = data_result.err_tip(|| "Failed to receive data in filesystem store")?;
let mut data = reader
.recv()
.await
.err_tip(|| "Failed to receive data in filesystem store")?;
let data_len = data.len();
if data_len == 0 {
break; // EOF.
}
resumeable_temp_file
.as_writer()
.await
.err_tip(|| "in filesystem_store::update_file")?
temp_file
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
data_size += data_len as u64;
}

resumeable_temp_file
.as_writer()
.await
.err_tip(|| "in filesystem_store::update_file")?
temp_file
.as_ref()
.sync_all()
.await
.err_tip(|| "Failed to sync_data in filesystem store")?;

drop(resumeable_temp_file);
drop(temp_file);

*entry.data_size_mut() = data_size;
self.emplace_file(final_key, Arc::new(entry)).await
Expand Down Expand Up @@ -942,19 +911,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
mut file: fs::ResumeableFileSlot,
path: OsString,
file: fs::FileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let path = file.get_path().as_os_str().to_os_string();
) -> Result<Option<fs::FileSlot>, Error> {
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size,
UploadSizeInfo::MaxSize(_) => file
.as_reader()
.await
.err_tip(|| {
format!("While getting metadata for {path:?} in update_with_whole_file")
})?
.get_ref()
.as_ref()
.metadata()
.await
Expand Down Expand Up @@ -995,7 +958,6 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
.err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
return Ok(());
}

let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
make_err!(
Code::NotFound,
Expand All @@ -1004,47 +966,21 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
let mut temp_file = entry.read_file_part(offset, read_limit).await?;

loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
resumeable_temp_file
.as_reader()
.await
.err_tip(|| "In FileSystemStore::get_part()")?
temp_file
.read_buf(&mut buf)
.await
.err_tip(|| "Failed to read data in filesystem store")?;
if buf.is_empty() {
break; // EOF.
}
// In the event it takes a while to send the data to the client, we want to close the
// reading file, to prevent the file descriptor left open for long periods of time.
// Failing to do so might cause deadlocks if the receiver is unable to receive data
// because it is waiting for a file descriptor to open before receiving data.
// Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
// next iteration.
let buf_content = buf.freeze();
loop {
let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
tokio::pin!(sleep_fn);
tokio::select! {
() = & mut (sleep_fn) => {
resumeable_temp_file
.close_file()
.await
.err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
}
res = writer.send(buf_content.clone()) => {
match res {
Ok(()) => break,
Err(err) => {
return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
}
}
}
}
}
writer
.send(buf.freeze())
.await
.err_tip(|| "Failed to send chunk in filesystem store get_part")?;
}
writer
.send_eof()
Expand Down
Loading

0 comments on commit 3e2df54

Please sign in to comment.