Skip to content

Commit

Permalink
Merge pull request #460 from kinode-dao/v0.8.7
Browse files Browse the repository at this point in the history
v0.8.7
  • Loading branch information
dr-frmr committed Aug 8, 2024
2 parents 25f6ff2 + f553ade commit fb44d67
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 41 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.8.6"
version = "0.8.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
FROM debian:12-slim AS downloader
ARG VERSION

WORKDIR /tmp/download

RUN apt-get update
RUN apt-get install wget curl openssl jq unzip -y
RUN apt-get install unzip -y

ADD https://api.github.com/repos/kinode-dao/kinode/releases releases.json
RUN wget "https://github.com/kinode-dao/kinode/releases/download/$(cat releases.json | jq -r '.[0].tag_name')/kinode-x86_64-unknown-linux-gnu.zip"
ADD "https://github.com/kinode-dao/kinode/releases/download/${VERSION}/kinode-x86_64-unknown-linux-gnu.zip" kinode-x86_64-unknown-linux-gnu.zip
RUN unzip kinode-x86_64-unknown-linux-gnu.zip

FROM debian:12-slim
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,17 @@ The image includes EXPOSE directives for TCP port `8080` and TCP port `9000`. Po
If you are running a direct node, you must map port `9000` to the same port on the host and on your router. Otherwise, your Kinode will not be able to connect to the rest of the network as connection info is written to the chain, and this information is based on the view from inside the Docker container.

To build a local Docker image, run the following command in this project root.
```
docker build -t 0xlynett/kinode .
```bash
# The `VERSION` may be replaced with the tag of a GitHub release
docker build -t 0xlynett/kinode . --build-arg VERSION=v0.8.6
```

For example:
```

