diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index f2132f9298..f7fdd976d5 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -72,8 +72,8 @@ use std::time::Duration; use trie_db::NodeCodec as NodeCodecT; use uuid::Uuid; -use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequest, - P2PRequestError}; +use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, + unsubscribe_from_topic, P2PRequest, P2PRequestError}; use crate::mm2::lp_swap::maker_swap_v2::{self, MakerSwapStateMachine, MakerSwapStorage}; use crate::mm2::lp_swap::taker_swap_v2::{self, TakerSwapStateMachine, TakerSwapStorage}; use crate::mm2::lp_swap::{calc_max_maker_vol, check_balance_for_maker_swap, check_balance_for_taker_swap, @@ -132,7 +132,6 @@ const BALANCE_REQUEST_INTERVAL: f64 = 30.; const MAKER_ORDER_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 3; const TAKER_ORDER_TIMEOUT: u64 = 30; const ORDER_MATCH_TIMEOUT: u64 = 30; -const ORDERBOOK_REQUESTING_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 2; const MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE: usize = 1000; #[cfg(not(test))] const TRIE_STATE_HISTORY_TIMEOUT: u64 = 14400; @@ -413,6 +412,15 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap(); let mut orderbook = ordermatch_ctx.orderbook.lock(); + let topic = orderbook_topic_from_base_rel(base, rel); + if let Some(sub_state) = orderbook.topics_subscribed_to.get_mut(&topic) { + sub_state.mark_as_requested() + } else { + // We might have unsubscribed from the topic from elsewhere while get orders response was in flight. + // In this case, we should ignore the response. + return Ok(()); + } + let my_pubsecp = mm2_internal_pubkey_hex(ctx, String::from).map_err(MmError::into_inner)?; let alb_pair = alb_ordered_pair(base, rel); @@ -442,11 +450,6 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul let _new_root = process_pubkey_full_trie(&mut orderbook, orders, params); } - let topic = orderbook_topic_from_base_rel(base, rel); - orderbook - .topics_subscribed_to - .insert(topic, OrderbookRequestingState::Requested); - Ok(()) } @@ -2354,13 +2357,35 @@ struct OrderedByPriceOrder { uuid: Uuid, } +/// The state of some orderbook subscription. #[derive(Clone, Debug, PartialEq)] -enum OrderbookRequestingState { - /// The orderbook was requested from relays. - #[allow(dead_code)] - Requested, - /// We subscribed to a topic at `subscribed_at` time, but the orderbook was not requested. - NotRequested { subscribed_at: u64 }, +struct OrderbookSubscriptionState { + /// At what time did we subscribe to the orderbook topic? + subscribed_at: u64, + /// Was the orderbook requested/fetched? + is_requested: bool, + /// Is this subscription in active mode? + /// + /// Active mode means we want to fill the whole orderbook. + is_listening_actively: bool, + /// Is this subscription in passive mode? + /// + /// Passive mode means that we are only interested updates but don't care if + /// we have the full orderbook or not. + is_listening_passively: bool, +} + +impl OrderbookSubscriptionState { + fn new() -> Self { + OrderbookSubscriptionState { + subscribed_at: now_sec(), + is_requested: false, + is_listening_actively: false, + is_listening_passively: false, + } + } + + fn mark_as_requested(&mut self) { self.is_requested = true; } } type H64 = [u8; 8]; @@ -2502,7 +2527,7 @@ struct Orderbook { order_set: HashMap, /// a map of orderbook states of known maker pubkeys pubkeys_state: HashMap, - topics_subscribed_to: HashMap, + topics_subscribed_to: HashMap, /// MemoryDB instance to store Patricia Tries data memory_db: MemoryDB, my_p2p_pubkeys: HashSet, @@ -2532,6 +2557,10 @@ impl Orderbook { fn find_order_by_uuid(&self, uuid: &Uuid) -> Option { self.order_set.get(uuid).cloned() } + fn purge_pair(&mut self, _base: &str, _rel: &str) { + // FIXME: To be filled! (ordered & unordred maps must have hashable keys) + } + fn insert_or_update_order_update_trie(&mut self, order: OrderbookItem) { let zero = BigRational::from_integer(0.into()); if order.max_volume <= zero || order.price <= zero || order.min_volume < zero { @@ -2688,7 +2717,11 @@ impl Orderbook { Some(order) } - fn is_subscribed_to(&self, topic: &str) -> bool { self.topics_subscribed_to.contains_key(topic) } + fn is_actively_subscribed_to(&self, topic: &str) -> bool { + self.topics_subscribed_to + .get(topic) + .map_or(false, |sub_state| sub_state.is_listening_actively) + } fn process_keep_alive( &mut self, @@ -2696,14 +2729,18 @@ impl Orderbook { message: new_protocol::PubkeyKeepAlive, i_am_relay: bool, ) -> Option { + let subscribed_to: HashSet<_> = message + .trie_roots + .keys() + .filter(|alb_pair| self.is_actively_subscribed_to(&orderbook_topic_from_ordered_pair(alb_pair))) + .cloned() + .collect(); + let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); pubkey_state.last_keep_alive = now_sec(); let mut trie_roots_to_request = HashMap::new(); for (alb_pair, trie_root) in message.trie_roots { - let subscribed = self - .topics_subscribed_to - .contains_key(&orderbook_topic_from_ordered_pair(&alb_pair)); - if !subscribed && !i_am_relay { + if !i_am_relay && !subscribed_to.contains(&alb_pair) { continue; } @@ -3331,6 +3368,7 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { // notify other nodes only if maker order is still there keeping maker_orders locked during the operation if maker_orders.contains_key(&uuid) { let topic = order.orderbook_topic(); + // FIXME: Will we ever unsubscribe? subscribe_to_topic(&ctx, topic); maker_order_created_p2p_notify( ctx.clone(), @@ -4000,7 +4038,7 @@ pub async fn lp_auto_buy( let request_orderbook = false; try_s!( - subscribe_to_orderbook_topic( + subscribe_actively_to_orderbook_topic( ctx, order.base_orderbook_ticker(), order.rel_orderbook_ticker(), @@ -4743,7 +4781,7 @@ pub async fn create_maker_order(ctx: &MmArc, req: SetPriceReq) -> Result Result Result<(), String> { - let current_timestamp = now_sec(); let topic = orderbook_topic_from_base_rel(base, rel); - let is_orderbook_filled = { + let was_orderbook_requested = { let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); let mut orderbook = ordermatch_ctx.orderbook.lock(); - match orderbook.topics_subscribed_to.entry(topic.clone()) { - Entry::Vacant(e) => { - // we weren't subscribed to the topic yet - e.insert(OrderbookRequestingState::NotRequested { - subscribed_at: current_timestamp, - }); - subscribe_to_topic(ctx, topic.clone()); - // orderbook is not filled - false - }, - Entry::Occupied(e) => match e.get() { - OrderbookRequestingState::Requested => { - // We are subscribed to the topic and the orderbook was requested already - true - }, - OrderbookRequestingState::NotRequested { subscribed_at } => { - // We are subscribed to the topic. Also we didn't request the orderbook, - // True if enough time has passed for the orderbook to fill by OrdermatchRequest::SyncPubkeyOrderbookState. - *subscribed_at + ORDERBOOK_REQUESTING_TIMEOUT < current_timestamp - }, - }, - } + let entry = orderbook + .topics_subscribed_to + .entry(topic.clone()) + .or_insert(OrderbookSubscriptionState::new()); + subscribe_to_topic(ctx, topic); + entry.is_listening_actively = true; + entry.is_requested }; - if !is_orderbook_filled && request_orderbook { + if !was_orderbook_requested && request_orderbook { try_s!(request_and_fill_orderbook(ctx, base, rel).await); } Ok(()) } +fn subscribe_passively_to_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> { + let topic = orderbook_topic_from_base_rel(base, rel); + let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let entry = orderbook + .topics_subscribed_to + .entry(topic.clone()) + .or_insert(OrderbookSubscriptionState::new()); + subscribe_to_topic(ctx, topic); + entry.is_listening_passively = true; + + Ok(()) +} + +fn unsubscribe_passively_from_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> { + let topic = orderbook_topic_from_base_rel(base, rel); + let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let remove_pair_from_orderbook = match orderbook.topics_subscribed_to.get_mut(&topic) { + Some(sub_state) => { + sub_state.is_listening_passively = false; + !sub_state.is_listening_actively + }, + None => false, + }; + + if remove_pair_from_orderbook { + orderbook.purge_pair(base, rel); + unsubscribe_from_topic(ctx, topic) + } + + Ok(()) +} + #[derive(Clone, Debug, Serialize)] pub struct RpcOrderbookEntryV2 { coin: String, diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs index dcc991a361..e1719817af 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs @@ -54,7 +54,7 @@ pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result Result<(), String> impl Drop for OrderbookStreamer { fn drop(&mut self) { - // FIXME(discuss): Do we want to unsubscribe from the orderbook topic when streaming is dropped? - - // Note that the client enables orderbook streaming for all enabled coins when they query for best - // orders. These could potentially be a lot of pairs! - // Also, in the dev branch, we seem to never unsubscribe from an orderbook topic after doing an orderbook RPC! + // If no client is listening to the stream (streamer being dropped), + // we can unsubscribe from the orderbook topic. + unsubscribe_passively_from_orderbook_topic(&self.ctx, &self.base, &self.rel) + .map_err(|e| error!("Unsubscribing from orderbook topic failed: {e:?}.")) + .ok(); } } diff --git a/mm2src/mm2_main/src/lp_ordermatch/orderbook_rpc.rs b/mm2src/mm2_main/src/lp_ordermatch/orderbook_rpc.rs index d76af5dfba..8c8a745a30 100644 --- a/mm2src/mm2_main/src/lp_ordermatch/orderbook_rpc.rs +++ b/mm2src/mm2_main/src/lp_ordermatch/orderbook_rpc.rs @@ -11,7 +11,7 @@ use num_traits::Zero; use serde_json::{self as json, Value as Json}; use super::{addr_format_from_protocol_info, is_my_order, mm2_internal_pubkey_hex, orderbook_address, - subscribe_to_orderbook_topic, OrdermatchContext, RpcOrderbookEntryV2}; + subscribe_actively_to_orderbook_topic, OrdermatchContext, RpcOrderbookEntryV2}; #[derive(Debug, Serialize)] pub struct AggregatedOrderbookEntryV2 { @@ -86,7 +86,7 @@ pub async fn orderbook_rpc(ctx: MmArc, req: Json) -> Result>, S return ERR!("Base and rel coins have the same orderbook tickers and protocols."); } - try_s!(subscribe_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook).await); + try_s!(subscribe_actively_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook).await); let my_pubsecp = mm2_internal_pubkey_hex(&ctx, String::from).map_err(MmError::into_inner)?; @@ -251,7 +251,7 @@ pub async fn orderbook_rpc_v2( } let request_orderbook = true; - subscribe_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook) + subscribe_actively_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook) .await .map_to_mm(OrderbookRpcError::P2PSubscribeError)?; diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index b0657b0d00..4c56b0fc6d 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -2142,7 +2142,7 @@ fn test_subscribe_to_ordermatch_topic_subscribed_not_filled() { let expected = Some(OrderbookRequestingState::Requested); assert_eq!(actual, expected); - // orderbook.topics_subscribed_to.insert(orderbook_topic("RICK", "MORTY"), OrderbookSubscriptionState::NotRequested {subscribed_at: now_ms() - 41}); + // orderbook.topics_subscribed_to.insert(orderbook_topic("RICK", "MORTY"), OrderbookRequestingState::NotRequested {subscribed_at: now_ms() - 41}); } #[test] @@ -2413,10 +2413,11 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() { #[test] fn test_orderbook_pubkey_sync_request() { let mut orderbook = Orderbook::default(); - orderbook.topics_subscribed_to.insert( - orderbook_topic_from_base_rel("C1", "C2"), - OrderbookRequestingState::Requested, - ); + let mut sub_state = OrderbookSubscriptionState::new(); + sub_state.is_listening_actively = true; + orderbook + .topics_subscribed_to + .insert(orderbook_topic_from_base_rel("C1", "C2"), sub_state); let pubkey = "pubkey"; let mut trie_roots = HashMap::new(); @@ -2444,10 +2445,12 @@ fn test_orderbook_pubkey_sync_request() { #[test] fn test_orderbook_pubkey_sync_request_relay() { let mut orderbook = Orderbook::default(); - orderbook.topics_subscribed_to.insert( - orderbook_topic_from_base_rel("C1", "C2"), - OrderbookRequestingState::Requested, - ); + let mut sub_state = OrderbookSubscriptionState::new(); + // We aren't listening actively but as a relay we should still store the orderbook state. + sub_state.is_listening_actively = false; + orderbook + .topics_subscribed_to + .insert(orderbook_topic_from_base_rel("C1", "C2"), sub_state); let pubkey = "pubkey"; let mut trie_roots = HashMap::new();