Skip to content

Commit

Permalink
sigh
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Aug 23, 2024
1 parent e328bc1 commit 12301df
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 35 deletions.
44 changes: 22 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion op-bridge/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "example-exex-op-bridge"
name = "op-bridge"
version = "0.0.0"
publish = false
edition.workspace = true
Expand Down
48 changes: 36 additions & 12 deletions op-bridge/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{ops::Deref, sync::Arc};
use std::{ops::DerefMut, sync::Arc};

use alloy_sol_types::{sol, SolEventInterface};
use eyre::OptionExt;
use futures::{Future, StreamExt, TryStreamExt};
use futures::{Future, FutureExt, TryStreamExt};
use reth_execution_types::Chain;
use reth_exex::{BackfillJobFactory, ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
Expand Down Expand Up @@ -41,6 +41,8 @@ async fn init<Node: FullNodeComponents>(
.transpose()?
.unwrap_or(0);

let connection = Arc::new(Mutex::new(connection));

// If head block is ahead of the highest processed block in the database, backfill the missing
// range of blocks
if ctx.head.number > highest_processed_block {
Expand All @@ -50,16 +52,24 @@ async fn init<Node: FullNodeComponents>(
// Convert the backfill job into a parallel stream
.into_stream()
// Convert the block execution error into `eyre`
.map_err(Into::into)
.map_err(eyre::Error::from)
// Process each block, returning early if an error occurs
.try_for_each(|chain| {
let ctx = &ctx;
let connection = connection.clone();
process_committed_chain(&ctx, &connection, &chain)?;
futures::ready(Ok(()))
async move {
let connection = connection.lock().await;
process_committed_chain(ctx, connection, &chain)?;
Ok(())
}
})
.await?;
}

let connection = Arc::into_inner(connection)
.ok_or_eyre("Arced connection still has references")?
.into_inner();

Ok(op_bridge_exex(ctx, connection))
}

Expand Down Expand Up @@ -140,7 +150,7 @@ fn create_tables(connection: &Connection) -> rusqlite::Result<()> {
/// and stores deposits and withdrawals in a SQLite database.
async fn op_bridge_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
connection: Connection,
mut connection: Connection,
) -> eyre::Result<()> {
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.recv().await {
Expand Down Expand Up @@ -178,7 +188,7 @@ async fn op_bridge_exex<Node: FullNodeComponents>(

// Insert all new deposits and withdrawals
if let Some(committed_chain) = notification.committed_chain() {
process_committed_chain(&ctx, &connection, &committed_chain)?;
process_committed_chain(&ctx, &mut connection, &committed_chain)?;
}
}

Expand All @@ -187,7 +197,7 @@ async fn op_bridge_exex<Node: FullNodeComponents>(

fn process_committed_chain<Node: FullNodeComponents>(
ctx: &ExExContext<Node>,
connection: &Connection,
mut connection: impl DerefMut<Target = Connection>,
chain: &Chain,
) -> eyre::Result<()> {
let events = decode_chain_into_events(chain);
Expand Down Expand Up @@ -255,7 +265,7 @@ fn process_committed_chain<Node: FullNodeComponents>(
VALUES (?)
"#,
(chain.tip().number,),
)
)?;
}

connection_tx.commit()?;
Expand Down Expand Up @@ -306,9 +316,23 @@ fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("OPBridge", |ctx| async move {
let connection = Connection::open("op_bridge.db")?;
init(ctx, connection).await
.install_exex("OPBridge", move |ctx| {
// Rust seems to trigger a bogus higher-ranked lifetime error when using
// just an async closure here -- using `spawn_blocking` avoids this
// particular issue.
//
// To avoid the higher ranked lifetime error we use `spawn_blocking`
// which will move the closure to another blocking-allowed thread,
// then execute.
//
// Source: https://github.com/vados-cosmonic/wasmCloud/commit/440e8c377f6b02f45eacb02692e4d2fabd53a0ec
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
let connection = Connection::open("op_bridge.db")?;
init(ctx, connection).await
})
})
.map(|result| result.map_err(Into::into).and_then(|result| result))
})
.launch()
.await?;
Expand Down

0 comments on commit 12301df

Please sign in to comment.