Skip to content

Commit

Permalink
Graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
naps62 committed Dec 3, 2023
1 parent 572ba0a commit 8652600
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 49 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ alloy-primitives = { version = "0.5.0", features = ["serde"] }
# cuckoo
scalable_cuckoo_filter = "0.2.3"
rand = { version = "0.8.5", default-features = false, features = ["std_rng"] }
tokio-util = { version = "0.7.10", features = ["rt"] }

[dev-dependencies]
lazy_static = "1.4.0"
Expand Down
4 changes: 1 addition & 3 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use tokio::task::JoinHandle;
use tracing::instrument;
use tracing_actix_web::TracingLogger;

use crate::config::HttpConfig;
use crate::db::Db;
use crate::{config::HttpConfig, db::Db};

#[instrument(name = "api", skip(db, config), fields(port = config.port))]
pub fn start(db: Db, config: HttpConfig) -> JoinHandle<std::result::Result<(), std::io::Error>> {
Expand All @@ -26,7 +25,6 @@ pub fn start(db: Db, config: HttpConfig) -> JoinHandle<std::result::Result<(), s
.service(routes::register)
.app_data(web::Data::new(db.clone()))
})
.disable_signals()
.bind(("0.0.0.0", config.port))
.unwrap();

Expand Down
33 changes: 23 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ mod rearrange;
mod sync;

use color_eyre::eyre::Result;
use tokio::sync::mpsc;
use tokio::{signal, sync::mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::info;
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};

use config::Config;

use self::db::Db;
use self::sync::{BackfillManager, Forward};
use self::sync::{BackfillManager, Forward, SyncJob};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -22,16 +24,27 @@ async fn main() -> Result<()> {
let (account_tx, account_rx) = mpsc::unbounded_channel();
let (job_tx, job_rx) = mpsc::unbounded_channel();
let db = Db::connect(&config, account_tx, job_tx).await?;
let chain = db.setup_chain(&config.chain).await?;

let sync = Forward::start(db.clone(), &config, account_rx).await?;
let backfill = BackfillManager::start(db.clone(), &config, job_rx).await?;
let api = config.http.map(|c| api::start(db, c));
// setup each task
let token = CancellationToken::new();
let sync = Forward::new(db.clone(), &config, chain, account_rx, token.clone()).await?;
let backfill = BackfillManager::new(db.clone(), &config, job_rx, token.clone());
let api = config.http.map(|c| api::start(db.clone(), c));

sync.await??;
backfill.await??;
if let Some(api) = api {
api.await??
};
// spawn and track all tasks
let tracker = TaskTracker::new();
tracker.spawn(sync.run());
tracker.spawn(backfill.run());
api.map(|t| tracker.spawn(t));

// termination handling
signal::ctrl_c().await?;
token.cancel();
tracker.close();
tracker.wait().await;

info!("graceful shutdown achieved. Closing");

Ok(())
}
Expand Down
56 changes: 31 additions & 25 deletions src/sync/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use async_trait::async_trait;
use color_eyre::eyre::Result;
use tokio::select;
use tokio::{
sync::{broadcast, mpsc::UnboundedReceiver, RwLock, Semaphore},
task::JoinHandle,
sync::{mpsc::UnboundedReceiver, RwLock, Semaphore},
time::sleep,
};
use tracing::instrument;
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument};

use crate::{
config::Config,
Expand All @@ -27,29 +27,30 @@ pub struct BackfillManager {
concurrency: usize,
jobs_rcv: UnboundedReceiver<()>,
config: Arc<RwLock<Config>>,
cancellation_token: CancellationToken,
}

impl BackfillManager {
pub async fn start(
pub fn new(
db: Db,
config: &Config,
jobs_rcv: UnboundedReceiver<()>,
) -> Result<JoinHandle<Result<()>>> {
let sync = Self {
cancellation_token: CancellationToken,
) -> Self {
Self {
db,
jobs_rcv,
config: Arc::new(RwLock::new(config.clone())),
concurrency: config.sync.backfill_concurrency,
};

Ok(tokio::spawn(async move { sync.run().await }))
cancellation_token,
}
}

#[instrument(name = "backfill", skip(self))]
async fn run(mut self) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
loop {
let semaphore = Arc::new(Semaphore::new(self.concurrency));
let (shutdown, _) = broadcast::channel(1);
let inner_cancel = CancellationToken::new();

self.db.rorg_backfill_jobs().await?;

Expand All @@ -62,15 +63,13 @@ impl BackfillManager {
let db = self.db.clone();
let semaphore = semaphore.clone();
let config = self.config.clone();
let mut shutdown = shutdown.subscribe();
let token = inner_cancel.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
if shutdown.try_recv().is_ok() {
if token.is_cancelled() {
return Ok(());
}
let worker = Backfill::new_worker(db, config, job, shutdown)
.await
.unwrap();
let worker = Backfill::new_worker(db, config, job, token).await.unwrap();
worker.run().await
})
})
Expand All @@ -79,33 +78,40 @@ impl BackfillManager {
// wait for a new job, or a preset delay, whichever comes first
let timeout = sleep(Duration::from_secs(10 * 60));
select! {
_ = timeout => {dbg!("timeout");}
Some(_) = self.jobs_rcv.recv() => {dbg!("rcv");}
_ = self.cancellation_token.cancelled() => {}
_ = timeout => {}
Some(_) = self.jobs_rcv.recv() => {}
}

// shutdown, time to re-org and reprioritize
dbg!("sending");
shutdown.send(())?;
inner_cancel.cancel();
for worker in workers {
worker.await.unwrap().unwrap();
}
if self.cancellation_token.is_cancelled() {
break;
} else {
info!("rotating backfill workers");
}
}

info!("closing backfill manager");
Ok(())
}
}

