Skip to content

Commit

Permalink
refactor: use multi-threaded runtime for blocking feature (#667)
Browse files Browse the repository at this point in the history
We need this to support log-consumers/followers, since it requires the
spawned tasks to run regardless of the `block_on` call.
  • Loading branch information
DDtKey authored Jun 17, 2024
1 parent e5217e3 commit ec466bc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
7 changes: 5 additions & 2 deletions testcontainers/src/core/containers/sync_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ where
}

impl<I: Image> Container<I> {
pub(crate) fn new(runtime: tokio::runtime::Runtime, async_impl: ContainerAsync<I>) -> Self {
pub(crate) fn new(
runtime: Arc<tokio::runtime::Runtime>,
async_impl: ContainerAsync<I>,
) -> Self {
Self {
inner: Some(ActiveContainer {
runtime: Arc::new(runtime),
runtime,
async_impl,
}),
}
Expand Down
41 changes: 34 additions & 7 deletions testcontainers/src/runners/sync_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use crate::{core::error::Result, Container, ContainerRequest, Image};
use std::sync::{Arc, Mutex, OnceLock, Weak};

use crate::{core::error::Result, Container, ContainerRequest, Image, TestcontainersError};

// We use `Weak` in order not to prevent `Drop` of being called.
// Instead, we re-create the runtime if it was dropped and asked one more time.
// This way we provide on `Drop` guarantees and avoid unnecessary instantiation at the same time.
static ASYNC_RUNTIME: OnceLock<Mutex<Weak<tokio::runtime::Runtime>>> = OnceLock::new();

/// Helper trait to start containers synchronously.
///
Expand Down Expand Up @@ -30,22 +37,40 @@ where
I: Image,
{
fn start(self) -> Result<Container<I>> {
let runtime = build_sync_runner()?;
let runtime = lazy_sync_runner()?;
let async_container = runtime.block_on(super::AsyncRunner::start(self))?;

Ok(Container::new(runtime, async_container))
}

fn pull_image(self) -> Result<ContainerRequest<I>> {
let runtime = build_sync_runner()?;
let runtime = lazy_sync_runner()?;
runtime.block_on(super::AsyncRunner::pull_image(self))
}
}

fn build_sync_runner() -> Result<tokio::runtime::Runtime> {
Ok(tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?)
fn lazy_sync_runner() -> Result<Arc<tokio::runtime::Runtime>> {
let mut guard = ASYNC_RUNTIME
.get_or_init(|| Mutex::new(Weak::new()))
.lock()
.map_err(|e| {
TestcontainersError::other(format!("failed to build a runtime for sync-runner: {e}"))
})?;

match guard.upgrade() {
Some(runtime) => Ok(runtime),
None => {
let runtime = Arc::new(
// we need to use multi-thread runtime,
// because we may spawn background tasks that must keep running regardless of `block_on` calls
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?,
);
*guard = Arc::downgrade(&runtime);
Ok(runtime)
}
}
}

#[cfg(test)]
Expand All @@ -71,6 +96,8 @@ mod tests {
fn runtime() -> &'static Runtime {
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("testcontainers-worker")
.worker_threads(2)
.enable_all()
.build()
.unwrap()
Expand Down

0 comments on commit ec466bc

Please sign in to comment.