Skip to content

Commit

Permalink
Merge pull request #2929 from subspace/optimize-plot-cache-memory-usage
Browse files Browse the repository at this point in the history
Free plot cache hashmap once it is no longer usable
  • Loading branch information
nazar-pc committed Jul 17, 2024
2 parents cc50899 + fe19cfb commit 1d8bcd2
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct DiskPlotCache {
file: Weak<UnbufferedIoFileWindows>,
sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
cached_pieces: Arc<RwLock<CachedPieces>>,
target_sector_count: SectorIndex,
sector_size: u64,
}

Expand Down Expand Up @@ -137,6 +138,7 @@ impl DiskPlotCache {
file: Arc::downgrade(file),
sectors_metadata: Arc::downgrade(sectors_metadata),
cached_pieces: Arc::new(RwLock::new(cached_pieces)),
target_sector_count,
sector_size,
}
}
Expand Down Expand Up @@ -204,14 +206,20 @@ impl DiskPlotCache {

let element_offset = u64::from(offset) * u64::from(Self::element_size());
let sectors_metadata = sectors_metadata.read().await;
let plotted_bytes = self.sector_size * sectors_metadata.len() as u64;
let plotted_sectors_count = sectors_metadata.len() as SectorIndex;
let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);

// Make sure offset is after anything that is already plotted
if element_offset < plotted_bytes {
// Just to be safe, avoid any overlap of write locks
drop(sectors_metadata);
let mut cached_pieces = self.cached_pieces.write();
// No space to store more pieces anymore
self.cached_pieces.write().next_offset.take();
cached_pieces.next_offset.take();
if plotted_sectors_count == self.target_sector_count {
// Free allocated memory once fully plotted
mem::take(&mut cached_pieces.map);
}
return Ok(false);
}

Expand Down Expand Up @@ -253,25 +261,17 @@ impl DiskPlotCache {
let offset = self.cached_pieces.read().map.get(key).copied()?;

let file = self.file.upgrade()?;
let cached_pieces = Arc::clone(&self.cached_pieces);
let key = key.clone();

let read_fn = move || {
let mut element = BytesMut::zeroed(Self::element_size() as usize);
match Self::read_piece_internal(&file, offset, &mut element) {
Ok(Some(_piece_index)) => {
let element = element.freeze();
let piece = Piece::try_from(
element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]),
)
.expect("Correct length; qed");
Some(piece)
}
_ => {
// Remove entry just in case it was overridden with a sector already
cached_pieces.write().map.remove(&key);
None
}
if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
let element = element.freeze();
let piece =
Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
.expect("Correct length; qed");
Some(piece)
} else {
None
}
};
// TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
Expand All @@ -280,15 +280,32 @@ impl DiskPlotCache {
// (Nazar).
// See https://github.com/subspace/subspace/issues/2813 and linked forum post for details.
// This TODO exists in multiple files
if cfg!(windows) {
let maybe_piece = if cfg!(windows) {
task::block_in_place(read_fn)
} else {
let read_fut = task::spawn_blocking(read_fn);

AsyncJoinOnDrop::new(read_fut, false)
.await
.unwrap_or_default()
};

if maybe_piece.is_none()
&& let Some(sectors_metadata) = self.sectors_metadata.upgrade()
{
let plotted_sectors_count = sectors_metadata.read().await.len() as SectorIndex;

let mut cached_pieces = self.cached_pieces.write();
if plotted_sectors_count == self.target_sector_count {
// Free allocated memory once fully plotted
mem::take(&mut cached_pieces.map);
} else {
// Remove entry just in case it was overridden with a sector already
cached_pieces.map.remove(key);
}
}

maybe_piece
}

fn read_piece_internal(
Expand Down

0 comments on commit 1d8bcd2

Please sign in to comment.