Skip to content

Commit

Permalink
add the ability to passively listen to orderbook updates
Browse files Browse the repository at this point in the history
without having the full orderbook
  • Loading branch information
mariocynicys committed Sep 8, 2024
1 parent 8d24afe commit e5d247c
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 68 deletions.
154 changes: 106 additions & 48 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ use std::time::Duration;
use trie_db::NodeCodec as NodeCodecT;
use uuid::Uuid;

use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequest,
P2PRequestError};
use crate::mm2::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic,
unsubscribe_from_topic, P2PRequest, P2PRequestError};
use crate::mm2::lp_swap::maker_swap_v2::{self, MakerSwapStateMachine, MakerSwapStorage};
use crate::mm2::lp_swap::taker_swap_v2::{self, TakerSwapStateMachine, TakerSwapStorage};
use crate::mm2::lp_swap::{calc_max_maker_vol, check_balance_for_maker_swap, check_balance_for_taker_swap,
Expand Down Expand Up @@ -132,7 +132,6 @@ const BALANCE_REQUEST_INTERVAL: f64 = 30.;
const MAKER_ORDER_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 3;
const TAKER_ORDER_TIMEOUT: u64 = 30;
const ORDER_MATCH_TIMEOUT: u64 = 30;
const ORDERBOOK_REQUESTING_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 2;
const MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE: usize = 1000;
#[cfg(not(test))]
const TRIE_STATE_HISTORY_TIMEOUT: u64 = 14400;
Expand Down Expand Up @@ -413,6 +412,15 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul
let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap();
let mut orderbook = ordermatch_ctx.orderbook.lock();

let topic = orderbook_topic_from_base_rel(base, rel);
if let Some(sub_state) = orderbook.topics_subscribed_to.get_mut(&topic) {
sub_state.mark_as_requested()
} else {
// We might have unsubscribed from the topic from elsewhere while get orders response was in flight.
// In this case, we should ignore the response.
return Ok(());
}

let my_pubsecp = mm2_internal_pubkey_hex(ctx, String::from).map_err(MmError::into_inner)?;

let alb_pair = alb_ordered_pair(base, rel);
Expand Down Expand Up @@ -442,11 +450,6 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul
let _new_root = process_pubkey_full_trie(&mut orderbook, orders, params);
}

let topic = orderbook_topic_from_base_rel(base, rel);
orderbook
.topics_subscribed_to
.insert(topic, OrderbookRequestingState::Requested);

Ok(())
}

Expand Down Expand Up @@ -2354,13 +2357,35 @@ struct OrderedByPriceOrder {
uuid: Uuid,
}

