Skip to content

Commit

Permalink
Merge pull request #1665 from subspace/archival-storage-lookups2
Browse files Browse the repository at this point in the history
Archival storage look-ups.
  • Loading branch information
nazar-pc authored Jul 25, 2023
2 parents a7a28b7 + 921e6dc commit da6cfd1
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 557 deletions.
102 changes: 28 additions & 74 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::commands::shared::print_disk_farm_info;
use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal};
use crate::{DiskFarm, FarmingArgs};
use anyhow::{anyhow, Context, Result};
use futures::future::{select, Either};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
Expand All @@ -22,10 +21,10 @@ use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_plot::{
SingleDiskPlot, SingleDiskPlotError, SingleDiskPlotOptions,
};
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::farmer_piece_cache::FarmerPieceCache;
use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter;
use subspace_farmer::utils::node_piece_getter::NodePieceGetter;
use subspace_farmer::utils::piece_cache::PieceCache;
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
Expand All @@ -34,13 +33,11 @@ use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy, PlottedSector};
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_announcement::announce_single_piece_index_hash_with_backoff;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use subspace_networking::Node;
use subspace_proof_of_space::Table;
use tokio::sync::{broadcast, OwnedSemaphorePermit};
use tokio::time::sleep;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing::{debug, error, info, info_span, trace, warn};
use zeroize::Zeroizing;

const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed");
Expand Down Expand Up @@ -83,10 +80,6 @@ where
info!(url = %node_rpc_url, "Connecting to node RPC");
let node_client = NodeRpcClient::new(&node_rpc_url).await?;

let concurrent_plotting_semaphore = Arc::new(tokio::sync::Semaphore::new(
farming_args.max_concurrent_plots.get(),
));

let farmer_app_info = node_client
.farmer_app_info()
.await
Expand All @@ -99,6 +92,7 @@ where
/ Piece::SIZE
+ 1usize;
let archival_storage_pieces = ArchivalStoragePieces::new(cuckoo_filter_capacity);
let archival_storage_info = ArchivalStorageInfo::default();

let (node, mut node_runner, piece_cache) = {
// TODO: Temporary networking identity derivation from the first disk farm identity.
Expand All @@ -123,6 +117,7 @@ where
&readers_and_pieces,
node_client.clone(),
archival_storage_pieces.clone(),
archival_storage_info.clone(),
)?
};

Expand All @@ -145,9 +140,12 @@ where
segment_commitments_cache,
)),
);

let piece_getter = Arc::new(FarmerPieceGetter::new(
NodePieceGetter::new(piece_provider),
node.clone(),
piece_provider,
piece_cache.clone(),
archival_storage_info,
));

let last_segment_index = farmer_app_info.protocol_info.history_size.segment_index();
Expand Down Expand Up @@ -209,7 +207,6 @@ where
kzg: kzg.clone(),
erasure_coding: erasure_coding.clone(),
piece_getter: piece_getter.clone(),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
},
disk_farm_index,
);
Expand Down Expand Up @@ -300,71 +297,28 @@ where
"More than 256 plots are not supported, this is checked above already; qed",
);
let readers_and_pieces = Arc::clone(&readers_and_pieces);
let node = node.clone();
let span = info_span!("farm", %disk_farm_index);

// We are not going to send anything here, but dropping of sender on dropping of
// corresponding `SingleDiskPlot` will allow us to stop background tasks.
let (dropped_sender, _dropped_receiver) = broadcast::channel::<()>(1);

