Skip to content

Commit

Permalink
more renames
Browse files Browse the repository at this point in the history
add CoreContext to reduce params
  • Loading branch information
sapinb committed Sep 27, 2024
1 parent 55a4797 commit cdbc2d1
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 166 deletions.
5 changes: 3 additions & 2 deletions crates/consensus-logic/src/fork_choice_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
l2_block_manager: Arc<L2BlockManager>,
engine: Arc<E>,
mut fcm_rx: mpsc::Receiver<ForkChoiceMessage>,
csm_ctl: Arc<CsmController>,
csm_controller: Arc<CsmController>,
params: Arc<Params>,
) -> anyhow::Result<()> {
// Wait until the CSM gives us a state we can start from.
Expand Down Expand Up @@ -273,7 +273,8 @@ pub fn tracker_task<D: Database, E: ExecEngineCtl>(
};
info!(%finalized_blockid, "forkchoice manager started");

if let Err(e) = forkchoice_manager_task_inner(&shutdown, fcm, engine.as_ref(), fcm_rx, &csm_ctl)
if let Err(e) =
forkchoice_manager_task_inner(&shutdown, fcm, engine.as_ref(), fcm_rx, &csm_controller)
{
error!(err = ?e, "tracker aborted");
return Err(e);
Expand Down
47 changes: 27 additions & 20 deletions crates/consensus-logic/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
pub struct SyncManager {
params: Arc<Params>,
fc_manager_tx: mpsc::Sender<ForkChoiceMessage>,
csm_ctl: Arc<CsmController>,
csm_controller: Arc<CsmController>,
cupdate_rx: broadcast::Receiver<Arc<ClientUpdateNotif>>,
status_tx: Arc<StatusTx>,
status_rx: Arc<StatusRx>,
Expand All @@ -40,12 +40,12 @@ impl SyncManager {

/// Gets a ref to the CSM controller.
pub fn csm_controller(&self) -> &CsmController {
&self.csm_ctl
&self.csm_controller
}

/// Gets a clone of the CSM controller.
pub fn get_csm_ctl(&self) -> Arc<CsmController> {
self.csm_ctl.clone()
self.csm_controller.clone()
}

/// Returns a new broadcast `Receiver` handle to the consensus update
Expand Down Expand Up @@ -87,59 +87,66 @@ pub fn start_sync_tasks<
pool: threadpool::ThreadPool,
params: Arc<Params>,
status_bundle: (Arc<StatusTx>, Arc<StatusRx>),
checkpoint_db_manager: Arc<CheckpointDbManager>,
checkpoint_manager: Arc<CheckpointDbManager>,
) -> anyhow::Result<SyncManager> {
// Create channels.
let (fcm_tx, fcm_rx) = mpsc::channel::<ForkChoiceMessage>(64);
let (csm_tx, csm_rx) = mpsc::channel::<CsmMessage>(64);
let csm_ctl = Arc::new(CsmController::new(database.clone(), pool, csm_tx));
let csm_controller = Arc::new(CsmController::new(database.clone(), pool, csm_tx));

// TODO should this be in an `Arc`? it's already fairly compact so we might
// not be benefitting from the reduced cloning
let (cupdate_tx, cupdate_rx) = broadcast::channel::<Arc<ClientUpdateNotif>>(64);

// Start the fork choice manager thread. If we haven't done genesis yet
// this will just wait until the CSM says we have.
let fcm_db = database.clone();
let fcm_l2blkman = l2_block_manager.clone();
let fcm_eng = engine.clone();
let fcm_csm_ctl = csm_ctl.clone();
let fcm_database = database.clone();
let fcm_l2_block_manager = l2_block_manager.clone();
let fcm_engine = engine.clone();
let fcm_csm_controller = csm_controller.clone();
let fcm_params = params.clone();
executor.spawn_critical("fork_choice_manager::tracker_task", |shutdown| {
// TODO this should be simplified into a builder or something
fork_choice_manager::tracker_task(
shutdown,
fcm_db,
fcm_l2blkman,
fcm_eng,
fcm_database,
fcm_l2_block_manager,
fcm_engine,
fcm_rx,
fcm_csm_ctl,
fcm_csm_controller,
fcm_params,
)
});

// Prepare the client worker state and start the thread for that.
let cw_state = worker::WorkerState::open(
let client_worker_state = worker::WorkerState::open(
params.clone(),
database.clone(),
database,
l2_block_manager,
cupdate_tx,
checkpoint_db_manager,
checkpoint_manager,
)?;

let csm_eng = engine.clone();
let csm_engine = engine.clone();
let csm_fcm_tx = fcm_tx.clone();

let status_tx = status_bundle.0.clone();
executor.spawn_critical("client_worker_task", |shutdown| {
worker::client_worker_task(shutdown, cw_state, csm_eng, csm_rx, status_tx, csm_fcm_tx)
.map_err(Into::into)
worker::client_worker_task(
shutdown,
client_worker_state,
csm_engine,
csm_rx,
status_tx,
csm_fcm_tx,
)
.map_err(Into::into)
});

Ok(SyncManager {
params,
fc_manager_tx: fcm_tx,
csm_ctl,
csm_controller,
cupdate_rx,
status_tx: status_bundle.0,
status_rx: status_bundle.1,
Expand Down
4 changes: 2 additions & 2 deletions sequencer/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn init_engine_controller(
db: Arc<CommonDb>,
params: &Params,
l2_block_manager: Arc<L2BlockManager>,
rt: &Runtime,
runtime: &Runtime,
) -> anyhow::Result<Arc<RpcExecEngineCtl<EngineRpcClient>>> {
let reth_jwtsecret = load_jwtsecret(&config.exec.reth.secret)?;
let client = EngineRpcClient::from_url_secret(
Expand All @@ -265,7 +265,7 @@ pub fn init_engine_controller(
let eng_ctl = alpen_express_evmexec::engine::RpcExecEngineCtl::new(
client,
initial_fcs,
rt.handle().clone(),
runtime.handle().clone(),
l2_block_manager.clone(),
);
let eng_ctl = Arc::new(eng_ctl);
Expand Down
Loading

0 comments on commit cdbc2d1

Please sign in to comment.