Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct I/O on Linux and macOS #3064

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions crates/subspace-farmer-components/src/file_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,20 @@ pub trait OpenOptionsExt {
/// undesirable, only has impact on Windows, for other operating systems see [`FileExt`]
fn advise_random_access(&mut self) -> &mut Self;

/// Advise Windows to not use buffering for this file and that file access will be random.
///
/// NOTE: There are major alignment requirements described here:
/// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements
#[cfg(windows)]
fn advise_unbuffered(&mut self) -> &mut Self;

/// Advise OS/file system that file will use sequential access and read-ahead behavior is
/// desirable, only has impact on Windows, for other operating systems see [`FileExt`]
fn advise_sequential_access(&mut self) -> &mut Self;

/// Use Direct I/O on Linux and disable buffering on Windows.
///
/// NOTE: There are major alignment requirements described here:
/// https://learn.microsoft.com/en-us/windows/win32/fileio/file-buffering#alignment-and-file-access-requirements
/// https://man7.org/linux/man-pages/man2/open.2.html
fn use_direct_io(&mut self) -> &mut Self;
}

impl OpenOptionsExt for OpenOptions {
#[cfg(target_os = "linux")]
fn advise_random_access(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(target_os = "macos")]
#[cfg(not(windows))]
fn advise_random_access(&mut self) -> &mut Self {
// Not supported
self
Expand All @@ -47,8 +41,20 @@ impl OpenOptionsExt for OpenOptions {
)
}

#[cfg(not(windows))]
fn advise_sequential_access(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(windows)]
fn advise_unbuffered(&mut self) -> &mut Self {
fn advise_sequential_access(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN)
}

#[cfg(windows)]
fn use_direct_io(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(
winapi::um::winbase::FILE_FLAG_WRITE_THROUGH
Expand All @@ -57,22 +63,16 @@ impl OpenOptionsExt for OpenOptions {
}

#[cfg(target_os = "linux")]
fn advise_sequential_access(&mut self) -> &mut Self {
// Not supported
self
fn use_direct_io(&mut self) -> &mut Self {
use std::os::unix::fs::OpenOptionsExt;
self.custom_flags(libc::O_DIRECT)
}

#[cfg(target_os = "macos")]
fn advise_sequential_access(&mut self) -> &mut Self {
#[cfg(not(any(target_os = "linux", windows)))]
fn use_direct_io(&mut self) -> &mut Self {
// Not supported
self
}

#[cfg(windows)]
fn advise_sequential_access(&mut self) -> &mut Self {
use std::os::windows::fs::OpenOptionsExt;
self.custom_flags(winapi::um::winbase::FILE_FLAG_SEQUENTIAL_SCAN)
}
}

/// Extension convenience trait that allows pre-allocating files, suggesting random access pattern
Expand All @@ -92,6 +92,9 @@ pub trait FileExt {
/// desirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`]
fn advise_sequential_access(&self) -> Result<()>;

/// Disable cache on macOS
fn disable_cache(&self) -> Result<()>;

/// Read exact number of bytes at a specific offset
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()>;

Expand Down Expand Up @@ -163,6 +166,22 @@ impl FileExt for File {
Ok(())
}

#[cfg(not(target_os = "macos"))]
fn disable_cache(&self) -> Result<()> {
// Not supported
Ok(())
}

#[cfg(target_os = "macos")]
fn disable_cache(&self) -> Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fcntl(self.as_raw_fd(), libc::F_NOCACHE, 1) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}

#[cfg(unix)]
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
std::os::unix::fs::FileExt::read_exact_at(self, buf, offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use std::path::PathBuf;
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{Record, SolutionRange};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_farm::direct_io_file::DirectIoFile;
use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles;
use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions};
use subspace_farmer::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmInfo, SingleDiskFarmSummary,
};
Expand Down Expand Up @@ -209,10 +209,10 @@ where
)
});
}
if cfg!(windows) {
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
UnbufferedIoFileWindows::open,
DirectIoFile::open,
)
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
Expand Down Expand Up @@ -424,10 +424,10 @@ where
)
});
}
if cfg!(windows) {
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
UnbufferedIoFileWindows::open,
DirectIoFile::open,
)
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
Expand Down
27 changes: 3 additions & 24 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,21 @@ mod tests;
use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
use crate::farm;
use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
use crate::utils::AsyncJoinOnDrop;
use async_trait::async_trait;
use bytes::BytesMut;
use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
#[cfg(not(windows))]
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
use std::task::Poll;
use std::{fs, io, mem};
use subspace_core_primitives::crypto::blake3_hash_list;
use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex};
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use thiserror::Error;
use tokio::runtime::Handle;
use tokio::task;
Expand Down Expand Up @@ -65,10 +59,7 @@ pub enum DiskPieceCacheError {
#[derive(Debug)]
struct Inner {
id: PieceCacheId,
#[cfg(not(windows))]
file: File,
#[cfg(windows)]
file: UnbufferedIoFileWindows,
file: DirectIoFile,
max_num_elements: u32,
metrics: Option<DiskPieceCacheMetrics>,
}
Expand Down Expand Up @@ -196,19 +187,7 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

#[cfg(not(windows))]
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::FILE_NAME))?;

