Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-recovery): support for local snapshots #2792

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/lib/snapshots_applier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { workspace = true, features = ["time"] }
tracing.workspace = true
thiserror.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
115 changes: 106 additions & 9 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mod metrics;
#[cfg(test)]
mod tests;

pub const LOCAL_SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot_header.json";

#[derive(Debug, Serialize)]
struct SnapshotsApplierHealthDetails {
snapshot_l2_block: L2BlockNumber,
Expand Down Expand Up @@ -260,6 +262,7 @@ pub struct SnapshotsApplierTask {
connection_pool: ConnectionPool<Core>,
main_node_client: Box<dyn SnapshotsApplierMainNodeClient>,
blob_store: Arc<dyn ObjectStore>,
local_snapshot_dir: Option<String>,
}

impl SnapshotsApplierTask {
Expand All @@ -268,6 +271,7 @@ impl SnapshotsApplierTask {
connection_pool: ConnectionPool<Core>,
main_node_client: Box<dyn SnapshotsApplierMainNodeClient>,
blob_store: Arc<dyn ObjectStore>,
local_snapshot_dir: Option<String>,
) -> Self {
Self {
snapshot_l1_batch: None,
Expand All @@ -277,6 +281,7 @@ impl SnapshotsApplierTask {
connection_pool,
main_node_client,
blob_store,
local_snapshot_dir,
}
}

Expand Down Expand Up @@ -407,6 +412,12 @@ impl SnapshotsApplierTask {
}
}

/// The type of snapshot recovery method used.
enum SnapshotRecoveryType {
Remote,
Local { snapshot_dir: String },
}

/// Strategy determining how snapshot recovery should proceed.
#[derive(Debug, Clone, Copy)]
enum SnapshotRecoveryStrategy {
Expand All @@ -423,6 +434,7 @@ impl SnapshotRecoveryStrategy {
storage: &mut Connection<'_, Core>,
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
recovery_type: SnapshotRecoveryType,
) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> {
let latency =
METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start();
Expand All @@ -438,12 +450,17 @@ impl SnapshotRecoveryStrategy {
}

let l1_batch_number = applied_snapshot_status.l1_batch_number;
let snapshot_header = main_node_client
.fetch_snapshot(l1_batch_number)
.await?
.with_context(|| {
format!("snapshot for L1 batch #{l1_batch_number} is no longer present on main node")
})?;
let snapshot_header = match recovery_type {
SnapshotRecoveryType::Local { snapshot_dir } => read_local_snapshot_header(&snapshot_dir)?,
SnapshotRecoveryType::Remote => {
main_node_client
.fetch_snapshot(l1_batch_number)
.await?
.with_context(|| {
format!("snapshot for L1 batch #{l1_batch_number} is no longer present on main node")
})?
},
};
// Old snapshots can theoretically be removed by the node, but in this case the snapshot data may be removed as well,
// so returning an error looks appropriate here.
let snapshot_version = Self::check_snapshot_version(snapshot_header.version)?;
Expand All @@ -460,8 +477,15 @@ impl SnapshotRecoveryStrategy {
return Err(SnapshotsApplierError::Fatal(err));
}

let (recovery_status, snapshot_version) =
Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?;
let (recovery_status, snapshot_version) = match recovery_type {
SnapshotRecoveryType::Remote => {
Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?
}
SnapshotRecoveryType::Local { snapshot_dir } => {
Self::create_fresh_recovery_status_from_local(main_node_client, snapshot_dir)
.await?
}
};

let storage_logs_count = storage
.storage_logs_dal()
Expand All @@ -482,6 +506,60 @@ impl SnapshotRecoveryStrategy {
}
}

async fn create_fresh_recovery_status_from_local(
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_dir: String,
) -> Result<(SnapshotRecoveryStatus, SnapshotVersion), SnapshotsApplierError> {
let snapshot = read_local_snapshot_header(&snapshot_dir)?;
let l1_batch_number = snapshot.l1_batch_number;
let l2_block_number = snapshot.l2_block_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \
version {version}, storage logs are divided into {chunk_count} chunk(s)",
version = snapshot.version,
chunk_count = snapshot.storage_logs_chunks.len()
);
let snapshot_version = Self::check_snapshot_version(snapshot.version)?;

let l1_batch = main_node_client
.fetch_l1_batch_details(l1_batch_number)
.await?
.with_context(|| format!("L1 batch #{l1_batch_number} is missing on main node"))?;
let l1_batch_root_hash = l1_batch
.base
.root_hash
.context("snapshot L1 batch fetched from main node doesn't have root hash set")?;
let l2_block = main_node_client
.fetch_l2_block_details(l2_block_number)
.await?
.with_context(|| format!("L2 block #{l2_block_number} is missing on main node"))?;
let l2_block_hash = l2_block
.base
.root_hash
.context("snapshot L2 block fetched from main node doesn't have hash set")?;
let protocol_version = l2_block.protocol_version.context(
"snapshot L2 block fetched from main node doesn't have protocol version set",
)?;
if l2_block.l1_batch_number != l1_batch_number {
let err = anyhow::anyhow!(
"snapshot L2 block returned by main node doesn't belong to expected L1 batch #{l1_batch_number}: {l2_block:?}"
);
return Err(err.into());
}

let status = SnapshotRecoveryStatus {
l1_batch_number,
l1_batch_timestamp: l1_batch.base.timestamp,
l1_batch_root_hash,
l2_block_number: snapshot.l2_block_number,
l2_block_timestamp: l2_block.base.timestamp,
l2_block_hash,
protocol_version,
storage_logs_chunks_processed: vec![false; snapshot.storage_logs_chunks.len()],
};
Ok((status, snapshot_version))
}

async fn create_fresh_recovery_status(
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
Expand Down Expand Up @@ -677,10 +755,19 @@ impl<'a> SnapshotsApplier<'a> {
.await?;
let mut storage_transaction = storage.start_transaction().await?;

let recovery_type = if let Some(snapshot_dir) = &task.local_snapshot_dir {
SnapshotRecoveryType::Local {
snapshot_dir: snapshot_dir.clone(),
}
} else {
SnapshotRecoveryType::Remote
};
// TODO: recovery type based on config.
let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new(
&mut storage_transaction,
main_node_client,
task.snapshot_l1_batch,
recovery_type,
)
.await?;
tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}");
Expand Down Expand Up @@ -892,7 +979,7 @@ impl<'a> SnapshotsApplier<'a> {
}
let latency = latency.observe();
tracing::info!(
"Loaded {} storage logs from GCS for chunk {chunk_id} in {latency:?}",
"Loaded {} storage logs for chunk {chunk_id} in {latency:?}",
storage_logs.len()
);

Expand Down Expand Up @@ -1040,3 +1127,13 @@ impl<'a> SnapshotsApplier<'a> {
Ok(())
}
}

fn read_local_snapshot_header(snapshot_dir: &str) -> Result<SnapshotHeader, SnapshotsApplierError> {
let snapshot_header_path =
format!("{snapshot_dir}/storage_logs_snapshots/{LOCAL_SNAPSHOT_HEADER_FILE_NAME}");
let snapshot_header_file = std::fs::read_to_string(&snapshot_header_path)
.context("the provided file path was not found")?;
let snapshot: SnapshotHeader = serde_json::from_str(&snapshot_header_file)
.context("failed to deserialize snapshot header file")?;
Ok(snapshot)
}
19 changes: 19 additions & 0 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async fn snapshots_creator_can_successfully_recover_db(
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
None,
);
let task_health = task.health_check();
let (_stop_sender, stop_receiver) = watch::channel(false);
Expand Down Expand Up @@ -138,6 +139,7 @@ async fn snapshots_creator_can_successfully_recover_db(
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
None,
);

let (_stop_sender, stop_receiver) = watch::channel(false);
Expand All @@ -163,6 +165,7 @@ async fn snapshots_creator_can_successfully_recover_db(
pool.clone(),
Box::new(client),
object_store,
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
let stats = task.run(stop_receiver).await.unwrap();
Expand All @@ -182,6 +185,7 @@ async fn applier_recovers_v0_snapshot(drop_storage_key_preimages: bool) {
pool.clone(),
Box::new(client),
object_store,
None,
);
if drop_storage_key_preimages {
task.drop_storage_key_preimages();
Expand Down Expand Up @@ -229,6 +233,7 @@ async fn applier_recovers_explicitly_specified_snapshot() {
pool.clone(),
Box::new(client),
object_store,
None,
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number);
let (_stop_sender, stop_receiver) = watch::channel(false);
Expand All @@ -255,6 +260,7 @@ async fn applier_error_for_missing_explicitly_specified_snapshot() {
pool,
Box::new(client),
object_store,
None,
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1);

Expand Down Expand Up @@ -283,6 +289,7 @@ async fn snapshot_applier_recovers_after_stopping() {
pool.clone(),
Box::new(client.clone()),
Arc::new(stopping_object_store),
None,
);
let (_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));
Expand Down Expand Up @@ -320,6 +327,7 @@ async fn snapshot_applier_recovers_after_stopping() {
pool.clone(),
Box::new(client.clone()),
Arc::new(stopping_object_store),
None,
);
let (_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));
Expand Down Expand Up @@ -347,6 +355,7 @@ async fn snapshot_applier_recovers_after_stopping() {
pool.clone(),
Box::new(client.clone()),
Arc::new(stopping_object_store),
None,
);
task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine
let (_stop_sender, stop_receiver) = watch::channel(false);
Expand Down Expand Up @@ -419,6 +428,7 @@ async fn health_status_immediately_after_task_start() {
ConnectionPool::<Core>::test_pool().await,
Box::new(client.clone()),
object_store,
None,
);
let task_health = task.health_check();
let (_stop_sender, task_stop_receiver) = watch::channel(false);
Expand Down Expand Up @@ -475,6 +485,7 @@ async fn applier_errors_after_genesis() {
pool,
Box::new(client),
object_store,
None,
);
let (_stop_sender, task_stop_receiver) = watch::channel(false);
task.run(task_stop_receiver).await.unwrap_err();
Expand All @@ -491,6 +502,7 @@ async fn applier_errors_without_snapshots() {
pool,
Box::new(client),
object_store,
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap_err();
Expand All @@ -511,6 +523,7 @@ async fn applier_errors_with_unrecognized_snapshot_version() {
pool,
Box::new(client),
object_store,
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap_err();
Expand All @@ -531,6 +544,7 @@ async fn applier_returns_error_on_fatal_object_store_error() {
pool,
Box::new(client),
Arc::new(object_store),
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
let err = task.run(stop_receiver).await.unwrap_err();
Expand Down Expand Up @@ -560,6 +574,7 @@ async fn applier_returns_error_after_too_many_object_store_retries() {
pool,
Box::new(client),
Arc::new(object_store),
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
let err = task.run(stop_receiver).await.unwrap_err();
Expand Down Expand Up @@ -600,6 +615,7 @@ async fn recovering_tokens() {
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
let task_result = task.run(stop_receiver).await;
Expand All @@ -617,6 +633,7 @@ async fn recovering_tokens() {
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();
Expand Down Expand Up @@ -652,6 +669,7 @@ async fn recovering_tokens() {
pool,
Box::new(client),
object_store,
None,
);
let (_stop_sender, stop_receiver) = watch::channel(false);
task.run(stop_receiver).await.unwrap();
Expand All @@ -674,6 +692,7 @@ async fn snapshot_applier_can_be_canceled() {
pool.clone(),
Box::new(client.clone()),
Arc::new(stopping_object_store),
None,
);
let (task_stop_sender, task_stop_receiver) = watch::channel(false);
let task_handle = tokio::spawn(task.run(task_stop_receiver));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl WiringLayer for ExternalNodeInitStrategyLayer {
client: client.clone(),
pool: pool.clone(),
});

let snapshot_recovery = match self.snapshot_recovery_config {
Some(recovery_config) => {
// Add a connection for checking whether the storage is initialized.
Expand Down
Loading
Loading