/// The state of some orderbook subscription.
#[derive(Clone, Debug, PartialEq)]
enum OrderbookRequestingState {
/// The orderbook was requested from relays.
#[allow(dead_code)]
Requested,
/// We subscribed to a topic at `subscribed_at` time, but the orderbook was not requested.
NotRequested { subscribed_at: u64 },
struct OrderbookSubscriptionState {
/// At what time did we subscribe to the orderbook topic?
subscribed_at: u64,
/// Was the orderbook requested/fetched?
is_requested: bool,
/// Is this subscription in active mode?
///
/// Active mode means we want to fill the whole orderbook.
is_listening_actively: bool,
/// Is this subscription in passive mode?
///
/// Passive mode means that we are only interested updates but don't care if
/// we have the full orderbook or not.
is_listening_passively: bool,
}

impl OrderbookSubscriptionState {
fn new() -> Self {
OrderbookSubscriptionState {
subscribed_at: now_sec(),
is_requested: false,
is_listening_actively: false,
is_listening_passively: false,
}
}

fn mark_as_requested(&mut self) { self.is_requested = true; }
}

type H64 = [u8; 8];
Expand Down Expand Up @@ -2502,7 +2527,7 @@ struct Orderbook {
order_set: HashMap<Uuid, OrderbookItem>,
/// a map of orderbook states of known maker pubkeys
pubkeys_state: HashMap<String, OrderbookPubkeyState>,
topics_subscribed_to: HashMap<String, OrderbookRequestingState>,
topics_subscribed_to: HashMap<String, OrderbookSubscriptionState>,
/// MemoryDB instance to store Patricia Tries data
memory_db: MemoryDB<Blake2Hasher64>,
my_p2p_pubkeys: HashSet<String>,
Expand Down Expand Up @@ -2532,6 +2557,10 @@ impl Orderbook {

fn find_order_by_uuid(&self, uuid: &Uuid) -> Option<OrderbookItem> { self.order_set.get(uuid).cloned() }

fn purge_pair(&mut self, _base: &str, _rel: &str) {
// FIXME: To be filled! (ordered & unordred maps must have hashable keys)
}

fn insert_or_update_order_update_trie(&mut self, order: OrderbookItem) {
let zero = BigRational::from_integer(0.into());
if order.max_volume <= zero || order.price <= zero || order.min_volume < zero {
Expand Down Expand Up @@ -2688,22 +2717,30 @@ impl Orderbook {
Some(order)
}

fn is_subscribed_to(&self, topic: &str) -> bool { self.topics_subscribed_to.contains_key(topic) }
fn is_actively_subscribed_to(&self, topic: &str) -> bool {
self.topics_subscribed_to
.get(topic)
.map_or(false, |sub_state| sub_state.is_listening_actively)
}

fn process_keep_alive(
&mut self,
from_pubkey: &str,
message: new_protocol::PubkeyKeepAlive,
i_am_relay: bool,
) -> Option<OrdermatchRequest> {
let subscribed_to: HashSet<_> = message
.trie_roots
.keys()
.filter(|alb_pair| self.is_actively_subscribed_to(&orderbook_topic_from_ordered_pair(alb_pair)))
.cloned()
.collect();

let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey);
pubkey_state.last_keep_alive = now_sec();
let mut trie_roots_to_request = HashMap::new();
for (alb_pair, trie_root) in message.trie_roots {
let subscribed = self
.topics_subscribed_to
.contains_key(&orderbook_topic_from_ordered_pair(&alb_pair));
if !subscribed && !i_am_relay {
if !i_am_relay && !subscribed_to.contains(&alb_pair) {
continue;
}

Expand Down Expand Up @@ -3331,6 +3368,7 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) {
// notify other nodes only if maker order is still there keeping maker_orders locked during the operation
if maker_orders.contains_key(&uuid) {
let topic = order.orderbook_topic();
// FIXME: Will we ever unsubscribe?
subscribe_to_topic(&ctx, topic);
maker_order_created_p2p_notify(
ctx.clone(),
Expand Down Expand Up @@ -4000,7 +4038,7 @@ pub async fn lp_auto_buy(

let request_orderbook = false;
try_s!(
subscribe_to_orderbook_topic(
subscribe_actively_to_orderbook_topic(
ctx,
order.base_orderbook_ticker(),
order.rel_orderbook_ticker(),
Expand Down Expand Up @@ -4743,7 +4781,7 @@ pub async fn create_maker_order(ctx: &MmArc, req: SetPriceReq) -> Result<MakerOr

let request_orderbook = false;
try_s!(
subscribe_to_orderbook_topic(
subscribe_actively_to_orderbook_topic(
ctx,
new_order.base_orderbook_ticker(),
new_order.rel_orderbook_ticker(),
Expand Down Expand Up @@ -5650,49 +5688,69 @@ pub async fn cancel_all_orders_rpc(ctx: MmArc, req: Json) -> Result<Response<Vec
/// # Safety
///
/// The function locks [`MmCtx::p2p_ctx`] and [`MmCtx::ordermatch_ctx`]
async fn subscribe_to_orderbook_topic(
async fn subscribe_actively_to_orderbook_topic(
ctx: &MmArc,
base: &str,
rel: &str,
request_orderbook: bool,
) -> Result<(), String> {
let current_timestamp = now_sec();
let topic = orderbook_topic_from_base_rel(base, rel);
let is_orderbook_filled = {
let was_orderbook_requested = {
let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx));
let mut orderbook = ordermatch_ctx.orderbook.lock();

match orderbook.topics_subscribed_to.entry(topic.clone()) {
Entry::Vacant(e) => {
// we weren't subscribed to the topic yet
e.insert(OrderbookRequestingState::NotRequested {
subscribed_at: current_timestamp,
});
subscribe_to_topic(ctx, topic.clone());
// orderbook is not filled
false
},
Entry::Occupied(e) => match e.get() {
OrderbookRequestingState::Requested => {
// We are subscribed to the topic and the orderbook was requested already
true
},
OrderbookRequestingState::NotRequested { subscribed_at } => {
// We are subscribed to the topic. Also we didn't request the orderbook,
// True if enough time has passed for the orderbook to fill by OrdermatchRequest::SyncPubkeyOrderbookState.
*subscribed_at + ORDERBOOK_REQUESTING_TIMEOUT < current_timestamp
},
},
}
let entry = orderbook
.topics_subscribed_to
.entry(topic.clone())
.or_insert(OrderbookSubscriptionState::new());
subscribe_to_topic(ctx, topic);
entry.is_listening_actively = true;
entry.is_requested
};

if !is_orderbook_filled && request_orderbook {
if !was_orderbook_requested && request_orderbook {
try_s!(request_and_fill_orderbook(ctx, base, rel).await);
}

Ok(())
}

fn subscribe_passively_to_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> {
let topic = orderbook_topic_from_base_rel(base, rel);
let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx));
let mut orderbook = ordermatch_ctx.orderbook.lock();

let entry = orderbook
.topics_subscribed_to
.entry(topic.clone())
.or_insert(OrderbookSubscriptionState::new());
subscribe_to_topic(ctx, topic);
entry.is_listening_passively = true;

Ok(())
}

fn unsubscribe_passively_from_orderbook_topic(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String> {
let topic = orderbook_topic_from_base_rel(base, rel);
let ordermatch_ctx = try_s!(OrdermatchContext::from_ctx(ctx));
let mut orderbook = ordermatch_ctx.orderbook.lock();

let remove_pair_from_orderbook = match orderbook.topics_subscribed_to.get_mut(&topic) {
Some(sub_state) => {
sub_state.is_listening_passively = false;
!sub_state.is_listening_actively
},
None => false,
};

if remove_pair_from_orderbook {
orderbook.purge_pair(base, rel);
unsubscribe_from_topic(ctx, topic)
}

Ok(())
}

#[derive(Clone, Debug, Serialize)]
pub struct RpcOrderbookEntryV2 {
coin: String,
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result<Response<Vec<u
.filter_map(|original_pair| {
let orderbook_pair = ordermatch_ctx.orderbook_pair_bypass(&original_pair);
let topic = orderbook_topic_from_base_rel(&orderbook_pair.0, &orderbook_pair.1);
if orderbook.is_subscribed_to(&topic) {
if orderbook.is_actively_subscribed_to(&topic) {
let asks = orderbook
.unordered
.get(&orderbook_pair)
Expand Down
16 changes: 9 additions & 7 deletions mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{orderbook_topic_from_base_rel, subscribe_to_orderbook_topic, OrderbookP2PItem};
use super::{orderbook_topic_from_base_rel, subscribe_passively_to_orderbook_topic,
unsubscribe_passively_from_orderbook_topic, OrderbookP2PItem};
use coins::{is_wallet_only_ticker, lp_coinfind};
use common::log::error;
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};

Expand Down Expand Up @@ -49,7 +51,7 @@ impl EventStreamer for OrderbookStreamer {
panic!("{}", err);
}
// We need to subscribe to the orderbook, otherwise we won't get any updates from the P2P network.
if let Err(err) = subscribe_to_orderbook_topic(&self.ctx, &self.base, &self.rel, false).await {
if let Err(err) = subscribe_passively_to_orderbook_topic(&self.ctx, &self.base, &self.rel) {
let err = format!("Subscribing to orderbook topic failed: {err:?}");
ready_tx.send(Err(err.clone())).expect(RECEIVER_DROPPED_MSG);
panic!("{}", err);
Expand Down Expand Up @@ -82,10 +84,10 @@ async fn sanity_checks(ctx: &MmArc, base: &str, rel: &str) -> Result<(), String>

impl Drop for OrderbookStreamer {
fn drop(&mut self) {
// FIXME(discuss): Do we want to unsubscribe from the orderbook topic when streaming is dropped?

// Note that the client enables orderbook streaming for all enabled coins when they query for best
// orders. These could potentially be a lot of pairs!
// Also, in the dev branch, we seem to never unsubscribe from an orderbook topic after doing an orderbook RPC!
// If no client is listening to the stream (streamer being dropped),
// we can unsubscribe from the orderbook topic.
unsubscribe_passively_from_orderbook_topic(&self.ctx, &self.base, &self.rel)
.map_err(|e| error!("Unsubscribing from orderbook topic failed: {e:?}."))
.ok();
}
}
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_ordermatch/orderbook_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use num_traits::Zero;
use serde_json::{self as json, Value as Json};

use super::{addr_format_from_protocol_info, is_my_order, mm2_internal_pubkey_hex, orderbook_address,
subscribe_to_orderbook_topic, OrdermatchContext, RpcOrderbookEntryV2};
subscribe_actively_to_orderbook_topic, OrdermatchContext, RpcOrderbookEntryV2};

#[derive(Debug, Serialize)]
pub struct AggregatedOrderbookEntryV2 {
Expand Down Expand Up @@ -86,7 +86,7 @@ pub async fn orderbook_rpc(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>>, S
return ERR!("Base and rel coins have the same orderbook tickers and protocols.");
}

try_s!(subscribe_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook).await);
try_s!(subscribe_actively_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook).await);

let my_pubsecp = mm2_internal_pubkey_hex(&ctx, String::from).map_err(MmError::into_inner)?;

Expand Down Expand Up @@ -251,7 +251,7 @@ pub async fn orderbook_rpc_v2(
}

let request_orderbook = true;
subscribe_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook)
subscribe_actively_to_orderbook_topic(&ctx, &base_ticker, &rel_ticker, request_orderbook)
.await
.map_to_mm(OrderbookRpcError::P2PSubscribeError)?;

Expand Down
21 changes: 12 additions & 9 deletions mm2src/mm2_main/src/ordermatch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,7 @@ fn test_subscribe_to_ordermatch_topic_subscribed_not_filled() {
let expected = Some(OrderbookRequestingState::Requested);
assert_eq!(actual, expected);
// orderbook.topics_subscribed_to.insert(orderbook_topic("RICK", "MORTY"), OrderbookSubscriptionState::NotRequested {subscribed_at: now_ms() - 41});
// orderbook.topics_subscribed_to.insert(orderbook_topic("RICK", "MORTY"), OrderbookRequestingState::NotRequested {subscribed_at: now_ms() - 41});
}
#[test]
Expand Down Expand Up @@ -2413,10 +2413,11 @@ fn test_diff_should_not_be_written_if_hash_not_changed_on_remove() {
#[test]
fn test_orderbook_pubkey_sync_request() {
let mut orderbook = Orderbook::default();
orderbook.topics_subscribed_to.insert(
orderbook_topic_from_base_rel("C1", "C2"),
OrderbookRequestingState::Requested,
);
let mut sub_state = OrderbookSubscriptionState::new();
sub_state.is_listening_actively = true;
orderbook
.topics_subscribed_to
.insert(orderbook_topic_from_base_rel("C1", "C2"), sub_state);
let pubkey = "pubkey";

let mut trie_roots = HashMap::new();
Expand Down Expand Up @@ -2444,10 +2445,12 @@ fn test_orderbook_pubkey_sync_request() {
#[test]
fn test_orderbook_pubkey_sync_request_relay() {
let mut orderbook = Orderbook::default();
orderbook.topics_subscribed_to.insert(
orderbook_topic_from_base_rel("C1", "C2"),
OrderbookRequestingState::Requested,
);
let mut sub_state = OrderbookSubscriptionState::new();
// We aren't listening actively but as a relay we should still store the orderbook state.
sub_state.is_listening_actively = false;
orderbook
.topics_subscribed_to
.insert(orderbook_topic_from_base_rel("C1", "C2"), sub_state);
let pubkey = "pubkey";

let mut trie_roots = HashMap::new();
Expand Down

0 comments on commit e5d247c

Please sign in to comment.