Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wasm): add #4

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,483 changes: 1,221 additions & 262 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ members = [
"op-bridge",
"remote",
"rollup",
"wasm",
"wasm/wasm_exex"
]
resolver = "2"

Expand All @@ -24,6 +26,7 @@ reth-discv5 = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-errors = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-types = { git = "https://github.com/paradigmxyz/reth" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] }
reth-exex-types = { git = "https://github.com/paradigmxyz/reth" }
reth-network-peers = { git = "https://github.com/paradigmxyz/reth" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth" }
Expand Down
33 changes: 33 additions & 0 deletions wasm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "wasm"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true

[dependencies]
# reth
reth-execution-types.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-tracing.workspace = true
reth.workspace = true

# async
async-trait = "0.1"
futures.workspace = true
tokio.workspace = true

# wasm
wasi-common = "22"
wasmtime = "22"

# misc
eyre.workspace = true
jsonrpsee = { version = "0.23", features = ["server", "macros"] }
serde_json = "1"

[dev-dependencies]
reth-exex-test-utils.workspace = true

148 changes: 148 additions & 0 deletions wasm/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
mod rpc;
mod wasm;

use std::{collections::HashMap, path::PathBuf};

use jsonrpsee::core::RpcResult;
use reth::dirs::{LogsDir, PlatformPath};
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::{error, info};
use rpc::{rpc_internal_error_format, ExExRpcExt, ExExRpcExtApiServer, RpcMessage};
use tokio::sync::{mpsc, oneshot};
use wasi_common::WasiCtx;
use wasm::RunningExEx;
use wasmtime::{Engine, Linker, Module};

struct WasmExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
rpc_messages: mpsc::UnboundedReceiver<(RpcMessage, oneshot::Sender<RpcResult<()>>)>,
logs_directory: PathBuf,

engine: Engine,
linker: Linker<WasiCtx>,

installed_exexes: HashMap<String, Module>,
running_exexes: HashMap<String, RunningExEx>,
}

