diff --git a/Cargo.lock b/Cargo.lock index 194c94c..2000a5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1698,7 +1698,7 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "contract-client" version = "1.0.3" -source = "git+https://github.com/subsquid/subsquid-network.git#7f7303c72f75e6d03843ab2b245b30e088a12a42" +source = "git+https://github.com/subsquid/subsquid-network.git#225f01c7911d8b4e04834138127a57b0a7168078" dependencies = [ "async-trait", "clap", @@ -1902,6 +1902,21 @@ dependencies = [ "syn 2.0.68", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", + "serde", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -3239,7 +3254,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -4195,7 +4210,7 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "logs-collector" -version = "1.0.3" +version = "1.0.4" dependencies = [ "anyhow", "async-trait", @@ -4470,7 +4485,7 @@ dependencies = [ [[package]] name = "network-scheduler" -version = "1.0.11" +version = "1.0.12" dependencies = [ "anyhow", "async-trait", @@ -4480,6 +4495,7 @@ dependencies = [ "base64 0.22.1", "clap", "contract-client", + "dashmap", "derive-enum-from-into", "env_logger", "futures", @@ -6590,7 +6606,7 @@ dependencies = [ [[package]] name = "subsquid-messages" version = "1.0.1" -source = "git+https://github.com/subsquid/subsquid-network.git#7f7303c72f75e6d03843ab2b245b30e088a12a42" +source = "git+https://github.com/subsquid/subsquid-network.git#225f01c7911d8b4e04834138127a57b0a7168078" dependencies = [ "anyhow", "hex", @@ -6604,8 +6620,8 @@ dependencies = [ [[package]] name = "subsquid-network-transport" -version = "1.0.4" -source = "git+https://github.com/subsquid/subsquid-network.git#7f7303c72f75e6d03843ab2b245b30e088a12a42" +version = "1.0.5" +source = "git+https://github.com/subsquid/subsquid-network.git#225f01c7911d8b4e04834138127a57b0a7168078" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 9f33764..e487c03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,4 @@ resolver = "2" [workspace.dependencies] contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" } subsquid-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.1" } -subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.4" } +subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.5" } diff --git a/crates/logs-collector/Cargo.toml b/crates/logs-collector/Cargo.toml index eace1f0..4946435 100644 --- a/crates/logs-collector/Cargo.toml +++ b/crates/logs-collector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logs-collector" -version = "1.0.3" +version = "1.0.4" edition = "2021" [dependencies] diff --git a/crates/network-scheduler/Cargo.toml b/crates/network-scheduler/Cargo.toml index 7b3810d..a0dcb7e 100644 --- a/crates/network-scheduler/Cargo.toml +++ b/crates/network-scheduler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "network-scheduler" -version = "1.0.11" +version = "1.0.12" edition = "2021" [dependencies] @@ -11,6 +11,7 @@ aws-sdk-s3 = "1" axum = { version = "0.7", features = ["json"] } base64 = "0.22.1" clap = { version = "4", features = ["derive", "env"] } +dashmap = { version = "6", features = ["serde"] } derive-enum-from-into = "0.1" env_logger = "0.11" futures = "0.3" diff --git a/crates/network-scheduler/src/scheduler.rs b/crates/network-scheduler/src/scheduler.rs index 803808b..93189d8 100644 --- a/crates/network-scheduler/src/scheduler.rs +++ b/crates/network-scheduler/src/scheduler.rs @@ -10,6 +10,8 @@ use random_choice::random_choice; use serde::{Deserialize, Serialize}; use contract_client::Worker; +use dashmap::mapref::one::RefMut; +use dashmap::DashMap; use subsquid_messages::HttpHeader; use subsquid_messages::{pong::Status as WorkerStatus, Ping}; use subsquid_network_transport::PeerId; @@ -36,7 +38,7 @@ pub struct ChunkStatus { pub struct Scheduler { known_units: HashMap, units_assignments: HashMap>, - worker_states: HashMap, + worker_states: DashMap, #[serde(default)] chunks_summary: HashMap>, // dataset -> chunks statuses #[serde(default)] @@ -87,19 +89,19 @@ impl Scheduler { } pub fn regenerate_signatures(&mut self) { - for state in self.worker_states.values_mut() { + for mut state in self.worker_states.iter_mut() { state.regenerate_signature(); } } /// Register ping msg from a worker. Returns worker status if ping was accepted, otherwise None - pub fn ping(&mut self, worker_id: PeerId, msg: Ping) -> WorkerStatus { + pub fn ping(&self, worker_id: PeerId, msg: Ping) -> WorkerStatus { let version = msg.sem_version(); if !Config::get().supported_worker_versions.matches(&version) { log::debug!("Worker {worker_id} version not supported: {}", version); return WorkerStatus::UnsupportedVersion(()); } - let worker_state = match self.worker_states.get_mut(&worker_id) { + let mut worker_state = match self.worker_states.get_mut(&worker_id) { None => { log::debug!("Worker {worker_id} not registered"); return WorkerStatus::NotRegistered(()); @@ -112,35 +114,35 @@ impl Scheduler { } let assigned_chunks = worker_state.assigned_chunks(&self.known_units); let mut assignment = chunks_to_assignment(assigned_chunks); - add_signature_headers(&mut assignment, &worker_id, worker_state); + add_signature_headers(&mut assignment, &worker_id, worker_state.value()); WorkerStatus::Active(assignment) } pub fn workers_to_dial(&self) -> Vec { self.worker_states .iter() - .filter_map(|(worker_id, state)| { - if !state.is_active() { + .filter_map(|worker_ref| { + if !worker_ref.is_active() { return None; // Worker not active - don't dial } - let Some(last_dial) = state.last_dial_time else { - return Some(*worker_id); // Worker has never been dialed - dial now + let Some(last_dial) = worker_ref.last_dial_time else { + return Some(worker_ref.peer_id); // Worker has never been dialed - dial now }; let time_since_last_dial = last_dial.elapsed().expect("time doesn't go backwards"); - let retry_interval = if state.last_dial_ok { + let retry_interval = if worker_ref.last_dial_ok { Config::get().successful_dial_retry } else { Config::get().failed_dial_retry }; - (time_since_last_dial > retry_interval).then_some(*worker_id) + (time_since_last_dial > retry_interval).then_some(worker_ref.peer_id) }) .collect() } - pub fn worker_dialed(&mut self, worker_id: PeerId, reachable: bool) { + pub fn worker_dialed(&self, worker_id: PeerId, reachable: bool) { log::info!("Dialed worker {worker_id}. reachable={reachable}"); match self.worker_states.get_mut(&worker_id) { - Some(worker_state) => worker_state.dialed(reachable), + Some(mut worker_state) => worker_state.dialed(reachable), None => log::error!("Unknown worker dialed: {worker_id}"), } } @@ -175,7 +177,7 @@ impl Scheduler { .get_mut(&unit_id) .expect("No assignment entry for unit") .retain(|worker_id| { - let worker = self + let mut worker = self .worker_states .get_mut(worker_id) .expect("Unknown worker"); @@ -188,18 +190,18 @@ impl Scheduler { } pub fn all_workers(&self) -> Vec { - self.worker_states.values().cloned().collect() + self.worker_states.iter().map(|w| w.clone()).collect() } pub fn active_workers(&self) -> Vec { self.worker_states - .values() + .iter() .filter(|w| w.is_active()) - .cloned() + .map(|w| w.clone()) .collect() } - fn get_worker(&mut self, worker_id: &PeerId) -> &mut WorkerState { + fn get_worker(&mut self, worker_id: &PeerId) -> RefMut { self.worker_states .get_mut(worker_id) .expect("Unknown worker") @@ -226,15 +228,16 @@ impl Scheduler { pub fn update_workers(&mut self, workers: Vec) { log::info!("Updating workers"); - let mut old_workers = std::mem::take(&mut self.worker_states); + let old_workers = std::mem::take(&mut self.worker_states); // For each of the new workers, find an existing state or create a blank one self.worker_states = workers .into_iter() .map(|w| { - let worker_state = old_workers - .remove(&w.peer_id) - .unwrap_or_else(|| WorkerState::new(w.peer_id, w.address)); + let worker_state = match old_workers.remove(&w.peer_id) { + Some((_, state)) => state, + None => WorkerState::new(w.peer_id, w.address), + }; (w.peer_id, worker_state) }) .collect(); @@ -251,13 +254,13 @@ impl Scheduler { } } - fn release_jailed_workers(&mut self) { + fn release_jailed_workers(&self) { log::info!("Releasing jailed workers"); let release_unreachable = !Config::get().jail_unreachable; self.worker_states - .values_mut() + .iter_mut() .filter(|w| w.jailed && w.is_active() && (release_unreachable || !w.is_unreachable())) - .for_each(|w| w.release()); + .for_each(|mut w| w.release()); } /// Jail workers which don't send pings. @@ -283,7 +286,7 @@ impl Scheduler { } else { log::info!("Jailing unreachable workers is disabled"); self.worker_states - .values() + .iter() .filter(|w| w.ever_been_active() && w.is_unreachable()) .for_each(|w| log::info!("Worker {} is unreachable", w.peer_id)); false @@ -299,13 +302,13 @@ impl Scheduler { let mut num_unassigned_units = 0; self.worker_states - .values_mut() + .iter_mut() .filter(|w| !w.jailed) - .for_each(|w| { + .for_each(|mut w| { if !w.ever_been_active() { return; // Don't jail workers that haven't been started yet } - if !criterion(w) { + if !criterion(&mut w) { return; } @@ -383,9 +386,9 @@ impl Scheduler { log::info!("Assigning units"); // Only active and non-jailed workers are eligible for assignment - let mut workers: Vec<&WorkerState> = self + let mut workers: Vec<_> = self .worker_states - .values() + .iter() .filter(|w| w.is_active() && !w.jailed) .collect(); @@ -471,11 +474,11 @@ impl Scheduler { prometheus_metrics::partially_assigned_units(incomplete_units); self.worker_states - .values_mut() + .iter_mut() .filter(|w| w.is_active() && !w.jailed) - .for_each(|w| { + .for_each(|mut w| { w.reset_download_progress(&self.known_units); - log::info!("{w}") + log::info!("{}", *w) }); } diff --git a/crates/network-scheduler/src/server.rs b/crates/network-scheduler/src/server.rs index 75bc05c..81cb239 100644 --- a/crates/network-scheduler/src/server.rs +++ b/crates/network-scheduler/src/server.rs @@ -101,7 +101,7 @@ impl + Send + Unpin + 'static> Server { SchedulerEvent::Ping { peer_id, ping } => self.ping(peer_id, ping).await, SchedulerEvent::PeerProbed { peer_id, reachable } => self .scheduler - .write() + .read() .await .worker_dialed(peer_id, reachable), } @@ -110,7 +110,7 @@ impl + Send + Unpin + 'static> Server { async fn ping(&mut self, peer_id: PeerId, ping: Ping) { log::debug!("Got ping from {peer_id}"); let ping_hash = msg_hash(&ping); - let status = self.scheduler.write().await.ping(peer_id, ping.clone()); + let status = self.scheduler.read().await.ping(peer_id, ping.clone()); self.write_metrics(peer_id, ping).await; let pong = Pong { ping_hash,