Skip to content

Commit

Permalink
Merge pull request #474 from kinode-dao/dr/eth-0.8.7
Browse files Browse the repository at this point in the history
eth provider fixes
  • Loading branch information
nick1udwig authored Aug 8, 2024
2 parents e876891 + a288fc1 commit f553ade
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 24 deletions.
90 changes: 68 additions & 22 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn handle_message(
)
.await;
}
Ok(())
}
Message::Request(req) => {
let timeout = req.expects_response.unwrap_or(60);
Expand All @@ -330,7 +331,7 @@ async fn handle_message(
};
match req {
IncomingReq::EthAction(eth_action) => {
return handle_eth_action(state, km, timeout, eth_action).await;
handle_eth_action(state, km, timeout, eth_action).await
}
IncomingReq::EthConfigAction(eth_config_action) => {
kernel_message(
Expand All @@ -344,50 +345,72 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::EthSubResult(eth_sub_result) => {
// forward this to rsvp, if we have the sub id in our active subs
let Some(rsvp) = km.rsvp else {
verbose_print(
&state.print_tx,
"eth: got eth_sub_result with no rsvp, ignoring",
)
.await;
return Ok(()); // no rsvp, no need to forward
};
let sub_id = match eth_sub_result {
Ok(EthSub { id, .. }) => id,
Err(EthSubError { id, .. }) => id,
};
if let Some(sub_map) = state.active_subscriptions.get(&rsvp) {
if let Some(ActiveSub::Remote {
provider_node,
sender,
..
}) = sub_map.get(&sub_id)
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
}
}
// failed to send subscription update to process,
// unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match or local sub was already closed",
)
.await;
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
return Ok(());
}
}
}
// tell the remote provider that we don't have this sub
// so they can stop sending us updates
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but no matching sub found, unsubscribing",
&format!(
"eth: got eth_sub_result but no matching sub {} found, unsubscribing",
sub_id
),
)
.await;
kernel_message(
&state.our.clone(),
&state.our,
km.id,
km.source.clone(),
km.source,
None,
true,
None,
EthAction::UnsubscribeLogs(sub_id),
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::SubKeepalive(sub_id) => {
// source expects that we have a local sub for them with this id
Expand Down Expand Up @@ -420,11 +443,11 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
}
}
}
Ok(())
}

async fn handle_eth_action(
Expand Down Expand Up @@ -479,12 +502,32 @@ async fn handle_eth_action(
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let mut sub_map = state
.active_subscriptions
.entry(km.source.clone())
.or_insert(HashMap::new());
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
if let Some(sub) = sub_map.remove(&sub_id) {
sub.close(sub_id, state).await;
verbose_print(
&state.print_tx,
&format!("eth: closed subscription {} for {}", sub_id, km.source.node),
)
.await;
kernel_message(
&state.our,
km.id,
Expand All @@ -499,7 +542,10 @@ async fn handle_eth_action(
} else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
&format!(
"eth: got unsubscribe from {} but no subscription {} found",
km.source, sub_id
),
)
.await;
error_message(
Expand Down
20 changes: 18 additions & 2 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
let response_channels = response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
Expand Down Expand Up @@ -471,7 +470,7 @@ async fn maintain_remote_subscription(
true,
Some(30),
IncomingReq::SubKeepalive(remote_sub_id),
&send_to_loop,
send_to_loop,
).await;
}
_incoming = net_error_rx.recv() => {
Expand All @@ -488,6 +487,23 @@ async fn maintain_remote_subscription(
}
}
};
// tell provider node we don't need their services anymore
// (in case they did not close the subscription on their side,
// such as in the 2-hour timeout case)
kernel_message(
our,
rand::random(),
Address {
node: provider_node.to_string(),
process: ETH_PROCESS_ID.clone(),
},
None,
true,
None,
EthAction::UnsubscribeLogs(remote_sub_id),
send_to_loop,
)
.await;
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
Expand Down

0 comments on commit f553ade

Please sign in to comment.