Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event-streaming): new streams (orders, swaps, eth_fee_estimation) #2172

Open
wants to merge 132 commits into
base: dev
Choose a base branch
from

Conversation

mariocynicys
Copy link
Collaborator

@mariocynicys mariocynicys commented Jul 22, 2024

This PR aims to support event streaming in some of MM2's API endpoints.

The work in this PR till now is only a refactor of the event streaming code after multiple iterations of discussing how this should end up looking like.

The flow goes as follows:

  • A client initially opens an SSE connection with MM2 to receive events on. For wasm, since it can't support SSEs, we use a worker to deliver the events to the client (let's consider it as SSE too).
  • If a client wants to receive some events (balance change, network topology change, new gas estimations, etc...), they need to subscribe to these events (enable these streamers) from the API.
  • A streamer is a background task that is responsible for firing some events. Initially MM2 doesn't have any streamers running. When a client enables a streamer (= subscribes to a particular event), they start receiving events from that streamer. Until they unsubscribe to them.
  • If no client is listening to some active streamer, it will shutdown. A new streamer will start again if a client re-subscribes.
  • (little irrelevant now) Multiple clients subscribing to the same event on MM2 doesn't mean we will have a new streamer for every client. Only one streamer is booted up for the first subscribed client and new subscribing clients will be sharing the same streamer.

The role of the StreamingManager:

  • Manage different clients (each client has an id cid when it first connects through SSE).
  • Add a new (cid, sid) pair (translates to: client with id cid wants to listen to the streamer with id sid). If such a streamer with sid is already running, it is just instructed that a new client is gonna be listening along, otherwise, it will be spawned in the background.
  • Stop listening (cid, sid) pair (= client with cid no longer wants to receive events from streamer with id sid, if no more clients are listening, the streamer will die).
  • Manage different streamers.
  • Manage the connections between the streamers (the background thread) and MM2. Streamers either work in a periodic manner (do some job and possibly fire some events then sleep for a while and repeat) or they need to receive a notification from MM2 (e.g. UTXO balance event streaming rely on electrum subscriptions: the electrum server sends a notification to MM2 when an address balance has changed and MM2 in turn sends this notification to the streamer responsible for firing balance events). To send some data to a streamer, StreamingManager::send(streamer_id, arbitrary_data) is used.

Breaking Changes:

  • No more event_stream_configuration in MM2 json config. Requested streams are initialized and configured dynamically through the API.
  • Thus event_stream_configuration.access_control_allow_origin config was moved to access_control_allow_origin (one scope out) b9d1218.
  • And event_stream_configuration.worker_path was changed to event_stream_worker_path (one scope out) ee32fd1.
  • No more filters are required in the initial SSE setup with MM2. (e.g. mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype -> mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for [& generated by] the client) (e.g. mm2.com/event-stream?id=799384531)
  • How to filter the wanted events then (client gets no events by default)? The client needs to activate specific event streamers through RPC (e.g. stream::enable::balance).
  • Contrary to the note in the previous point (client gets no events by default), some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all). example of these events are DATA_NEEDED:datatype, and HEARTBEAT should be similar later as well.

(so for keplr integration, only ?id=<RANDOM_U64> must be added to the SSE init request and everything should work. ?filter= shall be removed)
(the <RANDOM_U64> should be stored by the client since it's required in any event streamer activation/deactivation. If for keplr only though, this isn't necessary for now since DATA_NEEDED:datatype event streamer is a special event that's broadcasted to all clients and doesn't require any sort of activation/subscription before hand)

it was confusing why a utxo standard coin was being used instead of the coin we are building, this makes a bit more sense
There was no point of the option wrapping. Simplifying stuff.
also does some housekeeping, file splitting and renaming
remove the intermediate bridge task and replace it with `filter_map` call.
also properly implement the shutdown handle
also wraps the streamermanager mutable data inside Arc<RwLock< so to be used in concurrent code.
RwLock was chosen here since we should be sending streaming data way more than editing the streamers
along with some cleanups and simplifications
Also use the `data_rx` for utxo balance events.
A cleanup for the scripthash notification channels/logic will follow
this is very messy though (because of the current state of electurm code) and will be reverted, just recording this in the history for now
made more sense to have it inside the manager. also now the manager provides the controller as a parameter in the handle method so different streamers don't have to keep a copy of ctx just to broadcast on the controller.
this is still not hooked up though. no data will be received on this rx channel till we do
`event_stream_manager.send('BALANCE:ZCoin') (or whatever the coin name is)` somewhere.
compiles but streaming hasn't yet been enabled for the electrum coins
the streaming manager is passed up to ElectrumClientImpl, we could
thoeretically pass it down the tree to each ElectrumConnection, but
since these connections already have a ref to ElectrumClientImpl in
their establish_connecion_loop lifetime, we don't really need to do so.
this commit makes it so that the client deletion logic is handled by the
streaming manager when initializing the client. the return type of
client initialization has an extra field that will run the client
delection logic on Drop.

this fixes the problem with clients never getting removed as when the clients disconnect
and the body stream is no longer drived further thus will not hit the `remove_client` call.
@mariocynicys mariocynicys force-pushed the event-streaming branch 6 times, most recently from dd5ac82 to 39fa8b1 Compare November 1, 2024 18:10
ignoring the non-wasm test for now since it needs zcash params
downloaded which we don't have an auto way to do. would be better if we
can download them in the same way we do for wasm (right from kdf).
naming was used loosely, pirate was used as if it was zombie, a previous
fix in the url constants led to this breaking.
@mariocynicys mariocynicys force-pushed the event-streaming branch 2 times, most recently from 0d3ac5f to 159445b Compare November 2, 2024 09:17
@@ -903,7 +903,9 @@ impl MarketCoinOps for QtumCoin {
impl MmCoin for QtumCoin {
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) }

fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.as_ref().abortable_system) }
fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I mentioned this before (since my last review was a month ago), but why are we implementing this at the trait level? Coins don't create MmArc instances—they're already available at higher levels and some coins just receive them during initialization. We could likely pass MmArc where needed, as we do for certain coins. Adding this function seems unnecessary and inconsistent across different coins.

Copy link
Collaborator Author

@mariocynicys mariocynicys Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed in MmCoin so we can access it in generic impls over MmCoins, e.g.:

let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone();

i think i have answered that somewhere in the threads above though, could you please go over it and resolve addressed comments.

p.s. in the e.g. above, we could add it in the UtxoCoinOps instead, but this completely makes no sense IMO. We could also opt for making it get_streaming_manager but i opted for get_ctx since we already have global states other than just the streaming manager and some coins already accesses this global state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's not in

struct UtxoTxHistoryStateMachine<Coin: UtxoTxHistoryOps, Storage: TxHistoryStorage> {
coin: Coin,
storage: Storage,
metrics: MetricsArc,
/// Last requested balances of the activated coin's addresses.
/// TODO add a `CoinBalanceState` structure and replace [`HashMap<String, BigDecimal>`] everywhere.
balances: HashMap<String, BigDecimal>,
}
?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good place as well. im down to storing it (streaming manager) there if u are keen on the coins to expose no ctx :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer storing it where it's needed rather than forcing it into an unrelated trait.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

into an unrelated trait

gonna have to disagree with that, but guess that's a topic for another day :)

addressed in 65bd662 & e65ace0

pass the streaming manager down to where it should be used instead

this does so for utxo balance history, tendermint to come
Copy link
Member

@borngraced borngraced left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next iteration. I tried to focus on optimizations in this review.


/// Returns a human readable unique identifier for the event streamer.
/// No other event streamer should have the same identifier.
fn streamer_id(&self) -> String;
Copy link
Member

@borngraced borngraced Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamer_id is being cloned too often, please convert to Arc<str>

Comment on lines +68 to +75
future::ready(
if let Ok(input_data) = any_input_data.downcast() {
Some(*input_data)
} else {
error!("Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).", any::type_name::<<S as EventStreamer>::DataInType>());
None
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
future::ready(
if let Ok(input_data) = any_input_data.downcast() {
Some(*input_data)
} else {
error!("Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).", any::type_name::<<S as EventStreamer>::DataInType>());
None
}
)
future::ready(
any_input_data
.downcast()
.map(|input_data| *input_data)
.map_err(|_| { error!(
"Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).",
any::type_name::<S::DataInType>()
);
}).ok()
)

})?;

// If the handler takes no input data, return `None` for the data sender.
if any::TypeId::of::<<S as EventStreamer>::DataInType>() == any::TypeId::of::<NoDataIn>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be relaxed a lil bit

Suggested change
if any::TypeId::of::<<S as EventStreamer>::DataInType>() == any::TypeId::of::<NoDataIn>() {
if any::TypeId::of::<S::DataInType>() == any::TypeId::of::<NoDataIn>() {

#[derive(Default)]
pub struct Event {
/// The type of the event (balance, network, swap, etc...).
event_type: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be converted to Arc<str> ? I see it's mostly constructed via streamer_id which I also suggested to be made into an Arc as it's being cloned too often.

Comment on lines +43 to +46
pub fn get(&self) -> (String, Json) {
let prefix = if self.error { "ERROR:" } else { "" };
(format!("{prefix}{}", self.event_type), self.message.clone())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please return Json as ref and avoid cloning

Comment on lines +127 to +149
{
let mut this = self.write();
match this.clients.get(&client_id) {
// We don't know that client. We don't have a connection to it.
None => return Err(StreamingManagerError::UnknownClient),
// The client is already listening to that streamer.
Some(client_info) if client_info.listens_to(&streamer_id) => {
return Err(StreamingManagerError::ClientAlreadyListening);
},
_ => (),
}

// If a streamer is already up and running, we won't spawn another one.
if let Some(streamer_info) = this.streamers.get_mut(&streamer_id) {
// Register the client as a listener to the streamer.
streamer_info.add_client(client_id);
// Register the streamer as listened-to by the client.
if let Some(client_info) = this.clients.get_mut(&client_id) {
client_info.add_streamer(streamer_id.clone());
}
return Ok(streamer_id);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you think about read first” is to avoid unnecessarily taking a write lock when a read lock suffices or is this on purpose ?

Comment on lines +269 to +272
let mut this = self.write();
if this.clients.contains_key(&client_id) {
return Err(StreamingManagerError::ClientExists);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Comment on lines +318 to +322
let mut this = self.write();
let Some(streamer_info) = this.streamers.get(streamer_id) else { return };
if !streamer_info.is_down() {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

// Only `try_send` here. If the channel is full (client is slow), the message
// will be dropped and the client won't receive it.
// This avoids blocking the broadcast to other receivers.
self.channel.try_send(event).ok();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not self.channel.try_send(event).error_log()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants