Skip to content

Commit

Permalink
feat(prover): Add ProverJobMonitor (#2666)
Browse files Browse the repository at this point in the history
ProverJobMonitor will be house keeper's counter part in prover
subsystem. TL;DR; it's a singleton component, monitoring prover
subsystem jobs.

The TL;DR; is that prover and core won't share any databases. This
enables:
- core deployments without affecting prover
- removing prover infrastructure (DB) in proverless envs

The release plan is as follows:
- release a component (PJM) that runs in parallel with HK
- migrate all jobs/metrics/dashboards to PJM
- delete their counterparts in HK
- remove redundant infrastructure

This PR contains:
- a new component (PJM)
- fixes for bugs/issues with old metrics (backported to HK)
- refactoring of metrics (PJM metrics cover same metrics as HK, but they
are different, as we can cover more with less)
- various other small nits

P.S. Name is up for discussion, feel free to suggest better name.
  • Loading branch information
EmilLuta authored Aug 19, 2024
1 parent 56d8ee8 commit e22cfb6
Show file tree
Hide file tree
Showing 59 changed files with 2,047 additions and 182 deletions.
1 change: 1 addition & 0 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,6 @@ fn load_env_config() -> anyhow::Result<TempConfigStore> {
external_price_api_client_config: ExternalPriceApiClientConfig::from_env().ok(),
external_proof_integration_api_config: ExternalProofIntegrationApiConfig::from_env().ok(),
experimental_vm_config: ExperimentalVmConfig::from_env().ok(),
prover_job_monitor_config: None,
})
}
2 changes: 1 addition & 1 deletion core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl MainNodeBuilder {
fn add_house_keeper_layer(mut self) -> anyhow::Result<Self> {
let house_keeper_config = try_load_config!(self.configs.house_keeper_config);
let fri_prover_config = try_load_config!(self.configs.prover_config);
let fri_witness_generator_config = try_load_config!(self.configs.witness_generator);
let fri_witness_generator_config = try_load_config!(self.configs.witness_generator_config);
let fri_prover_group_config = try_load_config!(self.configs.prover_group_config);
let fri_proof_compressor_config = try_load_config!(self.configs.proof_compressor_config);

Expand Down
170 changes: 168 additions & 2 deletions core/lib/basic_types/src/basic_fri_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@

// TODO (PLA-773): Should be moved to the prover workspace.

use std::{convert::TryFrom, str::FromStr};
use std::{
collections::{hash_map::IntoIter, HashMap},
convert::TryFrom,
iter::once,
str::FromStr,
};

use serde::{Deserialize, Serialize};

use crate::protocol_version::{ProtocolSemanticVersion, ProtocolVersionId, VersionPatch};
use crate::{
protocol_version::{ProtocolSemanticVersion, ProtocolVersionId, VersionPatch},
prover_dal::JobCountStatistics,
};

const BLOB_CHUNK_SIZE: usize = 31;
const ELEMENTS_PER_4844_BLOCK: usize = 4096;
Expand Down Expand Up @@ -127,6 +135,14 @@ impl From<u8> for AggregationRound {
}

impl AggregationRound {
pub const ALL_ROUNDS: [AggregationRound; 5] = [
AggregationRound::BasicCircuits,
AggregationRound::LeafAggregation,
AggregationRound::NodeAggregation,
AggregationRound::RecursionTip,
AggregationRound::Scheduler,
];

pub fn next(&self) -> Option<AggregationRound> {
match self {
AggregationRound::BasicCircuits => Some(AggregationRound::LeafAggregation),
Expand Down Expand Up @@ -187,6 +203,156 @@ impl TryFrom<i32> for AggregationRound {
}
}

/// Wrapper for mapping from protocol version to prover circuits job stats
#[derive(Debug)]
pub struct ProtocolVersionedCircuitProverStats {
protocol_versioned_circuit_stats: HashMap<ProtocolSemanticVersion, CircuitProverStats>,
}

impl FromIterator<CircuitProverStatsEntry> for ProtocolVersionedCircuitProverStats {
fn from_iter<I: IntoIterator<Item = CircuitProverStatsEntry>>(iter: I) -> Self {
let mut mapping = HashMap::new();
for entry in iter {
let protocol_semantic_version = entry.protocol_semantic_version;
let circuit_prover_stats: &mut CircuitProverStats =
mapping.entry(protocol_semantic_version).or_default();
circuit_prover_stats.add(entry.circuit_id_round_tuple, entry.job_count_statistics);
}
Self {
protocol_versioned_circuit_stats: mapping,
}
}
}

impl IntoIterator for ProtocolVersionedCircuitProverStats {
type Item = (ProtocolSemanticVersion, CircuitProverStats);
type IntoIter = IntoIter<ProtocolSemanticVersion, CircuitProverStats>;

fn into_iter(self) -> Self::IntoIter {
self.protocol_versioned_circuit_stats.into_iter()
}
}

/// Wrapper for mapping between circuit/aggregation round to number of such jobs (queued and in progress)
#[derive(Debug)]
pub struct CircuitProverStats {
circuits_prover_stats: HashMap<CircuitIdRoundTuple, JobCountStatistics>,
}

impl IntoIterator for CircuitProverStats {
type Item = (CircuitIdRoundTuple, JobCountStatistics);
type IntoIter = IntoIter<CircuitIdRoundTuple, JobCountStatistics>;

fn into_iter(self) -> Self::IntoIter {
self.circuits_prover_stats.into_iter()
}
}

impl CircuitProverStats {
fn add(
&mut self,
circuit_id_round_tuple: CircuitIdRoundTuple,
job_count_statistics: JobCountStatistics,
) {
let stats = self
.circuits_prover_stats
.entry(circuit_id_round_tuple)
.or_default();
stats.queued += job_count_statistics.queued;
stats.in_progress += job_count_statistics.in_progress;
}
}

impl Default for CircuitProverStats {
fn default() -> Self {
let mut stats = HashMap::new();
for circuit in (1..=15).chain(once(255)) {
stats.insert(
CircuitIdRoundTuple::new(circuit, 0),
JobCountStatistics::default(),
);
}
for circuit in 3..=18 {
stats.insert(
CircuitIdRoundTuple::new(circuit, 1),
JobCountStatistics::default(),
);
}
stats.insert(
CircuitIdRoundTuple::new(2, 2),
JobCountStatistics::default(),
);
stats.insert(
CircuitIdRoundTuple::new(255, 3),
JobCountStatistics::default(),
);
stats.insert(
CircuitIdRoundTuple::new(1, 4),
JobCountStatistics::default(),
);
Self {
circuits_prover_stats: stats,
}
}
}

/// DTO for communication between DAL and prover_job_monitor.
/// Represents an entry -- count (queued & in progress) of jobs (circuit_id, aggregation_round) for a given protocol version.
#[derive(Debug)]
pub struct CircuitProverStatsEntry {
circuit_id_round_tuple: CircuitIdRoundTuple,
protocol_semantic_version: ProtocolSemanticVersion,
job_count_statistics: JobCountStatistics,
}

impl CircuitProverStatsEntry {
pub fn new(
circuit_id: i16,
aggregation_round: i16,
protocol_version: i32,
protocol_version_patch: i32,
status: &str,
count: i64,
) -> Self {
let mut queued = 0;
let mut in_progress = 0;
match status {
"queued" => queued = count as usize,
"in_progress" => in_progress = count as usize,
_ => unreachable!("received {:?}, expected only 'queued'/'in_progress' from DB as part of query filter", status),
};

let job_count_statistics = JobCountStatistics {
queued,
in_progress,
};
let protocol_semantic_version = ProtocolSemanticVersion::new(
ProtocolVersionId::try_from(protocol_version as u16)
.expect("received protocol version is broken"),
VersionPatch(protocol_version_patch as u32),
);

// BEWARE, HERE BE DRAGONS.
// In database, the `circuit_id` stored is the circuit for which the aggregation is done,
// not the circuit which is running.
// There is a single node level aggregation circuit, which is circuit 2.
// This can aggregate multiple leaf nodes (which may belong to different circuits).
// This "conversion" is a forced hacky way to use `circuit_id` 2 for nodes.
// A proper fix will be later provided to solve this once new auto-scaler is in place.
let circuit_id = if aggregation_round == 2 {
2
} else {
circuit_id as u8
};
let circuit_id_round_tuple = CircuitIdRoundTuple::new(circuit_id, aggregation_round as u8);
CircuitProverStatsEntry {
circuit_id_round_tuple,
protocol_semantic_version,
job_count_statistics,
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Hash)]
pub struct JobIdentifiers {
pub circuit_id: u8,
Expand Down
2 changes: 2 additions & 0 deletions core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct StuckJobs {
pub status: String,
pub attempts: u64,
pub circuit_id: Option<u32>,
pub picked_by: Option<String>,
pub error: Option<String>,
}

// TODO (PLA-774): Redundant structure, should be replaced with `std::net::SocketAddr`.
Expand Down
4 changes: 3 additions & 1 deletion core/lib/config/src/configs/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
da_dispatcher::DADispatcherConfig,
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
prover_job_monitor::ProverJobMonitorConfig,
pruning::PruningConfig,
snapshot_recovery::SnapshotRecoveryConfig,
vm_runner::{BasicWitnessInputProducerConfig, ProtectiveReadsWriterConfig},
Expand Down Expand Up @@ -33,7 +34,7 @@ pub struct GeneralConfig {
pub prover_gateway: Option<FriProverGatewayConfig>,
pub witness_vector_generator: Option<FriWitnessVectorGeneratorConfig>,
pub prover_group_config: Option<FriProverGroupConfig>,
pub witness_generator: Option<FriWitnessGeneratorConfig>,
pub witness_generator_config: Option<FriWitnessGeneratorConfig>,
pub prometheus_config: Option<PrometheusConfig>,
pub proof_data_handler_config: Option<ProofDataHandlerConfig>,
pub db_config: Option<DBConfig>,
Expand All @@ -52,4 +53,5 @@ pub struct GeneralConfig {
pub consensus_config: Option<ConsensusConfig>,
pub external_proof_integration_api_config: Option<ExternalProofIntegrationApiConfig>,
pub experimental_vm_config: Option<ExperimentalVmConfig>,
pub prover_job_monitor_config: Option<ProverJobMonitorConfig>,
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use self::{
object_store::ObjectStoreConfig,
observability::{ObservabilityConfig, OpentelemetryConfig},
proof_data_handler::ProofDataHandlerConfig,
prover_job_monitor::ProverJobMonitorConfig,
pruning::PruningConfig,
secrets::{DatabaseSecrets, L1Secrets, Secrets},
snapshot_recovery::SnapshotRecoveryConfig,
Expand Down Expand Up @@ -57,6 +58,7 @@ pub mod house_keeper;
pub mod object_store;
pub mod observability;
pub mod proof_data_handler;
pub mod prover_job_monitor;
pub mod pruning;
pub mod secrets;
pub mod snapshot_recovery;
Expand Down
Loading

0 comments on commit e22cfb6

Please sign in to comment.