Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread priority #2566

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/sc-proof-of-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ derive_more = "0.99.17"
futures = "0.3.29"
lru = "0.12.1"
parity-scale-codec = { version = "3.6.9", features = ["derive"] }
parking_lot = "0.12.1"
rayon = "1.8.1"
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8" }
sc-consensus-slots = { version = "0.10.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8" }
sc-network = { version = "0.10.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8" }
Expand All @@ -30,6 +32,5 @@ sp-inherents = { version = "4.0.0-dev", git = "https://github.com/subspace/polka
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/polkadot-sdk", rev = "d6b500960579d73c43fc4ef550b703acfa61c4c8" }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-proof-of-time = { version = "0.1.0", path = "../subspace-proof-of-time" }
parking_lot = "0.12.1"
rayon = "1.8.1"
thread-priority = "0.16.0"
tracing = "0.1.40"
9 changes: 9 additions & 0 deletions crates/sc-proof-of-time/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::thread;
use subspace_core_primitives::PotCheckpoints;
use thread_priority::{set_current_thread_priority, ThreadPriority};
use tracing::{debug, error, trace, warn};

const LOCAL_PROOFS_CHANNEL_CAPACITY: usize = 10;
Expand Down Expand Up @@ -150,6 +151,14 @@ where
}
}

if let Err(error) = set_current_thread_priority(ThreadPriority::Max) {
warn!(
%error,
"Failed to set thread priority, timekeeper performance may be \
negatively impacted by other software running on this machine",
);
}

if let Err(error) =
run_timekeeper(state, pot_verifier, timekeeper_proofs_sender)
{
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ substrate-bip39 = "0.4.5"
supports-color = "2.1.0"
tempfile = "3.9.0"
thiserror = "1.0.56"
thread-priority = "0.16.0"
tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, fs};
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{PublicKey, Record, SectorIndex};
use subspace_erasure_coding::ErasureCoding;
Expand All @@ -47,6 +47,7 @@ use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use thread_priority::ThreadPriority;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, info_span, warn};
use zeroize::Zeroizing;
Expand All @@ -66,6 +67,50 @@ fn should_farm_during_initial_plotting() -> bool {
total_cpu_cores > 8
}

/// Plotting thread priority
#[derive(Debug, Parser, Copy, Clone)]
enum PlottingThreadPriority {
/// Minimum priority
Min,
/// Default priority
Default,
/// Max priority (not recommended)
Max,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason why reference implementation wants to use this plotting when its clearly not recommended? Maybe this should not be an option at all ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually a much more fine-grained control available over priority. But for now I decided to give extreme option for users to try and experiment with. It will likely evolve over time, we may end up removing it if not helpful. This might be desirable to accelerate plotting at all costs, but it will likely make farming while plotting not possible at all (though not guaranteed).

}

impl FromStr for PlottingThreadPriority {
type Err = String;

fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
match s {
"min" => Ok(Self::Min),
"default" => Ok(Self::Default),
"max" => Ok(Self::Max),
s => Err(format!("Thread priority {s} is not valid")),
}
}
}

impl fmt::Display for PlottingThreadPriority {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Min => "min",
Self::Default => "default",
Self::Max => "max",
})
}
}

impl From<PlottingThreadPriority> for Option<ThreadPriority> {
fn from(value: PlottingThreadPriority) -> Self {
match value {
PlottingThreadPriority::Min => Some(ThreadPriority::Min),
PlottingThreadPriority::Default => None,
PlottingThreadPriority::Max => Some(ThreadPriority::Max),
}
}
}

/// Arguments for farmer
#[derive(Debug, Parser)]
pub(crate) struct FarmingArgs {
Expand Down Expand Up @@ -193,6 +238,10 @@ pub(crate) struct FarmingArgs {
/// each with a pair of CPU cores.
#[arg(long, conflicts_with_all = & ["sector_encoding_concurrency", "replotting_thread_pool_size"])]
replotting_cpu_cores: Option<String>,
/// Plotting thread priority, by default de-prioritizes plotting threads in order to make sure
/// farming is successful and computer can be used comfortably for other things
#[arg(long, default_value_t = PlottingThreadPriority::Min)]
plotting_thread_priority: PlottingThreadPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this feature if we consider it useful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there different circumstances and min will likely slightly decrease plotting performance. Users should be able to change behavior to the old one without downgrading while we're figuring out the best defaults. Otherwise they'll run outdated software and will not be able to benefit from other optimizations.

/// Disable farm locking, for example if file system doesn't support it
#[arg(long)]
disable_farm_locking: bool,
Expand Down Expand Up @@ -346,6 +395,7 @@ where
plotting_cpu_cores,
replotting_thread_pool_size,
replotting_cpu_cores,
plotting_thread_priority,
disable_farm_locking,
} = farming_args;

Expand Down Expand Up @@ -571,6 +621,7 @@ where
plotting_thread_pool_core_indices
.into_iter()
.zip(replotting_thread_pool_core_indices),
plotting_thread_priority.into(),
)?;
let farming_thread_pool_size = farming_thread_pool_size
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
Expand Down
10 changes: 10 additions & 0 deletions crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::pin::{pin, Pin};
use std::str::FromStr;
use std::task::{Context, Poll};
use std::{io, thread};
use thread_priority::{set_current_thread_priority, ThreadPriority};
use tokio::runtime::Handle;
use tokio::task;
use tracing::debug;
Expand Down Expand Up @@ -371,6 +372,7 @@ fn create_plotting_thread_pool_manager_thread_pool_pair(
thread_prefix: &'static str,
thread_pool_index: usize,
cpu_core_set: CpuCoreSet,
thread_priority: Option<ThreadPriority>,
) -> Result<ThreadPool, ThreadPoolBuildError> {
ThreadPoolBuilder::new()
.thread_name(move |thread_index| {
Expand All @@ -386,6 +388,11 @@ fn create_plotting_thread_pool_manager_thread_pool_pair(

move || {
cpu_core_set.pin_current_thread();
if let Some(thread_priority) = thread_priority {
if let Err(error) = set_current_thread_priority(thread_priority) {
warn!(%error, "Failed to set thread priority");
}
}
drop(cpu_core_set);

let _guard = handle.enter();
Expand All @@ -405,6 +412,7 @@ fn create_plotting_thread_pool_manager_thread_pool_pair(
/// support for user customizations is desired. They will then have to be composed into pairs for this function.
pub fn create_plotting_thread_pool_manager<I>(
mut cpu_core_sets: I,
thread_priority: Option<ThreadPriority>,
) -> Result<PlottingThreadPoolManager, ThreadPoolBuildError>
where
I: ExactSizeIterator<Item = (CpuCoreSet, CpuCoreSet)>,
Expand All @@ -422,11 +430,13 @@ where
"plotting",
thread_pool_index,
plotting_cpu_core_set,
thread_priority,
)?,
replotting: create_plotting_thread_pool_manager_thread_pool_pair(
"replotting",
thread_pool_index,
replotting_cpu_core_set,
thread_priority,
)?,
})
},
Expand Down
Loading