Skip to content

Commit

Permalink
auto subscribe for orderbook topic when orderbook streaming is enabled
Browse files Browse the repository at this point in the history
this creates a side-effect which might not be favoured.
  • Loading branch information
mariocynicys committed Sep 8, 2024
1 parent 4a82590 commit 588c85c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
48 changes: 43 additions & 5 deletions mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::OrderbookP2PItem;
use super::{subscribe_to_orderbook_topic, OrderbookP2PItem};
use coins::{is_wallet_only_ticker, lp_coinfind};
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};

use async_trait::async_trait;
Expand All @@ -7,12 +9,13 @@ use futures::StreamExt;
use uuid::Uuid;

pub struct OrderbookStreamer {
ctx: MmArc,
base: String,
rel: String,
}

impl OrderbookStreamer {
pub fn new(base: String, rel: String) -> Self { Self { base, rel } }
pub fn new(ctx: MmArc, base: String, rel: String) -> Self { Self { ctx, base, rel } }

pub fn derive_streamer_id(base: &str, rel: &str) -> String { format!("ORDERBOOK_UPDATE/{base}:{rel}") }
}
Expand All @@ -38,9 +41,18 @@ impl EventStreamer for OrderbookStreamer {
ready_tx: oneshot::Sender<Result<(), String>>,
mut data_rx: impl StreamHandlerInput<Self::DataInType>,
) {
ready_tx
.send(Ok(()))
.expect("Receiver is dropped, which should never happen.");
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";
if let Err(err) = sanity_checks(&self.ctx, &self.base, &self.rel).await {
ready_tx.send(Err(err.clone())).expect(RECEIVER_DROPPED_MSG);
panic!("{}", err);
}
// We need to subscribe to the orderbook, otherwise we won't get any updates from the P2P network.
if let Err(err) = subscribe_to_orderbook_topic(&self.ctx, &self.base, &self.rel, false).await {
let err = format!("Subscribing to orderbook topic failed: {err:?}");
ready_tx.send(Err(err.clone())).expect(RECEIVER_DROPPED_MSG);
panic!("{}", err);
}
ready_tx.send(Ok(())).expect(RECEIVER_DROPPED_MSG);

while let Some(orderbook_update) = data_rx.next().await {
let event_data = serde_json::to_value(orderbook_update).expect("Serialization shouldn't fail.");
Expand All @@ -49,3 +61,29 @@ impl EventStreamer for OrderbookStreamer {
}
}
}

async fn sanity_checks(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> {
lp_coinfind(ctx, base)
.await
.map_err(|e| format!("Coin {base} not found: {e}"))?;
if is_wallet_only_ticker(ctx, base) {
return Err(format!("Coin {base} is wallet-only."));
}
lp_coinfind(ctx, rel)
.await
.map_err(|e| format!("Coin {base} not found: {e}"))?;
if is_wallet_only_ticker(ctx, rel) {
return Err(format!("Coin {rel} is wallet-only."));
}
Ok(())
}

impl Drop for OrderbookStreamer {
fn drop(&mut self) {
// FIXME(discuss): Do we want to unsubscribe from the orderbook topic when streaming is dropped?

// Note that the client enables orderbook streaming for all enabled coins when they query for best
// orders. These could potentially be a lot of pairs!
// Also, in the dev branch, we seem to never unsubscribe from an orderbook topic after doing an orderbook RPC!
}
}
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/rpc/streaming_activations/orderbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn enable_orderbook(
ctx: MmArc,
req: EnableOrderbookStreamingRequest,
) -> MmResult<EnableStreamingResponse, OrderbookStreamingRequestError> {
let order_status_streamer = OrderbookStreamer::new(req.base, req.rel);
let order_status_streamer = OrderbookStreamer::new(ctx.clone(), req.base, req.rel);
ctx.event_stream_manager
.add(req.client_id, order_status_streamer, ctx.spawner())
.await
Expand Down

0 comments on commit 588c85c

Please sign in to comment.