// Collect newly plotted pieces
let on_plotted_sector_callback = move |(
plotted_sector,
maybe_old_plotted_sector,
plotting_permit,
): &(
PlottedSector,
Option<PlottedSector>,
Arc<OwnedSemaphorePermit>,
)| {
let _span_guard = span.enter();
let plotting_permit = Arc::clone(plotting_permit);
let node = node.clone();
let sector_index = plotted_sector.sector_index;

let mut dropped_receiver = dropped_sender.subscribe();

{
let mut readers_and_pieces = readers_and_pieces.lock();
let readers_and_pieces = readers_and_pieces
.as_mut()
.expect("Initial value was populated above; qed");

if let Some(old_plotted_sector) = maybe_old_plotted_sector {
readers_and_pieces.delete_sector(disk_farm_index, old_plotted_sector);
}
readers_and_pieces.add_sector(disk_farm_index, plotted_sector);
}

let piece_indexes = plotted_sector.piece_indexes.clone();
// TODO: Remove when we no longer need announcements
let publish_fut = async move {
let mut pieces_publishing_futures = piece_indexes
.iter()
.map(|piece_index| {
announce_single_piece_index_hash_with_backoff(piece_index.hash(), &node)
})
.collect::<FuturesUnordered<_>>();

while pieces_publishing_futures.next().await.is_some() {
// Nothing is needed here, just driving all futures to completion
}

info!(?sector_index, "Sector publishing was successful.");
let on_plotted_sector_callback =
move |(plotted_sector, maybe_old_plotted_sector): &(
PlottedSector,
Option<PlottedSector>,
)| {
let _span_guard = span.enter();

// Release only after publishing is finished
drop(plotting_permit);
}
.in_current_span();
{
let mut readers_and_pieces = readers_and_pieces.lock();
let readers_and_pieces = readers_and_pieces
.as_mut()
.expect("Initial value was populated above; qed");

tokio::spawn(async move {
let result =
select(Box::pin(publish_fut), Box::pin(dropped_receiver.recv())).await;
if matches!(result, Either::Right(_)) {
debug!("Piece publishing was cancelled due to shutdown.");
if let Some(old_plotted_sector) = maybe_old_plotted_sector {
readers_and_pieces.delete_sector(disk_farm_index, old_plotted_sector);
}
readers_and_pieces.add_sector(disk_farm_index, plotted_sector);
}
});
};
};

single_disk_plot
.on_sector_plotted(Arc::new(on_plotted_sector_callback))
Expand Down Expand Up @@ -429,13 +383,13 @@ fn derive_libp2p_keypair(schnorrkel_sk: &schnorrkel::SecretKey) -> Keypair {
/// Populates piece cache on startup. It waits for the new segment index and check all pieces from
/// previous segments to see if they are already in the cache. If they are not, they are added
/// from DSN.
async fn populate_pieces_cache<PG, PC>(
async fn populate_pieces_cache<PV, PC>(
node: Node,
segment_index: SegmentIndex,
piece_getter: Arc<FarmerPieceGetter<PG, PC>>,
piece_getter: Arc<FarmerPieceGetter<PV, PC>>,
piece_cache: Arc<tokio::sync::Mutex<FarmerPieceCache>>,
) where
PG: PieceGetter + Send + Sync,
PV: PieceValidator + Send + Sync + 'static,
PC: PieceCache + Send + 'static,
{
// Give some time to obtain DSN connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::SegmentIndex;
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::farmer_piece_cache::FarmerPieceCache;
use subspace_farmer::utils::farmer_provider_storage::FarmerProviderStorage;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub(super) fn configure_dsn(
readers_and_pieces: &Arc<Mutex<Option<ReadersAndPieces>>>,
node_client: NodeRpcClient,
archival_storage_pieces: ArchivalStoragePieces,
archival_storage_info: ArchivalStorageInfo,
) -> Result<
(
Node,
Expand Down Expand Up @@ -318,6 +320,34 @@ pub(super) fn configure_dsn(
}))
.detach();

node.on_peer_info(Arc::new({
let archival_storage_info = archival_storage_info.clone();

move |new_peer_info| {
let peer_id = new_peer_info.peer_id;
let peer_info = &new_peer_info.peer_info;

if let PeerInfo::Farmer { cuckoo_filter } = peer_info {
archival_storage_info.update_cuckoo_filter(peer_id, cuckoo_filter.clone());

debug!(%peer_id, ?peer_info, "Peer info cached",);
}
}
}))
.detach();

node.on_disconnected_peer(Arc::new({
let archival_storage_info = archival_storage_info.clone();

move |peer_id| {
if archival_storage_info.remove_peer_filter(peer_id) {
debug!(%peer_id, "Peer filter removed.",);
}
}
}))
.detach();

// Consider returning HandlerId instead of each `detach()` calls for other usages.
(node, node_runner, piece_cache)
})
.map_err(Into::into)
Expand Down
18 changes: 3 additions & 15 deletions crates/subspace-farmer/src/single_disk_plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use subspace_proof_of_space::Table;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, OwnedSemaphorePermit};
use tokio::sync::broadcast;
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use ulid::Ulid;

