Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify archiver to support fast-sync. #2722

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 123 additions & 53 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#[cfg(test)]
mod tests;

use crate::block_import::BlockImportingNotification;
use crate::slot_worker::SubspaceSyncOracle;
use crate::{SubspaceLink, SubspaceNotificationSender};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -78,7 +79,7 @@ use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

/// Number of WASM instances is 8, this is a bit lower to avoid warnings exceeding number of
/// instances
Expand Down Expand Up @@ -520,6 +521,7 @@ fn initialize_archiver<Block, Client, AS>(
segment_headers_store: &SegmentHeadersStore<AS>,
subspace_link: &SubspaceLink<Block>,
client: &Client,
overridden_last_archived_block: Option<NumberFor<Block>>,
) -> sp_blockchain::Result<InitializedArchiver<Block>>
where
Block: BlockT,
Expand All @@ -536,11 +538,16 @@ where
.chain_constants(best_block_hash)?
.confirmation_depth_k();

let maybe_last_archived_block = find_last_archived_block(
client,
segment_headers_store,
best_block_number.saturating_sub(confirmation_depth_k.into()),
)?;
let best_block_to_archive =
if let Some(overridden_last_archived_block) = overridden_last_archived_block {
overridden_last_archived_block
} else {
best_block_number.saturating_sub(confirmation_depth_k.into())
};

let maybe_last_archived_block =
find_last_archived_block(client, segment_headers_store, best_block_to_archive)?;

let have_last_segment_header = maybe_last_archived_block.is_some();
let mut best_archived_block = None;

Expand Down Expand Up @@ -783,64 +790,124 @@ where
AS: AuxStore + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + 'static,
{
let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
let mut maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
Some(initialize_archiver(
&segment_headers_store,
&subspace_link,
client.as_ref(),
None,
)?)
} else {
None
};

let mut block_importing_notification_stream = subspace_link
let block_importing_notification_stream = subspace_link
.block_importing_notification_stream
.subscribe();

Ok(async move {
let archiver = match maybe_archiver {
Some(archiver) => archiver,
None => initialize_archiver(&segment_headers_store, &subspace_link, client.as_ref())?,
};
let mut saved_block_import_notification = None;

let InitializedArchiver {
confirmation_depth_k,
mut archiver,
older_archived_segments,
best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number),
} = archiver;

let archived_segment_notification_sender =
subspace_link.archived_segment_notification_sender.clone();

// Farmers may have not received all previous segments, send them now.
for archived_segment in older_archived_segments {
send_archived_segment_notification(
&archived_segment_notification_sender,
archived_segment,
)
.await;
}
let mut block_importing_notification_stream = block_importing_notification_stream;
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

while let Some(ref block_import_notification) =
block_importing_notification_stream.next().await
{
archive_block(
&mut archiver,
segment_headers_store.clone(),
client.clone(),
&sync_oracle,
telemetry.clone(),
archived_segment_notification_sender.clone(),
'initialization_recovery: loop {
let overridden_last_archived_block = saved_block_import_notification
.as_ref()
.map(|notification: &BlockImportingNotification<Block>| notification.block_number);
let archived_segment_notification_sender =
subspace_link.archived_segment_notification_sender.clone();

let archiver = match maybe_archiver.take() {
Some(archiver) => archiver,
None => initialize_archiver(
&segment_headers_store,
&subspace_link,
client.as_ref(),
overridden_last_archived_block,
)?,
};

let InitializedArchiver {
confirmation_depth_k,
&mut best_archived_block_number,
&mut best_archived_block_hash,
block_import_notification.block_number,
)
.await?;
}
mut archiver,
older_archived_segments,
best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number),
} = archiver;

debug!(%best_archived_block_number, ?best_archived_block_hash, "Archiver initialized.");
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

if let Some(block_import_notification) = saved_block_import_notification.take() {
let success = archive_block(
&mut archiver,
segment_headers_store.clone(),
client.clone(),
&sync_oracle,
telemetry.clone(),
archived_segment_notification_sender.clone(),
confirmation_depth_k,
&mut best_archived_block_number,
&mut best_archived_block_hash,
block_import_notification.block_number,
)
.await?;

Ok(())
if !success {
let error = format!(
"Failed to recover the archiver for block number: {}",
block_import_notification.block_number
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to log an error here if we exit with it anyway. The whole node will crash and I believe we will log the error at higher level anyway.

error!(error);

return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation was that archiver would just try again. For that to happen we don't need to save block import notification, we can simply wait for next notification before doing re-initialization. And try that in a loop until it succeeds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow, if we don't save the import notification - we skip it and fail later. The current loop structure was agreed previously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure was primarily needed for override. Off top of my head I don't see the reason why this is needed and it does make code much harder to follow even if it works correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this thread's question, please rephrase - meanwhile, I'll try to explain how it works:
When the archiver detects a gap between blocks it returns the control to the calling function which in turn saves the current block notification. After that, it reinitializes archiver (via initialization loop) and uses the saved block notification to archive a block again. After that, it resumes the normal process of the block notification processing. If we don't save the problematic block notification then it would be lost and the archiving process would be broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't save the problematic block notification then it would be lost and the archiving process would be broken.

It will not be if we wait for the next block import as I suggested. Archiver will restart in fully deterministic way and will continue operation just fine.

The issue with this line I commented on is that it returns an error, meaning archiver will exit and node will crash with an error. While I think archiver should just restart in a loop until it succeeds.

error.into(),
)));
} else {
debug!(
"Saved block import notification succeeded: #{}",
block_import_notification.block_number
);
}
}