impl<Node: FullNodeComponents> WasmExEx<Node> {
fn new(
ctx: ExExContext<Node>,
rpc_messages: mpsc::UnboundedReceiver<(RpcMessage, oneshot::Sender<RpcResult<()>>)>,
logs_directory: PathBuf,
) -> eyre::Result<Self> {
let engine = Engine::default();
let mut linker = Linker::<WasiCtx>::new(&engine);
wasi_common::sync::add_to_linker(&mut linker, |s| s)
.map_err(|err| eyre::eyre!("failed to add WASI: {err}"))?;

Ok(Self {
ctx,
rpc_messages,
logs_directory,
engine,
linker,
installed_exexes: HashMap::new(),
running_exexes: HashMap::new(),
})
}

async fn start(mut self) -> eyre::Result<()> {
loop {
tokio::select! {
Some(notification) = self.ctx.notifications.recv() => {
self.handle_notification(notification).await?
}
Some((rpc_message, tx)) = self.rpc_messages.recv() => {
let _ = tx
.send(self.handle_rpc_message(rpc_message).await)
.inspect_err(|err| error!("failed to send response: {err:?}"));
},
}
}
}

async fn handle_notification(&mut self, notification: ExExNotification) -> eyre::Result<()> {
let committed_chain_tip = notification.committed_chain().map(|chain| chain.tip().number);

for exex in self.running_exexes.values_mut() {
if let Err(err) = exex.process_notification(&notification) {
error!(name = %exex.name, %err, "failed to process notification")
}
}

if let Some(tip) = committed_chain_tip {
self.ctx.events.send(ExExEvent::FinishedHeight(tip))?;
}

info!(?committed_chain_tip, "Handled notification");

Ok(())
}

async fn handle_rpc_message(&mut self, rpc_message: RpcMessage) -> RpcResult<()> {
match &rpc_message {
RpcMessage::Install(name, bytecode) => {
let module = Module::new(&self.engine, bytecode).map_err(|err| {
rpc_internal_error_format!("failed to create module for {name}: {err}")
})?;
self.installed_exexes.insert(name.clone(), module);
}
RpcMessage::Start(name) => {
let module = self
.installed_exexes
.get(name)
.ok_or_else(|| rpc_internal_error_format!("ExEx {name} not installed"))?;

let exex = RunningExEx::new(
name.clone(),
&self.engine,
module,
&self.linker,
&self.logs_directory,
)
.map_err(|err| {
rpc_internal_error_format!("failed to create exex for {name}: {err}")
})?;

self.running_exexes.insert(name.clone(), exex);
}
RpcMessage::Stop(name) => {
self.running_exexes.remove(name).ok_or_else(|| {
rpc_internal_error_format!("no running exex found for {name}")
})?;
}
}

info!(%rpc_message, "Handled RPC message");

Ok(())
}
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let (rpc_tx, rpc_rx) = mpsc::unbounded_channel();

let handle = builder
.node(EthereumNode::default())
.extend_rpc_modules(move |ctx| {
ctx.modules.merge_configured(ExExRpcExt { to_exex: rpc_tx }.into_rpc())?;
Ok(())
})
.install_exex("Minimal", |ctx| async move {
// TODO(alexey): obviously bad but we don't have access to log args in the context
let logs_directory = PlatformPath::<LogsDir>::default()
.with_chain(ctx.config.chain.chain, ctx.config.datadir.clone())
.as_ref()
.to_path_buf();
Ok(WasmExEx::new(ctx, rpc_rx, logs_directory)?.start())
})
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
83 changes: 83 additions & 0 deletions wasm/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::fmt::Display;

use async_trait::async_trait;
use jsonrpsee::{
core::RpcResult,
proc_macros::rpc,
types::{error::INTERNAL_ERROR_CODE, ErrorObject, ErrorObjectOwned},
};
use tokio::sync::{mpsc, oneshot};

#[rpc(server, namespace = "exex")]
trait ExExRpcExtApi {
#[method(name = "install")]
async fn install(&self, name: String, bytecode: Vec<u8>) -> RpcResult<()>;

#[method(name = "start")]
async fn start(&self, name: String) -> RpcResult<()>;

#[method(name = "stop")]
async fn stop(&self, name: String) -> RpcResult<()>;
}

pub struct ExExRpcExt {
pub to_exex: mpsc::UnboundedSender<(RpcMessage, oneshot::Sender<RpcResult<()>>)>,
}

#[derive(Debug)]
pub enum RpcMessage {
Install(String, Vec<u8>),
Start(String),
Stop(String),
}

impl Display for RpcMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RpcMessage::Install(name, bytecode) => {
write!(f, "install {name} with bytecode of length {}", bytecode.len())
}
RpcMessage::Start(name) => write!(f, "start {name}"),
RpcMessage::Stop(name) => write!(f, "stop {name}"),
}
}
}

#[async_trait]
impl ExExRpcExtApiServer for ExExRpcExt {
async fn install(&self, name: String, bytecode: Vec<u8>) -> RpcResult<()> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_exex
.send((RpcMessage::Install(name, bytecode), tx))
.map_err(|_| rpc_internal_error())?;
rx.await.map_err(|_| rpc_internal_error())?
}

async fn start(&self, name: String) -> RpcResult<()> {
let (tx, rx) = oneshot::channel();
let _ =
self.to_exex.send((RpcMessage::Start(name), tx)).map_err(|_| rpc_internal_error())?;
rx.await.map_err(|_| rpc_internal_error())?
}

async fn stop(&self, name: String) -> RpcResult<()> {
let (tx, rx) = oneshot::channel();
let _ =
self.to_exex.send((RpcMessage::Stop(name), tx)).map_err(|_| rpc_internal_error())?;
rx.await.map_err(|_| rpc_internal_error())?
}
}

#[inline]
fn rpc_internal_error() -> ErrorObjectOwned {
ErrorObject::owned(INTERNAL_ERROR_CODE, "internal error", Some(""))
}

