Skip to content

Commit

Permalink
Fast slow store directions
Browse files Browse the repository at this point in the history
There are multiple use cases where we don't want a fast-slow store to
persist to one of the stores in some direction.  For example, worker
nodes do not want to store build results on the local filesystem, just
with the upstream CAS.  Another case would be the re-use of prod action
cache in a dev environment, but not vice-versa.

This PR introduces options to the fast-slow store which default to the
existing behaviour, but allows customisation of each side of the fast
slow store to either persist in the case or get operations, put
operations or to make them read only.

Fixes #1577
  • Loading branch information
chrisstaite committed Feb 8, 2025
1 parent 7afe286 commit 251dda7
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 13 deletions.
1 change: 1 addition & 0 deletions deployment-examples/docker-compose/worker.json5
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
}
}
},
"fast_direction": "get",
"slow": {
"ref_store": {
"name": "GRPC_LOCAL_STORE"
Expand Down
1 change: 1 addition & 0 deletions kubernetes/components/worker/worker.json5
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
}
}
},
"fast_direction": "get",
"slow": {
"ref_store": {
"name": "GRPC_LOCAL_STORE"
Expand Down
30 changes: 30 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,46 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum StoreDirection {
/// The store operates normally and all get and put operations are
/// handled by it.
#[default]
Both,
/// Update operations will cause persistence to this store, but Get
/// operations will be ignored.
/// This only makes sense on the fast store as the slow store will
/// never get written to on Get anyway.
Update,
/// Get operations will cause persistence to this store, but Update
/// operations will be ignored.
Get,
/// Operate as a read only store, only really makes sense if there's
/// another way to write to it.
ReadOnly,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
/// Fast store that will be attempted to be contacted before reaching
/// out to the `slow` store.
pub fast: StoreSpec,

/// How to handle the fast store. This can be useful to set to Get for
/// worker nodes such that results are persisted to the slow store only.
#[serde(default)]
pub fast_direction: StoreDirection,

/// If the object does not exist in the `fast` store it will try to
/// get it from this store.
pub slow: StoreSpec,

/// How to handle the slow store. This can be useful if creating a diode
/// and you wish to have an upstream read only store.
#[serde(default)]
pub slow_direction: StoreDirection,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
71 changes: 63 additions & 8 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::{Arc, Weak};

use async_trait::async_trait;
use futures::{join, FutureExt};
use nativelink_config::stores::FastSlowSpec;
use nativelink_config::stores::{FastSlowSpec, StoreDirection};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{
Expand All @@ -45,18 +45,22 @@ use nativelink_util::store_trait::{
pub struct FastSlowStore {
#[metric(group = "fast_store")]
fast_store: Store,
fast_direction: StoreDirection,
#[metric(group = "slow_store")]
slow_store: Store,
slow_direction: StoreDirection,
weak_self: Weak<Self>,
#[metric]
metrics: FastSlowStoreMetrics,
}

impl FastSlowStore {
pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> {
Arc::new_cyclic(|weak_self| Self {
fast_store,
fast_direction: spec.fast_direction.clone(),
slow_store,
slow_direction: spec.slow_direction.clone(),
weak_self: weak_self.clone(),
metrics: FastSlowStoreMetrics::default(),
})
Expand Down Expand Up @@ -155,12 +159,31 @@ impl StoreDriver for FastSlowStore {
) -> Result<(), Error> {
// If either one of our stores is a noop store, bypass the multiplexing
// and just use the store that is not a noop store.
let slow_store = self.slow_store.inner_store(Some(key.borrow()));
if slow_store.optimized_for(StoreOptimizations::NoopUpdates) {
let ignore_slow = self
.slow_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.slow_direction == StoreDirection::ReadOnly
|| self.slow_direction == StoreDirection::Get;
let ignore_fast = self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get;
if ignore_slow && ignore_fast {
// We need to drain the reader to avoid the writer complaining that we dropped
// the connection prematurely.
reader
.drain()
.await
.err_tip(|| "In FastFlowStore::update")?;
return Ok(());
}
if ignore_slow {
return self.fast_store.update(key, reader, size_info).await;
}
let fast_store = self.fast_store.inner_store(Some(key.borrow()));
if fast_store.optimized_for(StoreOptimizations::NoopUpdates) {
if ignore_fast {
return self.slow_store.update(key, reader, size_info).await;
}

Expand Down Expand Up @@ -233,7 +256,10 @@ impl StoreDriver for FastSlowStore {
{
if !self
.slow_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
&& self.slow_direction != StoreDirection::ReadOnly
&& self.slow_direction != StoreDirection::Get
{
slow_update_store_with_file(
self.slow_store.as_store_driver_pin(),
Expand All @@ -244,6 +270,11 @@ impl StoreDriver for FastSlowStore {
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?;
}
if self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get
{
return Ok(Some(file));
}
return self
.fast_store
.update_with_whole_file(key, file, upload_size)
Expand All @@ -254,10 +285,13 @@ impl StoreDriver for FastSlowStore {
.slow_store
.optimized_for(StoreOptimizations::FileUpdates)
{
if !self
let ignore_fast = self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
{
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get;
if !ignore_fast {
slow_update_store_with_file(
self.fast_store.as_store_driver_pin(),
key.borrow(),
Expand All @@ -267,6 +301,11 @@ impl StoreDriver for FastSlowStore {
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
}
let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
|| self.slow_direction == StoreDirection::Get;
if ignore_slow {
return Ok(Some(file));
}
return self
.slow_store
.update_with_whole_file(key, file, upload_size)
Expand Down Expand Up @@ -317,6 +356,22 @@ impl StoreDriver for FastSlowStore {
.slow_store_hit_count
.fetch_add(1, Ordering::Acquire);

if self
.fast_store
.inner_store(Some(key.borrow()))
.optimized_for(StoreOptimizations::NoopUpdates)
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Update
{
self.slow_store
.get_part(key, writer.borrow_mut(), offset, length)
.await?;
self.metrics
.slow_store_downloaded_bytes
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
return Ok(());
}

let send_range = offset..length.map_or(u64::MAX, |length| length + offset);
let mut bytes_received: u64 = 0;

Expand Down
111 changes: 109 additions & 2 deletions nativelink-store/tests/fast_slow_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use bytes::Bytes;
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec};
use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreDirection, StoreSpec};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_metric::MetricsComponent;
Expand All @@ -35,20 +35,29 @@ use rand::{Rng, SeedableRng};

const MEGABYTE_SZ: usize = 1024 * 1024;

fn make_stores() -> (Store, Store, Store) {
fn make_stores_direction(
fast_direction: StoreDirection,
slow_direction: StoreDirection,
) -> (Store, Store, Store) {
let fast_store = Store::new(MemoryStore::new(&MemorySpec::default()));
let slow_store = Store::new(MemoryStore::new(&MemorySpec::default()));
let fast_slow_store = Store::new(FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction,
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction,
},
fast_store.clone(),
slow_store.clone(),
));
(fast_slow_store, fast_store, slow_store)
}

fn make_stores() -> (Store, Store, Store) {
make_stores_direction(StoreDirection::default(), StoreDirection::default())
}

fn make_random_data(sz: usize) -> Vec<u8> {
let mut value = vec![0u8; sz];
let mut rng = SmallRng::seed_from_u64(1);
Expand Down Expand Up @@ -331,7 +340,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> {
let fast_slow_store = FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
fast_store,
slow_store,
Expand Down Expand Up @@ -372,7 +383,9 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> {
let fast_slow_store = Arc::new(FastSlowStore::new(
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
fast_store.clone(),
slow_store,
Expand All @@ -395,7 +408,9 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
let slow_store = Store::new(NoopStore::new());
let fast_slow_store_config = FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::noop(NoopSpec::default()),
slow_direction: StoreDirection::default(),
};
let fast_slow_store = Arc::new(FastSlowStore::new(
&fast_slow_store_config,
Expand Down Expand Up @@ -430,3 +445,95 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> {
);
Ok(())
}

#[nativelink_test]
async fn fast_get_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Get, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_some(),
"Expected data in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn fast_readonly_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::ReadOnly, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_some(),
"Expected data in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn slow_readonly_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Both, StoreDirection::ReadOnly);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_some(),
"Expected data to be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_none(),
"Expected data to not be in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn slow_get_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Both, StoreDirection::Get);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
fast_slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
assert!(
fast_store.has(digest).await?.is_some(),
"Expected data to be in the fast store"
);
assert!(
slow_store.has(digest).await?.is_none(),
"Expected data to not be in the slow store"
);
Ok(())
}

#[nativelink_test]
async fn fast_put_only_not_updated() -> Result<(), Error> {
let (fast_slow_store, fast_store, slow_store) =
make_stores_direction(StoreDirection::Update, StoreDirection::Both);
let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap();
slow_store
.update_oneshot(digest, make_random_data(100).into())
.await?;
let _ = fast_slow_store.get_part_unchunked(digest, 0, None).await;
assert!(
fast_store.has(digest).await?.is_none(),
"Expected data to not be in the fast store"
);
Ok(())
}
6 changes: 5 additions & 1 deletion nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future, FutureExt};
use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
use nativelink_config::stores::{
FastSlowSpec, FilesystemSpec, MemorySpec, StoreDirection, StoreSpec,
};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_macro::nativelink_test;
use nativelink_store::fast_slow_store::FastSlowStore;
Expand Down Expand Up @@ -1333,7 +1335,9 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<
// Note: The config is not needed for this test, so use dummy data.
&FastSlowSpec {
fast: StoreSpec::memory(MemorySpec::default()),
fast_direction: StoreDirection::default(),
slow: StoreSpec::memory(MemorySpec::default()),
slow_direction: StoreDirection::default(),
},
Store::new(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
Expand Down
Loading

0 comments on commit 251dda7

Please sign in to comment.