// Farmers may have not received all previous segments, send them now.
for archived_segment in older_archived_segments {
send_archived_segment_notification(
&archived_segment_notification_sender,
archived_segment,
)
.await;
}

while let Some(block_import_notification) =
block_importing_notification_stream.next().await
{
let success = archive_block(
&mut archiver,
segment_headers_store.clone(),
client.clone(),
&sync_oracle,
telemetry.clone(),
archived_segment_notification_sender.clone(),
confirmation_depth_k,
&mut best_archived_block_number,
&mut best_archived_block_hash,
block_import_notification.block_number,
)
.await?;

if !success {
warn!(
"Archiver detected an error. \
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
Attempting recovery with block number = {}...",
block_import_notification.block_number
);

saved_block_import_notification = Some(block_import_notification);

continue 'initialization_recovery;
}
}
}
})
}

Expand Down Expand Up @@ -885,14 +952,16 @@ where
return Ok(true);
}

*best_archived_block_number = block_number_to_archive;
let maybe_block_hash = client.hash(block_number_to_archive)?;

let Some(block_hash) = maybe_block_hash else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the correct or maybe exhausive condition. The fact that we don't have a block number to archive doesn't mean we can restart archiver either.

What I think this should check is whether the gap between last archived block and current block to archive is not 1. If it isn't, archiver state will be inconsistent even if block to archive exists (which is theoretically possible). From there we need to retry archiver initialization until it succeeds (because again it is not guaranteed to in case block was imported in such a way that archiver can't be initialized).

The logic here is very fragile and has implicit assumptions that are not obvious, the main of which is that the block import notitification will be about the first block in the segment header or else archiver will either fail to initialize or initialize in the wrong state (not sure which one and lazy to analyze all the code paths right now).

I actually don't think this will work for fast sync from what I recall because in fast sync the first block we import manually bypassing all the checks is the block at which archiver should be initialized and block import notification will be fired with the block that follows. So you have to let archiver pick last archived block and re-initialize itself properly instead of overriding last archived block like done in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I think this should check is whether the gap between last archived block and current block to archive is not 1. If it isn't, archiver state will be inconsistent even if block to archive exists (which is theoretically possible).

I agree here - this would be an improvement. I'll change it when we agree on other points.

.. From there we need to retry archiver initialization until it succeeds (because again it is not guaranteed to in case block was imported in such a way that archiver can't be initialized).

I don't think I understand here. Why do you think we need to try to reinitialize the archiver until it succeeds when it fails after the first reinitialization? The current loop allows failing exactly once for each block import attempt because subsequent initialization won't change anything and it's better to fail fast here.

The logic here is very fragile and has implicit assumptions that are not obvious, the main of which is that the block import notitification will be about the first block in the segment header or else archiver will either fail to initialize or initialize in the wrong state (not sure which one and lazy to analyze all the code paths right now).

This confuses me a lot because it's pretty much my own argument when we discussed this approach in contrast to an explicit reinitialization of the previous version.

I actually don't think this will work for fast sync from what I recall because in fast sync the first block we import manually bypassing all the checks is the block at which archiver should be initialized and block import notification will be fired with the block that follows. So you have to let archiver pick last archived block and re-initialize itself properly instead of overriding last archived block like done in this PR.

I tested the PR by applying all the rest of the fast-sync solution and it works as expected. The overriding emerges when we need to deal with the confirmation_depth_k subtraction - I don't see a better way to work around this operation and am happy to implement it differently.

Overall after the last two refactoring PRs the current solution is very close to what we discussed previously (at least from my perspective) as an alternative to the event-based explicit initialization. The deviation from that (best block to archive override and saved block import notification) emerged with the practical implementation of the original sketch. Please, let me know what you think is missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand here. Why do you think we need to try to reinitialize the archiver until it succeeds when it fails after the first reinitialization? The current loop allows failing exactly once for each block import attempt because subsequent initialization won't change anything and it's better to fail fast here.

I should have mentioned that each new attempt should be made after new important blocks. Does it make more sense with this context?

This confuses me a lot because it's pretty much my own argument when we discussed this approach in contrast to an explicit reinitialization of the previous version.

Ideally we would have neither explicit reinitialization nor the issues mentioned and I do believe it is possible.

I tested the PR by applying all the rest of the fast-sync solution and it works as expected. The overriding emerges when we need to deal with the confirmation_depth_k subtraction - I don't see a better way to work around this operation and am happy to implement it differently.

I suspect it worked as expected until it didn't. It would have failed on the next segment header that would happen at a different point/with a different state. Can be verified by modifying your fast sync text to import block from pre-last segment instead so you can check if the next segment is processed correctly. I bet it will not succeed.

Overall after the last two refactoring PRs the current solution is very close to what we discussed previously (at least from my perspective) as an alternative to the event-based explicit initialization. The deviation from that (best block to archive override and saved block import notification) emerged with the practical implementation of the original sketch. Please, let me know what you think is missing.

I agree, just trying to analyze the code path and see if there are issues/improvements with what I see.

It would be great to have tests here to check such cases, but there are quite a few bounds in this function that makes it difficult.

warn!("Archiver detected an error: older block by number must always exist. ");

return Ok(false);
};

let block = client
.block(
client
.hash(block_number_to_archive)?
.expect("Older block by number must always exist"),
)?
.block(block_hash)?
.expect("Older block by number must always exist");

let parent_block_hash = *block.block.header().parent_hash();
Expand All @@ -914,6 +983,7 @@ where
)));
}

*best_archived_block_number = block_number_to_archive;
*best_archived_block_hash = block_hash_to_archive;

let block_object_mappings = client
Expand Down
Loading