From 154050d1b5dffe33eb79ff089687a96f6589bbd3 Mon Sep 17 00:00:00 2001 From: Chris Beck Date: Wed, 1 Apr 2020 13:48:03 -0700 Subject: [PATCH 1/2] Expose an API in grpcio::env that shuts-down AND joins the threads `grpcio::env` impl of `Drop` issues commands to request all the completion queues to shutdown, but does not actually join the threads. For a lot of webservers this works fine, but for some tests, it creates a problem. In my usecase I have a server containing SGX enclaves and a database, and I want to validate that even if the server goes down and comes back repeatedly, the users are able to recover their data from the database. ``` let users = ... mock user set for phase_count in 0..NUM_PHASES { log::info!(logger, "Phase {}/{}", phase_count + 1, NUM_PHASES); // First make grpcio env let grpcio_env = mobile_acct_api::make_env(); ... make server, make client, ... make requests for each mock user, ... validate results } ``` Unfortunately for me, even though `grpcio_env` is scoped to the loop body, the threads actually leak out because the implementation of `Drop` does not join the threads. Unfortunately, this consistently causes crashes in tests because intel sgx sdk contains a `SimEnclaveMgr` object which has a static lifetime and is torn down at process destruction. I believe that with the current API, I cannot guarantee that my grpcio threads are torn down BEFORE that object is. The only way that I can do that is if there is some API on `grpcio::Environment` that actually joins the threads. In the actual rust tests that validate `grpcio::Environment`, you yourselves have written code that joins the join handles. I would like to be able to do that in my tests at the end of my loop body. This commit exposes an API on grpcio::Environment that both issues the shutdown command, AND joins the join handles. It also makes the rust unit test, in that same file, use this API. This is not a breaking change, since we don't change the implementation of `Drop` or any other public api. Signed-off-by: Chris Beck --- src/env.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/env.rs b/src/env.rs index 8bad45ec1..799a0adb3 100644 --- a/src/env.rs +++ b/src/env.rs @@ -129,6 +129,17 @@ impl Environment { let idx = self.idx.fetch_add(1, Ordering::Relaxed); self.cqs[idx % self.cqs.len()].clone() } + + /// Shutdown the completion queues and join all threads + pub fn shutdown_and_join(&mut self) { + for cq in self.completion_queues() { + cq.shutdown(); + } + + for handle in self._handles.drain(..) { + handle.join().unwrap(); + } + } } impl Drop for Environment { @@ -163,12 +174,6 @@ mod tests { } assert_eq!(env.completion_queues().len(), 2); - for cq in env.completion_queues() { - cq.shutdown(); - } - - for handle in env._handles.drain(..) { - handle.join().unwrap(); - } + env.shutdown_and_join(); } } From 1e473dab71cade9e0a0f7e1c43dd16ae26961883 Mon Sep 17 00:00:00 2001 From: Chris Beck Date: Thu, 2 Apr 2020 09:34:11 -0700 Subject: [PATCH 2/2] Instead of previous commit, `join` worker threads in `drop` Includes a test for whether any of them is the current thread before joining. Signed-off-by: Chris Beck --- src/env.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/env.rs b/src/env.rs index 799a0adb3..66e0be226 100644 --- a/src/env.rs +++ b/src/env.rs @@ -129,17 +129,6 @@ impl Environment { let idx = self.idx.fetch_add(1, Ordering::Relaxed); self.cqs[idx % self.cqs.len()].clone() } - - /// Shutdown the completion queues and join all threads - pub fn shutdown_and_join(&mut self) { - for cq in self.completion_queues() { - cq.shutdown(); - } - - for handle in self._handles.drain(..) { - handle.join().unwrap(); - } - } } impl Drop for Environment { @@ -148,6 +137,15 @@ impl Drop for Environment { // it's safe to shutdown more than once. cq.shutdown() } + + // Join our threads when we leave scope + // Try not to join the current thread + let current_thread_id = std::thread::current().id(); + for handle in self._handles.drain(..) { + if handle.thread().id() != current_thread_id { + handle.join().unwrap(); + } + } } } @@ -157,7 +155,7 @@ mod tests { #[test] fn test_basic_loop() { - let mut env = Environment::new(2); + let env = Environment::new(2); let q1 = env.pick_cq(); let q2 = env.pick_cq(); @@ -174,6 +172,5 @@ mod tests { } assert_eq!(env.completion_queues().len(), 2); - env.shutdown_and_join(); } }