-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(event-streaming): new streams (orders, swaps, eth_fee_estimation) #2172
base: dev
Are you sure you want to change the base?
Conversation
it was confusing why a utxo standard coin was being used instead of the coin we are building, this makes a bit more sense
There was no point of the option wrapping. Simplifying stuff.
also does some housekeeping, file splitting and renaming
remove the intermediate bridge task and replace it with `filter_map` call. also properly implement the shutdown handle
also wraps the streamermanager mutable data inside Arc<RwLock< so to be used in concurrent code. RwLock was chosen here since we should be sending streaming data way more than editing the streamers
along with some cleanups and simplifications
Also use the `data_rx` for utxo balance events. A cleanup for the scripthash notification channels/logic will follow
this is very messy though (because of the current state of electurm code) and will be reverted, just recording this in the history for now
…nterface" This reverts commit 8972822.
made more sense to have it inside the manager. also now the manager provides the controller as a parameter in the handle method so different streamers don't have to keep a copy of ctx just to broadcast on the controller.
this is still not hooked up though. no data will be received on this rx channel till we do `event_stream_manager.send('BALANCE:ZCoin') (or whatever the coin name is)` somewhere.
compiles but streaming hasn't yet been enabled for the electrum coins
the streaming manager is passed up to ElectrumClientImpl, we could thoeretically pass it down the tree to each ElectrumConnection, but since these connections already have a ref to ElectrumClientImpl in their establish_connecion_loop lifetime, we don't really need to do so.
this commit makes it so that the client deletion logic is handled by the streaming manager when initializing the client. the return type of client initialization has an extra field that will run the client delection logic on Drop. this fixes the problem with clients never getting removed as when the clients disconnect and the body stream is no longer drived further thus will not hit the `remove_client` call.
5d72fb9
to
c2f0244
Compare
dd5ac82
to
39fa8b1
Compare
ignoring the non-wasm test for now since it needs zcash params downloaded which we don't have an auto way to do. would be better if we can download them in the same way we do for wasm (right from kdf).
naming was used loosely, pirate was used as if it was zombie, a previous fix in the url constants led to this breaking.
0d3ac5f
to
159445b
Compare
mm2src/coins/utxo/qtum.rs
Outdated
@@ -903,7 +903,9 @@ impl MarketCoinOps for QtumCoin { | |||
impl MmCoin for QtumCoin { | |||
fn is_asset_chain(&self) -> bool { utxo_common::is_asset_chain(&self.utxo_arc) } | |||
|
|||
fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.as_ref().abortable_system) } | |||
fn get_ctx(&self) -> Option<MmArc> { MmArc::from_weak(&self.as_ref().ctx) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I mentioned this before (since my last review was a month ago), but why are we implementing this at the trait level? Coins don't create MmArc instances—they're already available at higher levels and some coins just receive them during initialization. We could likely pass MmArc where needed, as we do for certain coins. Adding this function seems unnecessary and inconsistent across different coins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed in MmCoin so we can access it in generic impls over MmCoins, e.g.:
let streaming_manager = ctx.coin.get_ctx().unwrap().event_stream_manager.clone(); |
i think i have answered that somewhere in the threads above though, could you please go over it and resolve addressed comments.
p.s. in the e.g. above, we could add it in the UtxoCoinOps
instead, but this completely makes no sense IMO. We could also opt for making it get_streaming_manager
but i opted for get_ctx
since we already have global states other than just the streaming manager and some coins already accesses this global state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's not in
komodo-defi-framework/mm2src/coins/utxo/utxo_tx_history_v2.rs
Lines 145 to 152 in 547fc2b
struct UtxoTxHistoryStateMachine<Coin: UtxoTxHistoryOps, Storage: TxHistoryStorage> { | |
coin: Coin, | |
storage: Storage, | |
metrics: MetricsArc, | |
/// Last requested balances of the activated coin's addresses. | |
/// TODO add a `CoinBalanceState` structure and replace [`HashMap<String, BigDecimal>`] everywhere. | |
balances: HashMap<String, BigDecimal>, | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good place as well. im down to storing it (streaming manager) there if u are keen on the coins to expose no ctx
:(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer storing it where it's needed rather than forcing it into an unrelated trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass the streaming manager down to where it should be used instead this does so for utxo balance history, tendermint to come
3a467dd
to
e65ace0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next iteration. I tried to focus on optimizations in this review.
|
||
/// Returns a human readable unique identifier for the event streamer. | ||
/// No other event streamer should have the same identifier. | ||
fn streamer_id(&self) -> String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamer_id
is being cloned too often, please convert to Arc<str>
future::ready( | ||
if let Ok(input_data) = any_input_data.downcast() { | ||
Some(*input_data) | ||
} else { | ||
error!("Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).", any::type_name::<<S as EventStreamer>::DataInType>()); | ||
None | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future::ready( | |
if let Ok(input_data) = any_input_data.downcast() { | |
Some(*input_data) | |
} else { | |
error!("Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).", any::type_name::<<S as EventStreamer>::DataInType>()); | |
None | |
} | |
) | |
future::ready( | |
any_input_data | |
.downcast() | |
.map(|input_data| *input_data) | |
.map_err(|_| { error!( | |
"Couldn't downcast a received message to {}. This message wasn't intended to be sent to this streamer ({streamer_id}).", | |
any::type_name::<S::DataInType>() | |
); | |
}).ok() | |
) |
})?; | ||
|
||
// If the handler takes no input data, return `None` for the data sender. | ||
if any::TypeId::of::<<S as EventStreamer>::DataInType>() == any::TypeId::of::<NoDataIn>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be relaxed a lil bit
if any::TypeId::of::<<S as EventStreamer>::DataInType>() == any::TypeId::of::<NoDataIn>() { | |
if any::TypeId::of::<S::DataInType>() == any::TypeId::of::<NoDataIn>() { |
#[derive(Default)] | ||
pub struct Event { | ||
/// The type of the event (balance, network, swap, etc...). | ||
event_type: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be converted to Arc<str>
? I see it's mostly constructed via streamer_id
which I also suggested to be made into an Arc as it's being cloned too often.
pub fn get(&self) -> (String, Json) { | ||
let prefix = if self.error { "ERROR:" } else { "" }; | ||
(format!("{prefix}{}", self.event_type), self.message.clone()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please return Json
as ref and avoid cloning
{ | ||
let mut this = self.write(); | ||
match this.clients.get(&client_id) { | ||
// We don't know that client. We don't have a connection to it. | ||
None => return Err(StreamingManagerError::UnknownClient), | ||
// The client is already listening to that streamer. | ||
Some(client_info) if client_info.listens_to(&streamer_id) => { | ||
return Err(StreamingManagerError::ClientAlreadyListening); | ||
}, | ||
_ => (), | ||
} | ||
|
||
// If a streamer is already up and running, we won't spawn another one. | ||
if let Some(streamer_info) = this.streamers.get_mut(&streamer_id) { | ||
// Register the client as a listener to the streamer. | ||
streamer_info.add_client(client_id); | ||
// Register the streamer as listened-to by the client. | ||
if let Some(client_info) = this.clients.get_mut(&client_id) { | ||
client_info.add_streamer(streamer_id.clone()); | ||
} | ||
return Ok(streamer_id); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you think about read first” is to avoid unnecessarily taking a write lock when a read lock suffices or is this on purpose ?
let mut this = self.write(); | ||
if this.clients.contains_key(&client_id) { | ||
return Err(StreamingManagerError::ClientExists); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
let mut this = self.write(); | ||
let Some(streamer_info) = this.streamers.get(streamer_id) else { return }; | ||
if !streamer_info.is_down() { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
// Only `try_send` here. If the channel is full (client is slow), the message | ||
// will be dropped and the client won't receive it. | ||
// This avoids blocking the broadcast to other receivers. | ||
self.channel.try_send(event).ok(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not self.channel.try_send(event).error_log()
This PR aims to support event streaming in some of MM2's API endpoints.
The work in this PR till now is only a refactor of the event streaming code after multiple iterations of discussing how this should end up looking like.
The flow goes as follows:
The role of the
StreamingManager
:cid
when it first connects through SSE).cid
,sid
) pair (translates to: client with idcid
wants to listen to the streamer with idsid
). If such a streamer withsid
is already running, it is just instructed that a new client is gonna be listening along, otherwise, it will be spawned in the background.cid
,sid
) pair (= client withcid
no longer wants to receive events fromstreamer
with idsid
, if no more clients are listening, the streamer will die).StreamingManager::send(streamer_id, arbitrary_data)
is used.Breaking Changes:
event_stream_configuration
in MM2 json config. Requested streams are initialized and configured dynamically through the API.event_stream_configuration.access_control_allow_origin
config was moved toaccess_control_allow_origin
(one scope out) b9d1218.event_stream_configuration.worker_path
was changed toevent_stream_worker_path
(one scope out) ee32fd1.mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype
->mm2.com/event-stream?id=1
), but a client id is now required (a random u64 identifier for [& generated by] the client) (e.g.mm2.com/event-stream?id=799384531
)stream::enable::balance
).broadcast_all
). example of these events areDATA_NEEDED:datatype
, andHEARTBEAT
should be similar later as well.(so for keplr integration, only
?id=<RANDOM_U64>
must be added to the SSE init request and everything should work.?filter=
shall be removed)(the
<RANDOM_U64>
should be stored by the client since it's required in any event streamer activation/deactivation. If for keplr only though, this isn't necessary for now sinceDATA_NEEDED:datatype
event streamer is a special event that's broadcasted to all clients and doesn't require any sort of activation/subscription before hand)