Skip to content

Commit

Permalink
Introduce interrupt and retry handling to initial definition sync. (#321
Browse files Browse the repository at this point in the history
)

* Introduce interrupt and retry handling to initial definition sync.

We have been quite good with regards to handling interrupts / SIGTERMs almost from the very start in MSD.  However, a small oversight with regards to initial sync caused #314 , which in turn causes the daemon to freeze upon shutdown if any definition failed its initial sync.  The root cause is that the thread terminates on its own, not polling its end of the ender channel used to shut it down.

This PR fixes the issue.

In addition to that, we now also retry initial syncs just as failed incremental syncs are retried.  This adds robustness in the face of temporary network outages just as MSD is starting to execute.

* Commit clippy simplification making tests not pass.
  • Loading branch information
DFINITYManu authored Apr 17, 2024
1 parent b8bb7e0 commit b2bdd05
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
76 changes: 57 additions & 19 deletions rs/ic-observability/multiservice-discovery/src/definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,9 @@ impl RunningDefinition {
async fn initial_registry_sync(&self, use_current_version: bool) -> Result<(), SyncError> {
info!(
self.definition.log,
"Syncing local registry for {} started", self.definition.name
);
info!(
self.definition.log,
"Using local registry path: {}",
self.definition.registry_path.display()
"Syncing local registry for {} (to local registry path {}) started",
self.definition.name,
self.definition.registry_path.display(),
);

let r = sync_local_registry(
Expand All @@ -298,17 +295,29 @@ impl RunningDefinition {
self.definition.log,
"Syncing local registry for {} completed", self.definition.name,
);
self.metrics.observe_sync(self.name(), true)
self.metrics.observe_sync(self.name(), true);
Ok(())
}
Err(_) => {
warn!(
self.definition.log,
"Interrupted initial sync of definition {}", self.definition.name
);
self.metrics.observe_sync(self.name(), false)
Err(e) => {
match e {
SyncError::PublicKey(ref pkey) => {
error!(
self.definition.log,
"Failure in initial sync of {}: {}", self.definition.name, pkey,
);
// Note failure in metrics. On the other leg of the match
// we do not note either success or failure, since we don't
// know yet whether it was successful or not.
self.metrics.observe_sync(self.name(), false);
}
SyncError::Interrupted => info!(
self.definition.log,
"Interrupted initial sync of {}", self.definition.name
),
};
Err(e)
}
}
r
}

async fn poll_loop(&self) {
Expand Down Expand Up @@ -352,13 +361,42 @@ impl RunningDefinition {
// Syncs the registry and keeps running, syncing as new
// registry versions come in.
async fn run(&self) {
if self.initial_registry_sync(false).await.is_err() {
// Initial sync was interrupted.
self.metrics.observe_end(self.name());
return;
// Loop to do retries of initial sync and handle cancellation.
// We keep retries outside the callee to make the callee easier
// to test and more solid state.
while let Err(e) = self.initial_registry_sync(false).await {
match e {
SyncError::Interrupted => {
// Signal sent to callee via channel, initial sync interrupted.
// We signal observation end because we are going to return.
self.metrics.observe_end(self.name());
return;
}
SyncError::PublicKey(_) => {
// Initial sync failed.
error!(
self.definition.log,
"Will retry sync of {} until successful after {:#?}",
self.definition.name,
self.definition.poll_interval,
);
// Wait a prudent interval before retrying, but watch for
// termination during that wait.
let interval = crossbeam::channel::tick(self.definition.poll_interval);
crossbeam::select! {
recv(self.stop_signal) -> _ => {
// Terminated! Note the event and mark sync end.
info!(self.definition.log, "Received shutdown signal while waiting for initial sync retry of definition {}", self.definition.name);
self.metrics.observe_end(self.name());
return;
},
recv(interval) -> _ => continue,
}
}
}
}
self.metrics.observe_sync(self.name(), true);

// Ready to incrementally sync.
info!(
self.definition.log,
"Starting to watch for changes for definition {}", self.definition.name
Expand Down
7 changes: 6 additions & 1 deletion rs/ic-observability/service-discovery/src/registry_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ pub async fn sync_local_registry(
let local_store = Arc::new(LocalStoreImpl::new(local_path.clone()));
let registry_canister = RegistryCanister::new(nns_urls.to_vec());

if stop_signal.try_recv().is_ok() {
// Interrupted early. Let's get out of here.
return Err(SyncError::Interrupted);
}

let mut latest_version = if !Path::new(&local_path).exists() {
ZERO_REGISTRY_VERSION
} else {
Expand Down Expand Up @@ -85,7 +90,7 @@ pub async fn sync_local_registry(

loop {
if stop_signal.try_recv().is_ok() {
// Interrupted early. Let's get out of here.
// Interrupted. Let's get out of here.
return Err(SyncError::Interrupted);
}

Expand Down

0 comments on commit b2bdd05

Please sign in to comment.