Skip to content

Release/v0.2.11 #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ redis = "0.28.1"
redis-test = "0.8.0"

[workspace.package]
version = "0.2.10"
version = "0.2.11"
edition = "2021"

[workspace.features]
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ For complete setup instructions, refer to our [Development Setup Guide](docs/dev
### Install Worker CLI:
You can install the latest worker CLI using:
```
curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/worker/scripts/install.sh | bash
curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/crates/worker/scripts/install.sh | bash
```
This can also be used to upgrade the current installation to the latest release.

For the latest dev build use:
```
curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/develop/worker/scripts/install.sh | bash -s -- --dev
curl -sSL https://raw.githubusercontent.com/PrimeIntellect-ai/protocol/main/crates/worker/scripts/install.sh | bash -s -- --dev
```

## Documentation
Expand Down
24 changes: 14 additions & 10 deletions crates/orchestrator/src/node/invite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use alloy::primitives::utils::keccak256 as keccak;
use alloy::primitives::U256;
use alloy::signers::Signer;
use anyhow::Result;
use futures::stream;
use futures::StreamExt;
use hex;
use log::{debug, error, info, warn};
use reqwest::Client;
Expand All @@ -20,6 +22,7 @@ use tokio::time::{interval, Duration};
// Timeout constants
const REQUEST_TIMEOUT: u64 = 15; // 15 seconds for HTTP requests
const CONNECTION_TIMEOUT: u64 = 10; // 10 seconds for establishing connections
const DEFAULT_INVITE_CONCURRENT_COUNT: usize = 32; // Max concurrent count of nodes being invited

pub struct NodeInviter<'a> {
wallet: &'a Wallet,
Expand Down Expand Up @@ -75,7 +78,7 @@ impl<'a> NodeInviter<'a> {
}
}

