From 5dcab372fda1a939c7324797d845303a82a76a7b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 12 Nov 2024 07:35:32 +0200 Subject: [PATCH] Move cluster maintenance utilities from binary to library code --- .../commands/cluster/controller.rs | 29 +++++++++---- .../src/bin/subspace-farmer/commands/farm.rs | 4 +- .../src/bin/subspace-farmer/main.rs | 8 +--- .../subspace-farmer/src/cluster/controller.rs | 3 ++ .../cluster/controller/caches.rs | 33 +++++++++------ .../commands => }/cluster/controller/farms.rs | 42 ++++++++++++------- crates/subspace-farmer/src/lib.rs | 1 + 7 files changed, 77 insertions(+), 43 deletions(-) rename crates/subspace-farmer/src/{bin/subspace-farmer/commands => }/cluster/controller/caches.rs (86%) rename crates/subspace-farmer/src/{bin/subspace-farmer/commands => }/cluster/controller/farms.rs (92%) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index beffc2281e..6fa95bc119 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -1,8 +1,5 @@ -mod caches; -mod farms; - -use crate::commands::cluster::controller::caches::maintain_caches; -use crate::commands::cluster::controller::farms::{maintain_farms, FarmIndex}; +use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL; +use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL; use crate::commands::shared::derive_libp2p_keypair; use crate::commands::shared::network::{configure_network, NetworkArgs}; use anyhow::anyhow; @@ -18,7 +15,9 @@ use std::path::PathBuf; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::Duration; +use subspace_farmer::cluster::controller::caches::maintain_caches; use subspace_farmer::cluster::controller::controller_service; +use subspace_farmer::cluster::controller::farms::{maintain_farms, FarmIndex}; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farmer_cache::FarmerCache; @@ -229,13 +228,29 @@ pub(super) async fn controller( { let nats_client = nats_client.clone(); - move || async move { maintain_farms(&instance, &nats_client, &plotted_pieces).await } + move || async move { + maintain_farms( + &instance, + &nats_client, + &plotted_pieces, + FARMER_IDENTIFICATION_BROADCAST_INTERVAL, + ) + .await + } }, "controller-farms".to_string(), )?; let caches_fut = run_future_in_dedicated_thread( - move || async move { maintain_caches(&cache_group, &nats_client, farmer_cache).await }, + move || async move { + maintain_caches( + &cache_group, + &nats_client, + farmer_cache, + CACHE_IDENTIFICATION_BROADCAST_INTERVAL, + ) + .await + }, "controller-caches".to_string(), )?; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a9da959518..1379f42025 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -69,6 +69,8 @@ const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024 const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30); const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5); +type FarmIndex = u8; + #[derive(Debug, Parser)] struct CpuPlottingOptions { /// How many sectors a farmer will download concurrently. Limits memory usage of @@ -757,7 +759,7 @@ where info!("Finished collecting already plotted pieces successfully"); - let mut farms_stream = (0u8..) + let mut farms_stream = (FarmIndex::MIN..) .zip(farms) .map(|(farm_index, farm)| { let plotted_pieces = Arc::clone(&plotted_pieces); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index bab9a7abd2..440309fb91 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -1,10 +1,4 @@ -#![feature( - duration_constructors, - extract_if, - hash_extract_if, - let_chains, - type_changing_struct_update -)] +#![feature(duration_constructors, type_changing_struct_update)] mod commands; mod utils; diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index 5c0420e003..fba5e02563 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -6,6 +6,9 @@ //! client implementations designed to work with cluster controller and a service function to drive //! the backend part of the controller. +pub mod caches; +pub mod farms; + use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest}; use crate::cluster::nats_client::{ GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs b/crates/subspace-farmer/src/cluster/controller/caches.rs similarity index 86% rename from crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs rename to crates/subspace-farmer/src/cluster/controller/caches.rs index 17a1109f2c..a4a398743f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/caches.rs +++ b/crates/subspace-farmer/src/cluster/controller/caches.rs @@ -5,7 +5,11 @@ //! cache addition and removal, tries to reduce number of reinitializations that result in potential //! piece cache sync, etc. -use crate::commands::cluster::cache::CACHE_IDENTIFICATION_BROADCAST_INTERVAL; +use crate::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache}; +use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast; +use crate::cluster::nats_client::NatsClient; +use crate::farm::{PieceCache, PieceCacheId}; +use crate::farmer_cache::FarmerCache; use anyhow::anyhow; use futures::channel::oneshot; use futures::future::FusedFuture; @@ -15,11 +19,6 @@ use std::future::{ready, Future}; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; -use subspace_farmer::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache}; -use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast; -use subspace_farmer::cluster::nats_client::NatsClient; -use subspace_farmer::farm::{PieceCache, PieceCacheId}; -use subspace_farmer::farmer_cache::FarmerCache; use tokio::time::MissedTickBehavior; use tracing::{info, trace, warn}; @@ -32,12 +31,20 @@ struct KnownCache { piece_cache: Arc, } -#[derive(Debug, Default)] +#[derive(Debug)] struct KnownCaches { + identification_broadcast_interval: Duration, known_caches: Vec, } impl KnownCaches { + fn new(identification_broadcast_interval: Duration) -> Self { + Self { + identification_broadcast_interval, + known_caches: Vec::new(), + } + } + fn get_all(&self) -> Vec> { self.known_caches .iter() @@ -78,17 +85,19 @@ impl KnownCaches { fn remove_expired(&mut self) -> impl Iterator + '_ { self.known_caches.extract_if(|known_cache| { - known_cache.last_identification.elapsed() > CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2 + known_cache.last_identification.elapsed() > self.identification_broadcast_interval * 2 }) } } -pub(super) async fn maintain_caches( +/// Utility function for maintaining caches by controller in a cluster environment +pub async fn maintain_caches( cache_group: &str, nats_client: &NatsClient, farmer_cache: FarmerCache, + identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { - let mut known_caches = KnownCaches::default(); + let mut known_caches = KnownCaches::new(identification_broadcast_interval); let mut scheduled_reinitialization_for = None; // Farm that is being added/removed right now (if any) @@ -110,8 +119,8 @@ pub(super) async fn maintain_caches( let mut cache_identify_subscription = cache_identify_subscription.fuse(); let mut cache_pruning_interval = tokio::time::interval_at( - (Instant::now() + CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), - CACHE_IDENTIFICATION_BROADCAST_INTERVAL * 2, + (Instant::now() + identification_broadcast_interval * 2).into(), + identification_broadcast_interval * 2, ); cache_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/cluster/controller/farms.rs similarity index 92% rename from crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs rename to crates/subspace-farmer/src/cluster/controller/farms.rs index 8f211e5046..4c725e3d49 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/cluster/controller/farms.rs @@ -4,7 +4,11 @@ //! about which pieces are plotted in which sectors of which farm up to date. Implementation //! automatically handles dynamic farm addition and removal, etc. -use crate::commands::cluster::farmer::FARMER_IDENTIFICATION_BROADCAST_INTERVAL; +use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; +use crate::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; +use crate::cluster::nats_client::NatsClient; +use crate::farm::plotted_pieces::PlottedPieces; +use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; use anyhow::anyhow; use async_lock::RwLock as AsyncRwLock; use futures::channel::oneshot; @@ -18,14 +22,9 @@ use std::future::{ready, Future}; use std::mem; use std::pin::{pin, Pin}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use subspace_core_primitives::hashes::Blake3Hash; use subspace_core_primitives::sectors::SectorIndex; -use subspace_farmer::cluster::controller::ClusterControllerFarmerIdentifyBroadcast; -use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBroadcast}; -use subspace_farmer::cluster::nats_client::NatsClient; -use subspace_farmer::farm::plotted_pieces::PlottedPieces; -use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; use tokio::task; use tokio::time::MissedTickBehavior; use tracing::{error, info, trace, warn}; @@ -33,7 +32,8 @@ use tracing::{error, info, trace, warn}; type AddRemoveFuture<'a> = Pin, ClusterFarm)>> + 'a>>; -pub(super) type FarmIndex = u16; +/// Number of farms in a cluster is currently limited to 2^16 +pub type FarmIndex = u16; #[derive(Debug)] struct KnownFarm { @@ -55,12 +55,20 @@ enum KnownFarmInsertResult { NotInserted, } -#[derive(Debug, Default)] +#[derive(Debug)] struct KnownFarms { + identification_broadcast_interval: Duration, known_farms: HashMap, } impl KnownFarms { + fn new(identification_broadcast_interval: Duration) -> Self { + Self { + identification_broadcast_interval, + known_farms: HashMap::new(), + } + } + fn insert_or_update( &mut self, farm_id: FarmId, @@ -117,7 +125,7 @@ impl KnownFarms { fn remove_expired(&mut self) -> impl Iterator + '_ { self.known_farms.extract_if(|_farm_index, known_farm| { - known_farm.last_identification.elapsed() > FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2 + known_farm.last_identification.elapsed() > self.identification_broadcast_interval * 2 }) } @@ -126,18 +134,20 @@ impl KnownFarms { } } -pub(super) async fn maintain_farms( +/// Utility function for maintaining farms by controller in a cluster environment +pub async fn maintain_farms( instance: &str, nats_client: &NatsClient, plotted_pieces: &Arc>>, + identification_broadcast_interval: Duration, ) -> anyhow::Result<()> { - let mut known_farms = KnownFarms::default(); + let mut known_farms = KnownFarms::new(identification_broadcast_interval); // Futures that need to be processed sequentially in order to add/remove farms, if farm was // added, future will resolve with `Some`, `None` if removed - let mut farms_to_add_remove = VecDeque::::new(); + let mut farms_to_add_remove = VecDeque::>::new(); // Farm that is being added/removed right now (if any) - let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture).fuse(); + let mut farm_add_remove_in_progress = (Box::pin(ready(None)) as AddRemoveFuture<'_>).fuse(); // Initialize with pending future so it never ends let mut farms = FuturesUnordered::new(); @@ -158,8 +168,8 @@ pub(super) async fn maintain_farms( let mut farmer_identify_subscription = farmer_identify_subscription.fuse(); let mut farm_pruning_interval = tokio::time::interval_at( - (Instant::now() + FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2).into(), - FARMER_IDENTIFICATION_BROADCAST_INTERVAL * 2, + (Instant::now() + identification_broadcast_interval * 2).into(), + identification_broadcast_interval * 2, ); farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 3b53ef4be4..8046e897cf 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -4,6 +4,7 @@ assert_matches, btree_extract_if, duration_constructors, + extract_if, exact_size_is_empty, fmt_helpers_for_derive, future_join,