Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move cluster maintenance utilities from binary to library code #3229

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
mod caches;
mod farms;

use crate::commands::cluster::controller::caches::maintain_caches;
use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex};
use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL;
use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use crate::commands::shared::derive_libp2p_keypair;
use crate::commands::shared::network::{configure_network, NetworkArgs};
use anyhow::anyhow;
Expand All @@ -18,7 +15,9 @@ use std::path::PathBuf;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::Duration;
use subspace_farmer::cluster::controller::caches::maintain_caches;
use subspace_farmer::cluster::controller::controller_service;
use subspace_farmer::cluster::controller::farms::{maintain_farms, FarmIndex};
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farmer_cache::FarmerCache;
Expand Down Expand Up @@ -229,13 +228,29 @@ pub(super) async fn controller(
{
let nats_client = nats_client.clone();

move || async move { maintain_farms(&instance, &nats_client, &plotted_pieces).await }
move || async move {
maintain_farms(
&instance,
&nats_client,
&plotted_pieces,
FARMER_IDENTIFICATION_BROADCAST_INTERVAL,
)
.await
}
},
"controller-farms".to_string(),
)?;

let caches_fut = run_future_in_dedicated_thread(
move || async move { maintain_caches(&cache_group, &nats_client, farmer_cache).await },
move || async move {
maintain_caches(
&cache_group,
&nats_client,
farmer_cache,
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
)
.await
},
"controller-caches".to_string(),
)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024
const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);

type FarmIndex = u8;

#[derive(Debug, Parser)]
struct CpuPlottingOptions {
/// How many sectors a farmer will download concurrently. Limits memory usage of
Expand Down Expand Up @@ -757,7 +759,7 @@ where

info!("Finished collecting already plotted pieces successfully");

let mut farms_stream = (0u8..)
let mut farms_stream = (FarmIndex::MIN..)
.zip(farms)
.map(|(farm_index, farm)| {
let plotted_pieces = Arc::clone(&plotted_pieces);
Expand Down
8 changes: 1 addition & 7 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
#![feature(
duration_constructors,
extract_if,
hash_extract_if,
let_chains,
type_changing_struct_update
)]
#![feature(duration_constructors, type_changing_struct_update)]

mod commands;
mod utils;
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
//! client implementations designed to work with cluster controller and a service function to drive
//! the backend part of the controller.

pub mod caches;
pub mod farms;

use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
use crate::cluster::nats_client::{
GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
//! cache addition and removal, tries to reduce number of reinitializations that result in potential
//! piece cache sync, etc.

use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL;
use crate::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache};
use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
use crate::cluster::nats_client::NatsClient;
use crate::farm::{PieceCache, PieceCacheId};
use crate::farmer_cache::FarmerCache;
use anyhow::anyhow;
use futures::channel::oneshot;
use futures::future::FusedFuture;
Expand All @@ -15,11 +19,6 @@ use std::future::{ready, Future};
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_farmer::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache};
use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::{PieceCache, PieceCacheId};
use subspace_farmer::farmer_cache::FarmerCache;
use tokio::time::MissedTickBehavior;
use tracing::{info, trace, warn};

