From 396581fe269645e45018110a5ef92455e65c4f00 Mon Sep 17 00:00:00 2001 From: JannikSt Date: Mon, 28 Apr 2025 13:15:10 -0700 Subject: [PATCH 1/4] Feature: discovery updater (#308) * The discovery updater runs regularly when the node is not active in a pool --- crates/worker/src/cli/command.rs | 10 ++- crates/worker/src/services/discovery.rs | 24 +++-- .../worker/src/services/discovery_updater.rs | 89 +++++++++++++++++++ crates/worker/src/services/mod.rs | 1 + crates/worker/src/state/system_state.rs | 7 +- 5 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 crates/worker/src/services/discovery_updater.rs diff --git a/crates/worker/src/cli/command.rs b/crates/worker/src/cli/command.rs index c54566bd..971ef459 100644 --- a/crates/worker/src/cli/command.rs +++ b/crates/worker/src/cli/command.rs @@ -10,6 +10,7 @@ use crate::operations::compute_node::ComputeNodeOperations; use crate::operations::heartbeat::service::HeartbeatService; use crate::operations::provider::ProviderOperations; use crate::services::discovery::DiscoveryService; +use crate::services::discovery_updater::DiscoveryUpdater; use crate::state::system_state::SystemState; use crate::TaskHandles; use alloy::primitives::U256; @@ -269,8 +270,12 @@ pub async fn execute_command( compute_node_state, ); + let discovery_wallet = node_wallet_instance.clone(); let discovery_service = - DiscoveryService::new(&node_wallet_instance, discovery_url.clone(), None); + DiscoveryService::new(discovery_wallet, discovery_url.clone(), None); + let discovery_state = state.clone(); + let discovery_updater = + DiscoveryUpdater::new(discovery_service.clone(), discovery_state.clone()); let pool_id = U256::from(*compute_pool_id as u32); let pool_info = loop { @@ -589,6 +594,7 @@ pub async fn execute_command( let mut attempts = 0; let max_attempts = 100; while attempts < max_attempts { + Console::title("📦 Uploading discovery info"); match discovery_service.upload_discovery_info(&node_config).await { Ok(_) => break, Err(e) => { @@ -622,6 +628,8 @@ pub async fn execute_command( "", ); + discovery_updater.start_auto_update(node_config); + if let Err(err) = { let heartbeat_clone = heartbeat_service.unwrap().clone(); if recover_last_state { diff --git a/crates/worker/src/services/discovery.rs b/crates/worker/src/services/discovery.rs index f29e0827..031ea86a 100644 --- a/crates/worker/src/services/discovery.rs +++ b/crates/worker/src/services/discovery.rs @@ -1,16 +1,16 @@ -use crate::console::Console; use shared::models::node::Node; use shared::security::request_signer::sign_request; use shared::web3::wallet::Wallet; +use std::sync::Arc; -pub struct DiscoveryService<'b> { - wallet: &'b Wallet, +pub struct DiscoveryService { + wallet: Arc, base_url: String, endpoint: String, } -impl<'b> DiscoveryService<'b> { - pub fn new(wallet: &'b Wallet, base_url: Option, endpoint: Option) -> Self { +impl DiscoveryService { + pub fn new(wallet: Arc, base_url: Option, endpoint: Option) -> Self { Self { wallet, base_url: base_url.unwrap_or_else(|| "http://localhost:8089".to_string()), @@ -22,13 +22,11 @@ impl<'b> DiscoveryService<'b> { &self, node_config: &Node, ) -> Result<(), Box> { - Console::title("📦 Uploading discovery info"); - let request_data = serde_json::to_value(node_config) .map_err(|e| Box::new(e) as Box)?; let signature_string = - sign_request(&self.endpoint, self.wallet, Some(&request_data)).await?; + sign_request(&self.endpoint, &self.wallet, Some(&request_data)).await?; let mut headers = reqwest::header::HeaderMap::new(); headers.insert( @@ -68,3 +66,13 @@ impl<'b> DiscoveryService<'b> { Ok(()) } } + +impl Clone for DiscoveryService { + fn clone(&self) -> Self { + Self { + wallet: self.wallet.clone(), + base_url: self.base_url.clone(), + endpoint: self.endpoint.clone(), + } + } +} diff --git a/crates/worker/src/services/discovery_updater.rs b/crates/worker/src/services/discovery_updater.rs new file mode 100644 index 00000000..6da25b0d --- /dev/null +++ b/crates/worker/src/services/discovery_updater.rs @@ -0,0 +1,89 @@ +use crate::services::discovery::DiscoveryService; +use crate::state::system_state::SystemState; +use log::{debug, error, info}; +use shared::models::node::Node; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; +use tokio_util::sync::CancellationToken; + +const INITIAL_UPDATE_DELAY: Duration = Duration::from_secs(120); +const UPDATE_INTERVAL: Duration = Duration::from_secs(120); + +pub struct DiscoveryUpdater { + discovery_service: Arc, + is_running: Arc, + system_state: Arc, + cancellation_token: Arc, +} + +impl DiscoveryUpdater { + pub fn new(discovery_service: DiscoveryService, system_state: Arc) -> Self { + Self { + discovery_service: Arc::new(discovery_service), + is_running: Arc::new(AtomicBool::new(false)), + system_state, + cancellation_token: Arc::new(CancellationToken::new()), + } + } + + pub fn start_auto_update(&self, node_config: Node) { + if self.is_running.load(Ordering::SeqCst) { + debug!("Auto update already running, skipping start"); + return; + } + + self.is_running.store(true, Ordering::SeqCst); + let is_running = self.is_running.clone(); + let discovery_service = self.discovery_service.clone(); + let system_state = self.system_state.clone(); + let cancellation_token = self.cancellation_token.clone(); + + tokio::spawn(async move { + debug!("Starting discovery info auto-update task"); + + // Initial delay before first update + tokio::select! { + _ = sleep(INITIAL_UPDATE_DELAY) => {}, + _ = cancellation_token.cancelled() => { + is_running.store(false, Ordering::SeqCst); + return; + } + } + + while is_running.load(Ordering::SeqCst) { + // Check if we're in a compute pool by checking the heartbeat endpoint + let should_update = !system_state.is_running().await; + + if should_update { + if let Err(e) = discovery_service.upload_discovery_info(&node_config).await { + error!("Failed to update discovery info: {}", e); + } else { + info!("Successfully updated discovery info"); + } + } + + // Sleep before next check, but check for cancellation + tokio::select! { + _ = sleep(UPDATE_INTERVAL) => {}, + _ = cancellation_token.cancelled() => { + is_running.store(false, Ordering::SeqCst); + break; + } + } + } + debug!("Discovery info auto-update task finished"); + }); + } +} + +impl Clone for DiscoveryUpdater { + fn clone(&self) -> Self { + Self { + discovery_service: self.discovery_service.clone(), + is_running: self.is_running.clone(), + system_state: self.system_state.clone(), + cancellation_token: self.cancellation_token.clone(), + } + } +} diff --git a/crates/worker/src/services/mod.rs b/crates/worker/src/services/mod.rs index fc4b5cb6..4903673d 100644 --- a/crates/worker/src/services/mod.rs +++ b/crates/worker/src/services/mod.rs @@ -1 +1,2 @@ pub mod discovery; +pub mod discovery_updater; diff --git a/crates/worker/src/state/system_state.rs b/crates/worker/src/state/system_state.rs index 77bc1be0..b74f87a9 100644 --- a/crates/worker/src/state/system_state.rs +++ b/crates/worker/src/state/system_state.rs @@ -121,7 +121,12 @@ impl SystemState { let mut is_running = self.is_running.write().await; let mut endpoint = self.endpoint.write().await; *is_running = running; - *endpoint = heartbeat_endpoint; + + if !running { + *endpoint = None; + } else { + *endpoint = heartbeat_endpoint; + } if endpoint.is_some() { if let Err(e) = self.save_state(endpoint.clone()) { From 6c95c9f3fecfbfcdd30d819d2ced6b66a59ec111 Mon Sep 17 00:00:00 2001 From: JannikSt Date: Mon, 28 Apr 2025 13:23:17 -0700 Subject: [PATCH 2/4] adjust installation urls (#309) --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cbc6c519..75468685 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,13 @@ For complete setup instructions, refer to our [Development Setup Guide](docs/dev ### Install Worker CLI: You can install the latest worker CLI using: ``` -curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/worker/scripts/install.sh | bash +curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/crates/worker/scripts/install.sh | bash ``` +This can also be used to upgrade the current installation to the latest release. For the latest dev build use: ``` -curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/develop/worker/scripts/install.sh | bash -s -- --dev +curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/crates/worker/scripts/install.sh | bash -s -- --dev ``` ## Documentation From 1525775ddbce3be37247ad36cf8f231b2769e6f2 Mon Sep 17 00:00:00 2001 From: Gabriel Segatti <71611489+gsegatti@users.noreply.github.com> Date: Mon, 28 Apr 2025 21:28:02 +0100 Subject: [PATCH 3/4] Improvement: concurrent node invitation (#294) - Leverage asynchronism instead of awaiting for every invitation - Solves #232 --- crates/orchestrator/src/node/invite.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/crates/orchestrator/src/node/invite.rs b/crates/orchestrator/src/node/invite.rs index 9f01e96a..a52f5397 100644 --- a/crates/orchestrator/src/node/invite.rs +++ b/crates/orchestrator/src/node/invite.rs @@ -6,6 +6,8 @@ use alloy::primitives::utils::keccak256 as keccak; use alloy::primitives::U256; use alloy::signers::Signer; use anyhow::Result; +use futures::stream; +use futures::StreamExt; use hex; use log::{debug, error, info, warn}; use reqwest::Client; @@ -20,6 +22,7 @@ use tokio::time::{interval, Duration}; // Timeout constants const REQUEST_TIMEOUT: u64 = 15; // 15 seconds for HTTP requests const CONNECTION_TIMEOUT: u64 = 10; // 10 seconds for establishing connections +const DEFAULT_INVITE_CONCURRENT_COUNT: usize = 32; // Max concurrent count of nodes being invited pub struct NodeInviter<'a> { wallet: &'a Wallet, @@ -75,7 +78,7 @@ impl<'a> NodeInviter<'a> { } } - async fn _generate_invite(&self, node: OrchestratorNode) -> Result<[u8; 65]> { + async fn _generate_invite(&self, node: &OrchestratorNode) -> Result<[u8; 65]> { let domain_id: [u8; 32] = U256::from(self.domain_id).to_be_bytes(); let pool_id: [u8; 32] = U256::from(self.pool_id).to_be_bytes(); @@ -92,8 +95,7 @@ impl<'a> NodeInviter<'a> { Ok(signature) } - async fn _send_invite(&self, node: OrchestratorNode) -> Result<(), anyhow::Error> { - let node_to_update = node.clone(); + async fn _send_invite(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> { let node_url = format!("http://{}:{}", node.ip_address, node.port); let invite_path = "/invite".to_string(); let invite_url = format!("{}{}", node_url, invite_path); @@ -151,7 +153,6 @@ impl<'a> NodeInviter<'a> { let status = response.status(); if status.is_success() { - let node = node_to_update.clone(); info!("Successfully invited node"); self.store_context .node_store @@ -185,22 +186,25 @@ impl<'a> NodeInviter<'a> { async fn process_uninvited_nodes(&self) -> Result<()> { let nodes = self.store_context.node_store.get_uninvited_nodes(); - let mut failed_nodes = Vec::new(); - for node in nodes { - // TODO: Eventually and carefully move this to tokio + let invited_nodes = stream::iter(nodes.into_iter().map(|node| async move { info!("Processing node {:?}", node.address); - match self._send_invite(node.clone()).await { + match self._send_invite(&node).await { Ok(_) => { info!("Successfully processed node {:?}", node.address); + Ok(()) } Err(e) => { error!("Failed to process node {:?}: {}", node.address, e); - failed_nodes.push((node, e)); + Err((node, e)) } } - } + })) + .buffer_unordered(DEFAULT_INVITE_CONCURRENT_COUNT) + .collect::>() + .await; + let failed_nodes: Vec<_> = invited_nodes.into_iter().filter_map(Result::err).collect(); if !failed_nodes.is_empty() { warn!( "Failed to process {} nodes: {:?}", From c5350fc7e31d782f7d02feaa51ecdecc84064a29 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Mon, 28 Apr 2025 14:07:22 -0700 Subject: [PATCH 4/4] release 0.2.11 --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43fbd095..0096499c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2364,7 +2364,7 @@ dependencies = [ [[package]] name = "discovery" -version = "0.2.10" +version = "0.2.11" dependencies = [ "actix-web", "alloy", @@ -4403,7 +4403,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestrator" -version = "0.2.10" +version = "0.2.11" dependencies = [ "actix-web", "alloy", @@ -6562,7 +6562,7 @@ dependencies = [ [[package]] name = "validator" -version = "0.2.10" +version = "0.2.11" dependencies = [ "actix-web", "alloy", @@ -7206,7 +7206,7 @@ dependencies = [ [[package]] name = "worker" -version = "0.2.10" +version = "0.2.11" dependencies = [ "actix-web", "alloy", diff --git a/Cargo.toml b/Cargo.toml index 2a9d33c5..b9672aa3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ redis = "0.28.1" redis-test = "0.8.0" [workspace.package] -version = "0.2.10" +version = "0.2.11" edition = "2021" [workspace.features]