```bash
docker volume create kinode-volume

docker run -d -p 8080:8080 -it --name my-kinode \
--mount type=volume,source=kinode-volume,destination=/kinode-home \
0xlynett/kinode
```
```
2 changes: 1 addition & 1 deletion kinode/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.8.6"
version = "0.8.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
97 changes: 73 additions & 24 deletions kinode/src/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn handle_message(
)
.await;
}
Ok(())
}
Message::Request(req) => {
let timeout = req.expects_response.unwrap_or(60);
Expand All @@ -330,7 +331,7 @@ async fn handle_message(
};
match req {
IncomingReq::EthAction(eth_action) => {
return handle_eth_action(state, km, timeout, eth_action).await;
handle_eth_action(state, km, timeout, eth_action).await
}
IncomingReq::EthConfigAction(eth_config_action) => {
kernel_message(
Expand All @@ -344,50 +345,72 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::EthSubResult(eth_sub_result) => {
// forward this to rsvp, if we have the sub id in our active subs
let Some(rsvp) = km.rsvp else {
verbose_print(
&state.print_tx,
"eth: got eth_sub_result with no rsvp, ignoring",
)
.await;
return Ok(()); // no rsvp, no need to forward
};
let sub_id = match eth_sub_result {
Ok(EthSub { id, .. }) => id,
Err(EthSubError { id, .. }) => id,
};
if let Some(sub_map) = state.active_subscriptions.get(&rsvp) {
if let Some(ActiveSub::Remote {
provider_node,
sender,
..
}) = sub_map.get(&sub_id)
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
if let Some(mut sub_map) = state.active_subscriptions.get_mut(&rsvp) {
if let Some(sub) = sub_map.get(&sub_id) {
if let ActiveSub::Remote {
provider_node,
sender,
..
} = sub
{
if provider_node == &km.source.node {
if let Ok(()) = sender.send(eth_sub_result).await {
// successfully sent a subscription update from a
// remote provider to one of our processes
return Ok(());
}
}
// failed to send subscription update to process,
// unsubscribe from provider and close
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but provider node did not match or local sub was already closed",
)
.await;
sub.close(sub_id, state).await;
sub_map.remove(&sub_id);
return Ok(());
}
}
}
// tell the remote provider that we don't have this sub
// so they can stop sending us updates
verbose_print(
&state.print_tx,
"eth: got eth_sub_result but no matching sub found, unsubscribing",
&format!(
"eth: got eth_sub_result but no matching sub {} found, unsubscribing",
sub_id
),
)
.await;
kernel_message(
&state.our.clone(),
&state.our,
km.id,
km.source.clone(),
km.source,
None,
true,
None,
EthAction::UnsubscribeLogs(sub_id),
&state.send_to_loop,
)
.await;
Ok(())
}
IncomingReq::SubKeepalive(sub_id) => {
// source expects that we have a local sub for them with this id
Expand Down Expand Up @@ -420,11 +443,11 @@ async fn handle_message(
&state.send_to_loop,
)
.await;
Ok(())
}
}
}
}
Ok(())
}

async fn handle_eth_action(
Expand Down Expand Up @@ -479,12 +502,32 @@ async fn handle_eth_action(
.await;
}
EthAction::UnsubscribeLogs(sub_id) => {
let mut sub_map = state
.active_subscriptions
.entry(km.source.clone())
.or_insert(HashMap::new());
let Some(mut sub_map) = state.active_subscriptions.get_mut(&km.source) else {
verbose_print(
&state.print_tx,
&format!(
"eth: got unsubscribe from {} but no subscription found",
km.source
),
)
.await;
error_message(
&state.our,
km.id,
km.source,
EthError::MalformedRequest,
&state.send_to_loop,
)
.await;
return Ok(());
};
if let Some(sub) = sub_map.remove(&sub_id) {
sub.close(sub_id, state).await;
verbose_print(
&state.print_tx,
&format!("eth: closed subscription {} for {}", sub_id, km.source.node),
)
.await;
kernel_message(
&state.our,
km.id,
Expand All @@ -499,7 +542,10 @@ async fn handle_eth_action(
} else {
verbose_print(
&state.print_tx,
"eth: got unsubscribe but no matching subscription found",
&format!(
"eth: got unsubscribe from {} but no subscription {} found",
km.source, sub_id
),
)
.await;
error_message(
Expand Down Expand Up @@ -626,8 +672,11 @@ async fn fulfill_request(
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
let old_provider = aps.urls.remove(index);
match old_provider.pubsub {
None => aps.urls.insert(0, url_provider.clone()),
Some(_) => aps.urls.insert(0, old_provider),
}
});
if !is_replacement_successful {
verbose_print(
Expand Down
27 changes: 23 additions & 4 deletions kinode/src/eth/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub async fn create_new_subscription(
let (keepalive_err_sender, keepalive_err_receiver) =
tokio::sync::mpsc::channel(1);
response_channels.insert(keepalive_km_id, keepalive_err_sender);
let response_channels = response_channels.clone();
subs.insert(
remote_sub_id,
ActiveSub::Remote {
Expand Down Expand Up @@ -232,8 +231,11 @@ async fn build_subscription(
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
let old_provider = aps.urls.remove(index);
match old_provider.pubsub {
None => aps.urls.insert(0, url_provider.clone()),
Some(_) => aps.urls.insert(0, old_provider),
}
});
if !is_replacement_successful {
verbose_print(
Expand Down Expand Up @@ -468,7 +470,7 @@ async fn maintain_remote_subscription(
true,
Some(30),
IncomingReq::SubKeepalive(remote_sub_id),
&send_to_loop,
send_to_loop,
).await;
}
_incoming = net_error_rx.recv() => {
Expand All @@ -485,6 +487,23 @@ async fn maintain_remote_subscription(
}
}
};
// tell provider node we don't need their services anymore
// (in case they did not close the subscription on their side,
// such as in the 2-hour timeout case)
kernel_message(
our,
rand::random(),
Address {
node: provider_node.to_string(),
process: ETH_PROCESS_ID.clone(),
},
None,
true,
None,
EthAction::UnsubscribeLogs(remote_sub_id),
send_to_loop,
)
.await;
active_subscriptions
.entry(target.clone())
.and_modify(|sub_map| {
Expand Down
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.8.6"
version = "0.8.7"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down

0 comments on commit fb44d67

Please sign in to comment.