Skip to content

Commit

Permalink
Direct I/O on Linux and macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Sep 25, 2024
1 parent 84ea07b commit f2b5ff7
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 187 deletions.
71 changes: 45 additions & 26 deletions crates/subspace-farmer-components/src/file_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,20 @@ pub trait OpenOptionsExt {
/// undesirable, only has impact on Windows, for other operating systems see [`FileExt`]
fn advise_random_access(&mut self) -> &mut Self;

/// Advise Windows to not use buffering for this file and that file access will be random.
///
/// NOTE: There are major alignment requirements described here:
/// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements
#[cfg(windows)]
fn advise_unbuffered(&mut self) -> &mut Self;

/// Advise OS/file system that file will use sequential access and read-ahead behavior is
/// desirable, only has impact on Windows, for other operating systems see [`FileExt`]
fn advise_sequential_access(&mut self) -> &mut Self;

/// Use Direct I/O on Linux and disable buffering on Windows.
///
/// NOTE: There are major alignment requirements described here:
/// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements
/// https://man7.org/linux/man-pages/man2/open.2.html
fn use_direct_io(&mut self) -> &mut Self;
}

impl OpenOptionsExt for OpenOptions {
#[cfg(target_os = "linux")]
fn advise_random_access(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(target_os = "macos")]
#[cfg(not(windows))]
fn advise_random_access(&mut self) -> &mut Self {
// Not supported
self
Expand All @@ -47,8 +41,20 @@ impl OpenOptionsExt for OpenOptions {
)
}

#[cfg(not(windows))]
fn advise_sequential_access(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(windows)]
fn advise_unbuffered(&mut self) -> &mut Self {
fn advise_sequential_access(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN)
}

#[cfg(windows)]
fn use_direct_io(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(
winapi::um::winbase::FILE_FLAG_WRITE_THROUGH
Expand All @@ -57,22 +63,16 @@ impl OpenOptionsExt for OpenOptions {
}

#[cfg(target_os = "linux")]
fn advise_sequential_access(&mut self) -> &mut Self {
// Not supported
self
fn use_direct_io(&mut self) -> &mut Self {
use std::os::unix::fs::OpenOptionsExt;
self.custom_flags(libc::O_DIRECT)
}

#[cfg(target_os = "macos")]
fn advise_sequential_access(&mut self) -> &mut Self {
#[cfg(not(any(target_os = "linux", windows)))]
fn use_direct_io(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(windows)]
fn advise_sequential_access(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN)
}
}

/// Extension convenience trait that allows pre-allocating files, suggesting random access pattern
Expand All @@ -92,6 +92,9 @@ pub trait FileExt {
/// desirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`]
fn advise_sequential_access(&self) -> Result<()>;

/// Disable cache on macOS
fn disable_cache(&self) -> Result<()>;

/// Read exact number of bytes at a specific offset
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()>;

Expand Down Expand Up @@ -163,6 +166,22 @@ impl FileExt for File {
Ok(())
}

#[cfg(not(target_os = "macos"))]
fn disable_cache(&self) -> Result<()> {
// Not supported
Ok(())
}

#[cfg(target_os = "macos")]
fn disable_cache(&self) -> Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fcntl(self.as_raw_fd(), libc::F_NOCACHE, 1) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}

#[cfg(unix)]
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
std::os::unix::fs::FileExt::read_exact_at(self, buf, offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use std::path::PathBuf;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{Record, SolutionRange};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_farm::direct_io_file::DirectIoFile;
use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles;
use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions};
use subspace_farmer::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmInfo, SingleDiskFarmSummary,
};
Expand Down Expand Up @@ -212,10 +212,10 @@ where
)
});
}
if cfg!(windows) {
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
UnbufferedIoFileWindows::open,
DirectIoFile::open,
)
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
Expand Down Expand Up @@ -430,10 +430,10 @@ where
)
});
}
if cfg!(windows) {
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
UnbufferedIoFileWindows::open,
DirectIoFile::open,
)
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
Expand Down
27 changes: 3 additions & 24 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,21 @@ mod tests;
use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
use crate::farm;
use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
use crate::utils::AsyncJoinOnDrop;
use async_trait::async_trait;
use bytes::BytesMut;
use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
#[cfg(not(windows))]
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
use std::task::Poll;
use std::{fs, io, mem};
use subspace_core_primitives::crypto::blake3_hash_list;
use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex};
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::task;
Expand Down Expand Up @@ -65,10 +59,7 @@ pub enum DiskPieceCacheError {
#[derive(Debug)]
struct Inner {
id: PieceCacheId,
#[cfg(not(windows))]
file: File,
#[cfg(windows)]
file: UnbufferedIoFileWindows,
file: DirectIoFile,
max_num_elements: u32,
metrics: Option<DiskPieceCacheMetrics>,
}
Expand Down Expand Up @@ -196,19 +187,7 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

#[cfg(not(windows))]
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::FILE_NAME))?;

