Skip to content

Commit

Permalink
Lock-free scheduler, removed metrics writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jul 17, 2024
1 parent 7ed5e67 commit 3ba380e
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 286 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/network-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-scheduler"
version = "1.0.12"
version = "1.0.13"
edition = "2021"

[dependencies]
Expand All @@ -22,6 +22,7 @@ itertools = "0.12"
lazy_static = "1.4.0"
log = "0.4"
nonempty = { version = "0.10", features = ["serde", "serialize"] }
parking_lot = { version = "0.12", features = ["serde"] }
prometheus-client = "0.22"
rand = "0.8"
random_choice = "0.3"
Expand Down
10 changes: 0 additions & 10 deletions crates/network-scheduler/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,6 @@ pub struct Cli {
)]
pub metrics_path: Option<PathBuf>,

#[arg(
long,
env,
help = "Choose which metrics should be printed.",
value_delimiter = ',',
num_args = 0..,
default_value = "QuerySubmitted,QueryFinished,WorkersSnapshot"
)]
pub metrics: Vec<String>,

#[arg(
short,
long,
Expand Down
4 changes: 0 additions & 4 deletions crates/network-scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use prometheus_client::registry::Registry;
use subsquid_network_transport::P2PTransportBuilder;

use crate::cli::Cli;
use crate::metrics::MetricsWriter;
use crate::server::Server;
use crate::storage::S3Storage;

mod cli;
mod data_chunk;
mod metrics;
mod metrics_server;
mod prometheus_metrics;
mod scheduler;
Expand Down Expand Up @@ -39,7 +37,6 @@ async fn main() -> anyhow::Result<()> {
args.read_config().await?;

// Open file for writing metrics
let metrics_writer = MetricsWriter::from_cli(&args).await?;
let mut metrics_registry = Registry::default();
subsquid_network_transport::metrics::register_metrics(&mut metrics_registry);
prometheus_metrics::register_metrics(&mut metrics_registry);
Expand All @@ -61,7 +58,6 @@ async fn main() -> anyhow::Result<()> {
incoming_units,
transport_handle,
scheduler,
metrics_writer,
)
.run(
contract_client,
Expand Down
121 changes: 0 additions & 121 deletions crates/network-scheduler/src/metrics.rs

This file was deleted.

16 changes: 7 additions & 9 deletions crates/network-scheduler/src/metrics_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ const JAIL_INFO_FIELDS: [Field<'static, WorkerState>; 3] = [
Field::new("jail_reason"),
];

async fn active_workers(
Extension(scheduler): Extension<Arc<RwLock<Scheduler>>>,
) -> Json<Vec<WorkerState>> {
Json(scheduler.read().await.active_workers())
async fn active_workers(Extension(scheduler): Extension<Scheduler>) -> Json<Vec<WorkerState>> {
Json(scheduler.active_workers())
}

async fn workers_jail_info(Extension(scheduler): Extension<Arc<RwLock<Scheduler>>>) -> Response {
let active_workers = scheduler.read().await.active_workers();
async fn workers_jail_info(Extension(scheduler): Extension<Scheduler>) -> Response {
let active_workers = scheduler.active_workers();
let jail_info = active_workers
.iter()
.map(|w| w.with_fields(|_| JAIL_INFO_FIELDS))
Expand All @@ -37,9 +35,9 @@ async fn workers_jail_info(Extension(scheduler): Extension<Arc<RwLock<Scheduler>
}

async fn chunks(
Extension(scheduler): Extension<Arc<RwLock<Scheduler>>>,
Extension(scheduler): Extension<Scheduler>,
) -> Json<HashMap<String, Vec<ChunkStatus>>> {
let chunks_summary = scheduler.read().await.get_chunks_summary();
let chunks_summary = scheduler.get_chunks_summary();
Json(chunks_summary)
}

Expand All @@ -55,7 +53,7 @@ async fn get_metrics(Extension(metrics_registry): Extension<Arc<RwLock<Registry>
}

pub async fn run_server(
scheduler: Arc<RwLock<Scheduler>>,
scheduler: Scheduler,
addr: SocketAddr,
metrics_registry: Registry,
cancel_token: CancellationToken,
Expand Down
29 changes: 22 additions & 7 deletions crates/network-scheduler/src/prometheus_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::collections::HashMap;
use std::time::Duration;

use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{linear_buckets, Histogram};
use prometheus_client::metrics::histogram::{exponential_buckets, linear_buckets, Histogram};
use prometheus_client::registry::Registry;

use crate::scheduling_unit::UnitId;

lazy_static::lazy_static! {
static ref WORKERS_PER_UNIT: Histogram = Histogram::new(linear_buckets(0.0, 1.0, 100));
static ref TOTAL_UNITS: Gauge = Default::default();
static ref ACTIVE_WORKERS: Gauge = Default::default();
static ref REPLICATION_FACTOR: Gauge = Default::default();
static ref PARTIALLY_ASSIGNED_UNITS: Gauge = Default::default();
static ref S3_REQUESTS: Counter = Default::default();
static ref EXEC_TIMES: Family<Vec<(&'static str, &'static str)>, Histogram> = Family::new_with_constructor(
|| Histogram::new(exponential_buckets(0.001, 2.0, 24))
);
}

pub fn register_metrics(registry: &mut Registry) {
Expand Down Expand Up @@ -47,11 +49,16 @@ pub fn register_metrics(registry: &mut Registry) {
"s3_requests",
"Total number of S3 API requests since application start",
S3_REQUESTS.clone(),
)
);
registry.register(
"exec_times",
"Execution times of various procedures (ms)",
EXEC_TIMES.clone(),
);
}

pub fn units_assigned(counts: HashMap<&UnitId, usize>) {
for (_unit_id, count) in counts {
pub fn units_assigned(counts: impl IntoIterator<Item = usize>) {
for count in counts {
WORKERS_PER_UNIT.observe(count as f64);
}
}
Expand All @@ -75,3 +82,11 @@ pub fn partially_assigned_units(count: usize) {
pub fn s3_request() {
S3_REQUESTS.inc();
}

pub fn exec_time(procedure: &'static str, duration: Duration) {
let duration_millis = duration.as_micros() as f64 / 1000.;
log::trace!("Procedure {procedure} took {duration_millis:.3} ms");
EXEC_TIMES
.get_or_create(&vec![("procedure", procedure)])
.observe(duration_millis);
}
Loading

0 comments on commit 3ba380e

Please sign in to comment.