Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ideas to improve the speed of shutdown sequence #12

Open
skywhale opened this issue Feb 19, 2021 · 4 comments · May be fixed by #85
Open

Ideas to improve the speed of shutdown sequence #12

skywhale opened this issue Feb 19, 2021 · 4 comments · May be fixed by #85

Comments

@skywhale
Copy link
Member

No description provided.

@facetious
Copy link

You should split apart the control message and the join so that all threads can receive their shutdown control first. This will allow the threads to clean up independently of one another. Collect all the pending thread handles together after sending the control message, then iterate through them to join them and ensure they've terminated.

                    match entry {
                        RegistryEntry::CurrentThread(_) => None,
                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
                            if thread_handle.thread().id() == current_thread.id() {
                                return None;
                            }

                            Some((actor_name, i, thread_handle))
                        },
                    }
                })
                .collect::<Vec<(String, usize, JoinHandle<()>)>>()
                .iter()
                .map(|(actor_name, i, thread_handle)| {
                    debug!("[{}] joining actor thread: {}", self.name, actor_name);

                    match thread_handle.join() {
                        Ok(Ok(())) => {
                            debug!("[{}] actor thread joined: {}", self.name, actor_name);
                            None
                        },
                        Ok(Err(e)) => {
                            error!("[{}] actor thread panicked: {} ({})", self.name, actor_name, e);
                            Some(actor_name)
                        },
                        Err(e) => {
                            error!(
                                "[{}] actor thread join failed: {} ({})",
                                self.name, actor_name, e
                            );
                            Some(actor_name)
                        },
                    }
                })

@strohel
Copy link
Member

strohel commented Feb 21, 2024

@facetious good idea! If you want to take a stab at implementation, patches are welcome!

@bschwind
Copy link
Member

I remember adding a patch for this a long time ago, before the actor crate got split out into its own repo. I'm actually not sure why it never got pulled along. Here was the original diff:

impl SystemHandle {
    /// Stops all actors spawned by this system.
    pub fn shutdown(&self) -> Result<(), Error> {
+        let shutdown_start = Instant::now();
+
        let current_thread = thread::current();
        let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
        info!("Thread [{}] shutting down the actor system", current_thread_name);
@@ -418,17 +420,22 @@ impl SystemHandle {
            let mut registry = self.registry.lock();
            debug!("[{}] joining {} actor threads.", self.name, registry.len());
            // Joining actors in the reverse order in which they are spawn.
+
+            for entry in registry.iter_mut().rev() {
+                let actor_name = entry.name();
+
+                if let Err(e) = entry.control_addr().stop() {
+                    warn!("control channel is closed: {} ({})", actor_name, e);
+                }
+            }

            registry
                .drain(..)
                .rev()
                .enumerate()
-                .filter_map(|(i, mut entry)| {
+                .filter_map(|(i, entry)| {
                    let actor_name = entry.name();

-                    if let Err(e) = entry.control_addr().stop() {
-                        warn!("control channel is closed: {} ({})", actor_name, e);
-                    }
-
                    match entry {
                        RegistryEntry::CurrentThread(_) => None,
                        RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
@@ -460,7 +467,7 @@ impl SystemHandle {
                .count()
        };

-        info!("[{}] system finished shutting down.", self.name);
+        info!("[{}] system finished shutting down in {:?}", self.name, shutdown_start.elapsed());

        if let Some(callback) = self.callbacks.postshutdown.as_ref() {
            info!("[{}] calling post-shutdown callback.", self.name);

@facetious
Copy link

lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants