Skip to content

Commit

Permalink
feature(consensus): signal to retry download after validation complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jul 16, 2024
1 parent f6bfa23 commit 3b91a1d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 34 deletions.
10 changes: 5 additions & 5 deletions consensus/src/intercom/core/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Responder {
MPResponse::Broadcast
}
MPQuery::PointById(point_id) => MPResponse::PointById(match inner {
None => PointByIdResponse(None),
None => PointByIdResponse::TryLater,
Some(inner) => {
Uploader::find(&peer_id, &point_id, &inner.top_dag_round, &inner.effects)
}
Expand Down Expand Up @@ -124,12 +124,12 @@ impl EngineContext {
MPResponse::Signature(
SignatureResponse::Signature(_) | SignatureResponse::Rejected(_),
) => "tycho_mempool_signature_query_responder_data_time",
MPResponse::PointById(PointByIdResponse(Some(_))) => {
MPResponse::PointById(PointByIdResponse::Defined(Some(_))) => {
"tycho_mempool_download_query_responder_some_time"
}
MPResponse::PointById(PointByIdResponse(None)) => {
"tycho_mempool_download_query_responder_none_time"
}
MPResponse::PointById(
PointByIdResponse::Defined(None) | PointByIdResponse::TryLater,
) => "tycho_mempool_download_query_responder_none_time",
};
metrics::histogram!(metric_name).record(elapsed);
}
Expand Down
54 changes: 31 additions & 23 deletions consensus/src/intercom/dependency/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,24 +302,32 @@ impl DownloadTask {
peer_id: &PeerId,
resolved: anyhow::Result<PointByIdResponse>,
) -> Option<Point> {
let response = match resolved {
Ok(response) => response,
Err(network_err) => {
let status = self
.undone_peers
.get_mut(peer_id)
.unwrap_or_else(|| panic!("Coding error: peer not in map {}", peer_id.alt()));
status.is_in_flight = false;
status.failed_queries = status.failed_queries.saturating_add(1);
metrics::counter!(DownloadContext::FAILED_QUERY).increment(1);
tracing::warn!(
peer = display(peer_id.alt()),
error = display(network_err),
"network error",
);
return None;
}
};
let defined_response =
match resolved {
Ok(PointByIdResponse::Defined(response)) => response,
Ok(PointByIdResponse::TryLater) => {
let status = self.undone_peers.get_mut(peer_id).unwrap_or_else(|| {
panic!("Coding error: peer not in map {}", peer_id.alt())
});
status.is_in_flight = false;
tracing::trace!(peer = display(peer_id.alt()), "try later");
return None;
}
Err(network_err) => {
let status = self.undone_peers.get_mut(peer_id).unwrap_or_else(|| {
panic!("Coding error: peer not in map {}", peer_id.alt())
});
status.is_in_flight = false;
status.failed_queries = status.failed_queries.saturating_add(1);
metrics::counter!(DownloadContext::FAILED_QUERY).increment(1);
tracing::warn!(
peer = display(peer_id.alt()),
error = display(network_err),
"network error",
);
return None;
}
};

let Some(status) = self.undone_peers.remove(peer_id) else {
panic!("peer {} was removed, concurrent download?", peer_id.alt());
Expand All @@ -330,8 +338,8 @@ impl DownloadTask {
peer_id.alt(),
);

match response {
PointByIdResponse(None) => {
match defined_response {
None => {
if status.is_depender {
// if points are persisted in storage - it's a ban;
// else - peer evicted this point from its cache, as the point
Expand All @@ -342,11 +350,11 @@ impl DownloadTask {
tracing::warn!(peer = display(peer_id.alt()), "must have returned");
} else {
self.reliably_not_found = self.reliably_not_found.saturating_add(1);
tracing::debug!(peer = display(peer_id.alt()), "didn't return");
tracing::trace!(peer = display(peer_id.alt()), "didn't return");
}
None
}
PointByIdResponse(Some(point)) if point.id() != self.point_id => {
Some(point) if point.id() != self.point_id => {
// it's a ban
self.unreliable_peers = self.unreliable_peers.saturating_add(1);
tracing::error!(
Expand All @@ -358,7 +366,7 @@ impl DownloadTask {
);
None
}
PointByIdResponse(Some(point)) => {
Some(point) => {
match Verifier::verify(&point, &self.parent.inner.peer_schedule) {
Err(dag_point) => {
// reliable peer won't return unverifiable point
Expand Down
12 changes: 7 additions & 5 deletions consensus/src/intercom/dependency/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ impl Uploader {
digest = display(point_id.digest.alt()),
"upload",
);
PointByIdResponse(
ready
.and_then(|dag_point| dag_point.into_trusted())
.map(|valid| valid.point),
)
match ready {
Some(dag_point) => {
PointByIdResponse::Defined(dag_point.into_trusted().map(|valid| valid.point))
}
None if task_found => PointByIdResponse::TryLater,
None => PointByIdResponse::Defined(None),
}
}
}
5 changes: 4 additions & 1 deletion consensus/src/intercom/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use crate::effects::{AltFmt, AltFormat};
use crate::models::{Point, Signature};

#[derive(Debug, Serialize, Deserialize)]
pub struct PointByIdResponse(pub Option<Point>);
pub enum PointByIdResponse {
Defined(Option<Point>),
TryLater,
}

/// Denotes that broadcasts should be done via network query, not send message.
/// Because initiator must not duplicate its broadcasts, thus should wait for receiver to respond.
Expand Down

0 comments on commit 3b91a1d

Please sign in to comment.