Skip to content

Commit

Permalink
Merge pull request #3133 from autonomys/remove-farmer-piece-getter-de…
Browse files Browse the repository at this point in the history
…duplication

Remove farmer piece getter deduplication
  • Loading branch information
nazar-pc authored Oct 16, 2024
2 parents 68f25f4 + 45877d9 commit 51f7beb
Showing 1 changed file with 29 additions and 168 deletions.
197 changes: 29 additions & 168 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{
Mutex as AsyncMutex, MutexGuardArc as AsyncMutexGuardArc, RwLock as AsyncRwLock, Semaphore,
};
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
Expand All @@ -27,71 +23,6 @@ pub mod piece_validator;

const MAX_RANDOM_WALK_ROUNDS: usize = 15;

struct InProgressPieceGetting<'a> {
piece_index: PieceIndex,
in_progress_piece: AsyncMutexGuardArc<Option<Piece>>,
in_progress_pieces: &'a Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
}

impl<'a> Drop for InProgressPieceGetting<'a> {
fn drop(&mut self) {
self.in_progress_pieces.lock().remove(&self.piece_index);
}
}

impl<'a> InProgressPieceGetting<'a> {
fn store_piece_getting_result(mut self, maybe_piece: &Option<Piece>) {
self.in_progress_piece.clone_from(maybe_piece);
}
}

enum InProgressPiece<'a> {
Getting(InProgressPieceGetting<'a>),
// If piece is already in progress, just wait for it
Waiting {
in_progress_piece_mutex: Arc<AsyncMutex<Option<Piece>>>,
},
}

impl<'a> InProgressPiece<'a> {
fn new(
piece_index: PieceIndex,
in_progress_pieces: &'a Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
) -> Self {
let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None));
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(
in_progress_piece_mutex
.try_lock_arc()
.expect("Just created; qed"),
);
let in_progress_piece_mutex = in_progress_pieces
.lock()
.entry(piece_index)
.and_modify(|_mutex| {
local_in_progress_piece_guard.take();
})
.or_insert_with(|| Arc::clone(&in_progress_piece_mutex))
.clone();

if let Some(in_progress_piece) = local_in_progress_piece_guard {
// Store guard and details necessary to remove entry from `in_progress_pieces` after
// piece getting is complete (in `Drop` impl)
Self::Getting(InProgressPieceGetting {
piece_index,
in_progress_piece,
in_progress_pieces,
})
} else {
// Piece getting is already in progress, only mutex is needed to wait for its result
Self::Waiting {
in_progress_piece_mutex,
}
}
}
}

/// Retry policy for getting pieces from DSN cache
#[derive(Debug)]
pub struct DsnCacheRetryPolicy {
Expand All @@ -107,7 +38,6 @@ struct Inner<FarmIndex, CacheIndex, PV, NC> {
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
in_progress_pieces: Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
request_semaphore: Arc<Semaphore>,
}

Expand Down Expand Up @@ -163,7 +93,6 @@ where
node_client,
plotted_pieces,
dsn_cache_retry_policy,
in_progress_pieces: Mutex::default(),
request_semaphore,
}),
}
Expand All @@ -173,23 +102,7 @@ where
pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
// downloaded piece gets stored
let maybe_piece = self.get_piece_fast_internal(piece_index).await;
// Store the result for others to observe
in_progress_piece_getting.store_piece_getting_result(&maybe_piece);
maybe_piece
}
InProgressPiece::Waiting {
in_progress_piece_mutex,
} => {
trace!(%piece_index, "Piece is already in progress, waiting for result #1");
// Doesn't matter if it was successful or not here
in_progress_piece_mutex.lock().await.clone()
}
}
self.get_piece_fast_internal(piece_index).await
}

async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
Expand Down Expand Up @@ -246,31 +159,7 @@ where
pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
// downloaded piece gets stored
let maybe_piece = self.get_piece_slow_internal(piece_index).await;
// Store the result for others to observe
in_progress_piece_getting.store_piece_getting_result(&maybe_piece);
maybe_piece
}
InProgressPiece::Waiting {
in_progress_piece_mutex,
} => {
trace!(%piece_index, "Piece is already in progress, waiting for result #2");
if let Some(piece) = in_progress_piece_mutex.lock().await.clone() {
trace!(
%piece_index,
"Piece was already in progress and downloaded successfully #1"
);
return Some(piece);
}

// Try again just in case
self.get_piece_slow_internal(piece_index).await
}
}
self.get_piece_slow_internal(piece_index).await
}

/// Slow way to get piece using archival storage
Expand Down Expand Up @@ -314,7 +203,29 @@ where
None
}

async fn get_piece_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
/// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
/// used [`Arc`]
pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, CacheIndex, PV, NC> {
WeakFarmerPieceGetter {
inner: Arc::downgrade(&self.inner),
}
}
}

#[async_trait]
impl<FarmIndex, CacheIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, CacheIndex, PV, NC>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

{
let retries = AtomicU32::new(0);
let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
Expand Down Expand Up @@ -347,69 +258,19 @@ where

if let Ok(Some(piece)) = maybe_piece_fut.await {
trace!(%piece_index, "Got piece from cache successfully");
return Some(piece);
return Ok(Some(piece));
}
};

if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
return Some(piece);
return Ok(Some(piece));
}

debug!(
%piece_index,
"Cannot acquire piece: all methods yielded empty result"
);
None
}

/// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
/// used [`Arc`]
pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, CacheIndex, PV, NC> {
WeakFarmerPieceGetter {
inner: Arc::downgrade(&self.inner),
}
}
}

#[async_trait]
impl<FarmIndex, CacheIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, CacheIndex, PV, NC>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
InProgressPiece::Getting(in_progress_piece_getting) => {
// Try to get the piece without releasing lock to make sure successfully
// downloaded piece gets stored
let maybe_piece = self.get_piece_internal(piece_index).await;
// Store the result for others to observe
in_progress_piece_getting.store_piece_getting_result(&maybe_piece);
Ok(maybe_piece)
}
InProgressPiece::Waiting {
in_progress_piece_mutex,
} => {
trace!(%piece_index, "Piece is already in progress, waiting for result #3");
if let Some(piece) = in_progress_piece_mutex.lock().await.clone() {
trace!(
%piece_index,
"Piece was already in progress and downloaded successfully #2"
);
return Ok(Some(piece));
}

// Try again just in case
Ok(self.get_piece_internal(piece_index).await)
}
}
Ok(None)
}
}

Expand Down

0 comments on commit 51f7beb

Please sign in to comment.