diff --git a/Cargo.toml b/Cargo.toml index 93ed9f9..11eb277 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ clap = { version = "4.4.8", features = ["derive"] } serde_json = "1.0.113" log = "0.4.20" futures-util = "0.3.30" +RustQuant = "0.0.45" \ No newline at end of file diff --git a/src/behaviors/mod.rs b/src/behaviors/mod.rs index f2e83bb..068dd40 100644 --- a/src/behaviors/mod.rs +++ b/src/behaviors/mod.rs @@ -4,10 +4,12 @@ use serde::{Deserialize, Serialize}; pub mod deployer; pub mod pool_admin; +pub mod price_changer; pub mod token_admin; use deployer::Deployer; use pool_admin::PoolAdmin; +use price_changer::PriceChanger; use token_admin::TokenAdmin; #[derive(Behaviors, Debug, Serialize, Deserialize)] @@ -15,4 +17,5 @@ pub enum Behaviors { Deployer(Deployer), TokenAdmin(TokenAdmin), PoolAdmin(PoolAdmin), + PriceChanger(PriceChanger), } diff --git a/src/behaviors/price_changer.rs b/src/behaviors/price_changer.rs new file mode 100644 index 0000000..862b13b --- /dev/null +++ b/src/behaviors/price_changer.rs @@ -0,0 +1,117 @@ +use std::{fmt, sync::Arc}; + +use anyhow::Result; +use arbiter_core::middleware::ArbiterMiddleware; +use arbiter_engine::messager::{Message, Messager}; +use ethers::types::H160; +use RustQuant::{ + models::*, + stochastics::{process::Trajectories, *}, +}; + +use super::*; +use crate::bindings::liquid_exchange::LiquidExchange; + +#[derive(Serialize, Deserialize)] +pub struct PriceChanger { + #[serde(skip)] + pub params: OrnsteinUhlenbeckParams, + + #[serde(skip)] + #[serde(default = "trajectory_default")] + pub current_chunk: Trajectories, + + #[serde(skip)] + pub client: Option>, + + cursor: usize, + value: f64, +} + +fn trajectory_default() -> Trajectories { + Trajectories { + times: Vec::new(), + paths: Vec::new(), + } +} + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct OrnsteinUhlenbeckParams { + mu: f64, + sigma: f64, + theta: f64, +} + +impl fmt::Debug for PriceChanger { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PriceUpdate { + liquid_exchange: H160, +} + +impl PriceChanger { + /// Public constructor function to create a [`PriceChanger`] behaviour. + pub fn new(initial_value: f64, params: OrnsteinUhlenbeckParams) -> Self { + let ou = OrnsteinUhlenbeck::new(params.mu, params.sigma, params.theta); + + // Chunk our price trajectory over 100 price points. + let current_chunk = ou.euler_maruyama(initial_value, 0.0, 100.0, 100_usize, 1_usize, false); + + Self { + params, + current_chunk, + cursor: 0, + client: None, + value: initial_value, + } + } +} + +#[async_trait::async_trait] +impl Behavior for PriceChanger { + async fn startup( + &mut self, + _client: Arc, + _messager: Messager, + ) -> Result>> { + Ok(None) + } + + async fn process(&mut self, event: Message) -> Result { + let ou = OrnsteinUhlenbeck::new(self.params.mu, self.params.sigma, self.params.theta); + + let query: PriceUpdate = match serde_json::from_str(&event.data) { + Ok(query) => query, + Err(_) => { + eprintln!("Failed to deserialize the event data into a PriceUpdate"); + return Ok(ControlFlow::Continue); + } + }; + + if self.cursor >= 99 { + self.cursor = 0; + self.value = self.current_chunk.paths.clone()[0][self.cursor]; + self.current_chunk = + ou.euler_maruyama(self.value, 0.0, 100.0, 100_usize, 1_usize, false); + } + + let liquid_exchange = + LiquidExchange::new(query.liquid_exchange, self.client.clone().unwrap()); + + let price = self.current_chunk.paths.clone()[0][self.cursor]; + + liquid_exchange + .set_price(ethers::utils::parse_ether(price)?) + .send() + .await? + .await?; + + self.cursor += 1; + + Ok(ControlFlow::Continue) + } +}