Skip to content

Commit

Permalink
Fix deadlock in finalization migration (#4576)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Aug 8, 2023
1 parent 18e64e6 commit bba1526
Showing 1 changed file with 10 additions and 42 deletions.
52 changes: 10 additions & 42 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let tx_thread = if config.blocking {
None
} else {
Some(Mutex::new(Self::spawn_thread(
db.clone(),
prev_migration.clone(),
log.clone(),
)))
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone())))
};
Self {
db,
Expand Down Expand Up @@ -236,11 +232,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

// Restart the background thread if it has crashed.
if let Err(tx_err) = tx.send(notif) {
let (new_tx, new_thread) = Self::spawn_thread(
self.db.clone(),
self.prev_migration.clone(),
self.log.clone(),
);
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());

*tx = new_tx;
let old_thread = mem::replace(thread, new_thread);
Expand Down Expand Up @@ -292,6 +284,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
drop(prev_migration);

debug!(log, "Database consolidation started");
let timer = std::time::Instant::now();

let finalized_state_root = notif.finalized_state_root;
let finalized_block_root = notif.finalized_checkpoint.root;
Expand Down Expand Up @@ -358,7 +351,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
log,
"Database migration postponed, unaligned finalized block";
"Database migration postponed due to unaligned finalized block";
"slot" => slot.as_u64()
);
}
Expand All @@ -379,18 +372,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif.finalized_checkpoint.epoch,
log,
) {
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
warn!(log, "Database compaction failed"; "error" => ?e);
}

debug!(log, "Database consolidation complete");
debug!(
log,
"Database consolidation complete";
"running_time_ms" => timer.elapsed().as_millis()
);
}

/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending requests to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
prev_migration: Arc<Mutex<PrevMigration>>,
log: Logger,
) -> (
crossbeam_channel::Sender<Notification>,
Expand Down Expand Up @@ -458,35 +454,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

// Do the finalization migration.
if let Some(notif) = migrate_notif {
let timer = std::time::Instant::now();

let mut prev_migration = prev_migration.lock();

// Do not run too frequently.
let epoch = notif.finalized_checkpoint.epoch;
if epoch < prev_migration.epoch + prev_migration.epochs_per_migration {
debug!(
log,
"Finalization migration deferred";
"last_finalized_epoch" => prev_migration.epoch,
"new_finalized_epoch" => epoch,
"epochs_per_migration" => prev_migration.epochs_per_migration,
);
continue;
}

// We intend to run at this epoch, update the in-memory record of the last epoch
// at which we ran. This value isn't tracked on disk so we will always migrate
// on the first finalization after startup.
prev_migration.epoch = epoch;

Self::run_migration(db.clone(), notif.to_owned(), &log);

info!(
log,
"Finished finalization migration";
"running_time_ms" => timer.elapsed().as_millis()
);
}
}
});
Expand Down

0 comments on commit bba1526

Please sign in to comment.