From 3b91a1d32fb4633b35f12dffd7c8e5ca79991594 Mon Sep 17 00:00:00 2001 From: Kirill Mikheev Date: Tue, 16 Jul 2024 17:13:38 +0300 Subject: [PATCH] feature(consensus): signal to retry download after validation complete --- consensus/src/intercom/core/responder.rs | 10 ++-- .../src/intercom/dependency/downloader.rs | 54 +++++++++++-------- consensus/src/intercom/dependency/uploader.rs | 12 +++-- consensus/src/intercom/dto.rs | 5 +- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/consensus/src/intercom/core/responder.rs b/consensus/src/intercom/core/responder.rs index 4859f557f..f1a837eee 100644 --- a/consensus/src/intercom/core/responder.rs +++ b/consensus/src/intercom/core/responder.rs @@ -91,7 +91,7 @@ impl Responder { MPResponse::Broadcast } MPQuery::PointById(point_id) => MPResponse::PointById(match inner { - None => PointByIdResponse(None), + None => PointByIdResponse::TryLater, Some(inner) => { Uploader::find(&peer_id, &point_id, &inner.top_dag_round, &inner.effects) } @@ -124,12 +124,12 @@ impl EngineContext { MPResponse::Signature( SignatureResponse::Signature(_) | SignatureResponse::Rejected(_), ) => "tycho_mempool_signature_query_responder_data_time", - MPResponse::PointById(PointByIdResponse(Some(_))) => { + MPResponse::PointById(PointByIdResponse::Defined(Some(_))) => { "tycho_mempool_download_query_responder_some_time" } - MPResponse::PointById(PointByIdResponse(None)) => { - "tycho_mempool_download_query_responder_none_time" - } + MPResponse::PointById( + PointByIdResponse::Defined(None) | PointByIdResponse::TryLater, + ) => "tycho_mempool_download_query_responder_none_time", }; metrics::histogram!(metric_name).record(elapsed); } diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index 2e47e435d..18cf6256b 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -302,24 +302,32 @@ impl DownloadTask { peer_id: &PeerId, resolved: anyhow::Result, ) -> Option { - let response = match resolved { - Ok(response) => response, - Err(network_err) => { - let status = self - .undone_peers - .get_mut(peer_id) - .unwrap_or_else(|| panic!("Coding error: peer not in map {}", peer_id.alt())); - status.is_in_flight = false; - status.failed_queries = status.failed_queries.saturating_add(1); - metrics::counter!(DownloadContext::FAILED_QUERY).increment(1); - tracing::warn!( - peer = display(peer_id.alt()), - error = display(network_err), - "network error", - ); - return None; - } - }; + let defined_response = + match resolved { + Ok(PointByIdResponse::Defined(response)) => response, + Ok(PointByIdResponse::TryLater) => { + let status = self.undone_peers.get_mut(peer_id).unwrap_or_else(|| { + panic!("Coding error: peer not in map {}", peer_id.alt()) + }); + status.is_in_flight = false; + tracing::trace!(peer = display(peer_id.alt()), "try later"); + return None; + } + Err(network_err) => { + let status = self.undone_peers.get_mut(peer_id).unwrap_or_else(|| { + panic!("Coding error: peer not in map {}", peer_id.alt()) + }); + status.is_in_flight = false; + status.failed_queries = status.failed_queries.saturating_add(1); + metrics::counter!(DownloadContext::FAILED_QUERY).increment(1); + tracing::warn!( + peer = display(peer_id.alt()), + error = display(network_err), + "network error", + ); + return None; + } + }; let Some(status) = self.undone_peers.remove(peer_id) else { panic!("peer {} was removed, concurrent download?", peer_id.alt()); @@ -330,8 +338,8 @@ impl DownloadTask { peer_id.alt(), ); - match response { - PointByIdResponse(None) => { + match defined_response { + None => { if status.is_depender { // if points are persisted in storage - it's a ban; // else - peer evicted this point from its cache, as the point @@ -342,11 +350,11 @@ impl DownloadTask { tracing::warn!(peer = display(peer_id.alt()), "must have returned"); } else { self.reliably_not_found = self.reliably_not_found.saturating_add(1); - tracing::debug!(peer = display(peer_id.alt()), "didn't return"); + tracing::trace!(peer = display(peer_id.alt()), "didn't return"); } None } - PointByIdResponse(Some(point)) if point.id() != self.point_id => { + Some(point) if point.id() != self.point_id => { // it's a ban self.unreliable_peers = self.unreliable_peers.saturating_add(1); tracing::error!( @@ -358,7 +366,7 @@ impl DownloadTask { ); None } - PointByIdResponse(Some(point)) => { + Some(point) => { match Verifier::verify(&point, &self.parent.inner.peer_schedule) { Err(dag_point) => { // reliable peer won't return unverifiable point diff --git a/consensus/src/intercom/dependency/uploader.rs b/consensus/src/intercom/dependency/uploader.rs index a26f6ec29..1d1ff1dd1 100644 --- a/consensus/src/intercom/dependency/uploader.rs +++ b/consensus/src/intercom/dependency/uploader.rs @@ -61,10 +61,12 @@ impl Uploader { digest = display(point_id.digest.alt()), "upload", ); - PointByIdResponse( - ready - .and_then(|dag_point| dag_point.into_trusted()) - .map(|valid| valid.point), - ) + match ready { + Some(dag_point) => { + PointByIdResponse::Defined(dag_point.into_trusted().map(|valid| valid.point)) + } + None if task_found => PointByIdResponse::TryLater, + None => PointByIdResponse::Defined(None), + } } } diff --git a/consensus/src/intercom/dto.rs b/consensus/src/intercom/dto.rs index 5cfbf2ded..a829c7071 100644 --- a/consensus/src/intercom/dto.rs +++ b/consensus/src/intercom/dto.rs @@ -6,7 +6,10 @@ use crate::effects::{AltFmt, AltFormat}; use crate::models::{Point, Signature}; #[derive(Debug, Serialize, Deserialize)] -pub struct PointByIdResponse(pub Option); +pub enum PointByIdResponse { + Defined(Option), + TryLater, +} /// Denotes that broadcasts should be done via network query, not send message. /// Because initiator must not duplicate its broadcasts, thus should wait for receiver to respond.