diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index dc36644ebf..fb2b52674a 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -39,6 +39,7 @@ use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, TryFutureExt use hash256_std_hasher::Hash256StdHasher; use hash_db::Hasher; use http::Response; +use itertools::Itertools; use keys::{AddressFormat, KeyPair}; use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak}; use mm2_err_handle::prelude::*; @@ -2554,8 +2555,44 @@ impl Orderbook { fn find_order_by_uuid(&self, uuid: &Uuid) -> Option { self.order_set.get(uuid).cloned() } - fn purge_pair(&mut self, _base: &str, _rel: &str) { - // FIXME: To be filled! (ordered & unordred maps must have hashable keys) + fn purge_pair(&mut self, base: &str, rel: &str, delete_sub: bool) { + let base_rel = (base.to_string(), rel.to_string()); + let rel_base = (rel.to_string(), base.to_string()); + let topic = orderbook_topic_from_base_rel(base, rel); + let alb_pair = alb_ordered_pair(base, rel); + // Only delete the subscription if instructed to. + if delete_sub { + self.topics_subscribed_to.remove(&topic); + } + // Remove the pair from the ordered map. + self.ordered.remove(&base_rel); + self.ordered.remove(&rel_base); + // Remove the pair from the unordred map and get their orders' uuids. + let uuids = { + let base_rel_uuids = self.unordered.remove(&base_rel).unwrap_or_default(); + let rel_base_uuids = self.unordered.remove(&rel_base).unwrap_or_default(); + base_rel_uuids.into_iter().chain(rel_base_uuids.into_iter()) + }; + let pubkeys_for_pair = uuids + // Remove these uuids from the orders set, and collect the pubkeys to update the pubkeys state. + .filter_map(|uuid| self.order_set.remove(&uuid).map(|order| (order.pubkey, uuid))) + .into_group_map(); + for (pubkey, uuids) in pubkeys_for_pair { + let pubkey_state = match self.pubkeys_state.get_mut(&pubkey) { + Some(state) => state, + None => continue, + }; + + if pubkey_state.trie_roots.get(&alb_pair).is_none() { + continue; + } + for uuid in uuids { + pubkey_state.orders_uuids.remove(&(uuid, alb_pair.clone())); + } + + pubkey_state.order_pairs_trie_state_history.remove(alb_pair.clone()); + pubkey_state.trie_roots.remove(&alb_pair); + } } fn insert_or_update_order_update_trie(&mut self, order: OrderbookItem) { @@ -5712,6 +5749,29 @@ async fn subscribe_actively_to_orderbook_topic( Ok(()) } +#[allow(dead_code)] +fn unsubscribe_actively_from_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> { + let topic = orderbook_topic_from_base_rel(base, rel); + let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); + let mut orderbook = ordermatch_ctx.orderbook.lock(); + + let no_one_is_listening = match orderbook.topics_subscribed_to.get_mut(&topic) { + Some(sub_state) => { + sub_state.is_listening_actively = false; + !sub_state.is_listening_passively + }, + None => false, + }; + + // If we don't have a passive listener to this topic we want to delete the subscription. + orderbook.purge_pair(base, rel, no_one_is_listening); + if no_one_is_listening { + unsubscribe_from_topic(ctx, topic) + } + + Ok(()) +} + fn subscribe_passively_to_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> { let topic = orderbook_topic_from_base_rel(base, rel); let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx)); @@ -5741,7 +5801,7 @@ fn unsubscribe_passively_from_orderbook_topic(ctx: &MmArc, base: &str, rel: &str }; if remove_pair_from_orderbook { - orderbook.purge_pair(base, rel); + orderbook.purge_pair(base, rel, true); unsubscribe_from_topic(ctx, topic) }