Skip to content

Commit

Permalink
[RSDK-9593] move ota to config monitor, config monitor restart hook (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjperez authored Jan 7, 2025
1 parent 09ead6f commit a4d72d8
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 87 deletions.
64 changes: 51 additions & 13 deletions micro-rdk/src/common/config_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use crate::{
};
use async_io::Timer;
use futures_lite::{Future, FutureExt};
use std::fmt::Debug;
use std::pin::Pin;
use std::time::Duration;
use std::{fmt::Debug, pin::Pin, time::Duration};

#[cfg(feature = "ota")]
use crate::common::{exec::Executor, ota};

pub struct ConfigMonitor<'a, Storage> {
curr_config: Box<RobotConfig>, //config for robot gotten from last robot startup, aka inputted from entry
storage: Storage,
#[cfg(feature = "ota")]
executor: Executor,
restart_hook: Box<dyn Fn() + 'a>,
}

Expand All @@ -27,11 +30,14 @@ where
pub fn new(
curr_config: Box<RobotConfig>,
storage: Storage,
#[cfg(feature = "ota")] executor: Executor,
restart_hook: impl Fn() + 'a,
) -> Self {
Self {
curr_config,
storage,
#[cfg(feature = "ota")]
executor,
restart_hook: Box::new(restart_hook),
}
}
Expand Down Expand Up @@ -71,16 +77,48 @@ where
})
.await?;

if new_config
.config
.is_some_and(|cfg| cfg != *self.curr_config)
{
if let Err(e) = self.storage.reset_robot_configuration() {
log::warn!(
"Failed to reset robot config after new config detected: {}",
e
);
} else {
if let Some(config) = new_config.as_ref().config.as_ref() {
let mut reboot = false;

#[cfg(feature = "ota")]
{
if let Some(service) = config
.services
.iter()
.find(|&service| service.model == *ota::OTA_MODEL_TRIPLET)
{
// TODO(RSDK-9676): new OtaService created at every invocation, not ideal
match ota::OtaService::from_config(
service,
self.storage.clone(),
self.executor.clone(),
) {
Ok(mut ota) => match ota.update().await {
Ok(needs_reboot) => reboot = needs_reboot,
Err(e) => log::error!("failed to complete ota update: {}", e),
},
Err(e) => log::error!(
"failed to create ota service from config:{} - {:?}",
e,
service,
),
}
}
}

if *config != *self.curr_config {
if let Err(e) = self.storage.reset_robot_configuration() {
log::warn!(
"Failed to reset robot config after new config detected: {}",
e
);
} else {
reboot = true;
}
}

if reboot {
// TODO(RSDK-9464): flush logs to app.viam before restarting
self.restart();
}
}
Expand Down
53 changes: 11 additions & 42 deletions micro-rdk/src/common/conn/viam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use super::server::{IncomingConnectionManager, WebRtcConfiguration};
use crate::common::provisioning::server::AsNetwork;

#[cfg(feature = "ota")]
use crate::common::{credentials_storage::OtaMetadataStorage, ota};
use crate::common::credentials_storage::OtaMetadataStorage;

pub struct RobotCloudConfig {
local_fqdn: String,
Expand Down Expand Up @@ -364,8 +364,6 @@ where
http2_server_port: self.http2_server_port,
wifi_manager: self.wifi_manager.into(),
app_client_tasks: self.app_client_tasks,
#[cfg(feature = "ota")]
ota_service_task: Default::default(),
max_concurrent_connections: self.max_concurrent_connections,
network: Some(network),
}
Expand Down Expand Up @@ -400,8 +398,6 @@ where
http2_server_port: self.http2_server_port,
wifi_manager: Rc::new(self.wifi_manager),
app_client_tasks: self.app_client_tasks,
#[cfg(feature = "ota")]
ota_service_task: None,
max_concurrent_connections: self.max_concurrent_connections,
network: None,
}
Expand All @@ -420,8 +416,6 @@ pub struct ViamServer<Storage, C, M> {
http2_server_port: u16,
wifi_manager: Rc<Option<Box<dyn WifiManager>>>,
app_client_tasks: Vec<Box<dyn PeriodicAppClientTask>>,
#[cfg(feature = "ota")]
ota_service_task: Option<Task<()>>,
max_concurrent_connections: usize,
network: Option<Box<dyn Network>>,
}
Expand Down Expand Up @@ -486,6 +480,14 @@ where
self.provision().await;
}

