Skip to content

Commit

Permalink
add some traces to relay
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik committed Mar 20, 2024
1 parent 3aa8e56 commit d296c4d
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 65 deletions.
82 changes: 60 additions & 22 deletions relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::{future::Fuse, select, Future, FutureExt};
use num_traits::Saturating;
use num_traits::{Saturating, Zero};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
Expand Down Expand Up @@ -128,11 +128,6 @@ pub struct SyncInfo<P: FinalitySyncPipeline> {
pub best_number_at_target: P::Number,
/// Whether the target client follows the same fork as the source client do.
pub is_using_same_fork: bool,
/// Free headers interval. We assume that the submission of header `N`, divisible
/// by `free_headers_interval` will be free for submitter. May be `None` if runtime
/// is configured to not allow free headers. If it is `Some(_)`, it is guaranteed
/// not to be zero.
pub free_headers_interval: Option<P::Number>,
}

impl<P: FinalitySyncPipeline> SyncInfo<P> {
Expand Down Expand Up @@ -171,22 +166,11 @@ impl<P: FinalitySyncPipeline> SyncInfo<P> {
target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
let best_number_at_target = best_id_at_target.0;

let mut free_headers_interval =
target_client.free_source_headers_interval().await.map_err(Error::Target)?;
if free_headers_interval == Some(0.into()) {
free_headers_interval = None;
}

let is_using_same_fork = Self::is_on_same_fork(source_client, &best_id_at_target)
.await
.map_err(Error::Source)?;

Ok(Self {
best_number_at_source,
best_number_at_target,
free_headers_interval,
is_using_same_fork,
})
Ok(Self { best_number_at_source, best_number_at_target, is_using_same_fork })
}

fn update_metrics(&self, metrics_sync: &Option<SyncLoopMetrics>) {
Expand Down Expand Up @@ -331,6 +315,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
pub async fn select_header_to_submit(
&mut self,
info: &SyncInfo<P>,
free_headers_interval: Option<P::Number>,
) -> Result<Option<JustifiedHeader<P>>, Error<P, SC::Error, TC::Error>> {
// to see that the loop is progressing
log::trace!(
Expand All @@ -345,6 +330,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
&self.source_client,
info,
self.sync_params.headers_to_relay,
free_headers_interval,
)
.await?;
// if we see that the header schedules GRANDPA change, we need to submit it
Expand All @@ -356,8 +342,11 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
// => even if we have already selected some header and its persistent finality proof,
// we may try to select better header by reading non-persistent proofs from the stream
self.finality_proofs_buf.fill(&mut self.finality_proofs_stream);
let maybe_justified_header =
selector.select(info, self.sync_params.headers_to_relay, &self.finality_proofs_buf);
let maybe_justified_header = selector.select(
self.sync_params.headers_to_relay,
free_headers_interval,
&self.finality_proofs_buf,
);

// remove obsolete 'recent' finality proofs + keep its size under certain limit
let oldest_finality_proof_to_keep = maybe_justified_header
Expand All @@ -374,6 +363,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality

pub async fn run_iteration(
&mut self,
free_headers_interval: Option<P::Number>,
) -> Result<
Option<Transaction<TC::TransactionTracker, P::Number>>,
Error<P, SC::Error, TC::Error>,
Expand All @@ -390,7 +380,7 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
}

// submit new header if we have something new
match self.select_header_to_submit(&info).await? {
match self.select_header_to_submit(&info, free_headers_interval).await? {
Some(header) => {
let transaction = Transaction::submit(
&self.target_client,
Expand Down Expand Up @@ -427,9 +417,11 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
let exit_signal = exit_signal.fuse();
futures::pin_mut!(exit_signal, proof_submission_tx_tracker);

let free_headers_interval = free_headers_interval(&self.target_client).await?;

loop {
// run loop iteration
let next_tick = match self.run_iteration().await {
let next_tick = match self.run_iteration(free_headers_interval).await {
Ok(Some(tx)) => {
proof_submission_tx_tracker
.set(tx.track::<P, SC, _>(self.target_client.clone()).fuse());
Expand Down Expand Up @@ -482,6 +474,52 @@ impl<P: FinalitySyncPipeline, SC: SourceClient<P>, TC: TargetClient<P>> Finality
}
}

async fn free_headers_interval<P: FinalitySyncPipeline>(
target_client: &impl TargetClient<P>,
) -> Result<Option<P::Number>, FailedClient> {
match target_client.free_source_headers_interval().await {
Ok(Some(free_headers_interval)) if !free_headers_interval.is_zero() => {
log::trace!(
target: "bridge",
"Free headers interval for {} headers at {} is: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
free_headers_interval,
);
Ok(Some(free_headers_interval))
},
Ok(Some(_free_headers_interval)) => {
log::trace!(
target: "bridge",
"Free headers interval for {} headers at {} is zero. Not submitting any free headers",
P::SOURCE_NAME,
P::TARGET_NAME,
);
Ok(None)
},
Ok(None) => {
log::trace!(
target: "bridge",
"Free headers interval for {} headers at {} is None. Not submitting any free headers",
P::SOURCE_NAME,
P::TARGET_NAME,
);

Ok(None)
},
Err(e) => {
log::error!(
target: "bridge",
"Failed to read free headers interval for {} headers at {}: {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
e,
);
Err(FailedClient::Target)
},
}
}

/// Run finality proofs synchronization loop.
pub async fn run<P: FinalitySyncPipeline>(
source_client: impl SourceClient<P>,
Expand Down
19 changes: 14 additions & 5 deletions relays/finality/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
source_client: &SC,
info: &SyncInfo<P>,
headers_to_relay: HeadersToRelay,
free_headers_interval: Option<P::Number>,
) -> Result<Self, Error<P, SC::Error, TC::Error>> {
let mut unjustified_headers = Vec::new();
let mut maybe_justified_header = None;
Expand All @@ -74,7 +75,9 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
return Ok(Self::Mandatory(JustifiedHeader { header, proof }))
},
(true, None) => return Err(Error::MissingMandatoryFinalityProof(header.number())),
(false, Some(proof)) if need_to_relay(info, headers_to_relay, &header) => {
(false, Some(proof))
if need_to_relay::<P>(headers_to_relay, free_headers_interval, &header) =>
{
log::trace!(target: "bridge", "Header {:?} has persistent finality proof", header_number);
unjustified_headers.clear();
maybe_justified_header = Some(JustifiedHeader { header, proof });
Expand Down Expand Up @@ -113,8 +116,8 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
/// justifications stream.
pub fn select(
self,
info: &SyncInfo<P>,
headers_to_relay: HeadersToRelay,
free_headers_interval: Option<P::Number>,
buf: &FinalityProofsBuf<P>,
) -> Option<JustifiedHeader<P>> {
let (unjustified_headers, maybe_justified_header) = match self {
Expand All @@ -134,7 +137,13 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {
(maybe_finality_proof, maybe_unjustified_header)
{
match finality_proof.target_header_number().cmp(&unjustified_header.number()) {
Ordering::Equal if need_to_relay(info, headers_to_relay, &unjustified_header) => {
Ordering::Equal
if need_to_relay::<P>(
headers_to_relay,
free_headers_interval,
&unjustified_header,
) =>
{
log::trace!(
target: "bridge",
"Managed to improve selected {} finality proof {:?} to {:?}.",
Expand Down Expand Up @@ -170,16 +179,16 @@ impl<P: FinalitySyncPipeline> JustifiedHeaderSelector<P> {

/// Returns true if we want to relay header `header_number`.
fn need_to_relay<P: FinalitySyncPipeline>(
info: &SyncInfo<P>,
headers_to_relay: HeadersToRelay,
free_headers_interval: Option<P::Number>,
header: &P::Header,
) -> bool {
match headers_to_relay {
HeadersToRelay::All => true,
HeadersToRelay::Mandatory => header.is_mandatory(),
HeadersToRelay::Free =>
header.is_mandatory() ||
info.free_headers_interval
free_headers_interval
.map(|free_headers_interval| {
!header.number().is_zero() &&
(header.number() % free_headers_interval).is_zero()
Expand Down
102 changes: 64 additions & 38 deletions relays/parachains/src/parachains_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,36 @@ where
// free parachain header = header, available (proved) at free relay chain block. Let's
// read interval of free source relay chain blocks from target client
let free_source_relay_headers_interval = if only_free_headers {
let free_source_relay_headers_interval = target_client
.free_source_relay_headers_interval()
.await
.map_err(|e| {
log::warn!(target: "bridge", "Failed to read free {} headers interval at {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, e);
let free_source_relay_headers_interval =
target_client.free_source_relay_headers_interval().await.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to read free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
e,
);
FailedClient::Target
})?;
match free_source_relay_headers_interval {
Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 =>
free_source_relay_headers_interval,
Some(free_source_relay_headers_interval) if free_source_relay_headers_interval != 0 => {
log::trace!(
target: "bridge",
"Free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
free_source_relay_headers_interval,
);
free_source_relay_headers_interval
},
_ => {
log::warn!(target: "bridge", "Invalid free {} headers interval at {}: {:?}", P::SourceRelayChain::NAME, P::TargetChain::NAME, free_source_relay_headers_interval);
log::warn!(
target: "bridge",
"Invalid free {} headers interval at {}: {:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
free_source_relay_headers_interval,
);
return Err(FailedClient::Target)
},
}
Expand Down Expand Up @@ -305,39 +323,47 @@ where
// head, available at best free source relay chain header, known to the
// target chain
let prove_at_relay_block = if only_free_headers {
let relay_of_head_at_target = match relay_of_head_at_target {
Some(relay_of_head_at_target) => relay_of_head_at_target,
match relay_of_head_at_target {
Some(relay_of_head_at_target) => {
// find last free relay chain header in the range that we are interested in
let scan_range_begin = relay_of_head_at_target.number() + 1;
let scan_range_end = best_finalized_relay_block_at_target.number();
let last_free_source_relay_header_number = (scan_range_end /
free_source_relay_headers_interval) *
free_source_relay_headers_interval;
if last_free_source_relay_header_number < scan_range_begin {
// there are no new **free** relay chain headers in the range
log::trace!(
target: "bridge",
"Waiting for new free {} headers at {}: scanned {:?}..={:?}",
P::SourceRelayChain::NAME,
P::TargetChain::NAME,
scan_range_begin,
scan_range_end,
);
continue;
}

// ok - we know the relay chain header number, now let's get its full id
source_client
.relay_header_id(last_free_source_relay_header_number)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to get full header id of {} block #{:?}: {:?}",
P::SourceRelayChain::NAME,
last_free_source_relay_header_number,
e,
);
FailedClient::Source
})?
},
None => {
// no relay headers available at target => wait
continue
// no parachain head at target => let's submit first one
best_finalized_relay_block_at_target
},
};

// find last free relay chain header in the range that we are interested in
let scan_range_begin = relay_of_head_at_target.number() + 1;
let scan_range_end = best_finalized_relay_block_at_target.number();
let last_free_source_relay_header_number = (scan_range_end /
free_source_relay_headers_interval) *
free_source_relay_headers_interval;
if last_free_source_relay_header_number < scan_range_begin {
// there are no new **free** relay chain headers in the range
continue;
}

// ok - we know the relay chain header number, now let's get its full id
source_client
.relay_header_id(last_free_source_relay_header_number)
.await
.map_err(|e| {
log::warn!(
target: "bridge",
"Failed to get full header id of {} block #{:?}: {:?}",
P::SourceRelayChain::NAME,
last_free_source_relay_header_number,
e,
);
FailedClient::Source
})?
} else {
best_finalized_relay_block_at_target
};
Expand Down

0 comments on commit d296c4d

Please sign in to comment.