diff --git a/Cargo.lock b/Cargo.lock
index 751fed588..408401584 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2199,7 +2199,6 @@ dependencies = [
"rand",
"serde",
"serde_json",
- "serial_test",
"sha2",
"tokio",
"tokio-stream",
@@ -2240,6 +2239,7 @@ dependencies = [
"prost",
"prost-types",
"rand",
+ "rlimit",
"serde",
"serde_json",
"sha2",
@@ -2280,6 +2280,7 @@ dependencies = [
"scopeguard",
"serde",
"serde_json5",
+ "serial_test",
"shlex",
"tokio",
"tokio-stream",
@@ -2834,6 +2835,15 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "rlimit"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "roxmltree"
version = "0.14.1"
diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs
index f64dbe7e2..a2dfd6bc5 100644
--- a/nativelink-config/src/cas_server.rs
+++ b/nativelink-config/src/cas_server.rs
@@ -716,21 +716,6 @@ pub struct GlobalConfig {
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
pub max_open_files: usize,
- /// If a file descriptor is idle for this many milliseconds, it will be closed.
- /// In the event a client or store takes a long time to send or receive data
- /// the file descriptor will be closed, and since `max_open_files` blocks new
- /// `open_file` requests until a slot opens up, it will allow new requests to be
- /// processed. If a read or write is attempted on a closed file descriptor, the
- /// file will be reopened and the operation will continue.
- ///
- /// On services where worker(s) and scheduler(s) live in the same process, this
- /// also prevents deadlocks if a file->file copy is happening, but cannot open
- /// a new file descriptor because the limit has been reached.
- ///
- /// Default: 1000 (1 second)
- #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
- pub idle_file_descriptor_timeout_millis: u64,
-
/// This flag can be used to prevent metrics from being collected at runtime.
/// Metrics are still able to be collected, but this flag prevents metrics that
/// are collected at runtime (performance metrics) from being tallied. The
diff --git a/nativelink-macro/src/lib.rs b/nativelink-macro/src/lib.rs
index f37175569..79a87e9b5 100644
--- a/nativelink-macro/src/lib.rs
+++ b/nativelink-macro/src/lib.rs
@@ -36,8 +36,6 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream {
async fn #fn_name(#fn_inputs) #fn_output {
// Error means already initialized, which is ok.
let _ = nativelink_util::init_tracing();
- // If already set it's ok.
- let _ = nativelink_util::fs::set_idle_file_descriptor_timeout(std::time::Duration::from_millis(100));
#[warn(clippy::disallowed_methods)]
::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async(
diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml
index a9ee0aeaa..0648a502e 100644
--- a/nativelink-store/Cargo.toml
+++ b/nativelink-store/Cargo.toml
@@ -78,9 +78,6 @@ aws-sdk-s3 = { version = "=1.68.0", features = [
"rt-tokio",
], default-features = false }
aws-smithy-runtime-api = "1.7.3"
-serial_test = { version = "3.2.0", features = [
- "async",
-], default-features = false }
serde_json = "1.0.135"
fred = { version = "10.0.3", default-features = false, features = ["mocks"] }
tracing-subscriber = { version = "0.3.19", default-features = false }
diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs
index 60789e3a5..e2eec4b6f 100644
--- a/nativelink-store/src/fast_slow_store.rs
+++ b/nativelink-store/src/fast_slow_store.rs
@@ -14,6 +14,7 @@
use std::borrow::BorrowMut;
use std::cmp::{max, min};
+use std::ffi::OsString;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -224,9 +225,10 @@ impl StoreDriver for FastSlowStore {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
- mut file: fs::ResumeableFileSlot,
+ path: OsString,
+ mut file: fs::FileSlot,
upload_size: UploadSizeInfo,
- ) -> Result, Error> {
+ ) -> Result , Error> {
if self
.fast_store
.optimized_for(StoreOptimizations::FileUpdates)
@@ -246,7 +248,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.fast_store
- .update_with_whole_file(key, file, upload_size)
+ .update_with_whole_file(key, path, file, upload_size)
.await;
}
@@ -269,7 +271,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.slow_store
- .update_with_whole_file(key, file, upload_size)
+ .update_with_whole_file(key, path, file, upload_size)
.await;
}
diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs
index b5c7261dc..8e6f0a1d6 100644
--- a/nativelink-store/src/filesystem_store.rs
+++ b/nativelink-store/src/filesystem_store.rs
@@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
use async_lock::RwLock;
use async_trait::async_trait;
@@ -39,8 +39,7 @@ use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use nativelink_util::{background_spawn, spawn_blocking};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
-use tokio::time::{sleep, timeout, Sleep};
+use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{event, Level};
@@ -168,7 +167,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
- ) -> impl Future> + Send
+ ) -> impl Future> + Send
where
Self: Sized;
@@ -186,7 +185,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
&self,
offset: u64,
length: u64,
- ) -> impl Future> + Send;
+ ) -> impl Future, Error>> + Send;
/// This function is a safe way to extract the file name of the underlying file. To protect users from
/// accidentally creating undefined behavior we encourage users to do the logic they need to do with
@@ -231,7 +230,7 @@ impl FileEntry for FileEntryImpl {
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
- ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
+ ) -> Result<(FileEntryImpl, fs::FileSlot, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path().to_os_string();
let temp_file_result = fs::create_file(temp_full_path.clone())
.or_else(|mut err| async {
@@ -276,30 +275,19 @@ impl FileEntry for FileEntryImpl {
&self.encoded_file_path
}
- async fn read_file_part(
+ fn read_file_part(
&self,
offset: u64,
length: u64,
- ) -> Result {
- let (mut file, full_content_path_for_debug_only) = self
- .get_file_path_locked(|full_content_path| async move {
- let file = fs::open_file(full_content_path.clone(), length)
- .await
- .err_tip(|| {
- format!("Failed to open file in filesystem store {full_content_path:?}")
- })?;
- Ok((file, full_content_path))
- })
- .await?;
-
- file.as_reader()
- .await
- .err_tip(|| "Could not seek file in read_file_part()")?
- .get_mut()
- .seek(SeekFrom::Start(offset))
- .await
- .err_tip(|| format!("Failed to seek file: {full_content_path_for_debug_only:?}"))?;
- Ok(file)
+ ) -> impl Future, Error>> + Send {
+ self.get_file_path_locked(move |full_content_path| async move {
+ let file = fs::open_file(&full_content_path, offset, length)
+ .await
+ .err_tip(|| {
+ format!("Failed to open file in filesystem store {full_content_path:?}")
+ })?;
+ Ok(file)
+ })
}
async fn get_file_path_locked<
@@ -524,6 +512,7 @@ async fn add_files_to_cache(
);
}
};
+
Result::<(String, SystemTime, u64, bool), Error>::Ok((
file_name,
atime,
@@ -668,19 +657,16 @@ pub struct FilesystemStore {
#[metric(help = "Size of the configured read buffer size")]
read_buffer_size: usize,
weak_self: Weak,
- sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
}
impl FilesystemStore {
pub async fn new(spec: &FilesystemSpec) -> Result, Error> {
- Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to))
- .await
+ Self::new_with_timeout_and_rename_fn(spec, |from, to| std::fs::rename(from, to)).await
}
pub async fn new_with_timeout_and_rename_fn(
spec: &FilesystemSpec,
- sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
) -> Result, Error> {
async fn create_subdirs(path: &str) -> Result<(), Error> {
@@ -735,7 +721,6 @@ impl FilesystemStore {
block_size,
read_buffer_size,
weak_self: weak_self.clone(),
- sleep_fn,
rename_fn,
}))
}
@@ -754,50 +739,34 @@ impl FilesystemStore {
async fn update_file<'a>(
self: Pin<&'a Self>,
mut entry: Fe,
- mut resumeable_temp_file: fs::ResumeableFileSlot,
+ mut temp_file: fs::FileSlot,
final_key: StoreKey<'static>,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut data_size = 0;
loop {
- let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await
- else {
- // In the event we timeout, we want to close the writing file, to prevent
- // the file descriptor left open for long periods of time.
- // This is needed because we wrap `fs` so only a fixed number of file
- // descriptors may be open at any given time. If we are streaming from
- // File -> File, it can cause a deadlock if the Write file is not sending
- // data because it is waiting for a file descriotor to open before sending data.
- resumeable_temp_file.close_file().await.err_tip(|| {
- "Could not close file due to timeout in FileSystemStore::update_file"
- })?;
- continue;
- };
- let mut data = data_result.err_tip(|| "Failed to receive data in filesystem store")?;
+ let mut data = reader
+ .recv()
+ .await
+ .err_tip(|| "Failed to receive data in filesystem store")?;
let data_len = data.len();
if data_len == 0 {
break; // EOF.
}
- resumeable_temp_file
- .as_writer()
- .await
- .err_tip(|| "in filesystem_store::update_file")?
+ temp_file
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
data_size += data_len as u64;
}
- resumeable_temp_file
- .as_writer()
- .await
- .err_tip(|| "in filesystem_store::update_file")?
+ temp_file
.as_ref()
.sync_all()
.await
.err_tip(|| "Failed to sync_data in filesystem store")?;
- drop(resumeable_temp_file);
+ drop(temp_file);
*entry.data_size_mut() = data_size;
self.emplace_file(final_key, Arc::new(entry)).await
@@ -942,19 +911,13 @@ impl StoreDriver for FilesystemStore {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
- mut file: fs::ResumeableFileSlot,
+ path: OsString,
+ file: fs::FileSlot,
upload_size: UploadSizeInfo,
- ) -> Result, Error> {
- let path = file.get_path().as_os_str().to_os_string();
+ ) -> Result , Error> {
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size,
UploadSizeInfo::MaxSize(_) => file
- .as_reader()
- .await
- .err_tip(|| {
- format!("While getting metadata for {path:?} in update_with_whole_file")
- })?
- .get_ref()
.as_ref()
.metadata()
.await
@@ -995,7 +958,6 @@ impl StoreDriver for FilesystemStore {
.err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
return Ok(());
}
-
let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
make_err!(
Code::NotFound,
@@ -1004,47 +966,21 @@ impl StoreDriver for FilesystemStore {
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
- let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
+ let mut temp_file = entry.read_file_part(offset, read_limit).await?;
loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
- resumeable_temp_file
- .as_reader()
- .await
- .err_tip(|| "In FileSystemStore::get_part()")?
+ temp_file
.read_buf(&mut buf)
.await
.err_tip(|| "Failed to read data in filesystem store")?;
if buf.is_empty() {
break; // EOF.
}
- // In the event it takes a while to send the data to the client, we want to close the
- // reading file, to prevent the file descriptor left open for long periods of time.
- // Failing to do so might cause deadlocks if the receiver is unable to receive data
- // because it is waiting for a file descriptor to open before receiving data.
- // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
- // next iteration.
- let buf_content = buf.freeze();
- loop {
- let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
- tokio::pin!(sleep_fn);
- tokio::select! {
- () = & mut (sleep_fn) => {
- resumeable_temp_file
- .close_file()
- .await
- .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
- }
- res = writer.send(buf_content.clone()) => {
- match res {
- Ok(()) => break,
- Err(err) => {
- return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
- }
- }
- }
- }
- }
+ writer
+ .send(buf.freeze())
+ .await
+ .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
}
writer
.send_eof()
diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs
index 131e2688d..09b23ba65 100644
--- a/nativelink-store/tests/ac_utils_test.rs
+++ b/nativelink-store/tests/ac_utils_test.rs
@@ -40,7 +40,7 @@ async fn make_temp_path(data: &str) -> OsString {
const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
const HASH1_SIZE: i64 = 147;
-// Regression test for bug created when implementing ResumeableFileSlot
+// Regression test for bug created when implementing FileSlot
// where the timeout() success condition was breaking out of the outer
// loop resulting in the file always being created with <= 4096 bytes.
#[nativelink_test]
@@ -62,11 +62,15 @@ async fn upload_file_to_store_with_large_file() -> Result<(), Error> {
}
{
// Upload our file.
- let resumeable_file = fs::open_file(filepath, u64::MAX).await?;
+ let file = fs::open_file(&filepath, 0, u64::MAX)
+ .await
+ .unwrap()
+ .into_inner();
store
.update_with_whole_file(
digest,
- resumeable_file,
+ filepath,
+ file,
UploadSizeInfo::ExactSize(expected_data.len() as u64),
)
.await?;
diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs
index f02b32e82..3ea662464 100644
--- a/nativelink-store/tests/filesystem_store_test.rs
+++ b/nativelink-store/tests/filesystem_store_test.rs
@@ -27,10 +27,9 @@ use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future, FutureExt};
-use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
+use nativelink_config::stores::FilesystemSpec;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
-use nativelink_store::fast_slow_store::FastSlowStore;
use nativelink_store::filesystem_store::{
key_from_file, EncodedFilePath, FileEntry, FileEntryImpl, FileType, FilesystemStore,
DIGEST_FOLDER, STR_FOLDER,
@@ -44,9 +43,8 @@ use nativelink_util::{background_spawn, spawn};
use parking_lot::Mutex;
use pretty_assertions::assert_eq;
use rand::{thread_rng, Rng};
-use serial_test::serial;
use sha2::{Digest, Sha256};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
use tokio::sync::Barrier;
use tokio::time::sleep;
use tokio_stream::wrappers::ReadDirStream;
@@ -86,7 +84,7 @@ impl FileEntry for TestFileEntry<
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
- ) -> Result<(Self, fs::ResumeableFileSlot, OsString), Error> {
+ ) -> Result<(Self, fs::FileSlot, OsString), Error> {
let (inner, file_slot, path) =
FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
Ok((
@@ -111,11 +109,7 @@ impl FileEntry for TestFileEntry<
self.inner.as_ref().unwrap().get_encoded_file_path()
}
- async fn read_file_part(
- &self,
- offset: u64,
- length: u64,
- ) -> Result {
+ async fn read_file_part(&self, offset: u64, length: u64) -> Result, Error> {
self.inner
.as_ref()
.unwrap()
@@ -204,13 +198,11 @@ fn make_temp_path(data: &str) -> String {
}
async fn read_file_contents(file_name: &OsStr) -> Result, Error> {
- let mut file = fs::open_file(file_name, u64::MAX)
+ let mut file = fs::open_file(file_name, 0, u64::MAX)
.await
.err_tip(|| format!("Failed to open file: {file_name:?}"))?;
let mut data = vec![];
- file.as_reader()
- .await?
- .read_to_end(&mut data)
+ file.read_to_end(&mut data)
.await
.err_tip(|| "Error reading file to end")?;
Ok(data)
@@ -265,7 +257,6 @@ const VALUE1: &str = "0123456789";
const VALUE2: &str = "9876543210";
const STRING_NAME: &str = "String_Filename";
-#[serial]
#[nativelink_test]
async fn valid_results_after_shutdown_test() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -282,7 +273,6 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> {
})
.await?,
);
-
// Insert dummy value into store.
store.update_oneshot(digest, VALUE1.into()).await?;
@@ -314,7 +304,6 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -381,7 +370,6 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
// This test ensures that if a file is overridden and an open stream to the file already
// exists, the open stream will continue to work properly and when the stream is done the
// temporary file (of the object that was deleted) is cleaned up.
-#[serial]
#[nativelink_test]
async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> {
static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -418,7 +406,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
let digest1_clone = digest1;
background_spawn!(
"file_continues_to_stream_on_content_replace_test_store_get",
- async move { store_clone.get(digest1_clone, writer).await },
+ async move { store_clone.get(digest1_clone, writer).await.unwrap() },
);
{
@@ -489,7 +477,6 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
// Eviction has a different code path than a file replacement, so we check that if a
// file is evicted and has an open stream on it, it will stay alive and eventually
// get deleted.
-#[serial]
#[nativelink_test]
async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -520,14 +507,14 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
);
// Insert data into store.
- store.update_oneshot(digest1, VALUE1.into()).await?;
+ store.update_oneshot(digest1, VALUE1.into()).await.unwrap();
let mut reader = {
let (writer, reader) = make_buf_channel_pair();
let store_clone = store.clone();
background_spawn!(
"file_gets_cleans_up_on_cache_eviction_store_get",
- async move { store_clone.get(digest1, writer).await },
+ async move { store_clone.get(digest1, writer).await.unwrap() },
);
reader
};
@@ -583,7 +570,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
check_temp_empty(&temp_path).await
}
-#[serial]
#[nativelink_test]
async fn atime_updates_on_get_part_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -630,7 +616,6 @@ async fn atime_updates_on_get_part_test() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn eviction_drops_file_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -680,7 +665,6 @@ async fn eviction_drops_file_test() -> Result<(), Error> {
// Test to ensure that if we are holding a reference to `FileEntry` and the contents are
// replaced, the `FileEntry` continues to use the old data.
// `FileEntry` file contents should be immutable for the lifetime of the object.
-#[serial]
#[nativelink_test]
async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -701,11 +685,7 @@ async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error
// The file contents should equal our initial data.
let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
let mut file_contents = String::new();
- reader
- .as_reader()
- .await?
- .read_to_string(&mut file_contents)
- .await?;
+ reader.read_to_string(&mut file_contents).await?;
assert_eq!(file_contents, VALUE1);
}
@@ -716,18 +696,13 @@ async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error
// The file contents still equal our old data.
let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
let mut file_contents = String::new();
- reader
- .as_reader()
- .await?
- .read_to_string(&mut file_contents)
- .await?;
+ reader.read_to_string(&mut file_contents).await?;
assert_eq!(file_contents, VALUE1);
}
Ok(())
}
-#[serial]
#[nativelink_test]
async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
const SMALL_VALUE: &str = "01";
@@ -788,7 +763,6 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
#[allow(clippy::await_holding_refcell_ref)]
async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() -> Result<(), Error>
@@ -814,18 +788,11 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens()
let dir_entry = dir_entry?;
{
// Some filesystems won't sync automatically, so force it.
- let mut file_handle =
- fs::open_file(dir_entry.path().into_os_string(), u64::MAX)
- .await
- .err_tip(|| "Failed to open temp file")?;
+ let file_handle = fs::open_file(dir_entry.path().into_os_string(), 0, u64::MAX)
+ .await
+ .err_tip(|| "Failed to open temp file")?;
// We don't care if it fails, this is only best attempt.
- let _ = file_handle
- .as_reader()
- .await?
- .get_ref()
- .as_ref()
- .sync_all()
- .await;
+ let _ = file_handle.get_ref().as_ref().sync_all().await;
}
// Ensure we have written to the file too. This ensures we have an open file handle.
// Failing to do this may result in the file existing, but the `update_fut` not actually
@@ -924,7 +891,6 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens()
Ok(())
}
-#[serial]
#[nativelink_test]
async fn get_part_timeout_test() -> Result<(), Error> {
let large_value = "x".repeat(1024);
@@ -940,7 +906,6 @@ async fn get_part_timeout_test() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
@@ -972,7 +937,6 @@ async fn get_part_timeout_test() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn get_part_is_zero_digest() -> Result<(), Error> {
let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
@@ -987,7 +951,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
@@ -1014,7 +977,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn has_with_results_on_zero_digests() -> Result<(), Error> {
async fn wait_for_empty_content_file<
@@ -1055,7 +1017,6 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
@@ -1079,7 +1040,6 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> {
}
/// Regression test for: https://github.com/TraceMachina/nativelink/issues/495.
-#[serial]
#[nativelink_test(flavor = "multi_thread")]
async fn update_file_future_drops_before_rename() -> Result<(), Error> {
// Mutex can be used to signal to the rename function to pause execution.
@@ -1098,7 +1058,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
eviction_policy: None,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| {
// If someone locked our mutex, it means we need to pause, so we
// simply request a lock on the same mutex.
@@ -1167,7 +1126,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn deleted_file_removed_from_store() -> Result<(), Error> {
let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -1182,7 +1140,6 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
@@ -1193,11 +1150,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));
std::fs::remove_file(stored_file_path)?;
- let digest_result = store
- .has(digest)
- .await
- .err_tip(|| "Failed to execute has")?;
- assert!(digest_result.is_none());
+ let get_part_res = store.get_part_unchunked(digest, 0, None).await;
+ assert_eq!(get_part_res.unwrap_err().code, Code::NotFound);
// Repeat with a string typed key.
@@ -1205,16 +1159,14 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
store
.update_oneshot(string_key.borrow(), VALUE2.into())
- .await?;
+ .await
+ .unwrap();
let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}"));
std::fs::remove_file(stored_file_path)?;
- let string_result = store
- .has(string_key)
- .await
- .err_tip(|| "Failed to execute has")?;
- assert!(string_result.is_none());
+ let string_digest_get_part_res = store.get_part_unchunked(string_key, 0, None).await;
+ assert_eq!(string_digest_get_part_res.unwrap_err().code, Code::NotFound);
Ok(())
}
@@ -1224,7 +1176,6 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
// assume block size 4K
// 1B data size = 4K size on disk
// 5K data size = 8K size on disk
-#[serial]
#[nativelink_test]
async fn get_file_size_uses_block_size() -> Result<(), Error> {
let content_path = make_temp_path("content_path");
@@ -1244,7 +1195,6 @@ async fn get_file_size_uses_block_size() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
@@ -1260,7 +1210,6 @@ async fn get_file_size_uses_block_size() -> Result<(), Error> {
Ok(())
}
-#[serial]
#[nativelink_test]
async fn update_with_whole_file_closes_file() -> Result<(), Error> {
let mut permits = vec![];
@@ -1294,87 +1243,27 @@ async fn update_with_whole_file_closes_file() -> Result<(), Error> {
);
store.update_oneshot(digest, value.clone().into()).await?;
- let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
- {
- let writer = file.as_writer().await?;
- writer.write_all(value.as_bytes()).await?;
- writer.as_mut().sync_all().await?;
- writer.seek(tokio::io::SeekFrom::Start(0)).await?;
- }
-
- store
- .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
- .await?;
- Ok(())
-}
-
-#[serial]
-#[nativelink_test]
-async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<(), Error> {
- let mut permits = vec![];
- // Grab all permits to ensure only 1 permit is available.
- {
- wait_for_no_open_files().await?;
- while fs::OPEN_FILE_SEMAPHORE.available_permits() > 1 {
- permits.push(fs::get_permit().await);
- }
- assert_eq!(
- fs::OPEN_FILE_SEMAPHORE.available_permits(),
- 1,
- "Expected 1 permit to be available"
- );
- }
-
- let value = "x".repeat(1024);
-
- let digest = DigestInfo::try_new(HASH1, value.len())?;
-
- let store = FastSlowStore::new(
- // Note: The config is not needed for this test, so use dummy data.
- &FastSlowSpec {
- fast: StoreSpec::memory(MemorySpec::default()),
- slow: StoreSpec::memory(MemorySpec::default()),
- },
- Store::new(
- FilesystemStore::::new(&FilesystemSpec {
- content_path: make_temp_path("content_path"),
- temp_path: make_temp_path("temp_path"),
- read_buffer_size: 1,
- ..Default::default()
- })
- .await?,
- ),
- Store::new(
- FilesystemStore::::new(&FilesystemSpec {
- content_path: make_temp_path("content_path1"),
- temp_path: make_temp_path("temp_path1"),
- read_buffer_size: 1,
- ..Default::default()
- })
- .await?,
- ),
- );
- store.update_oneshot(digest, value.clone().into()).await?;
-
- let temp_path = make_temp_path("temp_path2");
- fs::create_dir_all(&temp_path).await?;
- let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
+ let file_path = OsString::from(format!("{temp_path}/dummy_file"));
+ let mut file = fs::create_file(&file_path).await?;
{
- let writer = file.as_writer().await?;
- writer.write_all(value.as_bytes()).await?;
- writer.as_mut().sync_all().await?;
- writer.seek(tokio::io::SeekFrom::Start(0)).await?;
+ file.write_all(value.as_bytes()).await?;
+ file.as_mut().sync_all().await?;
+ file.seek(tokio::io::SeekFrom::Start(0)).await?;
}
store
- .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
+ .update_with_whole_file(
+ digest,
+ file_path,
+ file,
+ UploadSizeInfo::ExactSize(value.len() as u64),
+ )
.await?;
Ok(())
}
// Ensure that update_with_whole_file() moves the file without making a copy.
#[cfg(target_family = "unix")]
-#[serial]
#[nativelink_test]
async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> {
use std::os::unix::fs::MetadataExt;
@@ -1393,36 +1282,35 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> {
read_buffer_size: 1,
..Default::default()
},
- |_| sleep(Duration::ZERO),
|from, to| std::fs::rename(from, to),
)
.await?,
);
- let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
- let original_inode = file
- .as_reader()
- .await?
- .get_ref()
- .as_ref()
- .metadata()
- .await?
- .ino();
+ let file_path = OsString::from(format!("{temp_path}/dummy_file"));
+ let original_inode = {
+ let file = fs::create_file(&file_path).await?;
+ let original_inode = file.as_ref().metadata().await?.ino();
- let result = store
- .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
- .await?;
- assert!(
- result.is_none(),
- "Expected filesystem store to consume the file"
- );
+ let result = store
+ .update_with_whole_file(
+ digest,
+ file_path,
+ file,
+ UploadSizeInfo::ExactSize(value.len() as u64),
+ )
+ .await?;
+ assert!(
+ result.is_none(),
+ "Expected filesystem store to consume the file"
+ );
+ original_inode
+ };
let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));
let new_inode = fs::create_file(expected_file_name)
- .await?
- .as_reader()
- .await?
- .get_ref()
+ .await
+ .unwrap()
.as_ref()
.metadata()
.await?
diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel
index 2029ce230..d8e40c4e5 100644
--- a/nativelink-util/BUILD.bazel
+++ b/nativelink-util/BUILD.bazel
@@ -66,6 +66,7 @@ rust_library(
"@crates//:prost",
"@crates//:prost-types",
"@crates//:rand",
+ "@crates//:rlimit",
"@crates//:serde",
"@crates//:sha2",
"@crates//:tokio",
@@ -87,7 +88,6 @@ rust_test_suite(
"tests/common_test.rs",
"tests/evicting_map_test.rs",
"tests/fastcdc_test.rs",
- "tests/fs_test.rs",
"tests/health_utils_test.rs",
"tests/operation_id_tests.rs",
"tests/origin_event_test.rs",
diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml
index 704632614..953f1004e 100644
--- a/nativelink-util/Cargo.toml
+++ b/nativelink-util/Cargo.toml
@@ -33,6 +33,7 @@ pin-project-lite = "0.2.16"
prost = { version = "0.13.4", default-features = false }
prost-types = { version = "0.13.4", default-features = false }
rand = { version = "0.8.5", default-features = false }
+rlimit = { version = "0.10.2", default-features = false }
serde = { version = "1.0.217", default-features = false }
sha2 = { version = "0.10.8", default-features = false }
tokio = { version = "1.43.0", features = ["fs", "rt-multi-thread", "signal", "io-util"], default-features = false }
diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs
index 178acc8fd..69f1cb6d7 100644
--- a/nativelink-util/src/buf_channel.rs
+++ b/nativelink-util/src/buf_channel.rs
@@ -372,7 +372,7 @@ impl DropCloserReadHalf {
let mut chunk = self
.recv()
.await
- .err_tip(|| "During first read of buf_channel::take()")?;
+ .err_tip(|| "During next read of buf_channel::take()")?;
if chunk.is_empty() {
break; // EOF.
}
diff --git a/nativelink-util/src/digest_hasher.rs b/nativelink-util/src/digest_hasher.rs
index 280311f7b..85882daec 100644
--- a/nativelink-util/src/digest_hasher.rs
+++ b/nativelink-util/src/digest_hasher.rs
@@ -25,7 +25,7 @@ use nativelink_metric::{
use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
-use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
use crate::common::DigestInfo;
use crate::origin_context::{ActiveOriginContext, OriginContext};
@@ -187,9 +187,10 @@ pub trait DigestHasher {
/// the file and feed it into the hasher.
fn digest_for_file(
self,
- file: fs::ResumeableFileSlot,
+ file_path: impl AsRef,
+ file: fs::FileSlot,
size_hint: Option,
- ) -> impl Future>;
+ ) -> impl Future>;
/// Utility function to compute a hash from a generic reader.
fn compute_from_reader(
@@ -229,11 +230,10 @@ impl DigestHasherImpl {
#[inline]
async fn hash_file(
&mut self,
- mut file: fs::ResumeableFileSlot,
- ) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
- let reader = file.as_reader().await.err_tip(|| "In digest_for_file")?;
+ mut file: fs::FileSlot,
+ ) -> Result<(DigestInfo, fs::FileSlot), Error> {
let digest = self
- .compute_from_reader(reader)
+ .compute_from_reader(&mut file)
.await
.err_tip(|| "In digest_for_file")?;
Ok((digest, file))
@@ -263,9 +263,10 @@ impl DigestHasher for DigestHasherImpl {
async fn digest_for_file(
mut self,
- mut file: fs::ResumeableFileSlot,
+ file_path: impl AsRef,
+ mut file: fs::FileSlot,
size_hint: Option,
- ) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
+ ) -> Result<(DigestInfo, fs::FileSlot), Error> {
let file_position = file
.stream_position()
.await
@@ -280,11 +281,12 @@ impl DigestHasher for DigestHasherImpl {
return self.hash_file(file).await;
}
}
+ let file_path = file_path.as_ref().to_path_buf();
match self.hash_func_impl {
DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await,
DigestHasherFuncImpl::Blake3(mut hasher) => {
spawn_blocking!("digest_for_file", move || {
- hasher.update_mmap(file.get_path()).map_err(|e| {
+ hasher.update_mmap(file_path).map_err(|e| {
make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}")
})?;
Result::<_, Error>::Ok((
diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs
index e28fbcc74..d5765b2e5 100644
--- a/nativelink-util/src/fs.rs
+++ b/nativelink-util/src/fs.rs
@@ -13,26 +13,20 @@
// limitations under the License.
use std::fs::Metadata;
-use std::io::IoSlice;
+use std::io::{IoSlice, Seek};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::OnceLock;
use std::task::{Context, Poll};
-use std::time::Duration;
-use bytes::BytesMut;
-use futures::Future;
use nativelink_error::{make_err, Code, Error, ResultExt};
+use rlimit::increase_nofile_limit;
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
/// issues.
pub use tokio::fs::DirEntry;
-use tokio::io::{
- AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, ReadBuf, SeekFrom, Take,
-};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
use tokio::sync::{Semaphore, SemaphorePermit};
-use tokio::time::timeout;
use tracing::{event, Level};
use crate::spawn_blocking;
@@ -40,171 +34,6 @@ use crate::spawn_blocking;
/// Default read buffer size when reading to/from disk.
pub const DEFAULT_READ_BUFF_SIZE: usize = 16384;
-type StreamPosition = u64;
-type BytesRemaining = u64;
-
-#[derive(Debug)]
-enum MaybeFileSlot {
- Open(Take),
- Closed((StreamPosition, BytesRemaining)),
-}
-
-/// A wrapper around a generic `FileSlot`. This gives us the ability to
-/// close a file and then resume it later. Specifically useful for cases
-/// piping data from one location to another and one side is slow at
-/// reading or writing the data, we can have a timeout, close the file
-/// and then reopen it later.
-///
-/// Note: This wraps both files opened for read and write, so we always
-/// need to know how the original file was opened and the location of
-/// the file. To simplify the code significantly we always require the
-/// file to be a `Take`.
-#[derive(Debug)]
-pub struct ResumeableFileSlot {
- maybe_file_slot: MaybeFileSlot,
- path: PathBuf,
- is_write: bool,
-}
-
-impl ResumeableFileSlot {
- pub fn new(file: FileSlot, path: PathBuf, is_write: bool) -> Self {
- Self {
- maybe_file_slot: MaybeFileSlot::Open(file.take(u64::MAX)),
- path,
- is_write,
- }
- }
-
- pub fn new_with_take(file: Take, path: PathBuf, is_write: bool) -> Self {
- Self {
- maybe_file_slot: MaybeFileSlot::Open(file),
- path,
- is_write,
- }
- }
-
- /// Returns the path of the file.
- pub fn get_path(&self) -> &Path {
- Path::new(&self.path)
- }
-
- /// Returns the current read position of a file.
- pub async fn stream_position(&mut self) -> Result {
- let file_slot = match &mut self.maybe_file_slot {
- MaybeFileSlot::Open(file_slot) => file_slot,
- MaybeFileSlot::Closed((pos, _)) => return Ok(*pos),
- };
- file_slot
- .get_mut()
- .inner
- .stream_position()
- .await
- .err_tip(|| "Failed to get file position in digest_for_file")
- }
-
- pub async fn close_file(&mut self) -> Result<(), Error> {
- let MaybeFileSlot::Open(file_slot) = &mut self.maybe_file_slot else {
- return Ok(());
- };
- let position = file_slot
- .get_mut()
- .inner
- .stream_position()
- .await
- .err_tip(|| format!("Failed to get file position {:?}", self.path))?;
- self.maybe_file_slot = MaybeFileSlot::Closed((position, file_slot.limit()));
- Ok(())
- }
-
- #[inline]
- pub async fn as_reader(&mut self) -> Result<&mut Take, Error> {
- let (stream_position, bytes_remaining) = match self.maybe_file_slot {
- MaybeFileSlot::Open(ref mut file_slot) => return Ok(file_slot),
- MaybeFileSlot::Closed(pos) => pos,
- };
- let permit = OPEN_FILE_SEMAPHORE
- .acquire()
- .await
- .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?;
- let inner = tokio::fs::OpenOptions::new()
- .write(self.is_write)
- .read(!self.is_write)
- .open(&self.path)
- .await
- .err_tip(|| format!("Could not open after resume {:?}", self.path))?;
- let mut file_slot = FileSlot {
- _permit: permit,
- inner,
- };
- file_slot
- .inner
- .seek(SeekFrom::Start(stream_position))
- .await
- .err_tip(|| {
- format!(
- "Failed to seek to position {stream_position} {:?}",
- self.path
- )
- })?;
-
- self.maybe_file_slot = MaybeFileSlot::Open(file_slot.take(bytes_remaining));
- match &mut self.maybe_file_slot {
- MaybeFileSlot::Open(file_slot) => Ok(file_slot),
- MaybeFileSlot::Closed(_) => unreachable!(),
- }
- }
-
- #[inline]
- pub async fn as_writer(&mut self) -> Result<&mut FileSlot, Error> {
- Ok(self.as_reader().await?.get_mut())
- }
-
- /// Utility function to read data from a handler and handles file descriptor
- /// timeouts. Chunk size is based on the `buf`'s capacity.
- /// Note: If the `handler` changes `buf`s capacity, it is responsible for reserving
- /// more before returning.
- pub async fn read_buf_cb<'b, T, F, Fut>(
- &'b mut self,
- (mut buf, mut state): (BytesMut, T),
- mut handler: F,
- ) -> Result<(BytesMut, T), Error>
- where
- F: (FnMut((BytesMut, T)) -> Fut) + 'b,
- Fut: Future> + 'b,
- {
- loop {
- buf.clear();
- self.as_reader()
- .await
- .err_tip(|| "Could not get reader from file slot in read_buf_cb")?
- .read_buf(&mut buf)
- .await
- .err_tip(|| "Could not read chunk during read_buf_cb")?;
- if buf.is_empty() {
- return Ok((buf, state));
- }
- let handler_fut = handler((buf, state));
- tokio::pin!(handler_fut);
- loop {
- match timeout(idle_file_descriptor_timeout(), &mut handler_fut).await {
- Ok(Ok(output)) => {
- (buf, state) = output;
- break;
- }
- Ok(Err(err)) => {
- return Err(err).err_tip(|| "read_buf_cb's handler returned an error")
- }
- Err(_) => {
- self.close_file()
- .await
- .err_tip(|| "Could not close file due to timeout in read_buf_cb")?;
- }
- }
- }
- }
- }
-}
-
#[derive(Debug)]
pub struct FileSlot {
// We hold the permit because once it is dropped it goes back into the queue.
@@ -283,7 +112,9 @@ impl AsyncWrite for FileSlot {
}
}
-const DEFAULT_OPEN_FILE_PERMITS: usize = 10;
+// Note: If the default changes make sure you update the documentation in
+// `config/cas_server.rs`.
+pub const DEFAULT_OPEN_FILE_PERMITS: usize = 24 * 1024; // 24k.
static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS);
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS);
@@ -309,6 +140,42 @@ where
}
pub fn set_open_file_limit(limit: usize) {
+ let new_limit = {
+ // We increase the limit by 20% to give extra
+ // room for other file descriptors like sockets,
+ // pipes, and other things.
+ let fs_ulimit =
+ u64::try_from(limit.saturating_add(limit / 5)).expect("set_open_file_limit too large");
+ match increase_nofile_limit(fs_ulimit) {
+ Ok(new_fs_ulimit) => {
+ event!(
+ Level::INFO,
+ "set_open_file_limit({limit})::ulimit success. New fs.ulimit: {fs_ulimit} (20% increase of {limit}).",
+ );
+ usize::try_from(new_fs_ulimit).expect("new_fs_ulimit too large")
+ }
+ Err(e) => {
+ event!(
+ Level::ERROR,
+ "set_open_file_limit({limit})::ulimit failed. Maybe system does not have ulimits, continuing anyway. - {e:?}",
+ );
+ limit
+ }
+ }
+ };
+ if new_limit < DEFAULT_OPEN_FILE_PERMITS {
+ event!(
+ Level::WARN,
+ "set_open_file_limit({limit}) succeeded, but this is below the default limit of {DEFAULT_OPEN_FILE_PERMITS}. Will continue, but we recommend increasing the limit to at least the default.",
+ );
+ }
+ if new_limit < limit {
+ event!(
+ Level::WARN,
+ "set_open_file_limit({limit}) succeeded, but new open file limit is {new_limit}. Will continue, but likely a config or system options (ie: ulimit) needs updated.",
+ );
+ }
+
let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire);
if limit < current_total {
event!(
@@ -327,45 +194,33 @@ pub fn get_open_files_for_test() -> usize {
TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
}
-/// How long a file descriptor can be open without being used before it is closed.
-static IDLE_FILE_DESCRIPTOR_TIMEOUT: OnceLock = OnceLock::new();
-
-pub fn idle_file_descriptor_timeout() -> Duration {
- *IDLE_FILE_DESCRIPTOR_TIMEOUT.get_or_init(|| Duration::MAX)
-}
-
-/// Set the idle file descriptor timeout. This is the amount of time
-/// a file descriptor can be open without being used before it is closed.
-pub fn set_idle_file_descriptor_timeout(timeout: Duration) -> Result<(), Error> {
- IDLE_FILE_DESCRIPTOR_TIMEOUT
- .set(timeout)
- .map_err(|_| make_err!(Code::Internal, "idle_file_descriptor_timeout already set"))
-}
-
-pub async fn open_file(path: impl AsRef, limit: u64) -> Result {
+pub async fn open_file(
+ path: impl AsRef,
+ start: u64,
+ limit: u64,
+) -> Result, Error> {
let path = path.as_ref().to_owned();
- let (permit, os_file, path) = call_with_permit(move |permit| {
- Ok((
- permit,
- std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}"))?,
- path,
- ))
+ let (permit, os_file) = call_with_permit(move |permit| {
+ let mut os_file =
+ std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}"))?;
+ if start > 0 {
+ os_file
+ .seek(std::io::SeekFrom::Start(start))
+ .err_tip(|| format!("Could not seek to {start} in {path:?}"))?;
+ }
+ Ok((permit, os_file))
})
.await?;
- Ok(ResumeableFileSlot::new_with_take(
- FileSlot {
- _permit: permit,
- inner: tokio::fs::File::from_std(os_file),
- }
- .take(limit),
- path,
- false, /* is_write */
- ))
+ Ok(FileSlot {
+ _permit: permit,
+ inner: tokio::fs::File::from_std(os_file),
+ }
+ .take(limit))
}
-pub async fn create_file(path: impl AsRef) -> Result {
+pub async fn create_file(path: impl AsRef) -> Result {
let path = path.as_ref().to_owned();
- let (permit, os_file, path) = call_with_permit(move |permit| {
+ let (permit, os_file) = call_with_permit(move |permit| {
Ok((
permit,
std::fs::File::options()
@@ -375,18 +230,13 @@ pub async fn create_file(path: impl AsRef) -> Result, dst: impl AsRef) -> Result<(), Error> {
diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs
index 4f01194b3..39fad9b9c 100644
--- a/nativelink-util/src/store_trait.rs
+++ b/nativelink-util/src/store_trait.rs
@@ -15,6 +15,7 @@
use std::borrow::{Borrow, BorrowMut, Cow};
use std::collections::hash_map::DefaultHasher as StdHasher;
use std::convert::Into;
+use std::ffi::OsString;
use std::hash::{Hash, Hasher};
use std::ops::{Bound, RangeBounds};
use std::pin::Pin;
@@ -23,20 +24,18 @@ use std::sync::{Arc, OnceLock};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
-use futures::future::{select, Either};
use futures::{join, try_join, Future, FutureExt, Stream};
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use rand::rngs::StdRng;
use rand::{RngCore, SeedableRng};
use serde::{Deserialize, Serialize};
-use tokio::io::AsyncSeekExt;
-use tokio::time::timeout;
+use tokio::io::{AsyncReadExt, AsyncSeekExt};
use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use crate::common::DigestInfo;
use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
-use crate::fs::{self, idle_file_descriptor_timeout};
+use crate::fs;
use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock = OnceLock::new();
@@ -79,54 +78,37 @@ pub enum UploadSizeInfo {
pub async fn slow_update_store_with_file(
store: Pin<&S>,
digest: impl Into>,
- file: &mut fs::ResumeableFileSlot,
+ file: &mut fs::FileSlot,
upload_size: UploadSizeInfo,
) -> Result<(), Error> {
- file.as_writer()
- .await
- .err_tip(|| "Failed to get writer in upload_file_to_store")?
- .rewind()
+ file.rewind()
.await
.err_tip(|| "Failed to rewind in upload_file_to_store")?;
- let (tx, rx) = make_buf_channel_pair();
+ let (mut tx, rx) = make_buf_channel_pair();
- let mut update_fut = store
+ let update_fut = store
.update(digest.into(), rx, upload_size)
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
- let read_result = {
- let read_data_fut = async {
- let (_, mut tx) = file
- .read_buf_cb(
- (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
- move |(chunk, mut tx)| async move {
- tx.send(chunk.freeze())
- .await
- .err_tip(|| "Failed to send in upload_file_to_store")?;
- Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
- },
- )
+ let read_data_fut = async move {
+ loop {
+ let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
+ let read = file
+ .read_buf(&mut buf)
.await
- .err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?;
- tx.send_eof()
- .err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
- Ok(())
- };
- tokio::pin!(read_data_fut);
- match select(&mut update_fut, read_data_fut).await {
- Either::Left((update_result, read_data_fut)) => {
- return update_result.merge(read_data_fut.await)
+ .err_tip(|| "Failed to read in upload_file_to_store")?;
+ if read == 0 {
+ break;
}
- Either::Right((read_result, _)) => read_result,
+ tx.send(buf.freeze())
+ .await
+ .err_tip(|| "Failed to send in upload_file_to_store")?;
}
+ tx.send_eof()
+ .err_tip(|| "Could not send EOF to store in upload_file_to_store")
};
- if let Ok(update_result) = timeout(idle_file_descriptor_timeout(), &mut update_fut).await {
- update_result.merge(read_result)
- } else {
- file.close_file()
- .await
- .err_tip(|| "Failed to close file in upload_file_to_store")?;
- update_fut.await.merge(read_result)
- }
+ tokio::pin!(read_data_fut);
+ let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
+ update_res.merge(read_res)
}
/// Optimizations that stores may want to expose to the callers.
@@ -495,18 +477,19 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
self.as_store_driver_pin().optimized_for(optimization)
}
- /// Specialized version of `.update()` which takes a `ResumeableFileSlot`.
+ /// Specialized version of `.update()` which takes a `FileSlot`.
/// This is useful if the underlying store can optimize the upload process
/// when it knows the data is coming from a file.
#[inline]
fn update_with_whole_file<'a>(
&'a self,
digest: impl Into>,
- file: fs::ResumeableFileSlot,
+ path: OsString,
+ file: fs::FileSlot,
upload_size: UploadSizeInfo,
- ) -> impl Future, Error>> + Send + 'a {
+ ) -> impl Future, Error>> + Send + 'a {
self.as_store_driver_pin()
- .update_with_whole_file(digest.into(), file, upload_size)
+ .update_with_whole_file(digest.into(), path, file, upload_size)
}
/// Utility to send all the data to the store when you have all the bytes.
@@ -635,9 +618,10 @@ pub trait StoreDriver:
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
- mut file: fs::ResumeableFileSlot,
+ path: OsString,
+ mut file: fs::FileSlot,
upload_size: UploadSizeInfo,
- ) -> Result, Error> {
+ ) -> Result , Error> {
let inner_store = self.inner_store(Some(key.borrow()));
if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
error_if!(
@@ -645,7 +629,7 @@ pub trait StoreDriver:
"Store::inner_store() returned self when optimization present"
);
return Pin::new(inner_store)
- .update_with_whole_file(key, file, upload_size)
+ .update_with_whole_file(key, path, file, upload_size)
.await;
}
slow_update_store_with_file(self, key, &mut file, upload_size).await?;
diff --git a/nativelink-util/tests/fs_test.rs b/nativelink-util/tests/fs_test.rs
deleted file mode 100644
index c215e537f..000000000
--- a/nativelink-util/tests/fs_test.rs
+++ /dev/null
@@ -1,180 +0,0 @@
-// Copyright 2024 The NativeLink Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::env;
-use std::ffi::OsString;
-use std::io::SeekFrom;
-use std::str::from_utf8;
-
-use nativelink_error::Error;
-use nativelink_macro::nativelink_test;
-use nativelink_util::common::fs;
-use pretty_assertions::assert_eq;
-use rand::{thread_rng, Rng};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio::sync::Semaphore;
-
-/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if
-/// not set.
-async fn make_temp_path(data: &str) -> OsString {
- let dir = format!(
- "{}/{}",
- env::var("TEST_TMPDIR").unwrap_or(env::temp_dir().to_str().unwrap().to_string()),
- thread_rng().gen::(),
- );
- fs::create_dir_all(&dir).await.unwrap();
- OsString::from(format!("{dir}/{data}"))
-}
-
-static TEST_EXCLUSIVE_SEMAPHORE: Semaphore = Semaphore::const_new(1);
-
-#[nativelink_test]
-async fn resumeable_file_slot_write_close_write_test() -> Result<(), Error> {
- let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
- let filename = make_temp_path("test_file.txt").await;
- {
- let mut file = fs::create_file(&filename).await?;
- file.as_writer().await?.write_all(b"Hello").await?;
- file.close_file().await?;
- assert_eq!(fs::get_open_files_for_test(), 0);
- file.as_writer().await?.write_all(b"Goodbye").await?;
- assert_eq!(fs::get_open_files_for_test(), 1);
- file.as_writer().await?.as_mut().sync_all().await?;
- }
- assert_eq!(fs::get_open_files_for_test(), 0);
- {
- let mut file = fs::open_file(&filename, u64::MAX).await?;
- let mut contents = String::new();
- file.as_reader()
- .await?
- .read_to_string(&mut contents)
- .await?;
- assert_eq!(contents, "HelloGoodbye");
- }
- Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_test() -> Result<(), Error> {
- const DUMMYDATA: &str = "DummyDataTest";
- let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
- let filename = make_temp_path("test_file.txt").await;
- {
- let mut file = fs::create_file(&filename).await?;
- file.as_writer()
- .await?
- .write_all(DUMMYDATA.as_bytes())
- .await?;
- file.as_writer().await?.as_mut().sync_all().await?;
- }
- {
- let mut file = fs::open_file(&filename, u64::MAX).await?;
- let mut contents = [0u8; 5];
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy");
- }
- file.close_file().await?;
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT");
- }
- file.close_file().await?;
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 3);
- assert_eq!(from_utf8(&contents[..3]).unwrap(), "est");
- }
- }
- Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_with_take_test() -> Result<(), Error> {
- const DUMMYDATA: &str = "DummyDataTest";
- let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
- let filename = make_temp_path("test_file.txt").await;
- {
- let mut file = fs::create_file(&filename).await?;
- file.as_writer()
- .await?
- .write_all(DUMMYDATA.as_bytes())
- .await?;
- file.as_writer().await?.as_mut().sync_all().await?;
- }
- {
- let mut file = fs::open_file(&filename, 11).await?;
- let mut contents = [0u8; 5];
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy");
- }
- assert_eq!(fs::get_open_files_for_test(), 1);
- file.close_file().await?;
- assert_eq!(fs::get_open_files_for_test(), 0);
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT");
- }
- assert_eq!(fs::get_open_files_for_test(), 1);
- file.close_file().await?;
- assert_eq!(fs::get_open_files_for_test(), 0);
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1);
- assert_eq!(from_utf8(&contents[..1]).unwrap(), "e");
- }
- }
- Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_with_take_and_seek_test() -> Result<(), Error> {
- const DUMMYDATA: &str = "DummyDataTest";
- let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
- let filename = make_temp_path("test_file.txt").await;
- {
- let mut file = fs::create_file(&filename).await?;
- file.as_writer()
- .await?
- .write_all(DUMMYDATA.as_bytes())
- .await?;
- file.as_writer().await?.as_mut().sync_all().await?;
- }
- {
- let mut file = fs::open_file(&filename, 11).await?;
- file.as_reader()
- .await?
- .get_mut()
- .seek(SeekFrom::Start(2))
- .await?;
- let mut contents = [0u8; 5];
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "mmyDa");
- }
- assert_eq!(fs::get_open_files_for_test(), 1);
- file.close_file().await?;
- assert_eq!(fs::get_open_files_for_test(), 0);
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
- assert_eq!(from_utf8(&contents[..]).unwrap(), "taTes");
- }
- file.close_file().await?;
- assert_eq!(fs::get_open_files_for_test(), 0);
- {
- assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1);
- assert_eq!(from_utf8(&contents[..1]).unwrap(), "t");
- }
- }
- Ok(())
-}
diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel
index 111f96763..fd054831d 100644
--- a/nativelink-worker/BUILD.bazel
+++ b/nativelink-worker/BUILD.bazel
@@ -76,6 +76,7 @@ rust_test_suite(
"@crates//:prost",
"@crates//:prost-types",
"@crates//:rand",
+ "@crates//:serial_test",
"@crates//:tokio",
"@crates//:tonic",
],
diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml
index e136ec431..a59aa957c 100644
--- a/nativelink-worker/Cargo.toml
+++ b/nativelink-worker/Cargo.toml
@@ -39,3 +39,4 @@ hyper-util = "0.1.10"
pretty_assertions = { version = "1.4.1", features = ["std"] }
prost-types = { version = "0.13.4", default-features = false }
rand = { version = "0.8.5", default-features = false }
+serial_test = { version = "3.2.0", features = ["async"], default-features = false }
diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs
index 044f54785..f95974ea3 100644
--- a/nativelink-worker/src/running_actions_manager.rs
+++ b/nativelink-worker/src/running_actions_manager.rs
@@ -261,24 +261,17 @@ async fn upload_file(
) -> Result {
let is_executable = is_executable(&metadata, &full_path);
let file_size = metadata.len();
- let resumeable_file = fs::open_file(&full_path, u64::MAX)
+ let file = fs::open_file(&full_path, 0, u64::MAX)
.await
.err_tip(|| format!("Could not open file {full_path:?}"))?;
- let (digest, mut resumeable_file) = hasher
+ let (digest, mut file) = hasher
.hasher()
- .digest_for_file(resumeable_file, Some(file_size))
+ .digest_for_file(&full_path, file.into_inner(), Some(file_size))
.await
.err_tip(|| format!("Failed to hash file in digest_for_file failed for {full_path:?}"))?;
- resumeable_file
- .as_reader()
- .await
- .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?
- .get_mut()
- .rewind()
- .await
- .err_tip(|| "Could not rewind file")?;
+ file.rewind().await.err_tip(|| "Could not rewind file")?;
// Note: For unknown reasons we appear to be hitting:
// https://github.com/rust-lang/rust/issues/92096
@@ -288,7 +281,8 @@ async fn upload_file(
.as_store_driver_pin()
.update_with_whole_file(
digest.into(),
- resumeable_file,
+ full_path.as_ref().into(),
+ file,
UploadSizeInfo::ExactSize(digest.size_bytes()),
)
.await
@@ -490,7 +484,7 @@ async fn process_side_channel_file(
let mut json_contents = String::new();
{
// Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
- let mut file_slot = match fs::open_file(side_channel_file, u64::MAX).await {
+ let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
Ok(file_slot) => file_slot,
Err(e) => {
if e.code != Code::NotFound {
@@ -500,11 +494,7 @@ async fn process_side_channel_file(
return Ok(None);
}
};
- let reader = file_slot
- .as_reader()
- .await
- .err_tip(|| "Error getting reader from side channel file (maybe permissions?)")?;
- reader
+ file_slot
.read_to_string(&mut json_contents)
.await
.err_tip(|| "Error reading side channel file")?;
@@ -992,12 +982,6 @@ impl RunningActionImpl {
?err,
"Could not kill process",
);
- } else {
- event!(
- Level::ERROR,
- operation_id = ?self.operation_id,
- "Could not get child process id, maybe already dead?",
- );
}
{
let mut state = self.state.lock();
diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs
index 7d0350c94..6e90e9949 100644
--- a/nativelink-worker/tests/local_worker_test.rs
+++ b/nativelink-worker/tests/local_worker_test.rs
@@ -75,7 +75,6 @@ fn make_temp_path(data: &str) -> String {
)
}
-#[cfg_attr(feature = "nix", ignore)]
#[nativelink_test]
async fn platform_properties_smoke_test() -> Result<(), Error> {
let mut platform_properties = HashMap::new();
@@ -132,7 +131,7 @@ async fn platform_properties_smoke_test() -> Result<(), Error> {
}
#[nativelink_test]
-async fn reconnect_on_server_disconnect_test() -> Result<(), Box> {
+async fn reconnect_on_server_disconnect_test() -> Result<(), Error> {
let mut test_context = setup_local_worker(HashMap::new()).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -162,7 +161,7 @@ async fn reconnect_on_server_disconnect_test() -> Result<(), Box Result<(), Box> {
+async fn kill_all_called_on_disconnect() -> Result<(), Error> {
let mut test_context = setup_local_worker(HashMap::new()).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -179,11 +178,14 @@ async fn kill_all_called_on_disconnect() -> Result<(), Box Result<(), Box Result<(), Box> {
+async fn blake3_digest_function_registerd_properly() -> Result<(), Error> {
let mut test_context = setup_local_worker(HashMap::new()).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -217,11 +219,14 @@ async fn blake3_digest_function_registerd_properly() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> {
+async fn simple_worker_start_action_test() -> Result<(), Error> {
let mut test_context = setup_local_worker(HashMap::new()).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -301,11 +309,14 @@ async fn simple_worker_start_action_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> {
+async fn new_local_worker_creates_work_directory_test() -> Result<(), Error> {
let cas_store = Store::new(FastSlowStore::new(
&FastSlowSpec {
// Note: These are not needed for this test, so we put dummy memory stores here.
@@ -450,8 +464,7 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Box Result<(), Box> {
+async fn new_local_worker_removes_work_directory_before_start_test() -> Result<(), Error> {
let cas_store = Store::new(FastSlowStore::new(
&FastSlowSpec {
// Note: These are not needed for this test, so we put dummy memory stores here.
@@ -473,8 +486,8 @@ async fn new_local_worker_removes_work_directory_before_start_test(
fs::create_dir_all(format!("{}/{}", work_directory, "another_dir")).await?;
let mut file =
fs::create_file(OsString::from(format!("{}/{}", work_directory, "foo.txt"))).await?;
- file.as_writer().await?.write_all(b"Hello, world!").await?;
- file.as_writer().await?.as_mut().sync_all().await?;
+ file.write_all(b"Hello, world!").await?;
+ file.as_mut().sync_all().await?;
drop(file);
new_local_worker(
Arc::new(LocalWorkerConfig {
@@ -498,7 +511,7 @@ async fn new_local_worker_removes_work_directory_before_start_test(
}
#[nativelink_test]
-async fn experimental_precondition_script_fails() -> Result<(), Box> {
+async fn experimental_precondition_script_fails() -> Result<(), Error> {
#[cfg(target_family = "unix")]
const EXPECTED_MSG: &str = "Preconditions script returned status exit status: 1 - ";
#[cfg(target_family = "windows")]
@@ -516,8 +529,7 @@ async fn experimental_precondition_script_fails() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> {
+async fn kill_action_request_kills_action() -> Result<(), Error> {
let mut test_context = setup_local_worker(HashMap::new()).await;
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -638,11 +654,14 @@ async fn kill_action_request_kills_action() -> Result<(), Box Result<(), Box Result<(), Box SystemTime {
previous_time
}
+#[serial]
#[nativelink_test]
async fn download_to_directory_file_download_test() -> Result<(), Box> {
const FILE1_NAME: &str = "file1.txt";
@@ -242,6 +245,7 @@ async fn download_to_directory_file_download_test() -> Result<(), Box Result<(), Box> {
const DIRECTORY1_NAME: &str = "folder1";
@@ -340,6 +344,7 @@ async fn download_to_directory_folder_download_test() -> Result<(), Box Result<(), Box> {
const FILE_NAME: &str = "file.txt";
@@ -411,6 +416,7 @@ async fn download_to_directory_symlink_download_test() -> Result<(), Box Result<(), Box> {
@@ -449,6 +455,10 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t
let command = Command {
arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()],
output_files: vec!["some/path/test.txt".to_string()],
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -528,6 +538,7 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t
Ok(())
}
+#[serial]
#[nativelink_test]
async fn ensure_output_files_full_directories_are_created_test(
) -> Result<(), Box> {
@@ -568,6 +579,10 @@ async fn ensure_output_files_full_directories_are_created_test(
arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()],
output_files: vec!["some/path/test.txt".to_string()],
working_directory: working_directory.to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -648,6 +663,7 @@ async fn ensure_output_files_full_directories_are_created_test(
Ok(())
}
+#[serial]
#[nativelink_test]
async fn blake3_upload_files() -> Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -702,6 +718,10 @@ async fn blake3_upload_files() -> Result<(), Box> {
arguments,
output_paths: vec!["test.txt".to_string()],
working_directory: working_directory.to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -823,6 +843,7 @@ async fn blake3_upload_files() -> Result<(), Box> {
Ok(())
}
+#[serial]
#[nativelink_test]
async fn upload_files_from_above_cwd_test() -> Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -877,6 +898,10 @@ async fn upload_files_from_above_cwd_test() -> Result<(), Box Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -1206,6 +1231,7 @@ async fn upload_dir_and_symlink_test() -> Result<(), Box>
Ok(())
}
+#[serial]
#[nativelink_test]
async fn cleanup_happens_on_job_failure() -> Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -1251,6 +1277,10 @@ async fn cleanup_happens_on_job_failure() -> Result<(), Box Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -1365,11 +1395,17 @@ async fn kill_ends_action() -> Result<(), Box> {
timeout_handled_externally: false,
})?);
+ let process_started_file = {
+ let tmp_dir = make_temp_path("root_action_directory");
+ fs::create_dir_all(&tmp_dir).await.unwrap();
+ format!("{tmp_dir}/process_started")
+ };
+
#[cfg(target_family = "unix")]
let arguments = vec![
"sh".to_string(),
"-c".to_string(),
- "sleep infinity".to_string(),
+ format!("touch {process_started_file} && sleep infinity"),
];
#[cfg(target_family = "windows")]
// Windows is weird with timeout, so we use ping. See:
@@ -1384,6 +1420,10 @@ async fn kill_ends_action() -> Result<(), Box> {
arguments,
output_paths: vec![],
working_directory: ".".to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -1430,16 +1470,27 @@ async fn kill_ends_action() -> Result<(), Box> {
)
.await?;
- // Start the action and kill it at the same time.
- let result = futures::join!(
- run_action(running_action_impl),
- running_actions_manager.kill_all()
- )
- .0?;
+ let run_action_fut = run_action(running_action_impl);
+ tokio::pin!(run_action_fut);
+ loop {
+ assert_eq!(poll!(&mut run_action_fut), Poll::Pending);
+ tokio::task::yield_now().await;
+ match fs::metadata(&process_started_file).await {
+ Ok(_) => break,
+ Err(err) => {
+ assert_eq!(err.code, Code::NotFound, "Unknown error {err:?}");
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+ }
+ }
+
+ let result = futures::join!(run_action_fut, running_actions_manager.kill_all())
+ .0
+ .unwrap();
// Check that the action was killed.
#[cfg(target_family = "unix")]
- assert_eq!(9, result.exit_code);
+ assert_eq!(9, result.exit_code, "Wrong exit_code - {result:?}");
// Note: Windows kill command returns exit code 1.
#[cfg(target_family = "windows")]
assert_eq!(1, result.exit_code);
@@ -1451,7 +1502,7 @@ async fn kill_ends_action() -> Result<(), Box> {
// The wrapper script will print a constant string to stderr, and the test itself will
// print to stdout. We then check the results of both to make sure the shell script was
// invoked and the actual command was invoked under the shell script.
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
#[nativelink_test]
async fn entrypoint_does_invoke_if_set() -> Result<(), Box> {
#[cfg(target_family = "unix")]
@@ -1505,9 +1556,6 @@ exit 0
test_wrapper_script
};
- // TODO(#527) Sleep to reduce flakey chances.
- tokio::time::sleep(Duration::from_millis(250)).await;
-
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_action_directory: root_action_directory.clone(),
@@ -1601,7 +1649,7 @@ exit 0
Ok(())
}
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
#[nativelink_test]
async fn entrypoint_injects_properties() -> Result<(), Box> {
#[cfg(target_family = "unix")]
@@ -1656,9 +1704,6 @@ exit 0
test_wrapper_script
};
- // TODO(#527) Sleep to reduce flakey chances.
- tokio::time::sleep(Duration::from_millis(250)).await;
-
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_action_directory: root_action_directory.clone(),
@@ -1701,6 +1746,10 @@ exit 0
let command = Command {
arguments,
working_directory: ".".to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -1783,7 +1832,7 @@ exit 0
Ok(())
}
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
#[nativelink_test]
async fn entrypoint_sends_timeout_via_side_channel() -> Result<(), Box> {
#[cfg(target_family = "unix")]
@@ -1826,9 +1875,6 @@ exit 1
test_wrapper_script
};
- // TODO(#527) Sleep to reduce flakey chances.
- tokio::time::sleep(Duration::from_millis(250)).await;
-
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_action_directory: root_action_directory.clone(),
@@ -1854,6 +1900,10 @@ exit 1
let command = Command {
arguments,
working_directory: ".".to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -1909,6 +1959,7 @@ exit 1
Ok(())
}
+#[serial]
#[nativelink_test]
async fn caches_results_in_action_cache_store() -> Result<(), Box> {
let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -1980,6 +2031,7 @@ async fn caches_results_in_action_cache_store() -> Result<(), Box Result<(), Box> {
let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2051,6 +2103,7 @@ async fn failed_action_does_not_cache_in_action_cache() -> Result<(), Box Result<(), Box> {
let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2150,6 +2203,7 @@ async fn success_does_cache_in_historical_results() -> Result<(), Box Result<(), Box> {
let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2189,6 +2243,7 @@ async fn failure_does_not_cache_in_historical_results() -> Result<(), Box Result<(), Box>
{
@@ -2257,6 +2312,7 @@ async fn infra_failure_does_cache_in_historical_results() -> Result<(), Box Result<(), Box> {
let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2307,7 +2363,7 @@ async fn action_result_has_used_in_message() -> Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -2336,6 +2392,10 @@ async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -2661,6 +2722,10 @@ async fn worker_times_out() -> Result<(), Box> {
arguments,
output_paths: vec![],
working_directory: ".".to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -2729,7 +2794,7 @@ async fn worker_times_out() -> Result<(), Box> {
Ok(())
}
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
#[nativelink_test]
async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -2896,7 +2961,7 @@ async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -2999,6 +3064,7 @@ async fn unix_executable_file_test() -> Result<(), Box> {
Ok(())
}
+#[serial]
#[nativelink_test]
async fn action_directory_contents_are_cleaned() -> Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -3096,11 +3162,11 @@ async fn action_directory_contents_are_cleaned() -> Result<(), Box Result<(), Box> {
const WORKER_ID: &str = "foo_worker_id";
@@ -3161,6 +3227,10 @@ async fn upload_with_single_permit() -> Result<(), Box> {
arguments,
output_paths: vec!["test.txt".to_string(), "tst".to_string()],
working_directory: working_directory.to_string(),
+ environment_variables: vec![EnvironmentVariable {
+ name: "PATH".to_string(),
+ value: std::env::var("PATH").unwrap(),
+ }],
..Default::default()
};
let command_digest = serialize_and_upload_message(
@@ -3287,7 +3357,7 @@ async fn upload_with_single_permit() -> Result<(), Box> {
Ok(())
}
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
#[nativelink_test]
async fn running_actions_manager_respects_action_timeout() -> Result<(), Box>
{
diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs
index f3d1dc931..e3d3dea6d 100644
--- a/src/bin/nativelink.rs
+++ b/src/bin/nativelink.rs
@@ -48,7 +48,7 @@ use nativelink_service::health_server::HealthServer;
use nativelink_service::worker_api_server::WorkerApiServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::store_manager::StoreManager;
-use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
+use nativelink_util::common::fs::set_open_file_limit;
use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
use nativelink_util::health_utils::HealthRegistryBuilder;
use nativelink_util::metrics_utils::{set_metrics_enabled_for_this_thread, Counter};
@@ -61,7 +61,7 @@ use nativelink_util::store_trait::{
set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
};
use nativelink_util::task::TaskExecutor;
-use nativelink_util::{background_spawn, init_tracing, spawn, spawn_blocking};
+use nativelink_util::{background_spawn, fs, init_tracing, spawn, spawn_blocking};
use nativelink_worker::local_worker::new_local_worker;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
@@ -1009,20 +1009,10 @@ fn main() -> Result<(), Box> {
let mut cfg = futures::executor::block_on(get_config())?;
- let (mut metrics_enabled, max_blocking_threads) = {
- // Note: If the default changes make sure you update the documentation in
- // `config/cas_server.rs`.
- const DEFAULT_MAX_OPEN_FILES: usize = 512;
- // Note: If the default changes make sure you update the documentation in
- // `config/cas_server.rs`.
- const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
+ let mut metrics_enabled = {
let global_cfg = if let Some(global_cfg) = &mut cfg.global {
if global_cfg.max_open_files == 0 {
- global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
- }
- if global_cfg.idle_file_descriptor_timeout_millis == 0 {
- global_cfg.idle_file_descriptor_timeout_millis =
- DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
+ global_cfg.max_open_files = fs::DEFAULT_OPEN_FILE_PERMITS;
}
if global_cfg.default_digest_size_health_check == 0 {
global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
@@ -1031,8 +1021,7 @@ fn main() -> Result<(), Box> {
*global_cfg
} else {
GlobalConfig {
- max_open_files: DEFAULT_MAX_OPEN_FILES,
- idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
+ max_open_files: fs::DEFAULT_OPEN_FILE_PERMITS,
disable_metrics: cfg.servers.iter().all(|v| {
let Some(service) = &v.services else {
return true;
@@ -1044,17 +1033,13 @@ fn main() -> Result<(), Box> {
}
};
set_open_file_limit(global_cfg.max_open_files);
- set_idle_file_descriptor_timeout(Duration::from_millis(
- global_cfg.idle_file_descriptor_timeout_millis,
- ))?;
set_default_digest_hasher_func(DigestHasherFunc::from(
global_cfg
.default_digest_hash_function
.unwrap_or(ConfigDigestHashFunction::sha256),
))?;
set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
- // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten
- (!global_cfg.disable_metrics, global_cfg.max_open_files * 10)
+ !global_cfg.disable_metrics
};
// Override metrics enabled if the environment variable is set.
if std::env::var(METRICS_DISABLE_ENV).is_ok() {
@@ -1067,7 +1052,6 @@ fn main() -> Result<(), Box> {
#[allow(clippy::disallowed_methods)]
{
let runtime = tokio::runtime::Builder::new_multi_thread()
- .max_blocking_threads(max_blocking_threads)
.enable_all()
.on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
.build()?;