diff --git a/.github/workflows/tests_and_checks.yml b/.github/workflows/tests_and_checks.yml index f6788cca..512e1878 100644 --- a/.github/workflows/tests_and_checks.yml +++ b/.github/workflows/tests_and_checks.yml @@ -189,7 +189,7 @@ jobs: - name: Run Tests (no-default-features) if: ${{ matrix.default-features == 'none' }} - run: cargo nextest run --workspace --profile ci --no-default-features --features "test-utils" + run: cargo nextest run --profile ci --no-default-features --features "test-utils" - name: Run Doc Tests if: ${{ matrix.default-features == 'all' }} @@ -241,7 +241,7 @@ jobs: - name: Run Tests (no-default-features) if: ${{ matrix.default-features == 'none' }} - run: cargo nextest run --workspace --profile ci --no-default-features --features "test-utils" + run: cargo nextest run --profile ci --no-default-features --features "test-utils" - name: Run Doc Tests if: ${{ matrix.default-features == 'all' }} diff --git a/Cargo.lock b/Cargo.lock index f25862ae..8170d9fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3449,6 +3449,7 @@ dependencies = [ "quick-protobuf", "rand", "rw-stream-sink", + "serde", "smallvec", "thiserror", "unsigned-varint", @@ -3497,6 +3498,7 @@ dependencies = [ "quick-protobuf-codec", "rand", "regex", + "serde", "sha2 0.10.8", "smallvec", "unsigned-varint", @@ -3541,6 +3543,7 @@ dependencies = [ "multihash 0.19.1", "quick-protobuf", "rand", + "serde", "sha2 0.10.8", "thiserror", "zeroize", @@ -3567,6 +3570,7 @@ dependencies = [ "quick-protobuf", "quick-protobuf-codec", "rand", + "serde", "sha2 0.10.8", "smallvec", "thiserror", @@ -4307,6 +4311,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" dependencies = [ "core2", + "serde", "unsigned-varint", ] diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 694ae5de..072e8502 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -101,7 +101,7 @@ hooks. Please run this before every commit and/or push. * **`nx-test`**, which translates to `cargo nextest run --workspace && cargo test --workspace --doc` * **`x-test`** for testing continuously as files change, translating to - `cargo watch -c -s "cargo nextest run --workspace --nocapture && cargo test --doc"` + `cargo watch -c -s "cargo nextest run --workspace --no-capture && cargo test --doc"` * **`x-`** for running a variety of `cargo watch` execution stages * **`nx-test-`**, which is just like `nx-test`, but adds `all` or `0` diff --git a/flake.nix b/flake.nix index f4fd8bb3..4bb6df8f 100644 --- a/flake.nix +++ b/flake.nix @@ -134,7 +134,7 @@ xFuncNoDefault = cmd: pkgs.writeScriptBin "x-${cmd}-0" '' #!${pkgs.stdenv.shell} - cargo watch -c -s "cargo ${cmd} --workspace --no-default-features" + cargo watch -c -s "cargo ${cmd} --no-default-features" ''; xFuncPackage = cmd: crate: @@ -145,19 +145,19 @@ xFuncTest = pkgs.writeScriptBin "x-test" '' #!${pkgs.stdenv.shell} - cargo watch -c -s "cargo nextest run --workspace --nocapture && cargo test --doc" + cargo watch -c -s "cargo nextest run --workspace --no-capture && cargo test --doc" ''; xFuncTestAll = pkgs.writeScriptBin "x-test-all" '' #!${pkgs.stdenv.shell} - cargo watch -c -s "cargo nextest run --workspace --all-features --nocapture \ + cargo watch -c -s "cargo nextest run --workspace --all-features --no-capture \ && cargo test --workspace --doc --all-features" ''; xFuncTestNoDefault = pkgs.writeScriptBin "x-test-0" '' #!${pkgs.stdenv.shell} - cargo watch -c -s "cargo nextest run --workspace --no-default-features --nocapture \ - && cargo test --workspace --doc --no-default-features" + cargo watch -c -s "cargo nextest run --no-default-features --no-capture \ + && cargo test --doc --no-default-features" ''; xFuncTestPackage = crate: @@ -175,14 +175,14 @@ nxTestAll = pkgs.writeScriptBin "nx-test-all" '' #!${pkgs.stdenv.shell} - cargo nextest run --workspace --all-features --nocapture + cargo nextest run --workspace --all-features --no-capture cargo test --workspace --doc --all-features ''; nxTestNoDefault = pkgs.writeScriptBin "nx-test-0" '' #!${pkgs.stdenv.shell} - cargo nextest run --workspace --no-default-features --nocapture - cargo test --workspace --doc --no-default-features + cargo nextest run --no-default-features --no-capture + cargo test --doc --no-default-features ''; wasmTest = pkgs.writeScriptBin "wasm-ex-test" '' diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 9d146f3f..c5453f9f 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -102,6 +102,7 @@ libp2p = { version = "0.52", default-features = false, features = [ "noise", "cbor", "yamux", + "serde", ] } libsqlite3-sys = { version = "0.26", default-features = false, features = [ "bundled", @@ -166,6 +167,7 @@ tower = { version = "0.4", default-features = false, features = [ ] } tower-http = { version = "0.4", default-features = false, features = [ "trace", + "sensitive-headers", "catch-panic", "cors", ] } diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 269bc736..f1838afb 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -25,6 +25,7 @@ pub(crate) mod cache; pub mod channel; pub(crate) mod error; pub(crate) mod event; +#[cfg(feature = "websocket-notify")] pub(crate) mod notification; pub(crate) mod swarm_event; pub(crate) use cache::{setup_cache, CacheValue}; @@ -80,7 +81,7 @@ pub(crate) struct EventHandler { p2p_provider_timeout: Duration, db: DB, swarm: Swarm, - cache: Cache, + cache: Arc>, sender: Arc>, receiver: channel::AsyncBoundedChannelReceiver, query_senders: FnvHashMap)>, @@ -228,7 +229,7 @@ where #[cfg(not(feature = "ipfs"))] pub(crate) async fn start(mut self) -> Result<()> { let handle = Handle::current(); - handle.spawn(poll_cache(self.cache.clone())); + handle.spawn(poll_cache(self.cache.clone(), self.poll_cache_interval)); loop { select! { diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index 7c222cf7..360d339d 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -7,7 +7,7 @@ use crate::event_handler::notification::emit_receipt; use crate::network::IpfsCli; use crate::{ db::Database, - event_handler::{Handler, P2PSender}, + event_handler::{Handler, P2PSender, ResponseEvent}, network::{ pubsub, swarm::{CapsuleTag, RequestResponseKey, TopicMessage}, @@ -127,8 +127,8 @@ impl Event { event_handler.shutdown().await; let _ = tx.send(()); } - Event::FindRecord(record) => record.find(event_handler), - Event::RemoveRecord(record) => record.remove(event_handler), + Event::FindRecord(record) => record.find(event_handler).await, + Event::RemoveRecord(record) => record.remove(event_handler).await, Event::OutboundRequest(PeerRequest { peer, request, @@ -144,7 +144,7 @@ impl Event { .request_response_senders .insert(request_id, (request, sender)); } - Event::GetProviders(record) => record.get_providers(event_handler), + Event::GetProviders(record) => record.get_providers(event_handler).await, Event::ProvideRecord(cid, sender, capsule_tag) => { let query_id = event_handler .swarm @@ -247,7 +247,7 @@ impl Captured { { emit_receipt( event_handler.ws_workflow_sender(), - receipt.clone(), + &receipt, self.metadata.to_owned(), ) } @@ -258,12 +258,15 @@ impl Captured { TopicMessage::CapturedReceipt(receipt), ) { Ok(msg_id) => info!( - "message {msg_id} published on {} topic for receipt with cid: {receipt_cid}", + cid = receipt_cid.to_string(), + "message {msg_id} published on {} topic for receipt", pubsub::RECEIPTS_TOPIC ), - Err(_err) => { - error!( - "message not published on {} topic for receipt with cid: {receipt_cid}", + Err(err) => { + warn!( + err=?err, + cid = receipt_cid.to_string(), + "message not published on {} topic for receipt", pubsub::RECEIPTS_TOPIC ) } @@ -328,7 +331,7 @@ impl Replay { Self { pointers, metadata } } - fn notify(self, event_handler: &EventHandler) -> Result<()> + fn notify(self, event_handler: &mut EventHandler) -> Result<()> where DB: Database, { @@ -350,7 +353,7 @@ impl Replay { ); #[cfg(feature = "websocket-notify")] - receipts.into_iter().for_each(|receipt| { + receipts.iter().for_each(|receipt| { emit_receipt( event_handler.ws_workflow_sender(), receipt, @@ -358,6 +361,28 @@ impl Replay { ); }); + // gossiping replayed receipts + receipts.into_iter().for_each(|receipt| { + if event_handler.pubsub_enabled { + let receipt_cid = receipt.cid().to_string(); + let _ = event_handler + .swarm + .behaviour_mut() + .gossip_publish( + pubsub::RECEIPTS_TOPIC, + TopicMessage::CapturedReceipt(receipt), + ) + .map(|msg_id| + info!(cid=receipt_cid, + "message {msg_id} published on {} topic for receipt", pubsub::RECEIPTS_TOPIC)) + .map_err( + |err| + warn!(err=?err, cid=receipt_cid, + "message not published on {} topic for receipt", pubsub::RECEIPTS_TOPIC), + ); + } + }); + Ok(()) } } @@ -372,10 +397,20 @@ impl QueryRecord { } } - fn find(self, event_handler: &mut EventHandler) + async fn find(self, event_handler: &mut EventHandler) where DB: Database, { + if event_handler.connections.peers.is_empty() { + info!("no connections to send request to"); + + if let Some(sender) = self.sender { + let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await; + } + + return; + } + let id = event_handler .swarm .behaviour_mut() @@ -386,10 +421,20 @@ impl QueryRecord { event_handler.query_senders.insert(id, (key, self.sender)); } - fn remove(self, event_handler: &mut EventHandler) + async fn remove(self, event_handler: &mut EventHandler) where DB: Database, { + if event_handler.connections.peers.is_empty() { + info!("no connections to send request to"); + + if let Some(sender) = self.sender { + let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await; + } + + return; + } + event_handler .swarm .behaviour_mut() @@ -403,10 +448,20 @@ impl QueryRecord { .stop_providing(&Key::new(&self.cid.to_bytes())); } - fn get_providers(self, event_handler: &mut EventHandler) + async fn get_providers(self, event_handler: &mut EventHandler) where DB: Database, { + if event_handler.connections.peers.is_empty() { + info!("no connections to send request to"); + + if let Some(sender) = self.sender { + let _ = sender.send_async(ResponseEvent::NoPeersAvailable).await; + } + + return; + } + let id = event_handler .swarm .behaviour_mut() diff --git a/homestar-runtime/src/event_handler/notification.rs b/homestar-runtime/src/event_handler/notification.rs index 8daa09a3..1d630d6d 100644 --- a/homestar-runtime/src/event_handler/notification.rs +++ b/homestar-runtime/src/event_handler/notification.rs @@ -31,10 +31,10 @@ const TIMESTAMP_KEY: &str = "timestamp"; /// Send receipt notification as bytes. pub(crate) fn emit_receipt( notifier: Notifier, - receipt: Receipt, + receipt: &Receipt, metadata: Option, ) { - let invocation_receipt = InvocationReceipt::from(&receipt); + let invocation_receipt = InvocationReceipt::from(receipt); let receipt_cid = receipt.cid(); let notification = ReceiptNotification::with(invocation_receipt, receipt_cid, metadata.clone()); diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 2c22db6e..03379420 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -1,6 +1,8 @@ //! Internal libp2p [SwarmEvent] handling and [Handler] implementation. use super::EventHandler; +#[cfg(feature = "websocket-notify")] +use crate::event_handler::notification::{self, EventNotificationTyp, SwarmNotification}; #[cfg(feature = "ipfs")] use crate::network::IpfsCli; use crate::{ @@ -8,7 +10,6 @@ use crate::{ event_handler::{ cache::{self, CacheData, CacheValue}, event::QueryRecord, - notification::{self, EventNotificationTyp, SwarmNotification}, Event, Handler, RequestResponseError, }, libp2p::multiaddr::MultiaddrExt, @@ -40,6 +41,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent}, PeerId, StreamProtocol, }; +#[cfg(feature = "websocket-notify")] use maplit::btreemap; use std::{ collections::{HashMap, HashSet}, @@ -56,6 +58,8 @@ const RENDEZVOUS_NAMESPACE: &str = "homestar"; pub(crate) enum ResponseEvent { /// Found [PeerRecord] on the DHT. Found(Result), + /// TODO + NoPeersAvailable, /// Found Providers/[PeerId]s on the DHT. Providers(Result>), } @@ -758,7 +762,7 @@ async fn handle_swarm_event( } => { debug!( peer_id = peer_id.to_string(), - "peer connection closed, cause: {cause:?}" + "peer connection closed, cause: {cause:#?}, endpoint: {endpoint:#?}" ); event_handler.connections.peers.remove_entry(&peer_id); diff --git a/homestar-runtime/src/network/ipfs.rs b/homestar-runtime/src/network/ipfs.rs index 1e018634..7e3aada1 100644 --- a/homestar-runtime/src/network/ipfs.rs +++ b/homestar-runtime/src/network/ipfs.rs @@ -2,14 +2,17 @@ //! //! [IpfsClient]: ipfs_api::IpfsClient +use crate::settings; use anyhow::Result; use futures::TryStreamExt; use homestar_core::workflow::Receipt; +use http::uri::Scheme; use ipfs_api::{ request::{DagCodec, DagPut}, response::DagPutResponse, IpfsApi, IpfsClient, }; +use ipfs_api_backend_hyper::TryFromUri; use libipld::{Cid, Ipld}; use std::{io::Cursor, sync::Arc}; use url::Url; @@ -20,15 +23,21 @@ const SHA3_256: &str = "sha3-256"; #[allow(missing_debug_implementations)] pub(crate) struct IpfsCli(Arc); -impl Clone for IpfsCli { - fn clone(&self) -> Self { - IpfsCli(Arc::clone(&self.0)) +impl IpfsCli { + /// Create a new [IpfsCli] from a [IpfsClient]. + pub(crate) fn new(settings: &settings::Ipfs) -> Result { + let cli = Self(Arc::new(IpfsClient::from_host_and_port( + Scheme::HTTP, + settings.host.as_str(), + settings.port, + )?)); + Ok(cli) } } -impl Default for IpfsCli { - fn default() -> Self { - Self(Arc::new(IpfsClient::default())) +impl Clone for IpfsCli { + fn clone(&self) -> Self { + IpfsCli(Arc::clone(&self.0)) } } diff --git a/homestar-runtime/src/network/pubsub.rs b/homestar-runtime/src/network/pubsub.rs index 710b2c41..ae570cae 100644 --- a/homestar-runtime/src/network/pubsub.rs +++ b/homestar-runtime/src/network/pubsub.rs @@ -34,9 +34,10 @@ pub(crate) fn new(keypair: Keypair, settings: &settings::Node) -> Result)), /// Acknowledgement of a [Workflow] run. AckWorkflow((Cid, FastStr)), + /// TODO + GetNodeInfo, + /// TODO + AckNodeInfo(NodeInfo), } /// WebSocket server fields. @@ -208,6 +224,7 @@ impl Server { rpc::METRICS_ENDPOINT, )?) .layer(cors) + .layer(SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION))) .timeout(self.webserver_timeout); let runtime_hdl = Handle::current(); @@ -235,11 +252,13 @@ fn port_available(host: IpAddr, port: u16) -> bool { #[cfg(test)] mod test { use super::*; - use crate::{event_handler::notification::ReceiptNotification, settings::Settings}; - use homestar_core::test_utils; + #[cfg(feature = "websocket-notify")] + use crate::event_handler::notification::ReceiptNotification; + use crate::{db::Database, settings::Settings}; #[cfg(feature = "websocket-notify")] use homestar_core::{ ipld::DagJson, + test_utils, workflow::{config::Resources, instruction::RunInstruction, prf::UcanPrf, Task}, }; #[cfg(feature = "websocket-notify")] @@ -249,14 +268,8 @@ mod test { use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClientBuilder}; #[cfg(feature = "websocket-notify")] use notifier::{self, Header}; - use serial_test::file_serial; use tokio::sync::mpsc; - fn set_ports(settings: &mut Settings) { - settings.node.network.metrics_port = test_utils::ports::get_port() as u16; - settings.node.network.webserver_port = test_utils::ports::get_port() as u16; - } - async fn metrics_handle(settings: Settings) -> PrometheusHandle { #[cfg(feature = "monitoring")] let metrics_hdl = crate::metrics::start(settings.monitoring(), settings.node.network()) @@ -271,237 +284,246 @@ mod test { metrics_hdl } - #[tokio::test] - #[file_serial] - async fn ws_connect() { - let mut settings = Settings::load().unwrap(); - set_ports(&mut settings); - let server = Server::new(settings.node().network()).unwrap(); - let metrics_hdl = metrics_handle(settings).await; - let (runner_tx, _runner_rx) = mpsc::channel(1); - server.start(runner_tx, metrics_hdl).await.unwrap(); - - let ws_url = format!("ws://{}", server.addr); - let http_url = format!("http://{}", server.addr); - - tokio_tungstenite::connect_async(ws_url.clone()) - .await - .unwrap(); - - let client = WsClientBuilder::default().build(ws_url).await.unwrap(); - let ws_resp: serde_json::Value = client - .request(rpc::HEALTH_ENDPOINT, rpc_params![]) - .await - .unwrap(); - - assert_eq!(ws_resp, serde_json::json!({"healthy": true})); - let http_resp = reqwest::get(format!("{}/health", http_url)).await.unwrap(); - assert_eq!(http_resp.status(), 200); - let http_resp = http_resp.json::().await.unwrap(); - assert_eq!(http_resp, serde_json::json!({"healthy": true})); + #[homestar_runtime_proc_macro::runner_test] + fn ws_connect() { + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network()).unwrap(); + let metrics_hdl = metrics_handle(settings).await; + let (runner_tx, _runner_rx) = mpsc::channel(1); + server.start(runner_tx, metrics_hdl).await.unwrap(); + + let ws_url = format!("ws://{}", server.addr); + let http_url = format!("http://{}", server.addr); + + tokio_tungstenite::connect_async(ws_url.clone()) + .await + .unwrap(); + + let client = WsClientBuilder::default().build(ws_url).await.unwrap(); + let ws_resp: serde_json::Value = client + .request(rpc::HEALTH_ENDPOINT, rpc_params![]) + .await + .unwrap(); + let peer_id = + libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC") + .unwrap(); + let nodeinfo = NodeInfo::new(peer_id); + assert_eq!( + ws_resp, + serde_json::json!({"healthy": true, "nodeInfo": nodeinfo}) + ); + let http_resp = reqwest::get(format!("{}/health", http_url)).await.unwrap(); + assert_eq!(http_resp.status(), 200); + let http_resp = http_resp.json::().await.unwrap(); + assert_eq!( + http_resp, + serde_json::json!({"healthy": true, "nodeInfo": nodeinfo}) + ); + }); unsafe { metrics::clear_recorder() } } #[cfg(feature = "monitoring")] - #[tokio::test] - #[file_serial] + #[homestar_runtime_proc_macro::runner_test] async fn ws_metrics_no_prefix() { - let mut settings = Settings::load().unwrap(); - set_ports(&mut settings); - settings.monitoring.process_collector_interval = Duration::from_millis(100); - let server = Server::new(settings.node().network()).unwrap(); - let metrics_hdl = metrics_handle(settings).await; - let (runner_tx, _runner_rx) = mpsc::channel(1); - server.start(runner_tx, metrics_hdl).await.unwrap(); - - let ws_url = format!("ws://{}", server.addr); - - // wait for interval to pass - std::thread::sleep(Duration::from_millis(100)); - - let client = WsClientBuilder::default().build(ws_url).await.unwrap(); - let ws_resp1: serde_json::Value = client - .request(rpc::METRICS_ENDPOINT, rpc_params![]) - .await - .unwrap(); - - let len = if let serde_json::Value::Array(array) = &ws_resp1["metrics"] { - array.len() - } else { - panic!("expected array"); - }; - - assert!(len > 0); - - unsafe { metrics::clear_recorder() } + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network()).unwrap(); + let metrics_hdl = metrics_handle(settings).await; + let (runner_tx, _runner_rx) = mpsc::channel(1); + server.start(runner_tx, metrics_hdl).await.unwrap(); + + let ws_url = format!("ws://{}", server.addr); + + // wait for interval to pass + std::thread::sleep(Duration::from_millis(150)); + + let client = WsClientBuilder::default().build(ws_url).await.unwrap(); + let ws_resp1: serde_json::Value = client + .request(rpc::METRICS_ENDPOINT, rpc_params![]) + .await + .unwrap(); + + let len = if let serde_json::Value::Array(array) = &ws_resp1["metrics"] { + array.len() + } else { + panic!("expected array"); + }; + + assert!(len > 0); + + unsafe { metrics::clear_recorder() } + }); } #[cfg(feature = "websocket-notify")] - #[tokio::test] - #[file_serial] + #[homestar_runtime_proc_macro::runner_test] async fn ws_subscribe_unsubscribe_network_events() { - let mut settings = Settings::load().unwrap(); - set_ports(&mut settings); - let server = Server::new(settings.node().network()).unwrap(); - let metrics_hdl = metrics_handle(settings).await; - let (runner_tx, _runner_rx) = mpsc::channel(1); - server.start(runner_tx, metrics_hdl).await.unwrap(); - - let ws_url = format!("ws://{}", server.addr); - - let client1 = WsClientBuilder::default().build(ws_url).await.unwrap(); - let mut sub: Subscription> = client1 - .subscribe( - rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, - rpc_params![], - rpc::UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, - ) - .await - .unwrap(); - - // send any bytes through (Vec) - let (invocation_receipt, runtime_receipt) = crate::test_utils::receipt::receipts(); - let receipt = ReceiptNotification::with(invocation_receipt, runtime_receipt.cid(), None); - server - .evt_notifier - .notify(notifier::Message::new( - Header::new( - notifier::SubscriptionTyp::EventSub( - rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string(), + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network()).unwrap(); + let metrics_hdl = metrics_handle(settings).await; + let (runner_tx, _runner_rx) = mpsc::channel(1); + server.start(runner_tx, metrics_hdl).await.unwrap(); + + let ws_url = format!("ws://{}", server.addr); + + let client1 = WsClientBuilder::default().build(ws_url).await.unwrap(); + let mut sub: Subscription> = client1 + .subscribe( + rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + rpc::UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + // send any bytes through (Vec) + let (invocation_receipt, runtime_receipt) = crate::test_utils::receipt::receipts(); + let receipt = + ReceiptNotification::with(invocation_receipt, runtime_receipt.cid(), None); + server + .evt_notifier + .notify(notifier::Message::new( + Header::new( + notifier::SubscriptionTyp::EventSub( + rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string(), + ), + None, ), - None, - ), - receipt.to_json().unwrap(), - )) - .unwrap(); - - // send an unknown msg: this should be dropped - server - .evt_notifier - .notify(notifier::Message::new( - Header::new( - notifier::SubscriptionTyp::EventSub("test".to_string()), - None, - ), - vec![], - )) - .unwrap(); - - server - .evt_notifier - .notify(notifier::Message::new( - Header::new( - notifier::SubscriptionTyp::EventSub( - rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string(), + receipt.to_json().unwrap(), + )) + .unwrap(); + + // send an unknown msg: this should be dropped + server + .evt_notifier + .notify(notifier::Message::new( + Header::new( + notifier::SubscriptionTyp::EventSub("test".to_string()), + None, ), - None, - ), - receipt.to_json().unwrap(), - )) - .unwrap(); + vec![], + )) + .unwrap(); + + server + .evt_notifier + .notify(notifier::Message::new( + Header::new( + notifier::SubscriptionTyp::EventSub( + rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string(), + ), + None, + ), + receipt.to_json().unwrap(), + )) + .unwrap(); - let msg1 = sub.next().await.unwrap().unwrap(); - let returned1: ReceiptNotification = DagJson::from_json(&msg1).unwrap(); - assert_eq!(returned1, receipt); + let msg1 = sub.next().await.unwrap().unwrap(); + let returned1: ReceiptNotification = DagJson::from_json(&msg1).unwrap(); + assert_eq!(returned1, receipt); - let msg2 = sub.next().await.unwrap().unwrap(); - let _returned1: ReceiptNotification = DagJson::from_json(&msg2).unwrap(); + let msg2 = sub.next().await.unwrap().unwrap(); + let _returned1: ReceiptNotification = DagJson::from_json(&msg2).unwrap(); - assert!(sub.unsubscribe().await.is_ok()); + assert!(sub.unsubscribe().await.is_ok()); - unsafe { metrics::clear_recorder() } + unsafe { metrics::clear_recorder() } + }); } #[cfg(feature = "websocket-notify")] - #[tokio::test] - #[file_serial] + #[homestar_runtime_proc_macro::runner_test] async fn ws_subscribe_workflow_incorrect_params() { - let mut settings = Settings::load().unwrap(); - set_ports(&mut settings); - let server = Server::new(settings.node().network()).unwrap(); - let metrics_hdl = metrics_handle(settings).await; - let (runner_tx, _runner_rx) = mpsc::channel(1); - server.start(runner_tx, metrics_hdl).await.unwrap(); - - let ws_url = format!("ws://{}", server.addr); - - let client = WsClientBuilder::default().build(ws_url).await.unwrap(); - let sub: Result>, jsonrpsee::core::error::Error> = client - .subscribe( - rpc::SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - rpc_params![], - rpc::UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - ) - .await; - - assert!(sub.is_err()); - - if let Err(jsonrpsee::core::error::Error::Call(err)) = sub { - let check = ErrorCode::InvalidParams; - assert_eq!(err.code(), check.code()); - } else { - panic!("expected same error code"); - } - - unsafe { metrics::clear_recorder() } + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network()).unwrap(); + let metrics_hdl = metrics_handle(settings).await; + let (runner_tx, _runner_rx) = mpsc::channel(1); + server.start(runner_tx, metrics_hdl).await.unwrap(); + + let ws_url = format!("ws://{}", server.addr); + + let client = WsClientBuilder::default().build(ws_url).await.unwrap(); + let sub: Result>, jsonrpsee::core::error::Error> = client + .subscribe( + rpc::SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + rpc_params![], + rpc::UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + ) + .await; + + assert!(sub.is_err()); + + if let Err(jsonrpsee::core::error::Error::Call(err)) = sub { + let check = ErrorCode::InvalidParams; + assert_eq!(err.code(), check.code()); + } else { + panic!("expected same error code"); + } + + unsafe { metrics::clear_recorder() } + }); } #[cfg(feature = "websocket-notify")] - #[tokio::test] - #[file_serial] + #[homestar_runtime_proc_macro::runner_test] async fn ws_subscribe_workflow_runner_timeout() { - let mut settings = Settings::load().unwrap(); - set_ports(&mut settings); - let server = Server::new(settings.node().network()).unwrap(); - let metrics_hdl = metrics_handle(settings).await; - let (runner_tx, _runner_rx) = mpsc::channel(1); - server.start(runner_tx, metrics_hdl).await.unwrap(); - - let ws_url = format!("ws://{}", server.addr); - - let config = Resources::default(); - let instruction1 = test_utils::workflow::instruction::(); - let (instruction2, _) = test_utils::workflow::wasm_instruction_with_nonce::(); - - let task1 = Task::new( - RunInstruction::Expanded(instruction1), - config.clone().into(), - UcanPrf::default(), - ); - let task2 = Task::new( - RunInstruction::Expanded(instruction2), - config.into(), - UcanPrf::default(), - ); - - let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let run_str = format!( - r#"{{"name": "test","workflow": {}}}"#, - workflow.to_json_string().unwrap() - ); - - let run: serde_json::Value = serde_json::from_str(&run_str).unwrap(); - let client = WsClientBuilder::default().build(ws_url).await.unwrap(); - let sub: Result>, jsonrpsee::core::error::Error> = client - .subscribe( - rpc::SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - rpc_params![run], - rpc::UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, - ) - .await; - - assert!(sub.is_err()); - - // Assure error is not on parse of params, but due to runner - // timeout (as runner is not available). - if let Err(jsonrpsee::core::error::Error::Call(err)) = sub { - let check = ErrorCode::InternalError; - assert_eq!(err.code(), check.code()); - } else { - panic!("expected same error code"); - } - - unsafe { metrics::clear_recorder() } + let TestRunner { runner, settings } = TestRunner::start(); + runner.runtime.block_on(async { + let server = Server::new(settings.node().network()).unwrap(); + let metrics_hdl = metrics_handle(settings).await; + let (runner_tx, _runner_rx) = mpsc::channel(1); + server.start(runner_tx, metrics_hdl).await.unwrap(); + + let ws_url = format!("ws://{}", server.addr); + + let config = Resources::default(); + let instruction1 = test_utils::workflow::instruction::(); + let (instruction2, _) = test_utils::workflow::wasm_instruction_with_nonce::(); + + let task1 = Task::new( + RunInstruction::Expanded(instruction1), + config.clone().into(), + UcanPrf::default(), + ); + let task2 = Task::new( + RunInstruction::Expanded(instruction2), + config.into(), + UcanPrf::default(), + ); + + let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); + let run_str = format!( + r#"{{"name": "test","workflow": {}}}"#, + workflow.to_json_string().unwrap() + ); + + let run: serde_json::Value = serde_json::from_str(&run_str).unwrap(); + let client = WsClientBuilder::default().build(ws_url).await.unwrap(); + let sub: Result>, jsonrpsee::core::error::Error> = client + .subscribe( + rpc::SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + rpc_params![run], + rpc::UNSUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + ) + .await; + + assert!(sub.is_err()); + + // Assure error is not on parse of params, but due to runner + // timeout (as runner is not available). + if let Err(jsonrpsee::core::error::Error::Call(err)) = sub { + let check = ErrorCode::ServerIsBusy; + assert_eq!(err.code(), check.code()); + } else { + panic!("expected same error code"); + } + + unsafe { metrics::clear_recorder() } + }); } } diff --git a/homestar-runtime/src/network/webserver/rpc.rs b/homestar-runtime/src/network/webserver/rpc.rs index 67461f7a..509e1243 100644 --- a/homestar-runtime/src/network/webserver/rpc.rs +++ b/homestar-runtime/src/network/webserver/rpc.rs @@ -1,9 +1,7 @@ -use super::{listener, prom::PrometheusData}; #[cfg(feature = "websocket-notify")] -use super::{ - notifier::{self, Header, Notifier, SubscriptionTyp}, - Message, -}; +use super::notifier::{self, Header, Notifier, SubscriptionTyp}; +#[allow(unused_imports)] +use super::{listener, prom::PrometheusData, Message}; use crate::runner::WsSender; #[cfg(feature = "websocket-notify")] use anyhow::anyhow; @@ -16,7 +14,7 @@ use faststr::FastStr; use futures::StreamExt; use jsonrpsee::{ server::RpcModule, - types::{error::ErrorCode, ErrorObjectOwned}, + types::error::{ErrorCode, ErrorObject}, }; #[cfg(feature = "websocket-notify")] use jsonrpsee::{types::SubscriptionId, SubscriptionMessage, SubscriptionSink, TrySendError}; @@ -27,16 +25,18 @@ use metrics_exporter_prometheus::PrometheusHandle; use std::sync::Arc; use std::time::Duration; #[cfg(feature = "websocket-notify")] +use tokio::{runtime::Handle, select}; +#[allow(unused_imports)] use tokio::{ - runtime::Handle, - select, sync::oneshot, time::{self, Instant}, }; #[cfg(feature = "websocket-notify")] use tokio_stream::wrappers::BroadcastStream; +#[allow(unused_imports)] +use tracing::warn; #[cfg(feature = "websocket-notify")] -use tracing::{error, info, warn}; +use tracing::{debug, error, info}; /// Health endpoint. pub(crate) const HEALTH_ENDPOINT: &str = "health"; @@ -135,8 +135,34 @@ impl JsonRpc { async fn register(ctx: Context) -> Result> { let mut module = RpcModule::new(ctx); + #[cfg(not(test))] + module.register_async_method(HEALTH_ENDPOINT, |_, ctx| async move { + let (tx, rx) = oneshot::channel(); + ctx.runner_sender + .send((Message::GetNodeInfo, Some(tx))) + .await + .map_err(|err| internal_err(err.to_string()))?; + + if let Ok(Ok(Message::AckNodeInfo(info))) = + time::timeout_at(Instant::now() + ctx.receiver_timeout, rx).await + { + Ok(serde_json::json!({ "healthy": true, "nodeInfo": info})) + } else { + warn!(sub = HEALTH_ENDPOINT, "did not acknowledge message in time"); + Err(internal_err("failed to get node information".to_string())) + } + })?; + + #[cfg(test)] module.register_async_method(HEALTH_ENDPOINT, |_, _| async move { - serde_json::json!({ "healthy": true }) + use crate::runner::NodeInfo; + use std::str::FromStr; + let peer_id = + libp2p::PeerId::from_str("12D3KooWRNw2pJC9748Fmq4WNV27HoSTcX3r37132FLkQMrbKAiC") + .unwrap(); + Ok::>(serde_json::json!({ + "healthy": true, "nodeInfo": NodeInfo::new(peer_id) + })) })?; module.register_async_method(METRICS_ENDPOINT, |params, ctx| async move { @@ -144,12 +170,15 @@ impl JsonRpc { // TODO: Handle prefix specific metrics in parser. match params.one::() { - Ok(listener::MetricsPrefix { prefix: _prefix }) => { - PrometheusData::from_string(&render) - .map_err(|_err| ErrorObjectOwned::from(ErrorCode::InternalError)) - } + Ok(listener::MetricsPrefix { prefix }) => PrometheusData::from_string(&render) + .map_err(|err| { + internal_err(format!( + "failed to render metrics @prefix {} : {:#?}", + prefix, err + )) + }), Err(_) => PrometheusData::from_string(&render) - .map_err(|_err| ErrorObjectOwned::from(ErrorCode::InternalError)), + .map_err(|err| internal_err(format!("failed to render metrics: {:#?}", err))), } })?; @@ -182,7 +211,7 @@ impl JsonRpc { Ok(listener::Run { name, workflow }) => { let (tx, rx) = oneshot::channel(); ctx.runner_sender - .send((Message::RunWorkflow((name, workflow)), Some(tx))) + .send((Message::RunWorkflow((name.clone(), workflow)), Some(tx))) .await?; if let Ok(Ok(Message::AckWorkflow((cid, name)))) = @@ -195,11 +224,13 @@ impl JsonRpc { let stream = BroadcastStream::new(rx); Self::handle_workflow_subscription(sink, stream, ctx).await?; } else { - warn!("did not acknowledge message in time"); + warn!( + sub = SUBSCRIBE_RUN_WORKFLOW_ENDPOINT, + workflow_name = name.to_string(), + "did not acknowledge message in time" + ); let _ = pending - .reject(ErrorObjectOwned::from(ErrorObjectOwned::from( - ErrorCode::InternalError, - ))) + .reject(busy_err("workflow not able to run workflow: {cid}")) .await; } } @@ -287,6 +318,9 @@ impl JsonRpc { .and_then(|v| { let (v_cid, v_name) = v.value(); if v_cid == &cid && (Some(v_name) == ident.as_ref() || ident.is_none()) { + debug!(cid = cid.to_string(), + ident = ident.clone().unwrap_or( + "undefined".into()).to_string(), "received message"); Some(payload) } else { None @@ -329,3 +363,12 @@ impl JsonRpc { Ok(()) } } + +fn internal_err<'a, T: ToString>(msg: T) -> ErrorObject<'a> { + ErrorObject::owned(ErrorCode::InternalError.code(), msg.to_string(), None::<()>) +} + +#[allow(dead_code)] +fn busy_err<'a, T: ToString>(msg: T) -> ErrorObject<'a> { + ErrorObject::owned(ErrorCode::ServerIsBusy.code(), msg.to_string(), None::<()>) +} diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index 3511f26f..06f59ff1 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -8,15 +8,18 @@ use crate::{ db::Database, event_handler::{Event, EventHandler}, network::{rpc, swarm, webserver}, + tasks::Fetch, worker::WorkerMessage, - workflow, Settings, Worker, + workflow::{self, Resource}, + Settings, Worker, }; use anyhow::{anyhow, Context, Result}; use atomic_refcell::AtomicRefCell; use chrono::NaiveDateTime; use dashmap::DashMap; use faststr::FastStr; -use futures::future::poll_fn; +use fnv::FnvHashSet; +use futures::{future::poll_fn, FutureExt}; use homestar_core::Workflow; use homestar_wasm::io::Arg; use jsonrpsee::server::ServerHandle; @@ -40,8 +43,10 @@ use tracing::{error, info, warn}; mod error; pub(crate) mod file; +mod nodeinfo; pub(crate) mod response; pub(crate) use error::Error; +pub(crate) use nodeinfo::NodeInfo; #[cfg(not(test))] const HOMESTAR_THREAD: &str = "homestar-runtime"; @@ -100,10 +105,11 @@ impl ModifiedSet for RunningTaskSet { pub struct Runner { event_sender: Arc>, expiration_queue: Rc>>, + node_info: NodeInfo, running_tasks: Arc, running_workers: RunningWorkerSet, - runtime: tokio::runtime::Runtime, - settings: Arc, + pub(crate) runtime: tokio::runtime::Runtime, + pub(crate) settings: Arc, webserver: Arc, } @@ -158,6 +164,7 @@ impl Runner { runtime: tokio::runtime::Runtime, ) -> Result { let swarm = runtime.block_on(swarm::new(settings.node()))?; + let peer_id = *swarm.local_peer_id(); let webserver = webserver::Server::new(settings.node().network())?; @@ -178,7 +185,7 @@ impl Runner { #[cfg(feature = "ipfs")] let _event_handler_hdl = runtime.spawn({ - let ipfs = IpfsCli::default(); + let ipfs = IpfsCli::new(settings.node.network.ipfs())?; event_handler.start(ipfs) }); @@ -188,6 +195,7 @@ impl Runner { Ok(Self { event_sender, expiration_queue: Rc::new(AtomicRefCell::new(DelayQueue::new())), + node_info: NodeInfo::new(peer_id), running_tasks: DashMap::new().into(), running_workers: DashMap::new(), runtime, @@ -263,27 +271,39 @@ impl Runner { _ => {} } } - Some((webserver::Message::RunWorkflow((name, workflow)), Some(oneshot_tx))) = ws_receiver.recv() => { - info!("running workflow: {}", name); - // TODO: Parse this from the workflow data itself. - let workflow_settings = workflow::Settings::default(); - match self.run_worker( - workflow, - workflow_settings, - Some(name), - runner_worker_tx.clone(), - db.clone(), - ).await { - Ok(data) => { - info!("sending message to rpc server"); - let _ = oneshot_tx.send(webserver::Message::AckWorkflow((data.info.cid, data.name))); + Some(msg) = ws_receiver.recv() => { + println!("ws message: {:?}", msg); + match msg { + (webserver::Message::RunWorkflow((name, workflow)), Some(oneshot_tx)) => { + info!("running workflow: {}", name); + // TODO: Parse this from the workflow data itself. + let workflow_settings = workflow::Settings::default(); + match self.run_worker( + workflow, + workflow_settings, + Some(name), + runner_worker_tx.clone(), + db.clone(), + ).await { + Ok(data) => { + info!("sending message to rpc server"); + let _ = oneshot_tx.send(webserver::Message::AckWorkflow((data.info.cid, data.name))); + } + Err(err) => { + error!(err=?err, "error handling ws message"); + let _ = oneshot_tx.send(webserver::Message::RunErr(err.into())); + } + } + } - Err(err) => { - error!(err=?err, "error handling ws message"); - let _ = oneshot_tx.send(webserver::Message::RunErr(err.into())); + (webserver::Message::GetNodeInfo, Some(oneshot_tx)) => { + info!("getting node info"); + let _ = oneshot_tx.send(webserver::Message::AckNodeInfo(self.node_info.clone())); } + _ => () } } + // Handle messages from the worker. Some(msg) = runner_worker_rx.recv() => { match msg { @@ -584,6 +604,7 @@ impl Runner { let initial_info = Arc::clone(&worker.workflow_info); let workflow_timeout = worker.workflow_settings.timeout; let workflow_name = worker.workflow_name.clone(); + let workflow_settings = worker.workflow_settings.clone(); let timestamp = worker.workflow_started; // Spawn worker, which initializees the scheduler and runs @@ -602,7 +623,23 @@ impl Runner { )) .await?; - let handle = self.runtime.spawn(worker.run(self.running_tasks())); + #[cfg(feature = "ipfs")] + let fetch_fn = { + let settings = Arc::clone(&self.settings); + let ipfs = IpfsCli::new(settings.node.network.ipfs())?; + move |rscs: FnvHashSet| { + async move { Fetch::get_resources(rscs, workflow_settings, ipfs).await }.boxed() + } + }; + + #[cfg(not(feature = "ipfs"))] + let fetch_fn = |rscs: FnvHashSet| { + async move { Fetch::get_resources(rscs, workflow_settings).await }.boxed() + }; + + let handle = self + .runtime + .spawn(worker.run(self.running_tasks(), fetch_fn)); // Add Cid to expirations timing wheel let delay_key = self @@ -641,12 +678,10 @@ mod test { use crate::{network::rpc::Client, test_utils::WorkerBuilder}; use homestar_core::test_utils as core_test_utils; use rand::thread_rng; - use serial_test::file_serial; use std::net::SocketAddr; use tarpc::context; use tokio::net::TcpStream; - #[file_serial] #[homestar_runtime_proc_macro::runner_test] fn shutdown() { let TestRunner { runner, settings } = TestRunner::start(); @@ -730,10 +765,14 @@ mod test { let TestRunner { runner, settings } = TestRunner::start(); runner.runtime.block_on(async { - let worker = WorkerBuilder::new(settings.node).build().await; + let builder = WorkerBuilder::new(settings.node); + let fetch_fn = builder.fetch_fn(); + let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; let workflow_timeout = worker.workflow_settings.timeout; - let handle = runner.runtime.spawn(worker.run(runner.running_tasks())); + let handle = runner + .runtime + .spawn(worker.run(runner.running_tasks(), fetch_fn)); let delay_key = runner .expiration_queue .try_borrow_mut() @@ -763,10 +802,14 @@ mod test { let TestRunner { runner, settings } = TestRunner::start(); runner.runtime.block_on(async { - let worker = WorkerBuilder::new(settings.node).build().await; + let builder = WorkerBuilder::new(settings.node); + let fetch_fn = builder.fetch_fn(); + let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; let workflow_timeout = worker.workflow_settings.timeout; - let handle = runner.runtime.spawn(worker.run(runner.running_tasks())); + let handle = runner + .runtime + .spawn(worker.run(runner.running_tasks(), fetch_fn)); let delay_key = runner .expiration_queue .try_borrow_mut() @@ -787,10 +830,14 @@ mod test { let TestRunner { runner, settings } = TestRunner::start(); runner.runtime.block_on(async { - let worker = WorkerBuilder::new(settings.node).build().await; + let builder = WorkerBuilder::new(settings.node); + let fetch_fn = builder.fetch_fn(); + let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; let workflow_timeout = worker.workflow_settings.timeout; - let handle = runner.runtime.spawn(worker.run(runner.running_tasks())); + let handle = runner + .runtime + .spawn(worker.run(runner.running_tasks(), fetch_fn)); let delay_key = runner .expiration_queue .try_borrow_mut() @@ -826,8 +873,10 @@ mod test { fn gc_while_workers_finished() { let TestRunner { runner, settings } = TestRunner::start(); runner.runtime.block_on(async { - let worker = WorkerBuilder::new(settings.node).build().await; - let _ = worker.run(runner.running_tasks()).await; + let builder = WorkerBuilder::new(settings.node); + let fetch_fn = builder.fetch_fn(); + let worker = builder.build().await; + let _ = worker.run(runner.running_tasks(), fetch_fn).await; }); runner.running_tasks.iter().for_each(|handles| { diff --git a/homestar-runtime/src/runner/nodeinfo.rs b/homestar-runtime/src/runner/nodeinfo.rs new file mode 100644 index 00000000..fd916151 --- /dev/null +++ b/homestar-runtime/src/runner/nodeinfo.rs @@ -0,0 +1,21 @@ +use libp2p::PeerId; +use serde::{Deserialize, Serialize}; + +/// TODO +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct NodeInfo { + pub(crate) peer_id: PeerId, +} + +impl NodeInfo { + /// TODO + pub(crate) fn new(peer_id: PeerId) -> Self { + Self { peer_id } + } + + /// TODO + #[allow(dead_code)] + pub(crate) fn peer_id(&self) -> &PeerId { + &self.peer_id + } +} diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index 4ca8ca07..df3f48bf 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -106,7 +106,7 @@ impl<'a> TaskScheduler<'a> { let mut_graph = Arc::make_mut(&mut graph); let schedule: &mut Schedule<'a> = mut_graph.schedule.as_mut(); let schedule_length = schedule.len(); - let mut resources_to_fetch: FnvHashSet = FnvHashSet::default(); + let mut resources_to_fetch = vec![]; let resume = 'resume: { for (idx, vec) in schedule.iter().enumerate().rev() { @@ -117,7 +117,7 @@ impl<'a> TaskScheduler<'a> { .get(&cid) .map(|resource| { resource.iter().for_each(|rsc| { - resources_to_fetch.insert(rsc.to_owned()); + resources_to_fetch.push((cid, rsc)); }); ptrs.push(Pointer::new(cid)); }) @@ -131,6 +131,13 @@ impl<'a> TaskScheduler<'a> { let linkmap = found.iter().fold( LinkMap::>::new(), |mut map, receipt| { + if let Some(idx) = resources_to_fetch + .iter() + .position(|(cid, _rsc)| cid == &receipt.instruction().cid()) + { + resources_to_fetch.swap_remove(idx); + } + let _ = map.insert( receipt.instruction().cid(), receipt.output_as_arg(), @@ -160,6 +167,10 @@ impl<'a> TaskScheduler<'a> { ControlFlow::Continue(()) }; + let resources_to_fetch: FnvHashSet = resources_to_fetch + .into_iter() + .map(|(_, rsc)| rsc.to_owned()) + .collect(); let fetched = fetch_fn(resources_to_fetch).await?; match resume { diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index 3b2ecdea..ed301134 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -4,6 +4,8 @@ use config::{Config, ConfigError, Environment, File}; use http::Uri; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr, DurationMilliSeconds, DurationSeconds}; +#[cfg(feature = "ipfs")] +use std::net::Ipv4Addr; use std::{ net::{IpAddr, Ipv6Addr}, path::PathBuf, @@ -34,26 +36,17 @@ impl Settings { } /// Monitoring settings. -#[cfg(feature = "monitoring")] #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct Monitoring { /// Tokio console port. pub console_subscriber_port: u16, /// Monitoring collection interval in milliseconds. + #[cfg(feature = "monitoring")] #[serde_as(as = "DurationMilliSeconds")] pub process_collector_interval: Duration, } -/// Monitoring settings. -#[cfg(not(feature = "monitoring"))] -#[serde_as] -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct Monitoring { - /// Tokio console port. - pub console_subscriber_port: u16, -} - /// Server settings. #[serde_as] #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] @@ -123,6 +116,14 @@ pub struct Network { /// Pub/sub idle timeout #[serde_as(as = "DurationSeconds")] pub(crate) pubsub_idle_timeout: Duration, + /// TODO + pub(crate) pubsub_mesh_n_low: usize, + /// TODO + pub(crate) pubsub_mesh_n_high: usize, + /// TODO + pub(crate) pubsub_mesh_n: usize, + /// TODO + pub(crate) pubsub_mesh_outbound_min: usize, /// Quorum for receipt records on the DHT. pub(crate) receipt_quorum: usize, /// RPC-server port. @@ -172,6 +173,20 @@ pub struct Network { /// Event handler poll cache interval in milliseconds. #[serde_as(as = "DurationMilliSeconds")] pub(crate) poll_cache_interval: Duration, + /// TODO + #[cfg(feature = "ipfs")] + pub(crate) ipfs: Ipfs, +} + +#[cfg(feature = "ipfs")] +#[serde_as] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(default)] +pub(crate) struct Ipfs { + /// TODO + pub(crate) host: String, + /// TODO + pub(crate) port: u16, } /// Database-related settings for a homestar node. @@ -210,6 +225,16 @@ impl Default for Monitoring { } } +#[cfg(feature = "ipfs")] +impl Default for Ipfs { + fn default() -> Self { + Self { + host: Ipv4Addr::LOCALHOST.to_string(), + port: 5001, + } + } +} + impl Default for Database { fn default() -> Self { Self { @@ -239,6 +264,10 @@ impl Default for Network { pubsub_duplication_cache_time: Duration::new(1, 0), pubsub_heartbeat: Duration::new(60, 0), pubsub_idle_timeout: Duration::new(60 * 60 * 24, 0), + pubsub_mesh_n_low: 1, + pubsub_mesh_n_high: 10, + pubsub_mesh_n: 2, + pubsub_mesh_outbound_min: 1, receipt_quorum: 2, rpc_host: IpAddr::V6(Ipv6Addr::LOCALHOST), rpc_max_connections: 10, @@ -247,7 +276,7 @@ impl Default for Network { transport_connection_timeout: Duration::new(20, 0), webserver_host: Uri::from_static("127.0.0.1"), webserver_port: 1337, - webserver_timeout: Duration::new(60, 0), + webserver_timeout: Duration::new(120, 0), websocket_capacity: 1024, websocket_receiver_timeout: Duration::from_millis(200), workflow_quorum: 3, @@ -257,6 +286,8 @@ impl Default for Network { max_connected_peers: 32, max_announce_addresses: 10, poll_cache_interval: Duration::from_millis(1000), + #[cfg(feature = "ipfs")] + ipfs: Default::default(), } } } @@ -272,6 +303,14 @@ impl Node { } } +impl Network { + /// TODO + #[cfg(feature = "ipfs")] + pub(crate) fn ipfs(&self) -> &Ipfs { + &self.ipfs + } +} + fn default_shutdown_timeout() -> Duration { Duration::new(20, 0) } diff --git a/homestar-runtime/src/test_utils/worker_builder.rs b/homestar-runtime/src/test_utils/worker_builder.rs index 4546fdb6..587800cc 100644 --- a/homestar-runtime/src/test_utils/worker_builder.rs +++ b/homestar-runtime/src/test_utils/worker_builder.rs @@ -1,10 +1,20 @@ //! Module for building out [Worker]s for testing purposes. use super::{db::MemoryDb, event}; +#[cfg(feature = "ipfs")] +use crate::network::IpfsCli; use crate::{ - channel::AsyncBoundedChannelSender, db::Database, event_handler::Event, settings, - worker::WorkerMessage, workflow, Settings, Worker, + channel::AsyncBoundedChannelSender, + db::Database, + event_handler::Event, + settings, + tasks::Fetch, + worker::WorkerMessage, + workflow::{self, Resource}, + Settings, Worker, }; +use fnv::FnvHashSet; +use futures::{future::BoxFuture, FutureExt}; use homestar_core::{ ipld::DagCbor, test_utils::workflow as workflow_test_utils, @@ -12,9 +22,24 @@ use homestar_core::{ Workflow, }; use homestar_wasm::io::Arg; +use indexmap::IndexMap; use libipld::Cid; use tokio::sync::mpsc; +/// TODO +#[cfg(feature = "ipfs")] +pub(crate) struct WorkerBuilder<'a> { + db: MemoryDb, + event_sender: AsyncBoundedChannelSender, + ipfs: IpfsCli, + runner_sender: mpsc::Sender, + name: Option, + workflow: Workflow<'a, Arg>, + workflow_settings: workflow::Settings, +} + +/// TODO +#[cfg(not(feature = "ipfs"))] pub(crate) struct WorkerBuilder<'a> { db: MemoryDb, event_sender: AsyncBoundedChannelSender, @@ -31,12 +56,12 @@ impl<'a> WorkerBuilder<'a> { let (instruction1, instruction2, _) = workflow_test_utils::related_wasm_instructions::(); let task1 = Task::new( - RunInstruction::Expanded(instruction1), + RunInstruction::Expanded(instruction1.clone()), config.clone().into(), UcanPrf::default(), ); let task2 = Task::new( - RunInstruction::Expanded(instruction2), + RunInstruction::Expanded(instruction2.clone()), config.into(), UcanPrf::default(), ); @@ -46,13 +71,31 @@ impl<'a> WorkerBuilder<'a> { let workflow = Workflow::new(vec![task1, task2]); let workflow_cid = workflow.clone().to_cid().unwrap(); - Self { - db: MemoryDb::setup_connection_pool(&settings, None).unwrap(), - event_sender: evt_tx, - runner_sender: wk_tx, - name: Some(workflow_cid.to_string()), - workflow, - workflow_settings: workflow::Settings::default(), + + #[cfg(feature = "ipfs")] + { + let ipfs = IpfsCli::new(settings.network.ipfs()).unwrap(); + Self { + db: MemoryDb::setup_connection_pool(&settings, None).unwrap(), + event_sender: evt_tx, + ipfs: ipfs.clone(), + runner_sender: wk_tx, + name: Some(workflow_cid.to_string()), + workflow, + workflow_settings: workflow::Settings::default(), + } + } + + #[cfg(not(feature = "ipfs"))] + { + Self { + db: MemoryDb::setup_connection_pool(&settings, None).unwrap(), + event_sender: evt_tx, + runner_sender: wk_tx, + name: Some(workflow_cid.to_string()), + workflow, + workflow_settings: workflow::Settings::default(), + } } } @@ -71,6 +114,37 @@ impl<'a> WorkerBuilder<'a> { .unwrap() } + /// TODO + #[cfg(feature = "ipfs")] + #[allow(dead_code)] + pub(crate) fn fetch_fn( + &self, + ) -> impl FnOnce(FnvHashSet) -> BoxFuture<'a, anyhow::Result>>> + { + let fetch_settings = self.workflow_settings.clone().into(); + let ipfs = self.ipfs.clone(); + let fetch_fn = move |rscs: FnvHashSet| { + async move { Fetch::get_resources(rscs, fetch_settings, ipfs).await }.boxed() + }; + + fetch_fn + } + + /// TODO + #[cfg(not(feature = "ipfs"))] + #[allow(dead_code)] + pub(crate) fn fetch_fn( + &self, + ) -> impl FnOnce(FnvHashSet) -> BoxFuture<'a, anyhow::Result>>> + { + let fetch_settings = self.workflow_settings.clone().into(); + let fetch_fn = |rscs: FnvHashSet| { + async move { Fetch::get_resources(rscs, fetch_settings).await }.boxed() + }; + + fetch_fn + } + /// Get the [Cid] of the workflow from the builder state. #[allow(dead_code)] pub(crate) fn workflow_cid(&self) -> Cid { diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index f7e1b64a..e14042c7 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -6,8 +6,6 @@ #[cfg(feature = "websocket-notify")] use crate::event_handler::event::Replay; -#[cfg(feature = "ipfs")] -use crate::network::IpfsCli; use crate::{ channel::{AsyncBoundedChannel, AsyncBoundedChannelSender}, db::Database, @@ -19,7 +17,7 @@ use crate::{ network::swarm::CapsuleTag, runner::{ModifiedSet, RunningTaskSet}, scheduler::{ExecutionGraph, TaskScheduler}, - tasks::{Fetch, RegisteredTasks, WasmContext}, + tasks::{RegisteredTasks, WasmContext}, workflow::{self, Resource}, Db, Receipt, }; @@ -27,7 +25,7 @@ use anyhow::{anyhow, Result}; use chrono::NaiveDateTime; use faststr::FastStr; use fnv::FnvHashSet; -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; use homestar_core::{ bail, ipld::DagCbor, @@ -145,23 +143,10 @@ where /// /// [Instruction]: homestar_core::workflow::Instruction /// [Swarm]: crate::network::swarm - pub(crate) async fn run(self, running_tasks: Arc) -> Result<()> { - let workflow_settings_fetch = self.workflow_settings.clone(); - #[cfg(feature = "ipfs")] - let fetch_fn = { - let ipfs = IpfsCli::default(); - - move |rscs: FnvHashSet| { - async move { Fetch::get_resources(rscs, workflow_settings_fetch, ipfs).await } - .boxed() - } - }; - - #[cfg(not(feature = "ipfs"))] - let fetch_fn = |rscs: FnvHashSet| { - async move { Fetch::get_resources(rscs, workflow_settings_fetch).await }.boxed() - }; - + pub(crate) async fn run(self, running_tasks: Arc, fetch_fn: F) -> Result<()> + where + F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, + { let scheduler_ctx = TaskScheduler::init( self.graph.clone(), // Arc'ed &mut self.db.conn()?, @@ -204,7 +189,7 @@ where ); if let Some(result) = linkmap.read().await.get(&cid) { - info!(cid = cid.to_string(), "found in in-memory linkmap"); + debug!(cid = cid.to_string(), "found in in-memory linkmap"); Ok(result.to_owned()) } else if let Some(bytes) = resources.read().await.get(&Resource::Cid(cid)) { Ok(InstructionResult::Ok(Arg::Ipld(Ipld::Bytes( @@ -237,6 +222,11 @@ where "failure in attempting to find event: {err}" ))) } + Ok(Ok(ResponseEvent::NoPeersAvailable)) => { + bail!(ResolveError::UnresolvedCid( + "no peers available to communicate with".to_string() + )) + } Ok(Ok(_)) => bail!(ResolveError::UnresolvedCid( "wrong or unexpected event message received".to_string(), )), @@ -465,6 +455,7 @@ mod test { let (tx, rx) = test_utils::event::setup_event_channel(settings.clone().node); let builder = WorkerBuilder::new(settings.node).with_event_sender(tx); + let fetch_fn = builder.fetch_fn(); let db = builder.db(); let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; @@ -484,7 +475,7 @@ mod test { let running_tasks = Arc::new(RunningTaskSet::new()); let worker_workflow_cid = worker.workflow_info.cid; - worker.run(running_tasks.clone()).await.unwrap(); + worker.run(running_tasks.clone(), fetch_fn).await.unwrap(); assert_eq!(running_tasks.len(), 1); assert!(running_tasks.contains_key(&worker_workflow_cid)); assert_eq!(running_tasks.get(&worker_workflow_cid).unwrap().len(), 2); @@ -582,6 +573,7 @@ mod test { let builder = WorkerBuilder::new(settings.node) .with_event_sender(tx) .with_tasks(vec![task1, task2]); + let fetch_fn = builder.fetch_fn(); let db = builder.db(); let workflow_cid = builder.workflow_cid(); @@ -626,7 +618,7 @@ mod test { let running_tasks = Arc::new(RunningTaskSet::new()); let worker_workflow_cid = worker.workflow_info.cid; - worker.run(running_tasks.clone()).await.unwrap(); + worker.run(running_tasks.clone(), fetch_fn).await.unwrap(); assert_eq!(running_tasks.len(), 1); assert!(running_tasks.contains_key(&worker_workflow_cid)); assert_eq!(running_tasks.get(&worker_workflow_cid).unwrap().len(), 1); diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index dd83d1b3..34d908fd 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -21,7 +21,7 @@ impl Default for Settings { retries: 10, retry_max_delay: Duration::new(60, 0), retry_initial_delay: Duration::from_millis(500), - p2p_timeout: Duration::new(60, 0), + p2p_timeout: Duration::new(5, 0), timeout: Duration::new(3600, 0), } } diff --git a/homestar-runtime/tests/webserver.rs b/homestar-runtime/tests/webserver.rs index 272e158b..28bc669a 100644 --- a/homestar-runtime/tests/webserver.rs +++ b/homestar-runtime/tests/webserver.rs @@ -64,7 +64,7 @@ fn test_workflow_run_serial() -> Result<()> { .arg("tests/fixtures/test_workflow2.toml") .arg("--db") .arg("homestar_test_workflow_run_serial.db") - .stdout(Stdio::piped()) + //.stdout(Stdio::piped()) .spawn() .unwrap(); @@ -111,11 +111,8 @@ fn test_workflow_run_serial() -> Result<()> { .for_each(|msg| async move { let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); let check = json.get("metadata").unwrap(); - let expected1 = serde_json::json!({"name": "test", "replayed": true, "workflow": {"/": "bafyrmicvwgispoezdciv5z6w3coutfjjtnhtmbegpcrrocqd76y7dvtknq"}}); - let expected2 = serde_json::json!({"name": "test", "replayed": false, "workflow": {"/": "bafyrmicvwgispoezdciv5z6w3coutfjjtnhtmbegpcrrocqd76y7dvtknq"}}); - if check != &expected1 && check != &expected2 { - panic!("JSONRPC response is not expected"); - } + let expected = serde_json::json!({"name": "test", "replayed": false, "workflow": {"/": "bafyrmicvwgispoezdciv5z6w3coutfjjtnhtmbegpcrrocqd76y7dvtknq"}}); + assert_eq!(check, &expected); }) .await; @@ -129,7 +126,12 @@ fn test_workflow_run_serial() -> Result<()> { .await .unwrap(); - assert!(sub2.next().await.is_some()); + let msg = sub2.next().await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); + let check = json.get("metadata").unwrap(); + let expected = serde_json::json!({"name": "test", "replayed": true, "workflow": {"/": "bafyrmicvwgispoezdciv5z6w3coutfjjtnhtmbegpcrrocqd76y7dvtknq"}}); + assert_eq!(check, &expected); + assert!(sub2.next().await.is_some()); assert!(sub2.next().await.is_some()); @@ -148,9 +150,10 @@ fn test_workflow_run_serial() -> Result<()> { .with_timeout(std::time::Duration::from_millis(500)) .await .is_err(); + assert!(sub3.next().await.is_some()); - assert!(sub3.next().await.is_some()); - assert!(sub3.next().await.is_some()); + assert!(sub2.next().await.is_some()); + assert!(sub2.next().await.is_some()); let another_run_str = format!(r#"{{"name": "another_test","workflow": {}}}"#, json_string); let another_run: serde_json::Value = serde_json::from_str(&another_run_str).unwrap();