macro_rules! rpc_internal_error_format {
($($arg:tt)*) => {
jsonrpsee::types::error::ErrorObject::owned(jsonrpsee::types::error::INTERNAL_ERROR_CODE, format!($($arg)*), Some(""))
};
}

pub(crate) use rpc_internal_error_format;
88 changes: 88 additions & 0 deletions wasm/src/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::{fs::File, path::Path};

use reth_tracing::tracing::debug;
use wasi_common::{pipe::WritePipe, sync::WasiCtxBuilder, WasiCtx};
use wasmtime::{Engine, Linker, Memory, Module, Store, TypedFunc};

type AllocParams = (i64,);
type AllocReturn = i64;
type NotificationParams = (i64, i64);
type NotificationReturn = i64;

pub struct RunningExEx {
pub name: String,
pub store: Store<WasiCtx>,
pub memory: Memory,
pub alloc_func: TypedFunc<AllocParams, AllocReturn>,
pub process_func: TypedFunc<NotificationParams, NotificationReturn>,
}

impl RunningExEx {
/// Creates a new instance of a running WASM-powered ExEx.
///
/// Initializes a WASM instance with WASI support, prepares the memory and the typed
/// functions.
pub fn new(
name: String,
engine: &Engine,
module: &Module,
linker: &Linker<WasiCtx>,
logs_directory: impl AsRef<Path>,
) -> eyre::Result<Self> {
// TODO(alexey): ideally setup tracer with a span
let file = File::create(logs_directory.as_ref().join(format!("{name}.log")))?;
let wasi = WasiCtxBuilder::new().stdout(Box::new(WritePipe::new(file))).build();
let mut store = Store::new(engine, wasi);

let instance = linker
.instantiate(&mut store, module)
.map_err(|err| eyre::eyre!("failed to instantiate: {err}"))?;

let memory = instance
.get_memory(&mut store, "memory")
.ok_or_else(|| eyre::eyre!("failed to get memory"))?;
let alloc_func = instance
.get_typed_func::<AllocParams, AllocReturn>(&mut store, "alloc")
.map_err(|err| eyre::eyre!("failed to get alloc func: {err}"))?;
let process_func = instance
.get_typed_func::<NotificationParams, NotificationReturn>(&mut store, "process")
.map_err(|err| eyre::eyre!("failed to get process func: {err}"))?;

Ok(Self { name, store, memory, alloc_func, process_func })
}

/// Processes an [`ExExNotification`] using the WASM instance.
// TODO(alexey): we can probably use shared memory here to avoid copying the data into every
// WASM instance memory. I tried it for a while and it didn't work straight away. Maybe we can
// share a portion of linear memory, but the rest is up to the WASM instance to manage?
pub fn process_notification(
&mut self,
notification: &reth_exex::ExExNotification,
) -> eyre::Result<()> {
// TODO(alexey): serialize to bincode or just cast to bytes directly. Can't do it now
// because `ExExNotification` can't be used inside WASM.
let serialized_notification =
// Can't even do JSON encode of a full struct smh, "key must be a string"
serde_json::to_vec(&notification.committed_chain().map(|chain| chain.tip().header.clone()))?;

// Allocate memory for the notification.
let data_size = serialized_notification.len() as i64;
let data_ptr = self
.alloc_func
.call(&mut self.store, (data_size,))
.map_err(|err| eyre::eyre!("failed to call alloc func: {err}"))?;

// Write the notification to the allocated memory.
self.memory.write(&mut self.store, data_ptr as usize, &serialized_notification)?;

// Call the notification function that will read the allocated memoyry.
let output = self
.process_func
.call(&mut self.store, (data_ptr, data_size))
.map_err(|err| eyre::eyre!("failed to call notification func: {err}"))?;

debug!(target: "wasm", name = %self.name, ?data_ptr, ?data_size, ?output, "Processed notification");

Ok(())
}
}
13 changes: 13 additions & 0 deletions wasm/wasm_exex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "wasm-exex"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true

[dependencies]
reth-tracing.workspace = true

once_cell = "1"
wee_alloc = "0.4"

Loading
Loading