diff --git a/crates/subspace-farmer-components/src/file_ext.rs b/crates/subspace-farmer-components/src/file_ext.rs index 435e0c7ddb..40b703f678 100644 --- a/crates/subspace-farmer-components/src/file_ext.rs +++ b/crates/subspace-farmer-components/src/file_ext.rs @@ -9,26 +9,20 @@ pub trait OpenOptionsExt { /// undesirable, only has impact on Windows, for other operating systems see [`FileExt`] fn advise_random_access(&mut self) -> &mut Self; - /// Advise Windows to not use buffering for this file and that file access will be random. - /// - /// NOTE: There are major alignment requirements described here: - /// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements - #[cfg(windows)] - fn advise_unbuffered(&mut self) -> &mut Self; - /// Advise OS/file system that file will use sequential access and read-ahead behavior is /// desirable, only has impact on Windows, for other operating systems see [`FileExt`] fn advise_sequential_access(&mut self) -> &mut Self; + + /// Use Direct I/O on Linux and disable buffering on Windows. + /// + /// NOTE: There are major alignment requirements described here: + /// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements + /// https://man7.org/linux/man-pages/man2/open.2.html + fn use_direct_io(&mut self) -> &mut Self; } impl OpenOptionsExt for OpenOptions { - #[cfg(target_os = "linux")] - fn advise_random_access(&mut self) -> &mut Self { - // Not supported - self - } - - #[cfg(target_os = "macos")] + #[cfg(not(windows))] fn advise_random_access(&mut self) -> &mut Self { // Not supported self @@ -47,8 +41,20 @@ impl OpenOptionsExt for OpenOptions { ) } + #[cfg(not(windows))] + fn advise_sequential_access(&mut self) -> &mut Self { + // Not supported + self + } + #[cfg(windows)] - fn advise_unbuffered(&mut self) -> &mut Self { + fn advise_sequential_access(&mut self) -> &mut Self { + use std::os::windows::fs::OpenOptionsExt; + self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN) + } + + #[cfg(windows)] + fn use_direct_io(&mut self) -> &mut Self { use std::os::windows::fs::OpenOptionsExt; self.custom_flags( winapi::um::winbase::FILE_FLAG_WRITE_THROUGH @@ -57,22 +63,16 @@ impl OpenOptionsExt for OpenOptions { } #[cfg(target_os = "linux")] - fn advise_sequential_access(&mut self) -> &mut Self { - // Not supported - self + fn use_direct_io(&mut self) -> &mut Self { + use std::os::unix::fs::OpenOptionsExt; + self.custom_flags(libc::O_DIRECT) } - #[cfg(target_os = "macos")] - fn advise_sequential_access(&mut self) -> &mut Self { + #[cfg(not(any(target_os = "linux", windows)))] + fn use_direct_io(&mut self) -> &mut Self { // Not supported self } - - #[cfg(windows)] - fn advise_sequential_access(&mut self) -> &mut Self { - use std::os::windows::fs::OpenOptionsExt; - self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN) - } } /// Extension convenience trait that allows pre-allocating files, suggesting random access pattern @@ -92,6 +92,9 @@ pub trait FileExt { /// desirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`] fn advise_sequential_access(&self) -> Result<()>; + /// Disable cache on macOS + fn disable_cache(&self) -> Result<()>; + /// Read exact number of bytes at a specific offset fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()>; @@ -163,6 +166,22 @@ impl FileExt for File { Ok(()) } + #[cfg(not(target_os = "macos"))] + fn disable_cache(&self) -> Result<()> { + // Not supported + Ok(()) + } + + #[cfg(target_os = "macos")] + fn disable_cache(&self) -> Result<()> { + use std::os::unix::io::AsRawFd; + if unsafe { libc::fcntl(self.as_raw_fd(), libc::F_NOCACHE, 1) } != 0 { + Err(std::io::Error::last_os_error()) + } else { + Ok(()) + } + } + #[cfg(unix)] fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { std::os::unix::fs::FileExt::read_exact_at(self, buf, offset) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 2a29717493..72031b306d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -11,9 +11,9 @@ use std::path::PathBuf; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{Blake3Hash, Record, SolutionRange}; use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::single_disk_farm::direct_io_file::DirectIoFile; use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles; use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions}; -use subspace_farmer::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmInfo, SingleDiskFarmSummary, }; @@ -209,10 +209,10 @@ where ) }); } - if cfg!(windows) { + { let plot = RayonFiles::open_with( &disk_farm.join(SingleDiskFarm::PLOT_FILE), - UnbufferedIoFileWindows::open, + DirectIoFile::open, ) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); @@ -426,10 +426,10 @@ where ) }); } - if cfg!(windows) { + { let plot = RayonFiles::open_with( &disk_farm.join(SingleDiskFarm::PLOT_FILE), - UnbufferedIoFileWindows::open, + DirectIoFile::open, ) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); diff --git a/crates/subspace-farmer/src/disk_piece_cache.rs b/crates/subspace-farmer/src/disk_piece_cache.rs index 21d9aa144b..4c476ccda3 100644 --- a/crates/subspace-farmer/src/disk_piece_cache.rs +++ b/crates/subspace-farmer/src/disk_piece_cache.rs @@ -13,9 +13,7 @@ mod tests; use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics; use crate::farm; use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset}; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; -use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; +use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE}; use crate::utils::AsyncJoinOnDrop; use async_trait::async_trait; use bytes::BytesMut; @@ -23,8 +21,6 @@ use futures::channel::mpsc; use futures::{stream, SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; -#[cfg(not(windows))] -use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; use std::task::Poll; @@ -32,8 +28,6 @@ use std::{fs, io}; use subspace_core_primitives::crypto::blake3_hash_list; use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex}; use subspace_farmer_components::file_ext::FileExt; -#[cfg(not(windows))] -use subspace_farmer_components::file_ext::OpenOptionsExt; use thiserror::Error; use tokio::runtime::Handle; use tokio::task; @@ -71,10 +65,7 @@ pub enum DiskPieceCacheError { #[derive(Debug)] struct Inner { id: PieceCacheId, - #[cfg(not(windows))] - file: File, - #[cfg(windows)] - file: UnbufferedIoFileWindows, + file: DirectIoFile, max_num_elements: u32, metrics: Option, } @@ -202,19 +193,7 @@ impl DiskPieceCache { return Err(DiskPieceCacheError::ZeroCapacity); } - #[cfg(not(windows))] - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .advise_random_access() - .open(directory.join(Self::FILE_NAME))?; - - #[cfg(not(windows))] - file.advise_random_access()?; - - #[cfg(windows)] - let file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?; + let file = DirectIoFile::open(&directory.join(Self::FILE_NAME))?; let expected_size = u64::from(Self::element_size()) * u64::from(capacity); // Align plot file size for disk sector size diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index b7b95bcf20..b93f84aebb 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -4,6 +4,7 @@ //! a small piece cache. It fully manages farming and plotting process, including listening to node //! notifications, producing solutions and singing rewards. +pub mod direct_io_file; pub mod farming; pub mod identity; mod metrics; @@ -13,7 +14,6 @@ pub mod plot_cache; mod plotted_sectors; mod plotting; mod reward_signing; -pub mod unbuffered_io_file_windows; use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::farm::{ @@ -22,6 +22,7 @@ use crate::farm::{ }; use crate::node_client::NodeClient; use crate::plotter::Plotter; +use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE}; use crate::single_disk_farm::farming::rayon_files::RayonFiles; use crate::single_disk_farm::farming::{ farming, slot_notification_forwarder, FarmingOptions, PlotAudit, @@ -37,9 +38,6 @@ use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, }; use crate::single_disk_farm::reward_signing::reward_signing; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; -use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::{farm, KNOWN_PEERS_CACHE_SIZE}; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; @@ -75,8 +73,6 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; -#[cfg(not(windows))] -use subspace_farmer_components::file_ext::OpenOptionsExt; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed}; use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync}; @@ -730,14 +726,8 @@ struct SingleDiskFarmInit { identity: Identity, single_disk_farm_info: SingleDiskFarmInfo, single_disk_farm_info_lock: Option, - #[cfg(not(windows))] - plot_file: Arc, - #[cfg(windows)] - plot_file: Arc, - #[cfg(not(windows))] - metadata_file: File, - #[cfg(windows)] - metadata_file: UnbufferedIoFileWindows, + plot_file: Arc, + metadata_file: DirectIoFile, metadata_header: PlotMetadataHeader, target_sector_count: u16, sectors_metadata: Arc>>, @@ -968,17 +958,7 @@ impl SingleDiskFarm { let farming_plot_fut = task::spawn_blocking(|| { farming_thread_pool .install(move || { - #[cfg(windows)] - { - RayonFiles::open_with( - &directory.join(Self::PLOT_FILE), - UnbufferedIoFileWindows::open, - ) - } - #[cfg(not(windows))] - { - RayonFiles::open(&directory.join(Self::PLOT_FILE)) - } + RayonFiles::open_with(&directory.join(Self::PLOT_FILE), DirectIoFile::open) }) .map(|farming_plot| (farming_plot, farming_thread_pool)) }); @@ -1392,19 +1372,7 @@ impl SingleDiskFarm { let target_sector_count = allocated_space_distribution.target_sector_count; let metadata_file_path = directory.join(Self::METADATA_FILE); - #[cfg(not(windows))] - let metadata_file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .advise_random_access() - .open(&metadata_file_path)?; - - #[cfg(not(windows))] - metadata_file.advise_random_access()?; - - #[cfg(windows)] - let metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?; + let metadata_file = DirectIoFile::open(&metadata_file_path)?; let metadata_size = metadata_file.size()?; let expected_metadata_size = allocated_space_distribution.metadata_file_size; @@ -1494,19 +1462,7 @@ impl SingleDiskFarm { Arc::new(AsyncRwLock::new(sectors_metadata)) }; - #[cfg(not(windows))] - let plot_file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .advise_random_access() - .open(directory.join(Self::PLOT_FILE))?; - - #[cfg(not(windows))] - plot_file.advise_random_access()?; - - #[cfg(windows)] - let plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?; + let plot_file = DirectIoFile::open(&directory.join(Self::PLOT_FILE))?; if plot_file.size()? != allocated_space_distribution.plot_file_size { // Allocating the whole file (`set_len` below can create a sparse file, which will cause @@ -1649,13 +1605,7 @@ impl SingleDiskFarm { pub fn read_all_sectors_metadata( directory: &Path, ) -> io::Result> { - #[cfg(not(windows))] - let metadata_file = OpenOptions::new() - .read(true) - .open(directory.join(Self::METADATA_FILE))?; - - #[cfg(windows)] - let metadata_file = UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?; + let metadata_file = DirectIoFile::open(&directory.join(Self::METADATA_FILE))?; let metadata_size = metadata_file.size()?; let sector_metadata_size = SectorMetadataChecksummed::encoded_size(); diff --git a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs similarity index 80% rename from crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs rename to crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs index d92dc6bf43..1694bcc11a 100644 --- a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs +++ b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs @@ -1,13 +1,11 @@ -//! Wrapper data structure for unbuffered I/O on Windows +//! Wrapper data structure for direct/unbuffered I/O use parking_lot::Mutex; use static_assertions::const_assert_eq; use std::fs::{File, OpenOptions}; -use std::io; use std::path::Path; -use subspace_farmer_components::file_ext::FileExt; -#[cfg(windows)] -use subspace_farmer_components::file_ext::OpenOptionsExt; +use std::{io, mem}; +use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; use subspace_farmer_components::ReadAtSync; /// 4096 is as a relatively safe size due to sector size on SSDs commonly being 512 or 4096 bytes @@ -17,30 +15,49 @@ const MAX_READ_SIZE: usize = 1024 * 1024; const_assert_eq!(MAX_READ_SIZE % DISK_SECTOR_SIZE, 0); -/// Wrapper data structure for unbuffered I/O on Windows +#[derive(Debug, Copy, Clone)] +#[repr(C, align(4096))] +struct AlignedSectorSize([u8; DISK_SECTOR_SIZE]); + +const_assert_eq!(align_of::(), DISK_SECTOR_SIZE); + +impl Default for AlignedSectorSize { + fn default() -> Self { + Self([0; DISK_SECTOR_SIZE]) + } +} + +impl AlignedSectorSize { + fn slice_mut_to_repr(slice: &mut [Self]) -> &mut [[u8; DISK_SECTOR_SIZE]] { + // SAFETY: `AlignedSectorSize` is `#[repr(C)]` and its alignment is larger than inner value + unsafe { mem::transmute(slice) } + } +} + +/// Wrapper data structure for direct/unbuffered I/O #[derive(Debug)] -pub struct UnbufferedIoFileWindows { +pub struct DirectIoFile { file: File, physical_sector_size: usize, /// Scratch buffer of aligned memory for reads and writes - scratch_buffer: Mutex>, + scratch_buffer: Mutex>, } -impl ReadAtSync for UnbufferedIoFileWindows { +impl ReadAtSync for DirectIoFile { #[inline] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { self.read_exact_at(buf, offset) } } -impl ReadAtSync for &UnbufferedIoFileWindows { +impl ReadAtSync for &DirectIoFile { #[inline] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { (*self).read_at(buf, offset) } } -impl FileExt for UnbufferedIoFileWindows { +impl FileExt for DirectIoFile { fn size(&self) -> io::Result { Ok(self.file.metadata()?.len()) } @@ -59,6 +76,11 @@ impl FileExt for UnbufferedIoFileWindows { Ok(()) } + fn disable_cache(&self) -> io::Result<()> { + // Ignore, not supported + Ok(()) + } + fn read_exact_at(&self, buf: &mut [u8], mut offset: u64) -> io::Result<()> { if buf.is_empty() { return Ok(()); @@ -128,15 +150,14 @@ impl FileExt for UnbufferedIoFileWindows { } } -impl UnbufferedIoFileWindows { - /// Open file at specified path for random unbuffered access on Windows for reads to prevent - /// huge memory usage (if file doesn't exist, it will be created). +impl DirectIoFile { + /// Open file at specified path for direct/unbuffered I/O for reads (if file doesn't exist, it + /// will be created). /// - /// This abstraction is useless on other platforms and will just result in extra memory copies + /// This is especially important on Windows to prevent huge memory usage. pub fn open(path: &Path) -> io::Result { let mut open_options = OpenOptions::new(); - #[cfg(windows)] - open_options.advise_unbuffered(); + open_options.use_direct_io(); let file = open_options .read(true) .write(true) @@ -144,6 +165,8 @@ impl UnbufferedIoFileWindows { .truncate(false) .open(path)?; + file.disable_cache()?; + // Physical sector size on many SSDs is smaller than 4096 and should improve performance let physical_sector_size = if file.read_at(&mut [0; 512], 512).is_ok() { 512 @@ -156,7 +179,7 @@ impl UnbufferedIoFileWindows { physical_sector_size, // In many cases we'll want to read this much at once, so pre-allocate it right away scratch_buffer: Mutex::new(vec![ - [0; DISK_SECTOR_SIZE]; + AlignedSectorSize::default(); MAX_READ_SIZE / DISK_SECTOR_SIZE ]), }) @@ -169,7 +192,7 @@ impl UnbufferedIoFileWindows { fn read_exact_at_internal<'a>( &self, - scratch_buffer: &'a mut Vec<[u8; DISK_SECTOR_SIZE]>, + scratch_buffer: &'a mut Vec, bytes_to_read: usize, offset: u64, ) -> io::Result<&'a [u8]> { @@ -178,31 +201,39 @@ impl UnbufferedIoFileWindows { let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; let desired_buffer_size = (bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE); if scratch_buffer.len() < desired_buffer_size { - scratch_buffer.resize(desired_buffer_size, [0; DISK_SECTOR_SIZE]); + scratch_buffer.resize_with(desired_buffer_size, AlignedSectorSize::default); } + let scratch_buffer = + AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut(); + // While buffer above is allocated with granularity of `MAX_DISK_SECTOR_SIZE`, reads are // done with granularity of physical sector size let offset_in_buffer = (offset % self.physical_sector_size as u64) as usize; self.file.read_exact_at( - &mut scratch_buffer.as_flattened_mut()[..(bytes_to_read + offset_in_buffer) + &mut scratch_buffer[..(bytes_to_read + offset_in_buffer) .div_ceil(self.physical_sector_size) * self.physical_sector_size], offset / self.physical_sector_size as u64 * self.physical_sector_size as u64, )?; - Ok(&scratch_buffer.as_flattened()[offset_in_buffer..][..bytes_to_read]) + Ok(&scratch_buffer[offset_in_buffer..][..bytes_to_read]) } /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends) fn write_all_at_internal( &self, - scratch_buffer: &mut Vec<[u8; DISK_SECTOR_SIZE]>, + scratch_buffer: &mut Vec, bytes_to_write: &[u8], offset: u64, ) -> io::Result<()> { - // This is guaranteed by `UnbufferedIoFileWindows::open()` - assert!(scratch_buffer.as_flattened().len() >= MAX_READ_SIZE); + // This is guaranteed by constructor + assert!( + AlignedSectorSize::slice_mut_to_repr(scratch_buffer) + .as_flattened_mut() + .len() + >= MAX_READ_SIZE + ); let aligned_offset = offset / self.physical_sector_size as u64 * self.physical_sector_size as u64; @@ -212,13 +243,17 @@ impl UnbufferedIoFileWindows { * self.physical_sector_size; if padding == 0 && bytes_to_read == bytes_to_write.len() { - let scratch_buffer = &mut scratch_buffer.as_flattened_mut()[..bytes_to_read]; + let scratch_buffer = + AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut(); + let scratch_buffer = &mut scratch_buffer[..bytes_to_read]; scratch_buffer.copy_from_slice(bytes_to_write); self.file.write_all_at(scratch_buffer, offset)?; } else { // Read whole pages where `bytes_to_write` will be written self.read_exact_at_internal(scratch_buffer, bytes_to_read, aligned_offset)?; - let scratch_buffer = &mut scratch_buffer.as_flattened_mut()[..bytes_to_read]; + let scratch_buffer = + AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut(); + let scratch_buffer = &mut scratch_buffer[..bytes_to_read]; // Update contents of existing pages and write into the file scratch_buffer[padding..][..bytes_to_write.len()].copy_from_slice(bytes_to_write); self.file.write_all_at(scratch_buffer, aligned_offset)?; @@ -230,9 +265,7 @@ impl UnbufferedIoFileWindows { #[cfg(test)] mod tests { - use crate::single_disk_farm::unbuffered_io_file_windows::{ - UnbufferedIoFileWindows, MAX_READ_SIZE, - }; + use crate::single_disk_farm::direct_io_file::{DirectIoFile, MAX_READ_SIZE}; use rand::prelude::*; use std::fs; use subspace_farmer_components::file_ext::FileExt; @@ -246,7 +279,7 @@ mod tests { thread_rng().fill(data.as_mut_slice()); fs::write(&file_path, &data).unwrap(); - let mut file = UnbufferedIoFileWindows::open(&file_path).unwrap(); + let mut file = DirectIoFile::open(&file_path).unwrap(); for override_physical_sector_size in [None, Some(4096)] { if let Some(physical_sector_size) = override_physical_sector_size { diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index 33bd304e77..80ff2cb834 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -1,15 +1,12 @@ //! Piece reader for single disk farm use crate::farm::{FarmError, PieceReader}; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; +use crate::single_disk_farm::direct_io_file::DirectIoFile; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, StreamExt}; use std::collections::HashSet; -#[cfg(not(windows))] -use std::fs::File; use std::future::Future; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceOffset, PublicKey, SectorId, SectorIndex}; @@ -54,8 +51,7 @@ impl DiskPieceReader { pub(super) fn new( public_key: PublicKey, pieces_in_sector: u16, - #[cfg(not(windows))] plot_file: Arc, - #[cfg(windows)] plot_file: Arc, + plot_file: Arc, sectors_metadata: Arc>>, erasure_coding: ErasureCoding, sectors_being_modified: Arc>>, diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs index dc26fce240..2ed66e02dc 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -4,16 +4,13 @@ mod tests; use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache}; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; +use crate::single_disk_farm::direct_io_file::DirectIoFile; use crate::utils::AsyncJoinOnDrop; use async_lock::RwLock as AsyncRwLock; use async_trait::async_trait; use bytes::BytesMut; use parking_lot::RwLock; use std::collections::HashMap; -#[cfg(not(windows))] -use std::fs::File; use std::sync::{Arc, Weak}; use std::{io, mem}; use subspace_core_primitives::crypto::blake3_hash_list; @@ -50,10 +47,7 @@ struct CachedPieces { /// Additional piece cache that exploit part of the plot that does not contain sectors yet #[derive(Debug, Clone)] pub struct DiskPlotCache { - #[cfg(not(windows))] - file: Weak, - #[cfg(windows)] - file: Weak, + file: Weak, sectors_metadata: Weak>>, cached_pieces: Arc>, target_sector_count: SectorIndex, @@ -84,8 +78,7 @@ impl PlotCache for DiskPlotCache { impl DiskPlotCache { pub(crate) fn new( - #[cfg(not(windows))] file: &Arc, - #[cfg(windows)] file: &Arc, + file: &Arc, sectors_metadata: &Arc>>, target_sector_count: SectorIndex, sector_size: u64, @@ -308,8 +301,7 @@ impl DiskPlotCache { } fn read_piece_internal( - #[cfg(not(windows))] file: &File, - #[cfg(windows)] file: &UnbufferedIoFileWindows, + file: &DirectIoFile, offset: u32, element: &mut [u8], ) -> Result, DiskPlotCacheError> { diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index 82aa5b941f..d3b12d7e18 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -5,20 +5,14 @@ )] use crate::farm::MaybePieceStoredResult; +use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE}; use crate::single_disk_farm::plot_cache::DiskPlotCache; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; -use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; use rand::prelude::*; use std::assert_matches::assert_matches; -#[cfg(not(windows))] -use std::fs::OpenOptions; use std::num::NonZeroU64; use std::sync::Arc; use subspace_core_primitives::{HistorySize, Piece, PieceIndex, Record, SectorIndex}; use subspace_farmer_components::file_ext::FileExt; -#[cfg(not(windows))] -use subspace_farmer_components::file_ext::OpenOptionsExt; use subspace_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed}; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; @@ -37,17 +31,7 @@ async fn basic() { }); let tempdir = tempdir().unwrap(); - #[cfg(not(windows))] - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .advise_random_access() - .open(tempdir.path().join("plot.bin")) - .unwrap(); - - #[cfg(windows)] - let file = UnbufferedIoFileWindows::open(&tempdir.path().join("plot.bin")).unwrap(); + let file = DirectIoFile::open(&tempdir.path().join("plot.bin")).unwrap(); // Align plot file size for disk sector size file.preallocate( diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 157867b8b4..605e3164dc 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,9 +1,8 @@ use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate}; use crate::node_client::{Error as NodeClientError, NodeClient}; use crate::plotter::{Plotter, SectorPlottingProgress}; +use crate::single_disk_farm::direct_io_file::DirectIoFile; use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics}; -#[cfg(windows)] -use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA, }; @@ -13,8 +12,6 @@ use futures::stream::FuturesOrdered; use futures::{select, FutureExt, SinkExt, StreamExt}; use parity_scale_codec::Encode; use std::collections::HashSet; -#[cfg(not(windows))] -use std::fs::File; use std::future::{pending, Future}; use std::io; use std::ops::Range; @@ -88,14 +85,8 @@ pub(super) struct SectorPlottingOptions<'a, NC> { pub(super) node_client: &'a NC, pub(super) pieces_in_sector: u16, pub(super) sector_size: usize, - #[cfg(not(windows))] - pub(super) plot_file: Arc, - #[cfg(windows)] - pub(super) plot_file: Arc, - #[cfg(not(windows))] - pub(super) metadata_file: Arc, - #[cfg(windows)] - pub(super) metadata_file: Arc, + pub(super) plot_file: Arc, + pub(super) metadata_file: Arc, pub(super) handlers: &'a Handlers, pub(super) global_mutex: &'a AsyncMutex<()>, pub(super) plotter: Arc, @@ -196,8 +187,7 @@ where async fn process_plotting_result( sector_plotting_result: SectorPlottingResult, metadata_header: &mut PlotMetadataHeader, - #[cfg(not(windows))] metadata_file: Arc, - #[cfg(windows)] metadata_file: Arc, + metadata_file: Arc, ) -> Result<(), PlottingError> { let SectorPlottingResult { sector_index, @@ -491,10 +481,8 @@ where async fn plot_single_sector_internal( sector_index: SectorIndex, sector_size: usize, - #[cfg(not(windows))] plot_file: &Arc, - #[cfg(windows)] plot_file: &Arc, - #[cfg(not(windows))] metadata_file: &Arc, - #[cfg(windows)] metadata_file: &Arc, + plot_file: &Arc, + metadata_file: &Arc, handlers: &Handlers, sectors_being_modified: &AsyncRwLock>, global_mutex: &AsyncMutex<()>,