Expand Down Expand Up @@ -264,8 +264,6 @@ pub struct SingleDiskPlotOptions<NC, PG> {
pub kzg: Kzg,
/// Erasure coding instance to use.
pub erasure_coding: ErasureCoding,
/// Semaphore to limit concurrency of plotting process.
pub concurrent_plotting_semaphore: Arc<tokio::sync::Semaphore>,
}

/// Errors happening when trying to create/open single disk plot
Expand Down Expand Up @@ -384,11 +382,7 @@ type Handler<A> = Bag<HandlerFn<A>, A>;

#[derive(Default, Debug)]
struct Handlers {
sector_plotted: Handler<(
PlottedSector,
Option<PlottedSector>,
Arc<OwnedSemaphorePermit>,
)>,
sector_plotted: Handler<(PlottedSector, Option<PlottedSector>)>,
solution: Handler<SolutionResponse>,
}

Expand Down Expand Up @@ -455,7 +449,6 @@ impl SingleDiskPlot {
piece_getter,
kzg,
erasure_coding,
concurrent_plotting_semaphore,
} = options;
fs::create_dir_all(&directory)?;

Expand Down Expand Up @@ -708,7 +701,6 @@ impl SingleDiskPlot {
erasure_coding,
handlers,
modifying_sector_index,
concurrent_plotting_semaphore,
sectors_to_plot_receiver,
)
.await
Expand Down Expand Up @@ -953,11 +945,7 @@ impl SingleDiskPlot {
/// throttling of the plotting process is desired.
pub fn on_sector_plotted(
&self,
callback: HandlerFn<(
PlottedSector,
Option<PlottedSector>,
Arc<OwnedSemaphorePermit>,
)>,
callback: HandlerFn<(PlottedSector, Option<PlottedSector>)>,
) -> HandlerId {
self.handlers.sector_plotted.add(callback)
}
Expand Down
21 changes: 3 additions & 18 deletions crates/subspace-farmer/src/single_disk_plot/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use subspace_farmer_components::plotting::{
use subspace_farmer_components::sector::SectorMetadata;
use subspace_proof_of_space::Table;
use thiserror::Error;
use tokio::sync::Semaphore;
use tracing::{debug, info, trace, warn};

const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -95,7 +94,6 @@ pub(super) async fn plotting<NC, PG, PosTable>(
erasure_coding: ErasureCoding,
handlers: Arc<Handlers>,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
concurrent_plotting_semaphore: Arc<Semaphore>,
mut sectors_to_plot: mpsc::Receiver<(SectorIndex, oneshot::Sender<()>)>,
) -> Result<(), PlottingError>
where
Expand All @@ -122,17 +120,6 @@ where
.len(sector_metadata_size)
.map_mut(&metadata_file)?
};
let plotting_permit = match concurrent_plotting_semaphore.clone().acquire_owned().await {
Ok(plotting_permit) => plotting_permit,
Err(error) => {
warn!(
%sector_index,
%error,
"Semaphore was closed, interrupting plotting"
);
return Ok(());
}
};

let maybe_old_sector_metadata = sectors_metadata.read().get(sector_index as usize).cloned();

Expand Down Expand Up @@ -233,11 +220,9 @@ where
info!(%sector_index, "Sector plotted successfully");
}

handlers.sector_plotted.call_simple(&(
plotted_sector,
maybe_old_plotted_sector,
Arc::new(plotting_permit),
));
handlers
.sector_plotted
.call_simple(&(plotted_sector, maybe_old_plotted_sector));
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod archival_storage_info;
pub mod archival_storage_pieces;
pub mod farmer_piece_cache;
pub mod farmer_piece_getter;
pub mod farmer_provider_storage;
pub mod node_piece_getter;
pub mod parity_db_store;
pub mod piece_cache;
pub mod piece_validator;
Expand Down
Loading

0 comments on commit da6cfd1

Please sign in to comment.