diff --git a/crates/artemis-core/src/engine.rs b/crates/artemis-core/src/engine.rs index 32bcba8..697e62d 100644 --- a/crates/artemis-core/src/engine.rs +++ b/crates/artemis-core/src/engine.rs @@ -109,7 +109,7 @@ where loop { match event_receiver.recv().await { Ok(event) => { - if let Some(action) = strategy.process_event(event).await { + for action in strategy.process_event(event).await { match action_sender.send(action) { Ok(_) => {} Err(e) => error!("error sending action: {}", e), diff --git a/crates/artemis-core/src/executors/mev_share_executor.rs b/crates/artemis-core/src/executors/mev_share_executor.rs index 9dc9c5a..c7a5448 100644 --- a/crates/artemis-core/src/executors/mev_share_executor.rs +++ b/crates/artemis-core/src/executors/mev_share_executor.rs @@ -2,7 +2,6 @@ use crate::types::Executor; use anyhow::Result; use async_trait::async_trait; use ethers::signers::Signer; -use futures::{stream, StreamExt}; use jsonrpsee::http_client::{ transport::{self}, HttpClientBuilder, @@ -34,24 +33,14 @@ impl MevshareExecutor { } #[async_trait] -impl Executor> for MevshareExecutor { +impl Executor for MevshareExecutor { /// Send bundles to the matchmaker. - async fn execute(&self, action: Vec) -> Result<()> { - let bodies = stream::iter(action) - .map(|bundle| { - let client = &self.mev_share_client; - async move { client.send_bundle(bundle).await } - }) - .buffer_unordered(5); - - bodies - .for_each(|b| async { - match b { - Ok(b) => info!("Bundle response: {:?}", b), - Err(e) => error!("Bundle error: {}", e), - } - }) - .await; + async fn execute(&self, action: SendBundleRequest) -> Result<()> { + let body = self.mev_share_client.send_bundle(action).await; + match body { + Ok(body) => info!("Bundle response: {:?}", body), + Err(e) => error!("Bundle error: {}", e), + }; Ok(()) } } diff --git a/crates/artemis-core/src/types.rs b/crates/artemis-core/src/types.rs index f005f34..a26118e 100644 --- a/crates/artemis-core/src/types.rs +++ b/crates/artemis-core/src/types.rs @@ -28,7 +28,7 @@ pub trait Strategy: Send + Sync { async fn sync_state(&mut self) -> Result<()>; /// Process an event, and return an action if needed. - async fn process_event(&mut self, event: E) -> Option; + async fn process_event(&mut self, event: E) -> Vec; } /// Executor trait, responsible for executing actions returned by strategies. diff --git a/crates/strategies/mev-share-uni-arb/src/strategy.rs b/crates/strategies/mev-share-uni-arb/src/strategy.rs index d535f03..51d3108 100644 --- a/crates/strategies/mev-share-uni-arb/src/strategy.rs +++ b/crates/strategies/mev-share-uni-arb/src/strategy.rs @@ -83,26 +83,29 @@ impl Strategy } // Process incoming events, seeing if we can arb new orders. - async fn process_event(&mut self, event: Event) -> Option { + async fn process_event(&mut self, event: Event) -> Vec { match event { Event::MEVShareEvent(event) => { info!("Received mev share event: {:?}", event); // skip if event has no logs if event.logs.is_empty() { - return None; + return vec![]; } let address = event.logs[0].address; // skip if address is not a v3 pool if !self.pool_map.contains_key(&address) { - return None; + return vec![]; } // if it's a v3 pool we care about, submit bundles info!( "Found a v3 pool match at address {:?}, submitting bundles", address ); - let bundles = self.generate_bundles(address, event.hash).await; - return Some(Action::SubmitBundles(bundles)); + self.generate_bundles(address, event.hash) + .await + .into_iter() + .map(Action::SubmitBundle) + .collect() } } } diff --git a/crates/strategies/mev-share-uni-arb/src/types.rs b/crates/strategies/mev-share-uni-arb/src/types.rs index f33c810..714fa9a 100644 --- a/crates/strategies/mev-share-uni-arb/src/types.rs +++ b/crates/strategies/mev-share-uni-arb/src/types.rs @@ -11,7 +11,7 @@ pub enum Event { /// Core Action enum for the current strategy. #[derive(Debug, Clone)] pub enum Action { - SubmitBundles(Vec), + SubmitBundle(SendBundleRequest), } #[derive(Debug, serde::Deserialize)] diff --git a/crates/strategies/opensea-sudo-arb/src/strategy.rs b/crates/strategies/opensea-sudo-arb/src/strategy.rs index 87d5ff5..c8939fb 100644 --- a/crates/strategies/opensea-sudo-arb/src/strategy.rs +++ b/crates/strategies/opensea-sudo-arb/src/strategy.rs @@ -105,11 +105,14 @@ impl Strategy for OpenseaSudoArb { } // Process incoming events, seeing if we can arb new orders, and updating the internal state on new blocks. - async fn process_event(&mut self, event: Event) -> Option { + async fn process_event(&mut self, event: Event) -> Vec { match event { - Event::OpenseaOrder(order) => self.process_order_event(*order).await, + Event::OpenseaOrder(order) => self + .process_order_event(*order) + .await + .map_or(vec![], |a| vec![a]), Event::NewBlock(block) => match self.process_new_block_event(block).await { - Ok(_) => None, + Ok(_) => vec![], Err(e) => { panic!("Strategy is out of sync {}", e); } @@ -217,7 +220,7 @@ impl OpenseaSudoArb { let quotes = self.quoter.get_multiple_sell_quotes(pools.clone()).await?; let res = pools .into_iter() - .zip(quotes.into_iter()) + .zip(quotes) .collect::>(); Ok(res) } diff --git a/examples/mev-share-arb/src/main.rs b/examples/mev-share-arb/src/main.rs index f2c29f9..7359064 100644 --- a/examples/mev-share-arb/src/main.rs +++ b/examples/mev-share-arb/src/main.rs @@ -82,7 +82,7 @@ async fn main() -> Result<()> { // Set up executor. let mev_share_executor = Box::new(MevshareExecutor::new(fb_signer)); let mev_share_executor = ExecutorMap::new(mev_share_executor, |action| match action { - Action::SubmitBundles(bundles) => Some(bundles), + Action::SubmitBundle(bundle) => Some(bundle), }); engine.add_executor(Box::new(mev_share_executor));