Skip to content

Commit

Permalink
Transport upgrade (throttling gossipsub) + don't lock scheduler for p…
Browse files Browse the repository at this point in the history
…ings
  • Loading branch information
Wiezzel committed Jul 15, 2024
1 parent 470d9d4 commit 7ed5e67
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 46 deletions.
30 changes: 23 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ resolver = "2"
[workspace.dependencies]
contract-client = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.3" }
subsquid-messages = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.1" }
subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.4" }
subsquid-network-transport = { git = "https://github.com/subsquid/subsquid-network.git", version = "1.0.5" }
2 changes: 1 addition & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "1.0.3"
version = "1.0.4"
edition = "2021"

[dependencies]
Expand Down
3 changes: 2 additions & 1 deletion crates/network-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-scheduler"
version = "1.0.11"
version = "1.0.12"
edition = "2021"

[dependencies]
Expand All @@ -11,6 +11,7 @@ aws-sdk-s3 = "1"
axum = { version = "0.7", features = ["json"] }
base64 = "0.22.1"
clap = { version = "4", features = ["derive", "env"] }
dashmap = { version = "6", features = ["serde"] }
derive-enum-from-into = "0.1"
env_logger = "0.11"
futures = "0.3"
Expand Down
71 changes: 37 additions & 34 deletions crates/network-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use random_choice::random_choice;
use serde::{Deserialize, Serialize};