#[cfg(not(windows))]
file.advise_random_access()?;

#[cfg(windows)]
let file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?;
let file = DirectIoFile::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
// Align plot file size for disk sector size
Expand Down
66 changes: 8 additions & 58 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! a small piece cache. It fully manages farming and plotting process, including listening to node
//! notifications, producing solutions and singing rewards.

pub mod direct_io_file;
pub mod farming;
pub mod identity;
mod metrics;
Expand All @@ -13,7 +14,6 @@ pub mod plot_cache;
mod plotted_sectors;
mod plotting;
mod reward_signing;
pub mod unbuffered_io_file_windows;

use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::farm::{
Expand All @@ -22,6 +22,7 @@ use crate::farm::{
};
use crate::node_client::NodeClient;
use crate::plotter::Plotter;
use crate::single_disk_farm::direct_io_file::{DirectIoFile, DISK_SECTOR_SIZE};
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
use crate::single_disk_farm::farming::{
farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
Expand All @@ -37,9 +38,6 @@ use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions,
};
use crate::single_disk_farm::reward_signing::reward_signing;
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::{farm, KNOWN_PEERS_CACHE_SIZE};
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
Expand Down Expand Up @@ -75,8 +73,6 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync};
Expand Down Expand Up @@ -730,14 +726,8 @@ struct SingleDiskFarmInit {
identity: Identity,
single_disk_farm_info: SingleDiskFarmInfo,
single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
#[cfg(not(windows))]
plot_file: Arc<File>,
#[cfg(windows)]
plot_file: Arc<UnbufferedIoFileWindows>,
#[cfg(not(windows))]
metadata_file: File,
#[cfg(windows)]
metadata_file: UnbufferedIoFileWindows,
plot_file: Arc<DirectIoFile>,
metadata_file: DirectIoFile,
metadata_header: PlotMetadataHeader,
target_sector_count: u16,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
Expand Down Expand Up @@ -968,17 +958,7 @@ impl SingleDiskFarm {
let farming_plot_fut = task::spawn_blocking(|| {
farming_thread_pool
.install(move || {
#[cfg(windows)]
{
RayonFiles::open_with(
&directory.join(Self::PLOT_FILE),
UnbufferedIoFileWindows::open,
)
}
#[cfg(not(windows))]
{
RayonFiles::open(&directory.join(Self::PLOT_FILE))
}
RayonFiles::open_with(&directory.join(Self::PLOT_FILE), DirectIoFile::open)
})
.map(|farming_plot| (farming_plot, farming_thread_pool))
});
Expand Down Expand Up @@ -1392,19 +1372,7 @@ impl SingleDiskFarm {
let target_sector_count = allocated_space_distribution.target_sector_count;

let metadata_file_path = directory.join(Self::METADATA_FILE);
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(&metadata_file_path)?;

#[cfg(not(windows))]
metadata_file.advise_random_access()?;

#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?;
let metadata_file = DirectIoFile::open(&metadata_file_path)?;

let metadata_size = metadata_file.size()?;
let expected_metadata_size = allocated_space_distribution.metadata_file_size;
Expand Down Expand Up @@ -1494,19 +1462,7 @@ impl SingleDiskFarm {
Arc::new(AsyncRwLock::new(sectors_metadata))
};

#[cfg(not(windows))]
let plot_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::PLOT_FILE))?;

#[cfg(not(windows))]
plot_file.advise_random_access()?;

#[cfg(windows)]
let plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?;
let plot_file = DirectIoFile::open(&directory.join(Self::PLOT_FILE))?;

if plot_file.size()? != allocated_space_distribution.plot_file_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
Expand Down Expand Up @@ -1649,13 +1605,7 @@ impl SingleDiskFarm {
pub fn read_all_sectors_metadata(
directory: &Path,
) -> io::Result<Vec<SectorMetadataChecksummed>> {
#[cfg(not(windows))]
let metadata_file = OpenOptions::new()
.read(true)
.open(directory.join(Self::METADATA_FILE))?;

#[cfg(windows)]
let metadata_file = UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?;
let metadata_file = DirectIoFile::open(&directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.size()?;
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
Expand Down
Loading
Loading