pub struct Backfill {
job_id: i32,
high: u64,
low: u64,
shutdown: broadcast::Receiver<()>,
}
#[async_trait]
impl SyncJob for Worker<Backfill> {
#[instrument(skip(self), fields(chain_id = self.chain.chain_id))]
async fn run(mut self) -> Result<()> {
for block in (self.inner.low..self.inner.high).rev() {
// start by checking shutdown signal
if self.inner.shutdown.try_recv().is_ok() {
if self.cancellation_token.is_cancelled() {
break;
}

Expand All @@ -118,6 +124,7 @@ impl SyncJob for Worker<Backfill> {
// this needs to be part of BackfillJob, not just Inner
self.flush(self.inner.low).await?;

info!("closing backfill worker");
Ok(())
}
}
Expand Down Expand Up @@ -149,7 +156,7 @@ impl Backfill {
db: Db,
config: Arc<RwLock<Config>>,
job: BackfillJobWithId,
shutdown: broadcast::Receiver<()>,
cancellation_token: CancellationToken,
) -> Result<Worker<Self>> {
let config = config.read().await;
let chain = db.setup_chain(&config.chain).await?;
Expand All @@ -158,9 +165,8 @@ impl Backfill {
job_id: job.id,
high: job.high as u64,
low: job.low as u64,
shutdown,
};

Worker::new(s, db, &config, chain).await
Worker::new(s, db, &config, chain, cancellation_token).await
}
}
28 changes: 18 additions & 10 deletions src/sync/forward.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use alloy_primitives::Address;
use async_trait::async_trait;
use color_eyre::eyre::Result;
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle};
use tracing::instrument;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument};

use crate::db::models::Chain;
use crate::{config::Config, db::Db};

use super::provider::Provider;
Expand All @@ -28,6 +30,10 @@ impl SyncJob for Worker<Forward> {
self.inner.next_block = (self.chain.last_known_block as u64) + 1;

loop {
if self.cancellation_token.is_cancelled() {
break;
}

self.process_new_accounts().await?;

match self.provider.block_header(self.inner.next_block)? {
Expand All @@ -45,6 +51,9 @@ impl SyncJob for Worker<Forward> {
}
}
}

info!("closing");
Ok(())
}
}

Expand Down Expand Up @@ -95,24 +104,23 @@ impl Worker<Forward> {
}

impl Forward {
pub async fn start(
pub async fn new(
db: Db,
config: &Config,
chain: Chain,
accounts_rcv: UnboundedReceiver<Address>,
) -> Result<JoinHandle<Result<()>>> {
let chain = db.setup_chain(&config.chain).await?;

let sync = Worker::new(
cancellation_token: CancellationToken,
) -> Result<Worker<Self>> {
Worker::new(
Forward {
accounts_rcv,
next_block: (chain.last_known_block as u64) + 1,
},
db,
config,
chain,
cancellation_token,
)
.await?;

Ok(tokio::spawn(async move { sync.run().await }))
.await
}
}
13 changes: 12 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use rand::{rngs::StdRng, SeedableRng};
use reth_primitives::Header;
use scalable_cuckoo_filter::{DefaultHasher, ScalableCuckooFilter, ScalableCuckooFilterBuilder};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::trace;

use crate::{
Expand Down Expand Up @@ -51,6 +52,9 @@ pub struct Worker<T> {

/// Desired buffer capacity, and threshold at which to flush it
buffer_capacity: usize,

/// Cancellation token for graceful shutdown
cancellation_token: CancellationToken,
}

/// A match between an address and a transaction
Expand All @@ -67,7 +71,13 @@ pub trait SyncJob {
}

impl<T> Worker<T> {
async fn new(inner: T, db: Db, config: &Config, chain: Chain) -> Result<Self> {
async fn new(
inner: T,
db: Db,
config: &Config,
chain: Chain,
cancellation_token: CancellationToken,
) -> Result<Self> {
let provider = RethDBProvider::new(config, &chain)?;

let addresses: BTreeSet<_> = db.get_addresses().await?.into_iter().map(|a| a.0).collect();
Expand All @@ -89,6 +99,7 @@ impl<T> Worker<T> {
provider,
buffer: Vec::with_capacity(config.sync.buffer_size),
buffer_capacity: config.sync.buffer_size,
cancellation_token,
})
}

Expand Down
4 changes: 4 additions & 0 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub struct Task {
pub handle: JoinHandle<()>,
pub cancelation_token: CancellationToken,
}

0 comments on commit 8652600

Please sign in to comment.