diff --git a/Cargo.lock b/Cargo.lock index 2000a5d..6a70724 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4200,6 +4200,7 @@ checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", + "serde", ] [[package]] @@ -4485,7 +4486,7 @@ dependencies = [ [[package]] name = "network-scheduler" -version = "1.0.12" +version = "1.0.13" dependencies = [ "anyhow", "async-trait", @@ -4506,6 +4507,7 @@ dependencies = [ "lazy_static", "log", "nonempty", + "parking_lot", "prometheus-client", "rand 0.8.5", "random_choice", diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index a0dcb7e..623983f 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "network-scheduler" -version = "1.0.12" +version = "1.0.13" edition = "2021" [dependencies] @@ -22,6 +22,7 @@ itertools = "0.12" lazy_static = "1.4.0" log = "0.4" nonempty = { version = "0.10", features = ["serde", "serialize"] } +parking_lot = { version = "0.12", features = ["serde"] } prometheus-client = "0.22" rand = "0.8" random_choice = "0.3" diff --git a/crates/network-scheduler/src/cli.rs b/crates/network-scheduler/src/cli.rs index 0532b0b..644a220 100644 --- a/crates/network-scheduler/src/cli.rs +++ b/crates/network-scheduler/src/cli.rs @@ -95,16 +95,6 @@ pub struct Cli { )] pub metrics_path: Option, - #[arg( - long, - env, - help = "Choose which metrics should be printed.", - value_delimiter = ',', - num_args = 0.., - default_value = "QuerySubmitted,QueryFinished,WorkersSnapshot" - )] - pub metrics: Vec, - #[arg( short, long, diff --git a/crates/network-scheduler/src/main.rs b/crates/network-scheduler/src/main.rs index fec19dd..229c37f 100644 --- a/crates/network-scheduler/src/main.rs +++ b/crates/network-scheduler/src/main.rs @@ -5,13 +5,11 @@ use prometheus_client::registry::Registry; use subsquid_network_transport::P2PTransportBuilder; use crate::cli::Cli; -use crate::metrics::MetricsWriter; use crate::server::Server; use crate::storage::S3Storage; mod cli; mod data_chunk; -mod metrics; mod metrics_server; mod prometheus_metrics; mod scheduler; @@ -39,7 +37,6 @@ async fn main() -> anyhow::Result<()> { args.read_config().await?; // Open file for writing metrics - let metrics_writer = MetricsWriter::from_cli(&args).await?; let mut metrics_registry = Registry::default(); subsquid_network_transport::metrics::register_metrics(&mut metrics_registry); prometheus_metrics::register_metrics(&mut metrics_registry); @@ -61,7 +58,6 @@ async fn main() -> anyhow::Result<()> { incoming_units, transport_handle, scheduler, - metrics_writer, ) .run( contract_client, diff --git a/crates/network-scheduler/src/metrics.rs b/crates/network-scheduler/src/metrics.rs deleted file mode 100644 index 99ca285..0000000 --- a/crates/network-scheduler/src/metrics.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::pin::Pin; -use std::time::SystemTime; - -use derive_enum_from_into::EnumFrom; -use serde::Serialize; -use serde_with::{serde_as, TimestampMilliSeconds}; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncWrite, AsyncWriteExt}; - -use subsquid_messages::{Ping, QueryFinished, QuerySubmitted}; -use subsquid_network_transport::PeerId; - -use crate::cli::Cli; -use crate::worker_state::WorkerState; - -#[serde_as] -#[derive(Debug, Clone, Serialize)] -pub struct Metrics { - #[serde_as(as = "TimestampMilliSeconds")] - timestamp: SystemTime, - #[serde(flatten)] - event: MetricsEvent, -} - -impl Metrics { - pub fn new(peer_id: Option, event: impl Into) -> anyhow::Result { - let event = event.into(); - let expected_sender = match &event { - MetricsEvent::QuerySubmitted(QuerySubmitted { client_id, .. }) => Some(client_id), - MetricsEvent::QueryFinished(QueryFinished { client_id, .. }) => Some(client_id), - MetricsEvent::Ping(Ping { worker_id, .. }) => worker_id.as_ref(), - _ => None, - }; - anyhow::ensure!( - peer_id.as_ref() == expected_sender, - "Invalid metrics message sender" - ); - - Ok(Self { - timestamp: SystemTime::now(), - event, - }) - } - - pub fn to_json_line(&self) -> anyhow::Result> { - let mut vec = serde_json::to_vec(self)?; - vec.push(b'\n'); - Ok(vec) - } -} - -#[derive(Debug, Clone, Serialize, EnumFrom)] -#[serde(tag = "event")] -pub enum MetricsEvent { - Ping(Ping), - QuerySubmitted(QuerySubmitted), - QueryFinished(QueryFinished), - WorkersSnapshot { active_workers: Vec }, -} - -impl MetricsEvent { - pub fn name(&self) -> &'static str { - match self { - MetricsEvent::Ping(_) => "Ping", - MetricsEvent::QuerySubmitted(_) => "QuerySubmitted", - MetricsEvent::QueryFinished(_) => "QueryFinished", - MetricsEvent::WorkersSnapshot { .. } => "WorkersSnapshot", - } - } -} - -impl From> for MetricsEvent { - fn from(active_workers: Vec) -> Self { - Self::WorkersSnapshot { active_workers } - } -} - -pub struct MetricsWriter { - output: Pin>, - enabled_metrics: Vec, -} - -impl MetricsWriter { - pub async fn from_cli(cli: &Cli) -> anyhow::Result { - let output: Pin> = match &cli.metrics_path { - Some(path) => { - let metrics_file = OpenOptions::new() - .create(true) - .append(true) - .open(path) - .await?; - Box::pin(metrics_file) - } - None => Box::pin(tokio::io::stdout()), - }; - let enabled_metrics = cli.metrics.clone(); - Ok(Self { - output, - enabled_metrics, - }) - } - - fn metric_enabled(&self, event: &MetricsEvent) -> bool { - let event_name = event.name(); - self.enabled_metrics.iter().any(|s| s == event_name) - } - - pub async fn write_metrics( - &mut self, - peer_id: Option, - msg: impl Into, - ) -> anyhow::Result<()> { - let peer_id = peer_id.map(|id| id.to_string()); - let metrics = Metrics::new(peer_id, msg)?; - if self.metric_enabled(&metrics.event) { - let json_line = metrics.to_json_line()?; - self.output.write_all(json_line.as_slice()).await?; - } - Ok(()) - } -} diff --git a/crates/network-scheduler/src/metrics_server.rs b/crates/network-scheduler/src/metrics_server.rs index e51fb6f..e195ba4 100644 --- a/crates/network-scheduler/src/metrics_server.rs +++ b/crates/network-scheduler/src/metrics_server.rs @@ -21,14 +21,12 @@ const JAIL_INFO_FIELDS: [Field<'static, WorkerState>; 3] = [ Field::new("jail_reason"), ]; -async fn active_workers( - Extension(scheduler): Extension>>, -) -> Json> { - Json(scheduler.read().await.active_workers()) +async fn active_workers(Extension(scheduler): Extension) -> Json> { + Json(scheduler.active_workers()) } -async fn workers_jail_info(Extension(scheduler): Extension>>) -> Response { - let active_workers = scheduler.read().await.active_workers(); +async fn workers_jail_info(Extension(scheduler): Extension) -> Response { + let active_workers = scheduler.active_workers(); let jail_info = active_workers .iter() .map(|w| w.with_fields(|_| JAIL_INFO_FIELDS)) @@ -37,9 +35,9 @@ async fn workers_jail_info(Extension(scheduler): Extension } async fn chunks( - Extension(scheduler): Extension>>, + Extension(scheduler): Extension, ) -> Json>> { - let chunks_summary = scheduler.read().await.get_chunks_summary(); + let chunks_summary = scheduler.get_chunks_summary(); Json(chunks_summary) } @@ -55,7 +53,7 @@ async fn get_metrics(Extension(metrics_registry): Extension } pub async fn run_server( - scheduler: Arc>, + scheduler: Scheduler, addr: SocketAddr, metrics_registry: Registry, cancel_token: CancellationToken, diff --git a/crates/network-scheduler/src/prometheus_metrics.rs b/crates/network-scheduler/src/prometheus_metrics.rs index 857b7a0..2914b60 100644 --- a/crates/network-scheduler/src/prometheus_metrics.rs +++ b/crates/network-scheduler/src/prometheus_metrics.rs @@ -1,12 +1,11 @@ -use std::collections::HashMap; +use std::time::Duration; use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; use prometheus_client::metrics::gauge::Gauge; -use prometheus_client::metrics::histogram::{linear_buckets, Histogram}; +use prometheus_client::metrics::histogram::{exponential_buckets, linear_buckets, Histogram}; use prometheus_client::registry::Registry; -use crate::scheduling_unit::UnitId; - lazy_static::lazy_static! { static ref WORKERS_PER_UNIT: Histogram = Histogram::new(linear_buckets(0.0, 1.0, 100)); static ref TOTAL_UNITS: Gauge = Default::default(); @@ -14,6 +13,9 @@ lazy_static::lazy_static! { static ref REPLICATION_FACTOR: Gauge = Default::default(); static ref PARTIALLY_ASSIGNED_UNITS: Gauge = Default::default(); static ref S3_REQUESTS: Counter = Default::default(); + static ref EXEC_TIMES: Family, Histogram> = Family::new_with_constructor( + || Histogram::new(exponential_buckets(0.001, 2.0, 24)) + ); } pub fn register_metrics(registry: &mut Registry) { @@ -47,11 +49,16 @@ pub fn register_metrics(registry: &mut Registry) { "s3_requests", "Total number of S3 API requests since application start", S3_REQUESTS.clone(), - ) + ); + registry.register( + "exec_times", + "Execution times of various procedures (ms)", + EXEC_TIMES.clone(), + ); } -pub fn units_assigned(counts: HashMap<&UnitId, usize>) { - for (_unit_id, count) in counts { +pub fn units_assigned(counts: impl IntoIterator) { + for count in counts { WORKERS_PER_UNIT.observe(count as f64); } } @@ -75,3 +82,11 @@ pub fn partially_assigned_units(count: usize) { pub fn s3_request() { S3_REQUESTS.inc(); } + +pub fn exec_time(procedure: &'static str, duration: Duration) { + let duration_millis = duration.as_micros() as f64 / 1000.; + log::trace!("Procedure {procedure} took {duration_millis:.3} ms"); + EXEC_TIMES + .get_or_create(&vec![("procedure", procedure)]) + .observe(duration_millis); +} diff --git a/crates/network-scheduler/src/scheduler.rs b/crates/network-scheduler/src/scheduler.rs index 93189d8..9c536b8 100644 --- a/crates/network-scheduler/src/scheduler.rs +++ b/crates/network-scheduler/src/scheduler.rs @@ -1,17 +1,21 @@ use std::cmp::max; use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use dashmap::mapref::one::RefMut; +use dashmap::DashMap; use iter_num_tools::lin_space; use itertools::Itertools; +use parking_lot::RwLock; +use prometheus_client::metrics::gauge::Atomic; use rand::prelude::SliceRandom; use rand::{thread_rng, Rng}; use random_choice::random_choice; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; use contract_client::Worker; -use dashmap::mapref::one::RefMut; -use dashmap::DashMap; use subsquid_messages::HttpHeader; use subsquid_messages::{pong::Status as WorkerStatus, Ping}; use subsquid_network_transport::PeerId; @@ -34,15 +38,13 @@ pub struct ChunkStatus { pub downloaded_by: Vec>, // Will deserialize duplicated, but it's short-lived } -#[derive(Default, Serialize, Deserialize)] +#[derive(Default, Clone, Serialize, Deserialize)] pub struct Scheduler { - known_units: HashMap, - units_assignments: HashMap>, - worker_states: DashMap, - #[serde(default)] - chunks_summary: HashMap>, // dataset -> chunks statuses - #[serde(default)] - last_schedule_epoch: u32, + known_units: Arc>, + units_assignments: Arc>>, + worker_states: Arc>, + chunks_summary: Arc>>>, // dataset -> chunks statuses + last_schedule_epoch: Arc, } impl Scheduler { @@ -55,7 +57,7 @@ impl Scheduler { } pub fn last_schedule_epoch(&self) -> u32 { - self.last_schedule_epoch + self.last_schedule_epoch.get() } pub fn clear_deprecated_units(&mut self) { @@ -67,16 +69,15 @@ impl Scheduler { let deprecated_unit_ids: Vec = self .known_units .iter() - .filter_map(|(unit_id, unit)| { - (!dataset_urls.contains(unit.dataset_url())).then_some(*unit_id) - }) + .filter_map(|unit| (!dataset_urls.contains(unit.dataset_url())).then_some(*unit.key())) .collect(); for unit_id in deprecated_unit_ids.iter() { - let unit = self.known_units.remove(unit_id).expect("unknown unit"); + let (_, unit) = self.known_units.remove(unit_id).expect("unknown unit"); log::info!("Removing deprecated scheduling unit {unit}"); let unit_size = unit.size_bytes(); self.units_assignments .remove(unit_id) + .map(|(_, workers)| workers) .unwrap_or_default() .into_iter() .for_each(|worker_id| { @@ -88,10 +89,12 @@ impl Scheduler { } } - pub fn regenerate_signatures(&mut self) { + pub fn regenerate_signatures(&self) { + let start = Instant::now(); for mut state in self.worker_states.iter_mut() { state.regenerate_signature(); } + prometheus_metrics::exec_time("regen_signatures", start.elapsed()); } /// Register ping msg from a worker. Returns worker status if ping was accepted, otherwise None @@ -141,17 +144,24 @@ impl Scheduler { pub fn worker_dialed(&self, worker_id: PeerId, reachable: bool) { log::info!("Dialed worker {worker_id}. reachable={reachable}"); + let start = Instant::now(); match self.worker_states.get_mut(&worker_id) { Some(mut worker_state) => worker_state.dialed(reachable), None => log::error!("Unknown worker dialed: {worker_id}"), } + prometheus_metrics::exec_time("worker_dialed", start.elapsed()); + } + + pub fn known_units(&self) -> DashMap { + (*self.known_units).clone() } - pub fn known_units(&self) -> HashMap { - self.known_units.clone() + pub fn total_data_size(&self) -> u64 { + self.known_units.iter().map(|u| u.size_bytes()).sum() } - pub fn new_unit(&mut self, unit: SchedulingUnit) { + pub fn new_unit(&self, unit: SchedulingUnit) { + let start = Instant::now(); let unit_id = unit.id(); let unit_size = unit.size_bytes(); let unit_str = unit.to_string(); @@ -187,6 +197,7 @@ impl Scheduler { }); } } + prometheus_metrics::exec_time("new_unit", start.elapsed()); } pub fn all_workers(&self) -> Vec { @@ -201,7 +212,7 @@ impl Scheduler { .collect() } - fn get_worker(&mut self, worker_id: &PeerId) -> RefMut { + fn get_worker(&self, worker_id: &PeerId) -> RefMut { self.worker_states .get_mut(worker_id) .expect("Unknown worker") @@ -214,7 +225,7 @@ impl Scheduler { .unwrap_or_default() } - pub fn schedule(&mut self, epoch: u32) { + pub fn schedule(&self, epoch: u32) { log::info!( "Starting scheduling. Total registered workers: {} Total units: {}", self.worker_states.len(), @@ -223,63 +234,72 @@ impl Scheduler { self.release_jailed_workers(); self.mix_random_units(); self.assign_units(); - self.last_schedule_epoch = epoch; + self.last_schedule_epoch.store(epoch, Ordering::Relaxed); } - pub fn update_workers(&mut self, workers: Vec) { + pub fn update_workers(&self, workers: Vec) { log::info!("Updating workers"); - let old_workers = std::mem::take(&mut self.worker_states); - - // For each of the new workers, find an existing state or create a blank one - self.worker_states = workers + let new_workers: HashMap<_, _> = workers .into_iter() - .map(|w| { - let worker_state = match old_workers.remove(&w.peer_id) { - Some((_, state)) => state, - None => WorkerState::new(w.peer_id, w.address), - }; - (w.peer_id, worker_state) - }) + .map(|w| (w.peer_id, w.address)) .collect(); + let current_workers: HashSet<_> = self.worker_states.iter().map(|w| *w.key()).collect(); - // Workers which remained in the map are no longer registered - for (_, worker) in old_workers { - log::info!("Worker unregistered: {worker:?}"); - for unit_id in worker.assigned_units { - self.units_assignments - .get_mut(&unit_id) - .expect("unknown unit") - .retain(|id| *id != worker.peer_id); - } - } + // Remove workers which are no longer registered + current_workers + .iter() + .filter(|peer_id| !new_workers.contains_key(peer_id)) + .for_each(|peer_id| { + let (_, worker) = self + .worker_states + .remove(peer_id) + .expect("current_workers are based of worker_states"); + log::info!("Worker unregistered: {worker:?}"); + for unit_id in worker.assigned_units { + self.units_assignments + .get_mut(&unit_id) + .expect("unknown unit") + .retain(|id| *id != worker.peer_id); + } + }); + + // Create blank states for newly registered workers + new_workers + .into_iter() + .filter(|w| !current_workers.contains(&w.0)) + .for_each(|(peer_id, address)| { + self.worker_states + .insert(peer_id, WorkerState::new(peer_id, address)); + }); } fn release_jailed_workers(&self) { log::info!("Releasing jailed workers"); + let start = Instant::now(); let release_unreachable = !Config::get().jail_unreachable; self.worker_states .iter_mut() .filter(|w| w.jailed && w.is_active() && (release_unreachable || !w.is_unreachable())) .for_each(|mut w| w.release()); + prometheus_metrics::exec_time("release_workers", start.elapsed()); } /// Jail workers which don't send pings. - pub fn jail_inactive_workers(&mut self) -> bool { + pub fn jail_inactive_workers(&self) -> bool { log::info!("Jailing inactive workers"); self.jail_workers(|w| !w.is_active(), JailReason::Inactive) } /// Jail workers which don't make download progress. - pub fn jail_stale_workers(&mut self) -> bool { + pub fn jail_stale_workers(&self) -> bool { log::info!("Jailing stale workers"); - let known_units = self.known_units.clone(); self.jail_workers( - |w| !w.check_download_progress(&known_units), + |w| !w.check_download_progress(&self.known_units), JailReason::Stale, ) } - pub fn jail_unreachable_workers(&mut self) -> bool { + pub fn jail_unreachable_workers(&self) -> bool { if Config::get().jail_unreachable { log::info!("Jailing unreachable workers"); self.jail_workers(|w| w.is_unreachable(), JailReason::Unreachable) @@ -294,10 +314,11 @@ impl Scheduler { } fn jail_workers( - &mut self, + &self, mut criterion: impl FnMut(&mut WorkerState) -> bool, reason: JailReason, ) -> bool { + let start = Instant::now(); let mut num_jailed_workers: usize = 0; let mut num_unassigned_units = 0; @@ -327,23 +348,25 @@ impl Scheduler { if num_unassigned_units > 0 { self.assign_units(); } + prometheus_metrics::exec_time("jail_workers", start.elapsed()); num_jailed_workers > 0 } - fn mix_random_units(&mut self) { + fn mix_random_units(&self) { log::info!("Mixing random units"); + let start = Instant::now(); // Group units by dataset and unassign random fraction of units for each dataset let grouped_units = self .known_units .iter() - .filter(|(unit_id, _)| self.num_replicas(unit_id) > 0) - .into_group_map_by(|(_, unit)| unit.dataset_url()); + .filter(|u| self.num_replicas(u.key()) > 0) + .into_group_map_by(|u| u.dataset_url().to_owned()); for (dataset_url, mut dataset_units) in grouped_units { // Sort units from oldest to newest and give them weights making // the most recent units more likely to be re-assigned - dataset_units.sort_by_cached_key(|(_, unit)| unit.begin()); + dataset_units.sort_by_cached_key(|u| u.begin()); let num_units = dataset_units.len(); let num_mixed = ((num_units as f64) * Config::get().mixed_units_ratio) as usize; let max_weight = Config::get().mixing_recent_unit_weight; @@ -353,37 +376,42 @@ impl Scheduler { log::info!("Mixing {num_mixed} out of {num_units} units for dataset {dataset_url}"); // For each of the randomly selected units, remove one random replica - for (unit_id, unit) in mixed_units { - let holder_ids = self + for unit in mixed_units { + let mut holder_ids = self .units_assignments - .get_mut(*unit_id) + .get_mut(unit.key()) .expect("no empty assignments"); let random_idx = thread_rng().gen_range(0..holder_ids.len()); let holder_id = holder_ids.remove(random_idx); self.worker_states .get_mut(&holder_id) .expect("Unknown worker") - .remove_unit(unit_id, unit.size_bytes()); + .remove_unit(unit.key(), unit.size_bytes()); } } + prometheus_metrics::exec_time("mix_units", start.elapsed()); } - fn clear_redundant_replicas(&mut self, rep_factor: usize) { - for (unit_id, holder_ids) in self.units_assignments.iter_mut() { - let unit = self.known_units.get(unit_id).expect("Unknown unit"); - while holder_ids.len() > rep_factor { - let random_idx = thread_rng().gen_range(0..holder_ids.len()); - let holder_id = holder_ids.remove(random_idx); + fn clear_redundant_replicas(&self, rep_factor: usize) { + for mut unit_holders in self.units_assignments.iter_mut() { + let unit = self + .known_units + .get(unit_holders.key()) + .expect("Unknown unit"); + while unit_holders.len() > rep_factor { + let random_idx = thread_rng().gen_range(0..unit_holders.len()); + let holder_id = unit_holders.remove(random_idx); self.worker_states .get_mut(&holder_id) .expect("Unknown worker") - .remove_unit(unit_id, unit.size_bytes()); + .remove_unit(unit_holders.key(), unit.size_bytes()); } } } - fn assign_units(&mut self) { + fn assign_units(&self) { log::info!("Assigning units"); + let start = Instant::now(); // Only active and non-jailed workers are eligible for assignment let mut workers: Vec<_> = self @@ -401,7 +429,7 @@ impl Scheduler { .collect(); // Compute replication factor based on total available capacity - let data_size: u64 = self.known_units().values().map(|u| u.size_bytes()).sum(); + let data_size = self.total_data_size(); let rep_factor = match replication_factor(data_size, workers.len() as u64) { Some(rep_factor) => rep_factor, None => return, @@ -414,11 +442,11 @@ impl Scheduler { let mut units: BinaryHeap<(usize, u64, UnitId)> = self .known_units .iter() - .filter_map(|(unit_id, unit)| { + .filter_map(|unit| { let missing_replicas = rep_factor - .checked_sub(self.num_replicas(unit_id)) + .checked_sub(self.num_replicas(unit.key())) .expect("Redundant replicas"); - (missing_replicas > 0).then_some((missing_replicas, unit.size_bytes(), *unit_id)) + (missing_replicas > 0).then_some((missing_replicas, unit.size_bytes(), *unit.key())) }) .collect(); @@ -460,16 +488,15 @@ impl Scheduler { let incomplete_units = self .units_assignments - .values() - .filter(|workers| workers.len() < rep_factor) + .iter() + .filter(|assignment| assignment.len() < rep_factor) .count(); log::info!("Assignment complete. {incomplete_units} units are missing some replicas"); prometheus_metrics::units_assigned( self.units_assignments .iter() - .map(|(unit_id, workers)| (unit_id, workers.len())) - .collect(), + .map(|assignment| assignment.len()), ); prometheus_metrics::partially_assigned_units(incomplete_units); @@ -480,14 +507,15 @@ impl Scheduler { w.reset_download_progress(&self.known_units); log::info!("{}", *w) }); + prometheus_metrics::exec_time("assign_units", start.elapsed()); } pub fn get_chunks_summary(&self) -> HashMap> { - self.chunks_summary.clone() + self.chunks_summary.read().clone() } - pub fn update_chunks_summary(&mut self, summary: HashMap>) { - self.chunks_summary = summary; + pub fn update_chunks_summary(&self, summary: HashMap>) { + *self.chunks_summary.write() = summary; } } diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 81cb239..b981367 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -1,14 +1,16 @@ -use futures::{Stream, StreamExt}; -use itertools::Itertools; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use dashmap::DashMap; +use futures::{Stream, StreamExt}; +use itertools::Itertools; +use parking_lot::Mutex; use prometheus_client::registry::Registry; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc::Receiver; -use tokio::sync::{Mutex, RwLock, RwLockWriteGuard}; +use tokio::time::Instant; use subsquid_messages::signatures::msg_hash; use subsquid_messages::{Ping, Pong, RangeSet}; @@ -17,12 +19,11 @@ use subsquid_network_transport::{PeerId, SchedulerEvent, SchedulerTransportHandl use crate::cli::Config; use crate::data_chunk::{chunks_to_worker_state, DataChunk}; -use crate::metrics::{MetricsEvent, MetricsWriter}; -use crate::metrics_server; use crate::scheduler::{ChunkStatus, Scheduler}; use crate::scheduling_unit::{SchedulingUnit, UnitId}; use crate::storage::S3Storage; use crate::worker_state::WorkerState; +use crate::{metrics_server, prometheus_metrics}; const WORKER_REFRESH_INTERVAL: Duration = Duration::from_secs(60); const CHUNKS_SUMMARY_REFRESH_INTERVAL: Duration = Duration::from_secs(60); @@ -31,8 +32,7 @@ pub struct Server + Send + Unpin + 'static> { incoming_events: S, incoming_units: Receiver, transport_handle: SchedulerTransportHandle, - scheduler: Arc>, - metrics_writer: Arc>, + scheduler: Scheduler, task_manager: TaskManager, } @@ -42,16 +42,12 @@ impl + Send + Unpin + 'static> Server { incoming_units: Receiver, transport_handle: SchedulerTransportHandle, scheduler: Scheduler, - metrics_writer: MetricsWriter, ) -> Self { - let scheduler = Arc::new(RwLock::new(scheduler)); - let metrics_writer = Arc::new(RwLock::new(metrics_writer)); Self { incoming_events, incoming_units, transport_handle, scheduler, - metrics_writer, task_manager: Default::default(), } } @@ -67,7 +63,7 @@ impl + Send + Unpin + 'static> Server { // Get worker set immediately to accept pings let workers = contract_client.active_workers().await?; - self.scheduler.write().await.update_workers(workers); + self.scheduler.update_workers(workers); self.spawn_scheduling_task(contract_client, storage_client.clone()) .await?; @@ -83,8 +79,8 @@ impl + Send + Unpin + 'static> Server { let mut sigterm = signal(SignalKind::terminate())?; loop { tokio::select! { - Some(ev) = self.incoming_events.next() => self.on_incoming_event(ev).await, // FIXME: blocking event loop - Some(unit) = self.incoming_units.recv() => self.on_new_unit(unit).await, // FIXME: blocking event loop + Some(ev) = self.incoming_events.next() => self.on_incoming_event(ev), + Some(unit) = self.incoming_units.recv() => self.on_new_unit(unit), _ = sigint.recv() => break, _ = sigterm.recv() => break, else => break @@ -96,22 +92,20 @@ impl + Send + Unpin + 'static> Server { Ok(()) } - async fn on_incoming_event(&mut self, ev: SchedulerEvent) { + fn on_incoming_event(&self, ev: SchedulerEvent) { match ev { - SchedulerEvent::Ping { peer_id, ping } => self.ping(peer_id, ping).await, - SchedulerEvent::PeerProbed { peer_id, reachable } => self - .scheduler - .read() - .await - .worker_dialed(peer_id, reachable), + SchedulerEvent::Ping { peer_id, ping } => self.ping(peer_id, ping), + SchedulerEvent::PeerProbed { peer_id, reachable } => { + self.scheduler.worker_dialed(peer_id, reachable) + } } } - async fn ping(&mut self, peer_id: PeerId, ping: Ping) { + fn ping(&self, peer_id: PeerId, ping: Ping) { log::debug!("Got ping from {peer_id}"); + let start = Instant::now(); let ping_hash = msg_hash(&ping); - let status = self.scheduler.read().await.ping(peer_id, ping.clone()); - self.write_metrics(peer_id, ping).await; + let status = self.scheduler.ping(peer_id, ping.clone()); let pong = Pong { ping_hash, status: Some(status), @@ -119,19 +113,11 @@ impl + Send + Unpin + 'static> Server { self.transport_handle .send_pong(peer_id, pong) .unwrap_or_else(|_| log::error!("Error sending pong: queue full")); + prometheus_metrics::exec_time("ping", start.elapsed()); } - async fn write_metrics(&mut self, peer_id: PeerId, msg: impl Into) { - self.metrics_writer - .write() - .await - .write_metrics(Some(peer_id), msg) - .await - .unwrap_or_else(|e| log::error!("Error writing metrics: {e:?}")); - } - - async fn on_new_unit(&self, unit: SchedulingUnit) { - self.scheduler.write().await.new_unit(unit) + fn on_new_unit(&self, unit: SchedulingUnit) { + self.scheduler.new_unit(unit) } async fn spawn_scheduling_task( @@ -158,19 +144,17 @@ impl + Send + Unpin + 'static> Server { }; // Update workers every epoch - let mut last_epoch = last_epoch.lock().await; - if current_epoch > *last_epoch { + if current_epoch > *last_epoch.lock() { match contract_client.active_workers().await { - Ok(workers) => scheduler.write().await.update_workers(workers), + Ok(workers) => scheduler.update_workers(workers), Err(e) => log::error!("Error getting workers: {e:?}"), } - *last_epoch = current_epoch; + *last_epoch.lock() = current_epoch; } // Schedule chunks every `schedule_interval_epochs` - let last_schedule_epoch = scheduler.read().await.last_schedule_epoch(); + let last_schedule_epoch = scheduler.last_schedule_epoch(); if current_epoch >= last_schedule_epoch + schedule_interval { - let mut scheduler = scheduler.write().await; scheduler.schedule(current_epoch); match scheduler.to_json() { Ok(state) => storage_client.save_scheduler(state).await, @@ -195,7 +179,7 @@ impl + Send + Unpin + 'static> Server { let transport_handle = transport_handle.clone(); async move { log::info!("Dialing workers..."); - let workers = scheduler.read().await.workers_to_dial(); + let workers = scheduler.workers_to_dial(); for peer_id in workers { transport_handle .probe_peer(peer_id) @@ -232,7 +216,7 @@ impl + Send + Unpin + 'static> Server { let scheduler = scheduler.clone(); async move { log::info!("Regenerating signatures"); - scheduler.write().await.regenerate_signatures(); + scheduler.regenerate_signatures(); } }; self.task_manager @@ -258,15 +242,14 @@ impl + Send + Unpin + 'static> Server { &mut self, storage_client: S3Storage, interval: Duration, - jail_fn: fn(&mut RwLockWriteGuard) -> bool, + jail_fn: fn(&Scheduler) -> bool, ) { let scheduler = self.scheduler.clone(); let task = move |_| { let scheduler = scheduler.clone(); let storage_client = storage_client.clone(); async move { - let mut scheduler = scheduler.write().await; - if jail_fn(&mut scheduler) { + if jail_fn(&scheduler) { // If any worker got jailed, save the changed scheduler state match scheduler.to_json() { Ok(state) => storage_client.save_scheduler(state).await, @@ -284,10 +267,10 @@ impl + Send + Unpin + 'static> Server { let scheduler = scheduler.clone(); async move { log::info!("Updating chunks summary"); - let workers = scheduler.read().await.all_workers(); - let units = scheduler.read().await.known_units(); + let workers = scheduler.all_workers(); + let units = scheduler.known_units(); let summary = build_chunks_summary(workers, units); - scheduler.write().await.update_chunks_summary(summary); + scheduler.update_chunks_summary(summary); } }; self.task_manager @@ -315,7 +298,7 @@ fn find_workers_with_chunk( fn build_chunks_summary( workers: Vec, - units: HashMap, + units: DashMap, ) -> HashMap> { let assigned_ranges = workers .iter() @@ -341,8 +324,8 @@ fn build_chunks_summary( }) .into_group_map(); units - .into_values() - .flatten() + .into_iter() + .flat_map(|(_, unit)| unit) .map(|chunk| { let assigned_to = find_workers_with_chunk(&chunk, &assigned_ranges); let downloaded_by = find_workers_with_chunk(&chunk, &stored_ranges); diff --git a/crates/network-scheduler/src/worker_state.rs b/crates/network-scheduler/src/worker_state.rs index 89a8bf6..b35a3f1 100644 --- a/crates/network-scheduler/src/worker_state.rs +++ b/crates/network-scheduler/src/worker_state.rs @@ -7,6 +7,7 @@ use serde_partial::SerializePartial; use serde_with::{serde_as, TimestampMilliSeconds}; use contract_client::Address; +use dashmap::DashMap; use subsquid_messages::{Ping, RangeSet}; use subsquid_network_transport::PeerId; @@ -185,7 +186,7 @@ impl WorkerState { pub fn assigned_chunks<'a>( &'a self, - units_map: &'a HashMap, + units_map: &'a DashMap, ) -> impl Iterator + 'a { self.assigned_units.iter().flat_map(|unit_id| { units_map @@ -195,7 +196,7 @@ impl WorkerState { }) } - fn count_missing_chunks<'a>(&'a self, units: &'a HashMap) -> u32 { + fn count_missing_chunks<'a>(&'a self, units: &'a DashMap) -> u32 { self.assigned_chunks(units) .map(|chunk| match self.stored_ranges.get(&chunk.dataset_id) { Some(range_set) if range_set.includes(chunk.block_range) => 0, @@ -208,7 +209,7 @@ impl WorkerState { /// Returns true iff the worker is fully synced or making progress. pub fn check_download_progress<'a>( &'a mut self, - units: &'a HashMap, + units: &'a DashMap, ) -> bool { assert!(!self.jailed); let Some(last_assignment) = self.last_assignment.as_ref() else { @@ -244,7 +245,7 @@ impl WorkerState { } } - pub fn reset_download_progress<'a>(&'a mut self, units: &'a HashMap) { + pub fn reset_download_progress<'a>(&'a mut self, units: &'a DashMap) { self.num_missing_chunks = self.count_missing_chunks(units); self.last_assignment = Some(SystemTime::now()); }