#[cfg(not(windows))]
file.advise_random_access()?;

#[cfg(windows)]
let file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?;
let file = DirectIoFile::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
// Align plot file size for disk sector size
Expand Down
66 changes: 8 additions & 58 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! a small piece cache. It fully manages farming and plotting process, including listening to node
//! notifications, producing solutions and singing rewards.

pub mod direct_io_file;
pub mod farming;
pub mod identity;
mod metrics;
Expand All @@ -13,7 +14,6 @@ pub mod plot_cache;
mod plotted_sectors;
mod plotting;
mod reward_signing;
pub mod unbuffered_io_file_windows;

use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::farm::{
Expand All @@ -22,6 +22,7 @@ use crate::farm::{
};
use crate::node_client::NodeClient;
use crate::plotter::Plotter;
use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
use crate::single_disk_farm::farming::{
farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
Expand All @@ -37,9 +38,6 @@ use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions,
};
use crate::single_disk_farm::reward_signing::reward_signing;
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
Expand Down Expand Up @@ -75,8 +73,6 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync};
Expand Down Expand Up @@ -753,14 +749,8 @@ struct SingleDiskFarmInit {
identity: Identity,
single_disk_farm_info: SingleDiskFarmInfo,
single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
#[cfg(not(windows))]
plot_file: Arc<File>,
#[cfg(windows)]
plot_file: Arc<UnbufferedIoFileWindows>,
#[cfg(not(windows))]
metadata_file: File,
#[cfg(windows)]
metadata_file: UnbufferedIoFileWindows,
plot_file: Arc<DirectIoFile>,
metadata_file: DirectIoFile,
metadata_header: PlotMetadataHeader,
target_sector_count: u16,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
Expand Down Expand Up @@ -993,17 +983,7 @@ impl SingleDiskFarm {
let farming_plot_fut = task::spawn_blocking(|| {
farming_thread_pool
.install(move || {
#[cfg(windows)]
{
RayonFiles::open_with(
&directory.join(Self::PLOT_FILE),
UnbufferedIoFileWindows::open,
)
}
#[cfg(not(windows))]
{
RayonFiles::open(&directory.join(Self::PLOT_FILE))
}
RayonFiles::open_with(&directory.join(Self::PLOT_FILE), DirectIoFile::open)
})
.map(|farming_plot| (farming_plot, farming_thread_pool))
});
Expand Down Expand Up @@ -1474,19 +1454,7 @@ impl SingleDiskFarm {
let target_sector_count = allocated_space_distribution.target_sector_count;

let metadata_file_path = directory.join(Self::METADATA_FILE);
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(&metadata_file_path)?;

#[cfg(not(windows))]
metadata_file.advise_random_access()?;

#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?;
let metadata_file = DirectIoFile::open(&metadata_file_path)?;

let metadata_size = metadata_file.size()?;
let expected_metadata_size = allocated_space_distribution.metadata_file_size;
Expand Down Expand Up @@ -1576,19 +1544,7 @@ impl SingleDiskFarm {
Arc::new(AsyncRwLock::new(sectors_metadata))
};

#[cfg(not(windows))]
let plot_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::PLOT_FILE))?;

#[cfg(not(windows))]
plot_file.advise_random_access()?;

#[cfg(windows)]
let plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?;
let plot_file = DirectIoFile::open(&directory.join(Self::PLOT_FILE))?;

if plot_file.size()? != allocated_space_distribution.plot_file_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
Expand Down Expand Up @@ -1731,13 +1687,7 @@ impl SingleDiskFarm {
pub fn read_all_sectors_metadata(
directory: &Path,
) -> io::Result<Vec<SectorMetadataChecksummed>> {
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.open(directory.join(Self::METADATA_FILE))?;

#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?;
let metadata_file = DirectIoFile::open(&directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.size()?;
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
Expand Down
Loading

0 comments on commit f2b5ff7

Please sign in to comment.