Skip to content

Commit

Permalink
feat(agent): ensure node actually starts up for reconcile to succeed
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Dec 5, 2024
1 parent 0f4cb0b commit 55f3ee3
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 5 deletions.
1 change: 1 addition & 0 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ async fn main() {
endpoint,
queue_reconcile_tx,
loki: Mutex::new(db.loki_url()),
last_node_status: RwLock::new(None),
env_info: RwLock::new(
db.env_info()
.inspect_err(|e| {
Expand Down
30 changes: 26 additions & 4 deletions crates/agent/src/reconcile/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,14 @@ impl AgentStateReconciler {

// If this reconcile was triggered by a reconcile request, post the status
if let Some(client) = self.state.get_ws_client().await {
let res = res.clone().map(|s| s.replace_inner(self.is_node_running()));
let node_is_started = self
.state
.get_node_status()
.await
.is_some_and(|s| s.is_started());
let res = res
.clone()
.map(|s| s.replace_inner(self.is_node_running() && node_is_started));

// TODO: throttle this broadcast
tokio::spawn(async move {
Expand Down Expand Up @@ -221,6 +228,8 @@ impl AgentStateReconciler {
// If the process has exited, clear the process context
if res.inner.is_some() {
self.context.process = None;
self.state.set_node_status(None).await;
self.context.shutdown_pending = false;
}
});
}
Expand Down Expand Up @@ -347,6 +356,7 @@ impl Reconcile<(), ReconcileError> for AgentStateReconciler {
// If the process has exited, clear the process context
if res.inner.is_some() {
self.context.process = None;
self.state.set_node_status(None).await;
self.context.shutdown_pending = false;
}
});
Expand Down Expand Up @@ -403,7 +413,17 @@ impl Reconcile<(), ReconcileError> for AgentStateReconciler {

// Prevent other reconcilers from running while the node is running
if self.state.is_node_online() {
return Ok(ReconcileStatus::default().add_scope("agent_state/running"));
let Some(node_status) = self.state.get_node_status().await else {
return Ok(ReconcileStatus::empty().add_scope("agent_state/node/booting"));
};

let rec = if node_status.is_started() {
ReconcileStatus::default()
} else {
ReconcileStatus::empty()
};

return Ok(rec.add_scope(format!("agent_state/node/{}", node_status.label())));
}

// If the node is not online, the process is still running, but the node
Expand All @@ -413,7 +433,7 @@ impl Reconcile<(), ReconcileError> for AgentStateReconciler {
return Ok(ReconcileStatus::empty()
.requeue_after(Duration::from_secs(1))
.add_condition(ReconcileCondition::PendingStartup)
.add_scope("agent_state/starting"));
.add_scope("agent_state/node/booting"));
}

let storage_path = self
Expand Down Expand Up @@ -496,10 +516,12 @@ impl Reconcile<(), ReconcileError> for AgentStateReconciler {
.await?;

let process = ProcessContext::new(command)?;
// Clear the last node running status (it was shut down)
self.state.set_node_status(None).await;
self.context.process = Some(process);
self.context.shutdown_pending = false;
Ok(ReconcileStatus::empty()
.add_scope("agent_state/starting")
.add_scope("agent_state/node/booting")
.requeue_after(Duration::from_secs(1)))
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/agent/src/rpc/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ impl AgentNodeService for AgentNodeRpcServer {
return Ok(()); // ignore if client is not available
};

// Update the last node status
self.state.set_node_status(Some(status.clone())).await;

client
.post_node_status(context::current(), status.into())
.await
Expand Down
14 changes: 13 additions & 1 deletion crates/agent/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use reqwest::Url;
use snops_common::{
api::AgentEnvInfo,
rpc::{agent::node::NodeServiceClient, control::ControlServiceClient, error::ReconcileError},
state::{AgentId, AgentPeer, AgentState, EnvId, ReconcileOptions, TransferId, TransferStatus},
state::{
snarkos_status::SnarkOSStatus, AgentId, AgentPeer, AgentState, EnvId, ReconcileOptions,
TransferId, TransferStatus,
},
util::OpaqueDebug,
};
use tarpc::context;
Expand Down Expand Up @@ -52,6 +55,7 @@ pub struct GlobalState {
pub transfers: Arc<DashMap<TransferId, TransferStatus>>,

pub node_client: RwLock<Option<NodeServiceClient>>,
pub last_node_status: RwLock<Option<(Instant, SnarkOSStatus)>>,
pub log_level_handler: ReloadHandler,
/// A oneshot sender to shutdown the agent.
pub shutdown: RwLock<Option<oneshot::Sender<()>>>,
Expand Down Expand Up @@ -222,4 +226,12 @@ impl GlobalState {
error!("failed to save resolved addrs to db: {e}");
}
}

pub async fn set_node_status(&self, status: Option<SnarkOSStatus>) {
*self.last_node_status.write().await = status.map(|s| (Instant::now(), s));
}

pub async fn get_node_status(&self) -> Option<SnarkOSStatus> {
self.last_node_status.read().await.clone().map(|(_, s)| s)
}
}
16 changes: 16 additions & 0 deletions crates/common/src/state/snarkos_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ pub enum SnarkOSStatus {
Halted(Option<String>),
}

impl SnarkOSStatus {
pub fn is_started(&self) -> bool {
matches!(self, SnarkOSStatus::Started)
}

pub fn label(&self) -> &'static str {
match self {
SnarkOSStatus::Starting => "starting",
SnarkOSStatus::LedgerLoading => "loading",
SnarkOSStatus::LedgerFailure(_) => "failure",
SnarkOSStatus::Started => "started",
SnarkOSStatus::Halted(_) => "halted",
}
}
}

/// Messages from snarkos to the agent, containing information about the status
/// of the node
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down

0 comments on commit 55f3ee3

Please sign in to comment.