Skip to content

Commit

Permalink
fix(validator): backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed May 3, 2024
1 parent c51ae04 commit 6aa50a8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 54 deletions.
5 changes: 5 additions & 0 deletions collator/src/validator/network/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::validator::state::SessionInfo;
use crate::validator::{process_candidate_signature_response, ValidatorEventListener};
use everscale_types::models::BlockIdShort;
use std::sync::Arc;
use tracing::trace;
use tycho_network::Response;
use crate::tracing_targets;

pub async fn handle_signatures_query(
session: Option<Arc<SessionInfo>>,
Expand All @@ -21,6 +23,7 @@ where
signatures: vec![],
},
Some(session) => {
trace!(target: tracing_targets::VALIDATOR, "Processing signatures query for block {:?} with {} signatures", block_id_short, signatures.len());
process_candidate_signature_response(
session.clone(),
block_id_short,
Expand All @@ -29,12 +32,14 @@ where
)
.await?;

trace!(target: tracing_targets::VALIDATOR, "Getting valid signatures for block {:?}", block_id_short);
let signatures = session
.get_valid_signatures(&block_id_short)
.await
.into_iter()
.map(|(k, v)| (k.0, v.0))
.collect::<Vec<_>>();

SignaturesQuery {
session_seqno,
block_id_short,
Expand Down
49 changes: 13 additions & 36 deletions collator/src/validator/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ impl SessionInfo {
&self,
block_id_short: &BlockIdShort,
) -> anyhow::Result<ValidationResult> {
trace!("Getting validation status for block {:?}", block_id_short);
// Bind the lock result to a variable to extend its lifetime
// let block_signatures_guard = self.blocks_signatures;
let signatures = self.blocks_signatures.get(block_id_short);
Expand Down Expand Up @@ -216,44 +215,14 @@ impl SessionInfo {
}
}

/// Adds a signature for a block.
pub async fn add_signature(
&self,
block_id: &BlockId,
validator_id: HashBytes,
signature: Signature,
is_valid: bool,
) {
let block_header = block_id.as_short_id();
// let mut write_guard = self.blocks_signatures.write().await; // Hold onto the lock
let mut entry = self
.blocks_signatures
.entry(block_header) // Use the guard to access the map
.or_insert_with(|| {
(
*block_id,
SignatureMaps {
valid_signatures: FastHashMap::default(),
invalid_signatures: FastHashMap::default(),
event_dispatched: Mutex::new(false),
},
)
});

if is_valid {
entry.1.valid_signatures.insert(validator_id, signature);
} else {
entry.1.invalid_signatures.insert(validator_id, signature);
}
}

pub async fn process_signatures_and_update_status(
&self,
block_id_short: BlockIdShort,
signatures: Vec<([u8; 32], [u8; 64])>,
listeners: &[Arc<dyn ValidatorEventListener>],
) -> anyhow::Result<()> {
trace!(
debug!(
target: tracing_targets::VALIDATOR,
"Processing signatures for block in state {:?}",
block_id_short
);
Expand Down Expand Up @@ -289,14 +258,20 @@ impl SessionInfo {
let signature = Signature(sig_bytes);
let block_validation_candidate = BlockValidationCandidate::from(entry.0);

let is_valid = self
let validator = self
.get_validation_session_info()
.validators
.get(&validator_id)
.context("Validator not found")?
.get(&validator_id).context("Validator not found")?.clone();

let is_valid = validator
.public_key
.verify(block_validation_candidate.as_bytes(), &signature.0);

trace!(
target: tracing_targets::VALIDATOR,
"Adding signature for block {:?} from validator {:?} valid {}",
block_id_short, validator_id, is_valid);

if is_valid {
entry.1.valid_signatures.insert(validator_id, signature);
} else {
Expand All @@ -305,6 +280,7 @@ impl SessionInfo {
}

let validation_status = self.validation_status(&entry.1).await;

// Check if the validation status qualifies for dispatching the event
match validation_status {
ValidationResult::Valid => {
Expand Down Expand Up @@ -370,6 +346,7 @@ impl SessionInfo {
event: OnValidatedBlockEvent,
listeners: &[Arc<dyn ValidatorEventListener>],
) {
trace!(target: tracing_targets::VALIDATOR, "Notifying listeners about block validation");
for listener in listeners {
let cloned_event = event.clone();
let listener = listener.clone();
Expand Down
53 changes: 35 additions & 18 deletions collator/src/validator/validator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use anyhow::{Context, Result};
use async_trait::async_trait;
use everscale_crypto::ed25519::KeyPair;
use everscale_crypto::ed25519::{KeyPair, PublicKey};
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockId, BlockIdShort, Signature};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Validator for ValidatorStdImpl {
.add_private_overlay(&private_overlay.clone());

if !overlay_added {
warn!(target: tracing_targets::VALIDATOR, "Failed to add private overlay")
warn!(target: tracing_targets::VALIDATOR, "Failed to add private overlay");
// bail!("Failed to add private overlay");
}

Expand Down Expand Up @@ -244,18 +244,20 @@ async fn start_candidate_validation(

let cached_signatures = session.get_cached_signatures_by_block(&block_id.as_short_id());

trace!(target: tracing_targets::VALIDATOR, "Cached signatures len: {:?}", cached_signatures.as_ref().map(|x| x.1.len()));

if let Some(cached_signatures) = cached_signatures {
initial_signatures.extend(cached_signatures.1.into_iter().map(|(k, v)| (k.0, v.0)));
}

trace!(target: tracing_targets::VALIDATOR, "Initial signatures: {:?}", initial_signatures);
let is_validation_finished = process_candidate_signature_response(
session.clone(),
short_id,
vec![(current_validator_pubkey.0, our_signature.0)],
initial_signatures,
listeners,
)
.await?;
trace!(target: tracing_targets::VALIDATOR, "Validation finished: {:?}", is_validation_finished);

if is_validation_finished {
cancellation_token.cancel(); // Cancel all tasks if validation is finished
Expand Down Expand Up @@ -361,18 +363,12 @@ async fn start_candidate_validation(
}
}
Err(e) => {
warn!(target: tracing_targets::VALIDATOR, "Elapsed validator response {}: {e}", validator.public_key);
let delay = delay * 2_u32.pow(attempt);
let delay = std::cmp::min(delay, max_delay);
tokio::time::sleep(delay).await;
attempt += 1;
let error_message = format!("Elapsed validator response: {}", e);
handle_error_and_backoff(delay, max_delay, &mut attempt, &validator.public_key, &error_message).await;
}
Ok(Err(e)) => {
warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {}: {e}", validator.public_key);
let delay = delay * 2_u32.pow(attempt);
let delay = std::cmp::min(delay, max_delay);
tokio::time::sleep(delay).await;
attempt += 1;
let error_message = format!("Error receiving signatures: {}", e);
handle_error_and_backoff(delay, max_delay, &mut attempt, &validator.public_key, &error_message).await;
}
}
tokio::time::sleep(delay).await;
Expand All @@ -390,31 +386,52 @@ async fn start_candidate_validation(
Ok(())
}

async fn handle_error_and_backoff(
delay: Duration,
max_delay: Duration,
attempt: &mut u32,
validator_public_key: &PublicKey,
error_message: &str,
) {
warn!(target: tracing_targets::VALIDATOR, "Error validator response: validator: {:x?}: {} ", validator_public_key, error_message);
let exponential_backoff = 2_u32.pow(*attempt);

let safe_delay = delay.checked_mul(exponential_backoff).unwrap_or(max_delay);

let new_delay = std::cmp::min(safe_delay, max_delay);
tokio::time::sleep(new_delay).await;
*attempt += 1;
}


pub async fn process_candidate_signature_response(
session: Arc<SessionInfo>,
block_id_short: BlockIdShort,
signatures: Vec<([u8; 32], [u8; 64])>,
listeners: &[Arc<dyn ValidatorEventListener>],
) -> Result<bool> {
trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response");
debug!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response");
let validation_status = session.get_validation_status(&block_id_short).await?;
trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Validation status: {:?}", validation_status);
if validation_status == ValidationResult::Valid
|| validation_status == ValidationResult::Invalid
{
debug!(
trace!(
"Validation status is already set for block {:?}.",
block_id_short
);
return Ok(true);
}

if session.get_block(&block_id_short).await.is_some() {
trace!(target: tracing_targets::VALIDATOR,
"Block {:?} is already in the session. Processing signatures.",
block_id_short);
session
.process_signatures_and_update_status(block_id_short, signatures, listeners)
.await?;
} else {
trace!(target: tracing_targets::VALIDATOR, "Caching signatures for block {:?}", block_id_short);
debug!(target: tracing_targets::VALIDATOR, "Caching signatures for block {:?}", block_id_short);
if block_id_short.seqno > 0 {
let previous_block =
BlockIdShort::from((block_id_short.shard, block_id_short.seqno - 1));
Expand Down

0 comments on commit 6aa50a8

Please sign in to comment.