From 588c85c88b34f2000b0d685b55c51aa031e60e46 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sat, 7 Sep 2024 14:16:28 +0300 Subject: [PATCH] auto subscribe for orderbook topic when orderbook streaming is enabled this creates a side-effect which might not be favoured. --- .../src/lp_ordermatch/orderbook_events.rs | 48 +++++++++++++++++-- .../rpc/streaming_activations/orderbook.rs | 2 +- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs index 5773c6fc7e..69a880c1c9 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs @@ -1,4 +1,6 @@ -use super::OrderbookP2PItem; +use super::{subscribe_to_orderbook_topic, OrderbookP2PItem}; +use coins::{is_wallet_only_ticker, lp_coinfind}; +use mm2_core::mm_ctx::MmArc; use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput}; use async_trait::async_trait; @@ -7,12 +9,13 @@ use futures::StreamExt; use uuid::Uuid; pub struct OrderbookStreamer { + ctx: MmArc, base: String, rel: String, } impl OrderbookStreamer { - pub fn new(base: String, rel: String) -> Self { Self { base, rel } } + pub fn new(ctx: MmArc, base: String, rel: String) -> Self { Self { ctx, base, rel } } pub fn derive_streamer_id(base: &str, rel: &str) -> String { format!("ORDERBOOK_UPDATE/{base}:{rel}") } } @@ -38,9 +41,18 @@ impl EventStreamer for OrderbookStreamer { ready_tx: oneshot::Sender>, mut data_rx: impl StreamHandlerInput, ) { - ready_tx - .send(Ok(())) - .expect("Receiver is dropped, which should never happen."); + const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen."; + if let Err(err) = sanity_checks(&self.ctx, &self.base, &self.rel).await { + ready_tx.send(Err(err.clone())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", err); + } + // We need to subscribe to the orderbook, otherwise we won't get any updates from the P2P network. + if let Err(err) = subscribe_to_orderbook_topic(&self.ctx, &self.base, &self.rel, false).await { + let err = format!("Subscribing to orderbook topic failed: {err:?}"); + ready_tx.send(Err(err.clone())).expect(RECEIVER_DROPPED_MSG); + panic!("{}", err); + } + ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG); while let Some(orderbook_update) = data_rx.next().await { let event_data = serde_json::to_value(orderbook_update).expect("Serialization shouldn't fail."); @@ -49,3 +61,29 @@ impl EventStreamer for OrderbookStreamer { } } } + +async fn sanity_checks(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> { + lp_coinfind(ctx, base) + .await + .map_err(|e| format!("Coin {base} not found: {e}"))?; + if is_wallet_only_ticker(ctx, base) { + return Err(format!("Coin {base} is wallet-only.")); + } + lp_coinfind(ctx, rel) + .await + .map_err(|e| format!("Coin {base} not found: {e}"))?; + if is_wallet_only_ticker(ctx, rel) { + return Err(format!("Coin {rel} is wallet-only.")); + } + Ok(()) +} + +impl Drop for OrderbookStreamer { + fn drop(&mut self) { + // FIXME(discuss): Do we want to unsubscribe from the orderbook topic when streaming is dropped? + + // Note that the client enables orderbook streaming for all enabled coins when they query for best + // orders. These could potentially be a lot of pairs! + // Also, in the dev branch, we seem to never unsubscribe from an orderbook topic after doing an orderbook RPC! + } +} diff --git a/mm2src/mm2_main/src/rpc/streaming_activations/orderbook.rs b/mm2src/mm2_main/src/rpc/streaming_activations/orderbook.rs index 6f57c77de2..73174bfa84 100644 --- a/mm2src/mm2_main/src/rpc/streaming_activations/orderbook.rs +++ b/mm2src/mm2_main/src/rpc/streaming_activations/orderbook.rs @@ -28,7 +28,7 @@ pub async fn enable_orderbook( ctx: MmArc, req: EnableOrderbookStreamingRequest, ) -> MmResult { - let order_status_streamer = OrderbookStreamer::new(req.base, req.rel); + let order_status_streamer = OrderbookStreamer::new(ctx.clone(), req.base, req.rel); ctx.event_stream_manager .add(req.client_id, order_status_streamer, ctx.spawner()) .await