Expand All @@ -32,12 +31,20 @@ struct KnownCache {
piece_cache: Arc<ClusterPieceCache>,
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct KnownCaches {
identification_broadcast_interval: Duration,
known_caches: Vec<KnownCache>,
}

impl KnownCaches {
fn new(identification_broadcast_interval: Duration) -> Self {
Self {
identification_broadcast_interval,
known_caches: Vec::new(),
}
}

fn get_all(&self) -> Vec<Arc<dyn PieceCache>> {
self.known_caches
.iter()
Expand Down Expand Up @@ -78,17 +85,19 @@ impl KnownCaches {

fn remove_expired(&mut self) -> impl Iterator<Item = KnownCache> + '_ {
self.known_caches.extract_if(|known_cache| {
known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2
known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2
})
}
}

pub(super) async fn maintain_caches(
/// Utility function for maintaining caches by controller in a cluster environment
pub async fn maintain_caches(
cache_group: &str,
nats_client: &NatsClient,
farmer_cache: FarmerCache,
identification_broadcast_interval: Duration,
) -> anyhow::Result<()> {
let mut known_caches = KnownCaches::default();
let mut known_caches = KnownCaches::new(identification_broadcast_interval);

let mut scheduled_reinitialization_for = None;
// Farm that is being added/removed right now (if any)
Expand All @@ -110,8 +119,8 @@ pub(super) async fn maintain_caches(

let mut cache_identify_subscription = cache_identify_subscription.fuse();
let mut cache_pruning_interval = tokio::time::interval_at(
(Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(),
CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2,
(Instant::now() + identification_broadcast_interval * 2).into(),
identification_broadcast_interval * 2,
);
cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
//! about which pieces are plotted in which sectors of which farm up to date. Implementation
//! automatically handles dynamic farm addition and removal, etc.

use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL;
use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast};
use crate::cluster::nats_client::NatsClient;
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use anyhow::anyhow;
use async_lock::RwLock as AsyncRwLock;
use futures::channel::oneshot;
Expand All @@ -18,22 +22,18 @@ use std::future::{ready, Future};
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use subspace_core_primitives::hashes::Blake3Hash;
use subspace_core_primitives::sectors::SectorIndex;
use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast};
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};

type AddRemoveFuture<'a> =
Pin<Box<dyn Future<Output = Option<(FarmIndex, oneshot::Receiver<()>, ClusterFarm)>> + 'a>>;

pub(super) type FarmIndex = u16;
/// Number of farms in a cluster is currently limited to 2^16
pub type FarmIndex = u16;

#[derive(Debug)]
struct KnownFarm {
Expand All @@ -55,12 +55,20 @@ enum KnownFarmInsertResult {
NotInserted,
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct KnownFarms {
identification_broadcast_interval: Duration,
known_farms: HashMap<FarmIndex, KnownFarm>,
}

impl KnownFarms {
fn new(identification_broadcast_interval: Duration) -> Self {
Self {
identification_broadcast_interval,
known_farms: HashMap::new(),
}
}

fn insert_or_update(
&mut self,
farm_id: FarmId,
Expand Down Expand Up @@ -117,7 +125,7 @@ impl KnownFarms {

fn remove_expired(&mut self) -> impl Iterator<Item = (FarmIndex, KnownFarm)> + '_ {
self.known_farms.extract_if(|_farm_index, known_farm| {
known_farm.last_identification.elapsed() > FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2
known_farm.last_identification.elapsed() > self.identification_broadcast_interval * 2
})
}

Expand All @@ -126,18 +134,20 @@ impl KnownFarms {
}
}

pub(super) async fn maintain_farms(
/// Utility function for maintaining farms by controller in a cluster environment
pub async fn maintain_farms(
instance: &str,
nats_client: &NatsClient,
plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
identification_broadcast_interval: Duration,
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::default();
let mut known_farms = KnownFarms::new(identification_broadcast_interval);

// Futures that need to be processed sequentially in order to add/remove farms, if farm was
// added, future will resolve with `Some`, `None` if removed
let mut farms_to_add_remove = VecDeque::<AddRemoveFuture>::new();
let mut farms_to_add_remove = VecDeque::<AddRemoveFuture<'_>>::new();
// Farm that is being added/removed right now (if any)
let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse();
let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture<'_>).fuse();
// Initialize with pending future so it never ends
let mut farms = FuturesUnordered::new();

Expand All @@ -158,8 +168,8 @@ pub(super) async fn maintain_farms(

let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
let mut farm_pruning_interval = tokio::time::interval_at(
(Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(),
FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2,
(Instant::now() + identification_broadcast_interval * 2).into(),
identification_broadcast_interval * 2,
);
farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
assert_matches,
btree_extract_if,
duration_constructors,
extract_if,
exact_size_is_empty,
fmt_helpers_for_derive,
future_join,
Expand Down
Loading