async fn _generate_invite(&self, node: OrchestratorNode) -> Result<[u8; 65]> {
async fn _generate_invite(&self, node: &OrchestratorNode) -> Result<[u8; 65]> {
let domain_id: [u8; 32] = U256::from(self.domain_id).to_be_bytes();
let pool_id: [u8; 32] = U256::from(self.pool_id).to_be_bytes();

Expand All @@ -92,8 +95,7 @@ impl<'a> NodeInviter<'a> {
Ok(signature)
}

async fn _send_invite(&self, node: OrchestratorNode) -> Result<(), anyhow::Error> {
let node_to_update = node.clone();
async fn _send_invite(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> {
let node_url = format!("http://{}:{}", node.ip_address, node.port);
let invite_path = "/invite".to_string();
let invite_url = format!("{}{}", node_url, invite_path);
Expand Down Expand Up @@ -151,7 +153,6 @@ impl<'a> NodeInviter<'a> {
let status = response.status();

if status.is_success() {
let node = node_to_update.clone();
info!("Successfully invited node");
self.store_context
.node_store
Expand Down Expand Up @@ -185,22 +186,25 @@ impl<'a> NodeInviter<'a> {

async fn process_uninvited_nodes(&self) -> Result<()> {
let nodes = self.store_context.node_store.get_uninvited_nodes();
let mut failed_nodes = Vec::new();

for node in nodes {
// TODO: Eventually and carefully move this to tokio
let invited_nodes = stream::iter(nodes.into_iter().map(|node| async move {
info!("Processing node {:?}", node.address);
match self._send_invite(node.clone()).await {
match self._send_invite(&node).await {
Ok(_) => {
info!("Successfully processed node {:?}", node.address);
Ok(())
}
Err(e) => {
error!("Failed to process node {:?}: {}", node.address, e);
failed_nodes.push((node, e));
Err((node, e))
}
}
}
}))
.buffer_unordered(DEFAULT_INVITE_CONCURRENT_COUNT)
.collect::<Vec<_>>()
.await;

let failed_nodes: Vec<_> = invited_nodes.into_iter().filter_map(Result::err).collect();
if !failed_nodes.is_empty() {
warn!(
"Failed to process {} nodes: {:?}",
Expand Down
10 changes: 9 additions & 1 deletion crates/worker/src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::operations::compute_node::ComputeNodeOperations;
use crate::operations::heartbeat::service::HeartbeatService;
use crate::operations::provider::ProviderOperations;
use crate::services::discovery::DiscoveryService;
use crate::services::discovery_updater::DiscoveryUpdater;
use crate::state::system_state::SystemState;
use crate::TaskHandles;
use alloy::primitives::U256;
Expand Down Expand Up @@ -269,8 +270,12 @@ pub async fn execute_command(
compute_node_state,
);

let discovery_wallet = node_wallet_instance.clone();
let discovery_service =
DiscoveryService::new(&node_wallet_instance, discovery_url.clone(), None);
DiscoveryService::new(discovery_wallet, discovery_url.clone(), None);
let discovery_state = state.clone();
let discovery_updater =
DiscoveryUpdater::new(discovery_service.clone(), discovery_state.clone());
let pool_id = U256::from(*compute_pool_id as u32);

let pool_info = loop {
Expand Down Expand Up @@ -589,6 +594,7 @@ pub async fn execute_command(
let mut attempts = 0;
let max_attempts = 100;
while attempts < max_attempts {
Console::title("📦 Uploading discovery info");
match discovery_service.upload_discovery_info(&node_config).await {
Ok(_) => break,
Err(e) => {
Expand Down Expand Up @@ -622,6 +628,8 @@ pub async fn execute_command(
"",
);

discovery_updater.start_auto_update(node_config);

if let Err(err) = {
let heartbeat_clone = heartbeat_service.unwrap().clone();
if recover_last_state {
Expand Down
24 changes: 16 additions & 8 deletions crates/worker/src/services/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::console::Console;
use shared::models::node::Node;
use shared::security::request_signer::sign_request;
use shared::web3::wallet::Wallet;
use std::sync::Arc;

pub struct DiscoveryService<'b> {
wallet: &'b Wallet,
pub struct DiscoveryService {
wallet: Arc<Wallet>,
base_url: String,
endpoint: String,
}

impl<'b> DiscoveryService<'b> {
pub fn new(wallet: &'b Wallet, base_url: Option<String>, endpoint: Option<String>) -> Self {
impl DiscoveryService {
pub fn new(wallet: Arc<Wallet>, base_url: Option<String>, endpoint: Option<String>) -> Self {
Self {
wallet,
base_url: base_url.unwrap_or_else(|| "http://localhost:8089".to_string()),
Expand All @@ -22,13 +22,11 @@ impl<'b> DiscoveryService<'b> {
&self,
node_config: &Node,
) -> Result<(), Box<dyn std::error::Error>> {
Console::title("📦 Uploading discovery info");

let request_data = serde_json::to_value(node_config)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;

let signature_string =
sign_request(&self.endpoint, self.wallet, Some(&request_data)).await?;
sign_request(&self.endpoint, &self.wallet, Some(&request_data)).await?;

let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
Expand Down Expand Up @@ -68,3 +66,13 @@ impl<'b> DiscoveryService<'b> {
Ok(())
}
}

impl Clone for DiscoveryService {
fn clone(&self) -> Self {
Self {
wallet: self.wallet.clone(),
base_url: self.base_url.clone(),
endpoint: self.endpoint.clone(),
}
}
}
89 changes: 89 additions & 0 deletions crates/worker/src/services/discovery_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::services::discovery::DiscoveryService;
use crate::state::system_state::SystemState;
use log::{debug, error, info};
use shared::models::node::Node;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

const INITIAL_UPDATE_DELAY: Duration = Duration::from_secs(120);
const UPDATE_INTERVAL: Duration = Duration::from_secs(120);

pub struct DiscoveryUpdater {
discovery_service: Arc<DiscoveryService>,
is_running: Arc<AtomicBool>,
system_state: Arc<SystemState>,
cancellation_token: Arc<CancellationToken>,
}

impl DiscoveryUpdater {
pub fn new(discovery_service: DiscoveryService, system_state: Arc<SystemState>) -> Self {
Self {
discovery_service: Arc::new(discovery_service),
is_running: Arc::new(AtomicBool::new(false)),
system_state,
cancellation_token: Arc::new(CancellationToken::new()),
}
}

pub fn start_auto_update(&self, node_config: Node) {
if self.is_running.load(Ordering::SeqCst) {
debug!("Auto update already running, skipping start");
return;
}

self.is_running.store(true, Ordering::SeqCst);
let is_running = self.is_running.clone();
let discovery_service = self.discovery_service.clone();
let system_state = self.system_state.clone();
let cancellation_token = self.cancellation_token.clone();

tokio::spawn(async move {
debug!("Starting discovery info auto-update task");

// Initial delay before first update
tokio::select! {
_ = sleep(INITIAL_UPDATE_DELAY) => {},
_ = cancellation_token.cancelled() => {
is_running.store(false, Ordering::SeqCst);
return;
}
}

while is_running.load(Ordering::SeqCst) {
// Check if we're in a compute pool by checking the heartbeat endpoint
let should_update = !system_state.is_running().await;

if should_update {
if let Err(e) = discovery_service.upload_discovery_info(&node_config).await {
error!("Failed to update discovery info: {}", e);
} else {
info!("Successfully updated discovery info");
}
}

// Sleep before next check, but check for cancellation
tokio::select! {
_ = sleep(UPDATE_INTERVAL) => {},
_ = cancellation_token.cancelled() => {
is_running.store(false, Ordering::SeqCst);
break;
}
}
}
debug!("Discovery info auto-update task finished");
});
}
}

impl Clone for DiscoveryUpdater {
fn clone(&self) -> Self {
Self {
discovery_service: self.discovery_service.clone(),
is_running: self.is_running.clone(),
system_state: self.system_state.clone(),
cancellation_token: self.cancellation_token.clone(),
}
}
}
1 change: 1 addition & 0 deletions crates/worker/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod discovery;
pub mod discovery_updater;
7 changes: 6 additions & 1 deletion crates/worker/src/state/system_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ impl SystemState {
let mut is_running = self.is_running.write().await;
let mut endpoint = self.endpoint.write().await;
*is_running = running;
*endpoint = heartbeat_endpoint;

if !running {
*endpoint = None;
} else {
*endpoint = heartbeat_endpoint;
}

if endpoint.is_some() {
if let Err(e) = self.save_state(endpoint.clone()) {
Expand Down