use contract_client::Worker;
use dashmap::mapref::one::RefMut;
use dashmap::DashMap;
use subsquid_messages::HttpHeader;
use subsquid_messages::{pong::Status as WorkerStatus, Ping};
use subsquid_network_transport::PeerId;
Expand All @@ -36,7 +38,7 @@ pub struct ChunkStatus {
pub struct Scheduler {
known_units: HashMap<UnitId, SchedulingUnit>,
units_assignments: HashMap<UnitId, Vec<PeerId>>,
worker_states: HashMap<PeerId, WorkerState>,
worker_states: DashMap<PeerId, WorkerState>,
#[serde(default)]
chunks_summary: HashMap<String, Vec<ChunkStatus>>, // dataset -> chunks statuses
#[serde(default)]
Expand Down Expand Up @@ -87,19 +89,19 @@ impl Scheduler {
}

pub fn regenerate_signatures(&mut self) {
for state in self.worker_states.values_mut() {
for mut state in self.worker_states.iter_mut() {
state.regenerate_signature();
}
}

/// Register ping msg from a worker. Returns worker status if ping was accepted, otherwise None
pub fn ping(&mut self, worker_id: PeerId, msg: Ping) -> WorkerStatus {
pub fn ping(&self, worker_id: PeerId, msg: Ping) -> WorkerStatus {
let version = msg.sem_version();
if !Config::get().supported_worker_versions.matches(&version) {
log::debug!("Worker {worker_id} version not supported: {}", version);
return WorkerStatus::UnsupportedVersion(());
}
let worker_state = match self.worker_states.get_mut(&worker_id) {
let mut worker_state = match self.worker_states.get_mut(&worker_id) {
None => {
log::debug!("Worker {worker_id} not registered");
return WorkerStatus::NotRegistered(());
Expand All @@ -112,35 +114,35 @@ impl Scheduler {
}
let assigned_chunks = worker_state.assigned_chunks(&self.known_units);
let mut assignment = chunks_to_assignment(assigned_chunks);
add_signature_headers(&mut assignment, &worker_id, worker_state);
add_signature_headers(&mut assignment, &worker_id, worker_state.value());
WorkerStatus::Active(assignment)
}

pub fn workers_to_dial(&self) -> Vec<PeerId> {
self.worker_states
.iter()
.filter_map(|(worker_id, state)| {
if !state.is_active() {
.filter_map(|worker_ref| {
if !worker_ref.is_active() {
return None; // Worker not active - don't dial
}
let Some(last_dial) = state.last_dial_time else {
return Some(*worker_id); // Worker has never been dialed - dial now
let Some(last_dial) = worker_ref.last_dial_time else {
return Some(worker_ref.peer_id); // Worker has never been dialed - dial now
};
let time_since_last_dial = last_dial.elapsed().expect("time doesn't go backwards");
let retry_interval = if state.last_dial_ok {
let retry_interval = if worker_ref.last_dial_ok {
Config::get().successful_dial_retry
} else {
Config::get().failed_dial_retry
};
(time_since_last_dial > retry_interval).then_some(*worker_id)
(time_since_last_dial > retry_interval).then_some(worker_ref.peer_id)
})
.collect()
}

pub fn worker_dialed(&mut self, worker_id: PeerId, reachable: bool) {
pub fn worker_dialed(&self, worker_id: PeerId, reachable: bool) {
log::info!("Dialed worker {worker_id}. reachable={reachable}");
match self.worker_states.get_mut(&worker_id) {
Some(worker_state) => worker_state.dialed(reachable),
Some(mut worker_state) => worker_state.dialed(reachable),
None => log::error!("Unknown worker dialed: {worker_id}"),
}
}
Expand Down Expand Up @@ -175,7 +177,7 @@ impl Scheduler {
.get_mut(&unit_id)
.expect("No assignment entry for unit")
.retain(|worker_id| {
let worker = self
let mut worker = self
.worker_states
.get_mut(worker_id)
.expect("Unknown worker");
Expand All @@ -188,18 +190,18 @@ impl Scheduler {
}

pub fn all_workers(&self) -> Vec<WorkerState> {
self.worker_states.values().cloned().collect()
self.worker_states.iter().map(|w| w.clone()).collect()
}

pub fn active_workers(&self) -> Vec<WorkerState> {
self.worker_states
.values()
.iter()
.filter(|w| w.is_active())
.cloned()
.map(|w| w.clone())
.collect()
}

fn get_worker(&mut self, worker_id: &PeerId) -> &mut WorkerState {
fn get_worker(&mut self, worker_id: &PeerId) -> RefMut<PeerId, WorkerState> {
self.worker_states
.get_mut(worker_id)
.expect("Unknown worker")
Expand All @@ -226,15 +228,16 @@ impl Scheduler {

pub fn update_workers(&mut self, workers: Vec<Worker>) {
log::info!("Updating workers");
let mut old_workers = std::mem::take(&mut self.worker_states);
let old_workers = std::mem::take(&mut self.worker_states);

// For each of the new workers, find an existing state or create a blank one
self.worker_states = workers
.into_iter()
.map(|w| {
let worker_state = old_workers
.remove(&w.peer_id)
.unwrap_or_else(|| WorkerState::new(w.peer_id, w.address));
let worker_state = match old_workers.remove(&w.peer_id) {
Some((_, state)) => state,
None => WorkerState::new(w.peer_id, w.address),
};
(w.peer_id, worker_state)
})
.collect();
Expand All @@ -251,13 +254,13 @@ impl Scheduler {
}
}

fn release_jailed_workers(&mut self) {
fn release_jailed_workers(&self) {
log::info!("Releasing jailed workers");
let release_unreachable = !Config::get().jail_unreachable;
self.worker_states
.values_mut()
.iter_mut()
.filter(|w| w.jailed && w.is_active() && (release_unreachable || !w.is_unreachable()))
.for_each(|w| w.release());
.for_each(|mut w| w.release());
}

/// Jail workers which don't send pings.
Expand All @@ -283,7 +286,7 @@ impl Scheduler {
} else {
log::info!("Jailing unreachable workers is disabled");
self.worker_states
.values()
.iter()
.filter(|w| w.ever_been_active() && w.is_unreachable())
.for_each(|w| log::info!("Worker {} is unreachable", w.peer_id));
false
Expand All @@ -299,13 +302,13 @@ impl Scheduler {
let mut num_unassigned_units = 0;

self.worker_states
.values_mut()
.iter_mut()
.filter(|w| !w.jailed)
.for_each(|w| {
.for_each(|mut w| {
if !w.ever_been_active() {
return; // Don't jail workers that haven't been started yet
}
if !criterion(w) {
if !criterion(&mut w) {
return;
}

Expand Down Expand Up @@ -383,9 +386,9 @@ impl Scheduler {
log::info!("Assigning units");

// Only active and non-jailed workers are eligible for assignment
let mut workers: Vec<&WorkerState> = self
let mut workers: Vec<_> = self
.worker_states
.values()
.iter()
.filter(|w| w.is_active() && !w.jailed)
.collect();

Expand Down Expand Up @@ -471,11 +474,11 @@ impl Scheduler {
prometheus_metrics::partially_assigned_units(incomplete_units);

self.worker_states
.values_mut()
.iter_mut()
.filter(|w| w.is_active() && !w.jailed)
.for_each(|w| {
.for_each(|mut w| {
w.reset_download_progress(&self.known_units);
log::info!("{w}")
log::info!("{}", *w)
});
}

Expand Down
4 changes: 2 additions & 2 deletions crates/network-scheduler/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<S: Stream<Item = SchedulerEvent> + Send + Unpin + 'static> Server<S> {
SchedulerEvent::Ping { peer_id, ping } => self.ping(peer_id, ping).await,
SchedulerEvent::PeerProbed { peer_id, reachable } => self
.scheduler
.write()
.read()
.await
.worker_dialed(peer_id, reachable),
}
Expand All @@ -110,7 +110,7 @@ impl<S: Stream<Item = SchedulerEvent> + Send + Unpin + 'static> Server<S> {
async fn ping(&mut self, peer_id: PeerId, ping: Ping) {
log::debug!("Got ping from {peer_id}");
let ping_hash = msg_hash(&ping);
let status = self.scheduler.write().await.ping(peer_id, ping.clone());
let status = self.scheduler.read().await.ping(peer_id, ping.clone());
self.write_metrics(peer_id, ping).await;
let pong = Pong {
ping_hash,
Expand Down

0 comments on commit 7ed5e67

Please sign in to comment.