diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index 564d94a5c..8750bb2ea 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -275,12 +275,9 @@ impl RunningDefinition { async fn initial_registry_sync(&self, use_current_version: bool) -> Result<(), SyncError> { info!( self.definition.log, - "Syncing local registry for {} started", self.definition.name - ); - info!( - self.definition.log, - "Using local registry path: {}", - self.definition.registry_path.display() + "Syncing local registry for {} (to local registry path {}) started", + self.definition.name, + self.definition.registry_path.display(), ); let r = sync_local_registry( @@ -298,17 +295,29 @@ impl RunningDefinition { self.definition.log, "Syncing local registry for {} completed", self.definition.name, ); - self.metrics.observe_sync(self.name(), true) + self.metrics.observe_sync(self.name(), true); + Ok(()) } - Err(_) => { - warn!( - self.definition.log, - "Interrupted initial sync of definition {}", self.definition.name - ); - self.metrics.observe_sync(self.name(), false) + Err(e) => { + match e { + SyncError::PublicKey(ref pkey) => { + error!( + self.definition.log, + "Failure in initial sync of {}: {}", self.definition.name, pkey, + ); + // Note failure in metrics. On the other leg of the match + // we do not note either success or failure, since we don't + // know yet whether it was successful or not. + self.metrics.observe_sync(self.name(), false); + } + SyncError::Interrupted => info!( + self.definition.log, + "Interrupted initial sync of {}", self.definition.name + ), + }; + Err(e) } } - r } async fn poll_loop(&self) { @@ -352,13 +361,42 @@ impl RunningDefinition { // Syncs the registry and keeps running, syncing as new // registry versions come in. async fn run(&self) { - if self.initial_registry_sync(false).await.is_err() { - // Initial sync was interrupted. - self.metrics.observe_end(self.name()); - return; + // Loop to do retries of initial sync and handle cancellation. + // We keep retries outside the callee to make the callee easier + // to test and more solid state. + while let Err(e) = self.initial_registry_sync(false).await { + match e { + SyncError::Interrupted => { + // Signal sent to callee via channel, initial sync interrupted. + // We signal observation end because we are going to return. + self.metrics.observe_end(self.name()); + return; + } + SyncError::PublicKey(_) => { + // Initial sync failed. + error!( + self.definition.log, + "Will retry sync of {} until successful after {:#?}", + self.definition.name, + self.definition.poll_interval, + ); + // Wait a prudent interval before retrying, but watch for + // termination during that wait. + let interval = crossbeam::channel::tick(self.definition.poll_interval); + crossbeam::select! { + recv(self.stop_signal) -> _ => { + // Terminated! Note the event and mark sync end. + info!(self.definition.log, "Received shutdown signal while waiting for initial sync retry of definition {}", self.definition.name); + self.metrics.observe_end(self.name()); + return; + }, + recv(interval) -> _ => continue, + } + } + } } - self.metrics.observe_sync(self.name(), true); + // Ready to incrementally sync. info!( self.definition.log, "Starting to watch for changes for definition {}", self.definition.name diff --git a/rs/ic-observability/service-discovery/src/registry_sync.rs b/rs/ic-observability/service-discovery/src/registry_sync.rs index 45a36a7e2..75ac55ba4 100644 --- a/rs/ic-observability/service-discovery/src/registry_sync.rs +++ b/rs/ic-observability/service-discovery/src/registry_sync.rs @@ -50,6 +50,11 @@ pub async fn sync_local_registry( let local_store = Arc::new(LocalStoreImpl::new(local_path.clone())); let registry_canister = RegistryCanister::new(nns_urls.to_vec()); + if stop_signal.try_recv().is_ok() { + // Interrupted early. Let's get out of here. + return Err(SyncError::Interrupted); + } + let mut latest_version = if !Path::new(&local_path).exists() { ZERO_REGISTRY_VERSION } else { @@ -85,7 +90,7 @@ pub async fn sync_local_registry( loop { if stop_signal.try_recv().is_ok() { - // Interrupted early. Let's get out of here. + // Interrupted. Let's get out of here. return Err(SyncError::Interrupted); }