diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7e7b3a8e5..f54b388de 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -222,6 +222,8 @@ jobs: name: frontend path: tests/server/build/ - name: Run orchestrator unit tests + env: + TESTS_MAX_CONCURRENCY: 3 run: chmod +x ./server/unit_tests_orchestrator && ./server/unit_tests_orchestrator - name: Run ui unit tests run: chmod +x ./server/unit_tests_ui && ./server/unit_tests_ui diff --git a/ci/workflows.yml b/ci/workflows.yml index 9121e9ec0..77c1a3112 100644 --- a/ci/workflows.yml +++ b/ci/workflows.yml @@ -312,6 +312,8 @@ workflows: path: tests/server/build/ - name: "Run orchestrator unit tests" + env: + TESTS_MAX_CONCURRENCY: 3 run: |- chmod +x ./server/unit_tests_orchestrator && ./server/unit_tests_orchestrator diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock index cf6ab4c06..f01de3890 100644 --- a/compiler/base/orchestrator/Cargo.lock +++ b/compiler/base/orchestrator/Cargo.lock @@ -316,6 +316,7 @@ dependencies = [ "bincode", "futures", "modify-cargo-toml", + "once_cell", "serde", "snafu", "tempdir", diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml index cea0c744c..5f720ce29 100644 --- a/compiler/base/orchestrator/Cargo.toml +++ b/compiler/base/orchestrator/Cargo.toml @@ -22,5 +22,6 @@ tracing = { version = "0.1.37", default-features = false, features = ["attribute [dev-dependencies] assert_matches = "1.5.0" assertables = "7.0.1" +once_cell = "1.18.0" tempdir = "0.3.7" tracing-subscriber = "0.3.17" diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index e12d536d4..0551aae22 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -1709,8 +1709,10 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok mod tests { use assertables::*; use futures::{future::try_join_all, Future, FutureExt}; - use std::{sync::Once, time::Duration}; + use once_cell::sync::Lazy; + use std::{env, sync::Once, time::Duration}; use tempdir::TempDir; + use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::*; @@ -1774,15 +1776,78 @@ mod tests { } } - async fn new_coordinator() -> Coordinator { + const MAX_CONCURRENT_TESTS: Lazy = Lazy::new(|| { + env::var("TESTS_MAX_CONCURRENCY") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5) + }); + + static CONCURRENT_TEST_SEMAPHORE: Lazy> = + Lazy::new(|| Arc::new(Semaphore::new(*MAX_CONCURRENT_TESTS))); + + struct RestrictedCoordinator { + _permit: OwnedSemaphorePermit, + coordinator: Coordinator, + } + + impl RestrictedCoordinator + where + T: Backend, + { + async fn with(f: F) -> Self + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let semaphore = CONCURRENT_TEST_SEMAPHORE.clone(); + let permit = semaphore + .acquire_owned() + .await + .expect("Unable to acquire permit"); + let coordinator = f().await; + Self { + _permit: permit, + coordinator, + } + } + + async fn shutdown(self) -> super::Result { + self.coordinator.shutdown().await + } + } + + impl ops::Deref for RestrictedCoordinator { + type Target = Coordinator; + + fn deref(&self) -> &Self::Target { + &self.coordinator + } + } + + impl ops::DerefMut for RestrictedCoordinator { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.coordinator + } + } + + async fn new_coordinator_test() -> RestrictedCoordinator { + RestrictedCoordinator::with(|| Coordinator::new(TestBackend::new())).await + } + + async fn new_coordinator_docker() -> RestrictedCoordinator { + RestrictedCoordinator::with(|| Coordinator::new_docker()).await + } + + async fn new_coordinator() -> RestrictedCoordinator { #[cfg(not(force_docker))] { - Coordinator::new(TestBackend::new()).await + new_coordinator_test().await } #[cfg(force_docker)] { - Coordinator::new_docker().await + new_coordinator_docker().await } } @@ -2472,7 +2537,7 @@ mod tests { #[snafu::report] async fn compile_wasm() -> Result<()> { // cargo-wasm only exists inside the container - let coordinator = Coordinator::new_docker().await; + let coordinator = new_coordinator_docker().await; let req = CompileRequest { target: CompileTarget::Wasm, @@ -2703,7 +2768,7 @@ mod tests { #[snafu::report] async fn network_connections_are_disabled() -> Result<()> { // The limits are only applied to the container - let coordinator = Coordinator::new_docker().await; + let coordinator = new_coordinator_docker().await; let req = ExecuteRequest { code: r#" @@ -2729,7 +2794,7 @@ mod tests { #[snafu::report] async fn memory_usage_is_limited() -> Result<()> { // The limits are only applied to the container - let coordinator = Coordinator::new_docker().await; + let coordinator = new_coordinator_docker().await; let req = ExecuteRequest { code: r#" @@ -2756,7 +2821,7 @@ mod tests { #[snafu::report] async fn number_of_pids_is_limited() -> Result<()> { // The limits are only applied to the container - let coordinator = Coordinator::new_docker().await; + let coordinator = new_coordinator_docker().await; let req = ExecuteRequest { code: r##"