#[cfg(feature = "ota")]
{
match self.storage.get_ota_metadata() {
Ok(metadata) => log::info!("firmware version: {}", metadata.version),
Err(e) => log::warn!("not OTA firmware metadata available: {}", e),
};
}

// Since provisioning was run and completed, credentials are properly populated
// if wifi manager is configured loop forever until wifi is connected
if let Some(wifi) = self.wifi_manager.as_ref().as_ref() {
Expand Down Expand Up @@ -576,45 +578,12 @@ where
let config_monitor_task = Box::new(ConfigMonitor::new(
config.clone(),
self.storage.clone(),
#[cfg(feature = "ota")]
self.executor.clone(),
|| std::process::exit(0),
));
self.app_client_tasks.push(config_monitor_task);

#[cfg(feature = "ota")]
{
log::debug!("ota feature enabled");

if let Some(service) = config
.services
.iter()
.find(|&service| service.model == *ota::OTA_MODEL_TRIPLET)
{
match ota::OtaService::from_config(
service,
self.storage.clone(),
self.executor.clone(),
) {
Ok(mut ota) => {
self.ota_service_task
.replace(self.executor.spawn(async move {
if let Err(e) = ota.update().await {
log::error!("failed to complete ota update {}", e);
}
}));
}
Err(e) => {
log::error!("failed to build ota service: {}", e.to_string());
log::error!("ota service config: {:?}", service);
}
};
} else {
log::error!(
"ota enabled, but no service of type `{}` found in robot config",
ota::OTA_MODEL_TYPE
);
}
}

let mut robot = LocalRobot::from_cloud_config(
self.executor.clone(),
robot_creds.robot_id.clone(),
Expand Down
84 changes: 52 additions & 32 deletions micro-rdk/src/common/ota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use thiserror::Error;
#[cfg(not(feature = "esp32"))]
use {bincode::Decode, futures_lite::AsyncWriteExt};

const CONN_RETRY_SECS: u64 = 60;
const CONN_RETRY_SECS: u64 = 1;
const NUM_RETRY_CONN: usize = 3;
const SIZEOF_APPDESC: usize = 256;
const MAX_VER_LEN: usize = 128;
pub const OTA_MODEL_TYPE: &str = "ota_service";
Expand Down Expand Up @@ -121,7 +122,7 @@ pub(crate) enum ConfigError {

#[allow(dead_code)]
#[derive(Error, Debug)]
pub(crate) enum OtaError {
pub(crate) enum OtaError<S: OtaMetadataStorage> {
#[error("error occured during abort process: {0}")]
AbortError(String),
#[error("{0}")]
Expand All @@ -136,6 +137,10 @@ pub(crate) enum OtaError {
InvalidImageSize(usize, usize),
#[error("failed to retrieve firmware header info from binary, firmware may not be valid for this system: {0}")]
InvalidFirmware(String),
#[error("failed to update OTA metadata: expected updated version to be `{0}`, found `{1}`")]
UpdateMetadata(String, String),
#[error(transparent)]
StorageError(<S as OtaMetadataStorage>::Error),
#[error("error writing firmware to update partition: {0}")]
WriteError(String),
#[error("{0}")]
Expand All @@ -156,9 +161,6 @@ impl OtaMetadata {
pub fn new(version: String) -> Self {
Self { version }
}
pub(crate) fn version(&self) -> &str {
&self.version
}
}

pub(crate) struct OtaService<S: OtaMetadataStorage> {
Expand All @@ -172,13 +174,23 @@ pub(crate) struct OtaService<S: OtaMetadataStorage> {
}

impl<S: OtaMetadataStorage> OtaService<S> {
pub(crate) fn stored_metadata(&self) -> Result<OtaMetadata, OtaError<S>> {
if !self.storage.has_ota_metadata() {
log::info!("no OTA metadata currently stored in NVS");
}

self.storage
.get_ota_metadata()
.map_err(OtaError::StorageError)
}

pub(crate) fn from_config(
new_config: &ServiceConfig,
storage: S,
exec: Executor,
) -> Result<Self, OtaError> {
) -> Result<Self, OtaError<S>> {
let kind = new_config.attributes.as_ref().ok_or_else(|| {
ConfigError::Other("ota service config has no attributes".to_string())
ConfigError::Other("OTA service config has no attributes".to_string())
})?;

let url = kind
Expand Down Expand Up @@ -259,28 +271,17 @@ impl<S: OtaMetadataStorage> OtaService<S> {
})
}

pub(crate) async fn update(&mut self) -> Result<(), OtaError> {
let stored_metadata = if !self.storage.has_ota_metadata() {
log::info!("no ota metadata currently stored in NVS");
OtaMetadata::default()
} else {
self.storage
.get_ota_metadata()
.inspect_err(|e| log::warn!("failed to get ota metadata from nvs: {}", e))
.unwrap_or_default()
};
pub(crate) fn needs_update(&self) -> bool {
self.stored_metadata().unwrap_or_default().version != self.pending_version
}

if self.pending_version == stored_metadata.version() {
log::info!("firmware is up-to-date: `{}`", stored_metadata.version);
return Ok(());
/// Attempts to perform an OTA update.
/// On success, returns an `Ok(true)` or `Ok(false)` indicating if a reboot is necessary.
pub(crate) async fn update(&mut self) -> Result<bool, OtaError<S>> {
if !(self.needs_update()) {
return Ok(false);
}

log::info!(
"firmware is out of date, proceeding with update from version `{}` to version `{}`",
stored_metadata.version,
self.pending_version
);

let mut uri = self
.url
.parse::<hyper::Uri>()
Expand All @@ -304,11 +305,19 @@ impl<S: OtaMetadataStorage> OtaService<S> {
uri = hyper::Uri::from_parts(parts).map_err(|e| OtaError::Other(e.to_string()))?;
};

let mut num_tries = 0;
let (mut sender, conn) = loop {
num_tries += 1;
if num_tries == NUM_RETRY_CONN + 1 {
return Err(OtaError::Other(
"failed to establish connection".to_string(),
));
}
match self.connector.connect_to(&uri) {
Ok(connection) => {
match connection.await {
Ok(io) => {
// TODO(RSDK-9617): add timeout for stalled download
match http2::Builder::new(self.exec.clone())
.max_frame_size(16_384) // lowest configurable
.timer(H2Timer)
Expand Down Expand Up @@ -519,16 +528,27 @@ impl<S: OtaMetadataStorage> OtaService<S> {
})
.map_err(|e| OtaError::Other(e.to_string()))?;

log::info!("firmware update complete");
// verifies nvs was stored correctly
let curr_metadata = self
.stored_metadata()
.inspect_err(|e| log::error!("OTA update failed to store new metadata: {e}"))?;
if curr_metadata.version != self.pending_version {
return Err(OtaError::UpdateMetadata(
self.pending_version.clone(),
curr_metadata.version,
));
};
log::info!(
"firmware update successful: version `{}`",
curr_metadata.version
);

// Test experimental ffi accesses here to be recoverable without flashing
// Note: test experimental ota ffi accesses here to be recoverable without flashing
#[cfg(feature = "esp32")]
{
log::info!("rebooting to load firmware from `{:#x}`", self.address);
// TODO(RSDK-9464): flush logs to app.viam before restarting
esp_idf_svc::hal::reset::restart();
log::info!("next reboot will load firmware from `{:#x}`", self.address);
}

Ok(())
Ok(true)
}
}

0 comments on commit a4d72d8

Please sign in to comment.