From 69c2e6ae290b385fa90fb6eeaceac1488e1271d4 Mon Sep 17 00:00:00 2001
From: Joris Bayer <joris.bayer@sentry.io>
Date: Tue, 19 Nov 2024 08:22:19 +0100
Subject: [PATCH] fix(server): Terminate on service panic (#4249)

Terminate the process when a panic occurs in one of the services.

Instead of implementing a sync `fn spawn_handler()` interface, each
service now implements an `async fn run()` interface. This simplifies
service implementations in most cases (except where services need to
spawn multiple tasks), and allows us to join on all service main tasks
to detect a panic.

The PR introduces a `ServiceRunner` utility to easily start & join on
running services.

---------

Co-authored-by: Sebastian Zivota <loewenheim@users.noreply.github.com>
---
 CHANGELOG.md                                  |   1 +
 relay-server/src/lib.rs                       |  17 +-
 relay-server/src/service.rs                   | 117 +++++-----
 relay-server/src/services/buffer/mod.rs       | 160 ++++++-------
 relay-server/src/services/cogs.rs             |  10 +-
 relay-server/src/services/global_config.rs    |  84 ++++---
 relay-server/src/services/health_check.rs     |  28 +--
 .../src/services/metrics/aggregator.rs        |  54 ++---
 relay-server/src/services/metrics/router.rs   |  38 ++--
 relay-server/src/services/outcome.rs          |  66 +++---
 .../src/services/outcome_aggregator.rs        |  34 ++-
 relay-server/src/services/processor.rs        |  18 +-
 .../src/services/projects/cache/legacy.rs     | 215 +++++++++---------
 .../src/services/projects/cache/service.rs    |  44 ++--
 .../src/services/projects/source/local.rs     |  26 +--
 .../src/services/projects/source/mod.rs       |  15 +-
 .../src/services/projects/source/upstream.rs  |  27 ++-
 relay-server/src/services/relays.rs           |  30 ++-
 relay-server/src/services/server/mod.rs       |  12 +-
 relay-server/src/services/spooler/mod.rs      |  52 ++---
 relay-server/src/services/stats.rs            |  20 +-
 relay-server/src/services/store.rs            |  20 +-
 relay-server/src/services/test_store.rs       |  10 +-
 relay-server/src/services/upstream.rs         |  20 +-
 relay-system/src/controller.rs                |  23 +-
 relay-system/src/service.rs                   |  96 +++++---
 26 files changed, 623 insertions(+), 614 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d2a36ffe80b..63d576b58ed 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,7 @@
 **Bug Fixes**:
 
 - Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155))
+- Terminate the process when one of the services crashes. ([#4249](https://github.com/getsentry/relay/pull/4249))
 
 **Features**:
 
diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs
index 986db2e5d4d..d675c220bf8 100644
--- a/relay-server/src/lib.rs
+++ b/relay-server/src/lib.rs
@@ -279,7 +279,7 @@ mod testutils;
 use std::sync::Arc;
 
 use relay_config::Config;
-use relay_system::{Controller, Service};
+use relay_system::Controller;
 
 use crate::service::ServiceState;
 use crate::services::server::HttpServer;
@@ -301,9 +301,18 @@ pub fn run(config: Config) -> anyhow::Result<()> {
     // information on all services.
     main_runtime.block_on(async {
         Controller::start(config.shutdown_timeout());
-        let service = ServiceState::start(config.clone())?;
-        HttpServer::new(config, service.clone())?.start();
-        Controller::shutdown_handle().finished().await;
+        let (state, mut runner) = ServiceState::start(config.clone())?;
+        runner.start(HttpServer::new(config, state.clone())?);
+
+        tokio::select! {
+            _ = runner.join() => {},
+            // NOTE: when every service implements a shutdown listener,
+            // awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
+            // that every service finished its main task.
+            // See also https://github.com/getsentry/relay/issues/4050.
+            _ = Controller::shutdown_handle().finished() => {}
+        }
+
         anyhow::Ok(())
     })?;
 
diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs
index a736fd17f3b..4376ec7394e 100644
--- a/relay-server/src/service.rs
+++ b/relay-server/src/service.rs
@@ -31,7 +31,7 @@ use relay_redis::redis::Script;
 #[cfg(feature = "processing")]
 use relay_redis::{PooledClient, RedisScripts};
 use relay_redis::{RedisError, RedisPool, RedisPools};
-use relay_system::{channel, Addr, Service};
+use relay_system::{channel, Addr, Service, ServiceRunner};
 use tokio::runtime::Runtime;
 use tokio::sync::mpsc;
 
@@ -134,7 +134,6 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
 struct StateInner {
     config: Arc<Config>,
     memory_checker: MemoryChecker,
-
     registry: Registry,
 }
 
@@ -146,9 +145,10 @@ pub struct ServiceState {
 
 impl ServiceState {
     /// Starts all services and returns addresses to all of them.
-    pub fn start(config: Arc<Config>) -> Result<Self> {
-        let upstream_relay = UpstreamRelayService::new(config.clone()).start();
-        let test_store = TestStoreService::new(config.clone()).start();
+    pub fn start(config: Arc<Config>) -> Result<(Self, ServiceRunner)> {
+        let mut runner = ServiceRunner::new();
+        let upstream_relay = runner.start(UpstreamRelayService::new(config.clone()));
+        let test_store = runner.start(TestStoreService::new(config.clone()));
 
         let redis_pools = config
             .redis()
@@ -173,13 +173,13 @@ impl ServiceState {
         // Create an address for the `EnvelopeProcessor`, which can be injected into the
         // other services.
         let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
-        let outcome_producer = OutcomeProducerService::create(
+        let outcome_producer = runner.start(OutcomeProducerService::create(
             config.clone(),
             upstream_relay.clone(),
             processor.clone(),
-        )?
-        .start();
-        let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();
+        )?);
+        let outcome_aggregator =
+            runner.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
 
         let (global_config, global_config_rx) =
             GlobalConfigService::new(config.clone(), upstream_relay.clone());
@@ -187,12 +187,13 @@ impl ServiceState {
         // The global config service must start before dependant services are
         // started. Messages like subscription requests to the global config
         // service fail if the service is not running.
-        let global_config = global_config.start();
+        let global_config = runner.start(global_config);
 
         let (legacy_project_cache, legacy_project_cache_rx) =
             channel(legacy::ProjectCacheService::name());
 
-        let project_source = ProjectSource::start(
+        let project_source = ProjectSource::start_in(
+            &mut runner,
             Arc::clone(&config),
             upstream_relay.clone(),
             redis_pools
@@ -200,7 +201,7 @@ impl ServiceState {
                 .map(|pools| pools.project_configs.clone()),
         );
         let project_cache_handle =
-            ProjectCacheService::new(Arc::clone(&config), project_source).start();
+            ProjectCacheService::new(Arc::clone(&config), project_source).start_in(&mut runner);
 
         let aggregator = RouterService::new(
             config.default_aggregator_config().clone(),
@@ -208,7 +209,7 @@ impl ServiceState {
             Some(legacy_project_cache.clone().recipient()),
         );
         let aggregator_handle = aggregator.handle();
-        let aggregator = aggregator.start();
+        let aggregator = runner.start(aggregator);
 
         let metric_stats = MetricStats::new(
             config.clone(),
@@ -229,32 +230,34 @@ impl ServiceState {
                     outcome_aggregator.clone(),
                     metric_outcomes.clone(),
                 )
-                .map(|s| s.start())
+                .map(|s| runner.start(s))
             })
             .transpose()?;
 
         let cogs = CogsService::new(&config);
-        let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));
-
-        EnvelopeProcessorService::new(
-            create_processor_pool(&config)?,
-            config.clone(),
-            global_config_handle,
-            project_cache_handle.clone(),
-            cogs,
-            #[cfg(feature = "processing")]
-            redis_pools.clone(),
-            processor::Addrs {
-                outcome_aggregator: outcome_aggregator.clone(),
-                upstream_relay: upstream_relay.clone(),
-                test_store: test_store.clone(),
+        let cogs = Cogs::new(CogsServiceRecorder::new(&config, runner.start(cogs)));
+
+        runner.start_with(
+            EnvelopeProcessorService::new(
+                create_processor_pool(&config)?,
+                config.clone(),
+                global_config_handle,
+                project_cache_handle.clone(),
+                cogs,
                 #[cfg(feature = "processing")]
-                store_forwarder: store.clone(),
-                aggregator: aggregator.clone(),
-            },
-            metric_outcomes.clone(),
-        )
-        .spawn_handler(processor_rx);
+                redis_pools.clone(),
+                processor::Addrs {
+                    outcome_aggregator: outcome_aggregator.clone(),
+                    upstream_relay: upstream_relay.clone(),
+                    test_store: test_store.clone(),
+                    #[cfg(feature = "processing")]
+                    store_forwarder: store.clone(),
+                    aggregator: aggregator.clone(),
+                },
+                metric_outcomes.clone(),
+            ),
+            processor_rx,
+        );
 
         let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
         let envelope_buffer = EnvelopeBufferService::new(
@@ -268,7 +271,7 @@ impl ServiceState {
                 test_store: test_store.clone(),
             },
         )
-        .map(|b| b.start_observable());
+        .map(|b| b.start_in(&mut runner));
 
         // Keep all the services in one context.
         let project_cache_services = legacy::Services {
@@ -280,34 +283,37 @@ impl ServiceState {
             test_store: test_store.clone(),
         };
 
-        legacy::ProjectCacheService::new(
-            config.clone(),
-            MemoryChecker::new(memory_stat.clone(), config.clone()),
-            project_cache_handle.clone(),
-            project_cache_services,
-            global_config_rx,
-            envelopes_rx,
-        )
-        .spawn_handler(legacy_project_cache_rx);
+        runner.start_with(
+            legacy::ProjectCacheService::new(
+                config.clone(),
+                MemoryChecker::new(memory_stat.clone(), config.clone()),
+                project_cache_handle.clone(),
+                project_cache_services,
+                global_config_rx,
+                envelopes_rx,
+            ),
+            legacy_project_cache_rx,
+        );
 
-        let health_check = HealthCheckService::new(
+        let health_check = runner.start(HealthCheckService::new(
             config.clone(),
             MemoryChecker::new(memory_stat.clone(), config.clone()),
             aggregator_handle,
             upstream_relay.clone(),
             envelope_buffer.clone(),
-        )
-        .start();
+        ));
 
-        RelayStats::new(
+        runner.start(RelayStats::new(
             config.clone(),
             upstream_relay.clone(),
             #[cfg(feature = "processing")]
             redis_pools.clone(),
-        )
-        .start();
+        ));
 
-        let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start();
+        let relay_cache = runner.start(RelayCacheService::new(
+            config.clone(),
+            upstream_relay.clone(),
+        ));
 
         let registry = Registry {
             processor,
@@ -329,9 +335,12 @@ impl ServiceState {
             registry,
         };
 
-        Ok(ServiceState {
-            inner: Arc::new(state),
-        })
+        Ok((
+            ServiceState {
+                inner: Arc::new(state),
+            },
+            runner,
+        ))
     }
 
     /// Returns a reference to the Relay configuration.
diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs
index 519f5bcf902..968bc06d395 100644
--- a/relay-server/src/services/buffer/mod.rs
+++ b/relay-server/src/services/buffer/mod.rs
@@ -10,6 +10,7 @@ use chrono::DateTime;
 use chrono::Utc;
 use relay_base_schema::project::ProjectKey;
 use relay_config::Config;
+use relay_system::ServiceRunner;
 use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
 use relay_system::{Controller, Shutdown};
 use tokio::sync::mpsc::Permit;
@@ -144,12 +145,11 @@ impl EnvelopeBufferService {
     }
 
     /// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
-    pub fn start_observable(self) -> ObservableEnvelopeBuffer {
+    pub fn start_in(self, runner: &mut ServiceRunner) -> ObservableEnvelopeBuffer {
         let has_capacity = self.has_capacity.clone();
-        ObservableEnvelopeBuffer {
-            addr: self.start(),
-            has_capacity,
-        }
+
+        let addr = runner.start(self);
+        ObservableEnvelopeBuffer { addr, has_capacity }
     }
 
     /// Wait for the configured amount of time and make sure the project cache is ready to receive.
@@ -378,7 +378,7 @@ fn is_expired(last_received_at: DateTime<Utc>, config: &Config) -> bool {
 impl Service for EnvelopeBufferService {
     type Interface = EnvelopeBuffer;
 
-    fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
+    async fn run(mut self, mut rx: Receiver<Self::Interface>) {
         let config = self.config.clone();
         let memory_checker = MemoryChecker::new(self.memory_stat.clone(), config.clone());
         let mut global_config_rx = self.global_config_rx.clone();
@@ -387,81 +387,14 @@ impl Service for EnvelopeBufferService {
         let dequeue = Arc::<AtomicBool>::new(true.into());
         let dequeue1 = dequeue.clone();
 
-        relay_system::spawn!(async move {
-            let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await;
-
-            let mut buffer = match buffer {
-                Ok(buffer) => buffer,
-                Err(error) => {
-                    relay_log::error!(
-                        error = &error as &dyn std::error::Error,
-                        "failed to start the envelope buffer service",
-                    );
-                    std::process::exit(1);
-                }
-            };
-            buffer.initialize().await;
-
-            let mut shutdown = Controller::shutdown_handle();
-            let mut project_changes = self.services.project_cache_handle.changes();
-
-            relay_log::info!("EnvelopeBufferService: starting");
-            loop {
-                let used_capacity = self.services.envelopes_tx.max_capacity()
-                    - self.services.envelopes_tx.capacity();
-                relay_statsd::metric!(
-                    histogram(RelayHistograms::BufferBackpressureEnvelopesCount) =
-                        used_capacity as u64
-                );
-
-                let mut sleep = Duration::MAX;
-                tokio::select! {
-                    // NOTE: we do not select a bias here.
-                    // On the one hand, we might want to prioritize dequeuing over enqueuing
-                    // so we do not exceed the buffer capacity by starving the dequeue.
-                    // on the other hand, prioritizing old messages violates the LIFO design.
-                    Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
-                        match Self::try_pop(&config, &mut buffer, &services, permit).await {
-                            Ok(new_sleep) => {
-                                sleep = new_sleep;
-                            }
-                            Err(error) => {
-                                relay_log::error!(
-                                error = &error as &dyn std::error::Error,
-                                "failed to pop envelope"
-                            );
-                            }
-                        }
-                    }
-                    change = project_changes.recv() => {
-                        if let Ok(ProjectChange::Ready(project_key)) = change {
-                            buffer.mark_ready(&project_key, true);
-                        }
-                        sleep = Duration::ZERO;
-                    }
-                    Some(message) = rx.recv() => {
-                        Self::handle_message(&mut buffer, &services, message).await;
-                        sleep = Duration::ZERO;
-                    }
-                    shutdown = shutdown.notified() => {
-                        // In case the shutdown was handled, we break out of the loop signaling that
-                        // there is no need to process anymore envelopes.
-                        if Self::handle_shutdown(&mut buffer, shutdown).await {
-                            break;
-                        }
-                    }
-                    Ok(()) = global_config_rx.changed() => {
-                        sleep = Duration::ZERO;
-                    }
-                    else => break,
-                }
+        let mut buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker)
+            .await
+            .expect("failed to start the envelope buffer service");
 
-                self.sleep = sleep;
-                self.update_observable_state(&mut buffer);
-            }
+        buffer.initialize().await;
 
-            relay_log::info!("EnvelopeBufferService: stopping");
-        });
+        let mut shutdown = Controller::shutdown_handle();
+        let mut project_changes = self.services.project_cache_handle.changes();
 
         #[cfg(unix)]
         relay_system::spawn!(async move {
@@ -475,6 +408,62 @@ impl Service for EnvelopeBufferService {
                 relay_log::info!("SIGUSR1 receive, dequeue={}", deq);
             }
         });
+
+        relay_log::info!("EnvelopeBufferService: starting");
+        loop {
+            let used_capacity =
+                self.services.envelopes_tx.max_capacity() - self.services.envelopes_tx.capacity();
+            relay_statsd::metric!(
+                histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64
+            );
+
+            let mut sleep = Duration::MAX;
+            tokio::select! {
+                // NOTE: we do not select a bias here.
+                // On the one hand, we might want to prioritize dequeuing over enqueuing
+                // so we do not exceed the buffer capacity by starving the dequeue.
+                // on the other hand, prioritizing old messages violates the LIFO design.
+                Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
+                    match Self::try_pop(&config, &mut buffer, &services, permit).await {
+                        Ok(new_sleep) => {
+                            sleep = new_sleep;
+                        }
+                        Err(error) => {
+                            relay_log::error!(
+                            error = &error as &dyn std::error::Error,
+                            "failed to pop envelope"
+                        );
+                        }
+                    }
+                }
+                change = project_changes.recv() => {
+                    if let Ok(ProjectChange::Ready(project_key)) = change {
+                        buffer.mark_ready(&project_key, true);
+                    }
+                    sleep = Duration::ZERO;
+                }
+                Some(message) = rx.recv() => {
+                    Self::handle_message(&mut buffer, &services, message).await;
+                    sleep = Duration::ZERO;
+                }
+                shutdown = shutdown.notified() => {
+                    // In case the shutdown was handled, we break out of the loop signaling that
+                    // there is no need to process anymore envelopes.
+                    if Self::handle_shutdown(&mut buffer, shutdown).await {
+                        break;
+                    }
+                }
+                Ok(()) = global_config_rx.changed() => {
+                    sleep = Duration::ZERO;
+                }
+                else => break,
+            }
+
+            self.sleep = sleep;
+            self.update_observable_state(&mut buffer);
+        }
+
+        relay_log::info!("EnvelopeBufferService: stopping");
     }
 }
 
@@ -556,7 +545,8 @@ mod tests {
 
         service.has_capacity.store(false, Ordering::Relaxed);
 
-        let ObservableEnvelopeBuffer { has_capacity, .. } = service.start_observable();
+        let ObservableEnvelopeBuffer { has_capacity, .. } =
+            service.start_in(&mut ServiceRunner::new());
         assert!(!has_capacity.load(Ordering::Relaxed));
 
         tokio::time::advance(Duration::from_millis(100)).await;
@@ -580,7 +570,7 @@ mod tests {
             ..
         } = envelope_buffer_service(None, global_config::Status::Pending);
 
-        let addr = service.start();
+        let addr = service.start_detached();
 
         let envelope = new_envelope(false, "foo");
         let project_key = envelope.meta().public_key();
@@ -624,7 +614,7 @@ mod tests {
             global_config::Status::Ready(Arc::new(GlobalConfig::default())),
         );
 
-        let addr = service.start();
+        let addr = service.start_detached();
 
         let envelope = new_envelope(false, "foo");
         let project_key = envelope.meta().public_key();
@@ -657,7 +647,7 @@ mod tests {
         );
 
         let config = service.config.clone();
-        let addr = service.start();
+        let addr = service.start_detached();
 
         tokio::time::sleep(Duration::from_millis(100)).await;
 
@@ -690,7 +680,7 @@ mod tests {
             global_config::Status::Ready(Arc::new(GlobalConfig::default())),
         );
 
-        let addr = service.start();
+        let addr = service.start_detached();
 
         let envelope = new_envelope(false, "foo");
         let project_key = envelope.meta().public_key();
@@ -725,7 +715,7 @@ mod tests {
             global_config::Status::Ready(Arc::new(GlobalConfig::default())),
         );
 
-        let addr = service.start();
+        let addr = service.start_detached();
 
         let envelope = new_envelope(false, "foo");
         let project_key = envelope.meta().public_key();
diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs
index 93462ddd899..5dc804f9274 100644
--- a/relay-server/src/services/cogs.rs
+++ b/relay-server/src/services/cogs.rs
@@ -54,12 +54,10 @@ impl CogsService {
 impl Service for CogsService {
     type Interface = CogsReport;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            while let Some(message) = rx.recv().await {
-                self.handle_report(message);
-            }
-        });
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        while let Some(message) = rx.recv().await {
+            self.handle_report(message);
+        }
     }
 }
 
diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs
index 06b9d72cd6c..07520bb8ba7 100644
--- a/relay-server/src/services/global_config.rs
+++ b/relay-server/src/services/global_config.rs
@@ -338,53 +338,51 @@ impl GlobalConfigService {
 impl Service for GlobalConfigService {
     type Interface = GlobalConfigManager;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            let mut shutdown_handle = Controller::shutdown_handle();
-
-            relay_log::info!("global config service starting");
-            if self.config.relay_mode() == RelayMode::Managed {
-                relay_log::info!("requesting global config from upstream");
-                self.request_global_config();
-            } else {
-                match GlobalConfig::load(self.config.path()) {
-                    Ok(Some(from_file)) => {
-                        relay_log::info!("serving static global config loaded from file");
-                        self.global_config_watch
-                            .send_replace(Status::Ready(Arc::new(from_file)));
-                    }
-                    Ok(None) => {
-                        relay_log::info!(
-                                "serving default global configs due to lacking static global config file"
-                            );
-                        self.global_config_watch
-                            .send_replace(Status::Ready(Arc::default()));
-                    }
-                    Err(e) => {
-                        relay_log::error!("failed to load global config from file: {}", e);
-                        relay_log::info!(
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        let mut shutdown_handle = Controller::shutdown_handle();
+
+        relay_log::info!("global config service starting");
+        if self.config.relay_mode() == RelayMode::Managed {
+            relay_log::info!("requesting global config from upstream");
+            self.request_global_config();
+        } else {
+            match GlobalConfig::load(self.config.path()) {
+                Ok(Some(from_file)) => {
+                    relay_log::info!("serving static global config loaded from file");
+                    self.global_config_watch
+                        .send_replace(Status::Ready(Arc::new(from_file)));
+                }
+                Ok(None) => {
+                    relay_log::info!(
+                        "serving default global configs due to lacking static global config file"
+                    );
+                    self.global_config_watch
+                        .send_replace(Status::Ready(Arc::default()));
+                }
+                Err(e) => {
+                    relay_log::error!("failed to load global config from file: {}", e);
+                    relay_log::info!(
                                 "serving default global configs due to failure to load global config from file"
                             );
-                        self.global_config_watch
-                            .send_replace(Status::Ready(Arc::default()));
-                    }
+                    self.global_config_watch
+                        .send_replace(Status::Ready(Arc::default()));
                 }
-            };
+            }
+        };
 
-            loop {
-                tokio::select! {
-                    biased;
+        loop {
+            tokio::select! {
+                biased;
 
-                    () = &mut self.fetch_handle => self.request_global_config(),
-                    Some(result) = self.internal_rx.recv() => self.handle_result(result),
-                    Some(message) = rx.recv() => self.handle_message(message),
-                    _ = shutdown_handle.notified() => self.handle_shutdown(),
+                () = &mut self.fetch_handle => self.request_global_config(),
+                Some(result) = self.internal_rx.recv() => self.handle_result(result),
+                Some(message) = rx.recv() => self.handle_message(message),
+                _ = shutdown_handle.notified() => self.handle_shutdown(),
 
-                    else => break,
-                }
+                else => break,
             }
-            relay_log::info!("global config service stopped");
-        });
+        }
+        relay_log::info!("global config service stopped");
     }
 }
 
@@ -421,7 +419,7 @@ mod tests {
 
         let service = GlobalConfigService::new(Arc::new(config), upstream)
             .0
-            .start();
+            .start_detached();
 
         assert!(service.send(Get).await.is_ok());
 
@@ -452,7 +450,7 @@ mod tests {
         let fetch_interval = config.global_config_fetch_interval();
         let service = GlobalConfigService::new(Arc::new(config), upstream)
             .0
-            .start();
+            .start_detached();
         service.send(Get).await.unwrap();
 
         tokio::time::sleep(fetch_interval * 2).await;
@@ -479,7 +477,7 @@ mod tests {
 
         let service = GlobalConfigService::new(Arc::new(config), upstream)
             .0
-            .start();
+            .start_detached();
         service.send(Get).await.unwrap();
 
         tokio::time::sleep(fetch_interval * 2).await;
diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs
index 8f3403f76be..b586d22429b 100644
--- a/relay-server/src/services/health_check.rs
+++ b/relay-server/src/services/health_check.rs
@@ -91,8 +91,6 @@ pub struct HealthCheckService {
 
 impl HealthCheckService {
     /// Creates a new instance of the HealthCheck service.
-    ///
-    /// The service does not run. To run the service, use [`start`](Self::start).
     pub fn new(
         config: Arc<Config>,
         memory_checker: MemoryChecker,
@@ -193,7 +191,7 @@ impl HealthCheckService {
 impl Service for HealthCheckService {
     type Interface = HealthCheck;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
         let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy));
         let check_interval = self.config.health_refresh_interval();
         // Add 10% buffer to the internal timeouts to avoid race conditions.
@@ -216,19 +214,17 @@ impl Service for HealthCheckService {
             update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
         });
 
-        relay_system::spawn!(async move {
-            while let Some(HealthCheck(message, sender)) = rx.recv().await {
-                let update = update_rx.borrow();
-
-                sender.send(if matches!(message, IsHealthy::Liveness) {
-                    Status::Healthy
-                } else if update.instant.elapsed() >= status_timeout {
-                    Status::Unhealthy
-                } else {
-                    update.status
-                });
-            }
-        });
+        while let Some(HealthCheck(message, sender)) = rx.recv().await {
+            let update = update_rx.borrow();
+
+            sender.send(if matches!(message, IsHealthy::Liveness) {
+                Status::Healthy
+            } else if update.instant.elapsed() >= status_timeout {
+                Status::Unhealthy
+            } else {
+                update.status
+            });
+        }
     }
 }
 
diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs
index 2c170bd20bb..9c433c9ab3d 100644
--- a/relay-server/src/services/metrics/aggregator.rs
+++ b/relay-server/src/services/metrics/aggregator.rs
@@ -246,25 +246,23 @@ impl AggregatorService {
 impl Service for AggregatorService {
     type Interface = Aggregator;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
-            let mut shutdown = Controller::shutdown_handle();
-
-            // Note that currently this loop never exits and will run till the tokio runtime shuts
-            // down. This is about to change with the refactoring for the shutdown process.
-            loop {
-                tokio::select! {
-                    biased;
-
-                    _ = ticker.tick() => self.try_flush(),
-                    Some(message) = rx.recv() => self.handle_message(message),
-                    shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
-
-                    else => break,
-                }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
+        let mut shutdown = Controller::shutdown_handle();
+
+        // Note that currently this loop never exits and will run till the tokio runtime shuts
+        // down. This is about to change with the refactoring for the shutdown process.
+        loop {
+            tokio::select! {
+                biased;
+
+                _ = ticker.tick() => self.try_flush(),
+                Some(message) = rx.recv() => self.handle_message(message),
+                shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
+
+                else => break,
             }
-        });
+        }
     }
 }
 
@@ -361,16 +359,14 @@ mod tests {
     impl Service for TestReceiver {
         type Interface = TestInterface;
 
-        fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
-            relay_system::spawn!(async move {
-                while let Some(message) = rx.recv().await {
-                    let buckets = message.0.buckets;
-                    relay_log::debug!(?buckets, "received buckets");
-                    if !self.reject_all {
-                        self.add_buckets(buckets);
-                    }
+        async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
+            while let Some(message) = rx.recv().await {
+                let buckets = message.0.buckets;
+                relay_log::debug!(?buckets, "received buckets");
+                if !self.reject_all {
+                    self.add_buckets(buckets);
                 }
-            });
+            }
         }
     }
 
@@ -392,7 +388,7 @@ mod tests {
         tokio::time::pause();
 
         let receiver = TestReceiver::default();
-        let recipient = receiver.clone().start().recipient();
+        let recipient = receiver.clone().start_detached().recipient();
 
         let config = AggregatorServiceConfig {
             aggregator: AggregatorConfig {
@@ -402,7 +398,7 @@ mod tests {
             },
             ..Default::default()
         };
-        let aggregator = AggregatorService::new(config, Some(recipient)).start();
+        let aggregator = AggregatorService::new(config, Some(recipient)).start_detached();
 
         let mut bucket = some_bucket();
         bucket.timestamp = UnixTimestamp::now();
diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs
index cb9578e2e8d..8820dc9a1a5 100644
--- a/relay-server/src/services/metrics/router.rs
+++ b/relay-server/src/services/metrics/router.rs
@@ -4,7 +4,7 @@
 use relay_config::aggregator::Condition;
 use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
 use relay_metrics::MetricNamespace;
-use relay_system::{Addr, NoResponse, Recipient, Service};
+use relay_system::{Addr, NoResponse, Recipient, Service, ServiceRunner};
 
 use crate::services::metrics::{
     Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
@@ -53,26 +53,24 @@ impl RouterService {
 impl Service for RouterService {
     type Interface = Aggregator;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            let mut router = StartedRouter::start(self);
-            relay_log::info!("metrics router started");
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
+        let mut router = StartedRouter::start_in(self, &mut ServiceRunner::new());
+        relay_log::info!("metrics router started");
 
-            // Note that currently this loop never exists and will run till the tokio runtime shuts
-            // down. This is about to change with the refactoring for the shutdown process.
-            loop {
-                tokio::select! {
-                    biased;
+        // Note that currently this loop never exists and will run till the tokio runtime shuts
+        // down. This is about to change with the refactoring for the shutdown process.
+        loop {
+            tokio::select! {
+                biased;
 
-                    Some(message) = rx.recv() => {
-                        router.handle_message(message)
-                    },
+                Some(message) = rx.recv() => {
+                    router.handle_message(message)
+                },
 
-                    else => break,
-                }
+                else => break,
             }
-            relay_log::info!("metrics router stopped");
-        });
+        }
+        relay_log::info!("metrics router stopped");
     }
 }
 
@@ -83,7 +81,7 @@ struct StartedRouter {
 }
 
 impl StartedRouter {
-    fn start(router: RouterService) -> Self {
+    fn start_in(router: RouterService, runner: &mut ServiceRunner) -> Self {
         let RouterService { default, secondary } = router;
 
         let secondary = secondary
@@ -94,12 +92,12 @@ impl StartedRouter {
                     .filter(|&namespace| condition.matches(Some(namespace)))
                     .collect();
 
-                (aggregator.start(), namespaces)
+                (runner.start(aggregator), namespaces)
             })
             .collect();
 
         Self {
-            default: default.start(),
+            default: runner.start(default),
             secondary,
         }
     }
diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs
index 22ec72b3e76..bd7b34bf112 100644
--- a/relay-server/src/services/outcome.rs
+++ b/relay-server/src/services/outcome.rs
@@ -683,19 +683,17 @@ impl HttpOutcomeProducer {
 impl Service for HttpOutcomeProducer {
     type Interface = TrackRawOutcome;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            loop {
-                tokio::select! {
-                    // Prioritize flush over receiving messages to prevent starving.
-                    biased;
-
-                    () = &mut self.flush_handle => self.send_batch(),
-                    Some(message) = rx.recv() => self.handle_message(message),
-                    else => break,
-                }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        loop {
+            tokio::select! {
+                // Prioritize flush over receiving messages to prevent starving.
+                biased;
+
+                () = &mut self.flush_handle => self.send_batch(),
+                Some(message) = rx.recv() => self.handle_message(message),
+                else => break,
             }
-        });
+        }
     }
 }
 
@@ -776,19 +774,17 @@ impl ClientReportOutcomeProducer {
 impl Service for ClientReportOutcomeProducer {
     type Interface = TrackOutcome;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            loop {
-                tokio::select! {
-                    // Prioritize flush over receiving messages to prevent starving.
-                    biased;
-
-                    () = &mut self.flush_handle => self.flush(),
-                    Some(message) = rx.recv() => self.handle_message(message),
-                    else => break,
-                }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        loop {
+            tokio::select! {
+                // Prioritize flush over receiving messages to prevent starving.
+                biased;
+
+                () = &mut self.flush_handle => self.flush(),
+                Some(message) = rx.recv() => self.handle_message(message),
+                else => break,
             }
-        });
+        }
     }
 }
 
@@ -980,8 +976,10 @@ impl ProducerInner {
         match self {
             #[cfg(feature = "processing")]
             ProducerInner::Kafka(inner) => OutcomeBroker::Kafka(inner),
-            ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start()),
-            ProducerInner::ClientReport(inner) => OutcomeBroker::ClientReport(inner.start()),
+            ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start_detached()),
+            ProducerInner::ClientReport(inner) => {
+                OutcomeBroker::ClientReport(inner.start_detached())
+            }
             ProducerInner::Disabled => OutcomeBroker::Disabled,
         }
     }
@@ -1035,18 +1033,16 @@ impl OutcomeProducerService {
 impl Service for OutcomeProducerService {
     type Interface = OutcomeProducer;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
         let Self { config, inner } = self;
 
-        relay_system::spawn!(async move {
-            let broker = inner.start();
+        let broker = inner.start();
 
-            relay_log::info!("OutcomeProducer started.");
-            while let Some(message) = rx.recv().await {
-                broker.handle_message(message, &config);
-            }
-            relay_log::info!("OutcomeProducer stopped.");
-        });
+        relay_log::info!("OutcomeProducer started.");
+        while let Some(message) = rx.recv().await {
+            broker.handle_message(message, &config);
+        }
+        relay_log::info!("OutcomeProducer stopped.");
     }
 }
 
diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs
index 91b7561e0c0..788facb967d 100644
--- a/relay-server/src/services/outcome_aggregator.rs
+++ b/relay-server/src/services/outcome_aggregator.rs
@@ -138,25 +138,23 @@ impl OutcomeAggregator {
 impl Service for OutcomeAggregator {
     type Interface = TrackOutcome;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            let mut shutdown = Controller::shutdown_handle();
-            relay_log::info!("outcome aggregator started");
-
-            loop {
-                tokio::select! {
-                    // Prioritize flush over receiving messages to prevent starving. Shutdown can be
-                    // last since it is not vital if there are still messages in the channel.
-                    biased;
-
-                    () = &mut self.flush_handle => self.flush(),
-                    Some(message) = rx.recv() => self.handle_track_outcome(message),
-                    shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
-                    else => break,
-                }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        let mut shutdown = Controller::shutdown_handle();
+        relay_log::info!("outcome aggregator started");
+
+        loop {
+            tokio::select! {
+                // Prioritize flush over receiving messages to prevent starving. Shutdown can be
+                // last since it is not vital if there are still messages in the channel.
+                biased;
+
+                () = &mut self.flush_handle => self.flush(),
+                Some(message) = rx.recv() => self.handle_track_outcome(message),
+                shutdown = shutdown.notified() => self.handle_shutdown(shutdown),
+                else => break,
             }
+        }
 
-            relay_log::info!("outcome aggregator stopped");
-        });
+        relay_log::info!("outcome aggregator stopped");
     }
 }
diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs
index bbe574bfdd5..171b9574394 100644
--- a/relay-server/src/services/processor.rs
+++ b/relay-server/src/services/processor.rs
@@ -2820,16 +2820,14 @@ impl EnvelopeProcessorService {
 impl Service for EnvelopeProcessorService {
     type Interface = EnvelopeProcessor;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            while let Some(message) = rx.recv().await {
-                let service = self.clone();
-                self.inner
-                    .workers
-                    .spawn(move || service.handle_message(message))
-                    .await;
-            }
-        });
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
+        while let Some(message) = rx.recv().await {
+            let service = self.clone();
+            self.inner
+                .workers
+                .spawn(move || service.handle_message(message))
+                .await;
+        }
     }
 }
 
diff --git a/relay-server/src/services/projects/cache/legacy.rs b/relay-server/src/services/projects/cache/legacy.rs
index faf59d45de8..33e9aeb4b03 100644
--- a/relay-server/src/services/projects/cache/legacy.rs
+++ b/relay-server/src/services/projects/cache/legacy.rs
@@ -680,7 +680,7 @@ impl ProjectCacheService {
 impl Service for ProjectCacheService {
     type Interface = ProjectCache;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
         let Self {
             config,
             memory_checker,
@@ -694,120 +694,121 @@ impl Service for ProjectCacheService {
         let outcome_aggregator = services.outcome_aggregator.clone();
         let test_store = services.test_store.clone();
 
-        relay_system::spawn!(async move {
-            relay_log::info!("project cache started");
+        relay_log::info!("project cache started");
 
-            let global_config = match global_config_rx.borrow().clone() {
-                global_config::Status::Ready(_) => {
-                    relay_log::info!("global config received");
-                    GlobalConfigStatus::Ready
-                }
-                global_config::Status::Pending => {
-                    relay_log::info!("waiting for global config");
-                    GlobalConfigStatus::Pending
-                }
-            };
+        let global_config = match global_config_rx.borrow().clone() {
+            global_config::Status::Ready(_) => {
+                relay_log::info!("global config received");
+                GlobalConfigStatus::Ready
+            }
+            global_config::Status::Pending => {
+                relay_log::info!("waiting for global config");
+                GlobalConfigStatus::Pending
+            }
+        };
 
-            let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
-            let spool_v1 = match config.spool_v2() {
-                true => None,
-                false => Some({
-                    // Channel for envelope buffering.
-                    let buffer_services = spooler::Services {
-                        outcome_aggregator,
-                        project_cache,
-                        test_store,
-                    };
-                    let buffer = match BufferService::create(
-                        memory_checker.clone(),
-                        buffer_services,
-                        config.clone(),
-                    )
-                    .await
-                    {
-                        Ok(buffer) => buffer.start(),
-                        Err(err) => {
-                            relay_log::error!(
-                                error = &err as &dyn Error,
-                                "failed to start buffer service",
-                            );
-                            // NOTE: The process will exit with error if the buffer file could not be
-                            // opened or the migrations could not be run.
-                            std::process::exit(1);
-                        }
-                    };
+        let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
+        let spool_v1 = match config.spool_v2() {
+            true => None,
+            false => Some({
+                // Channel for envelope buffering.
+                let buffer_services = spooler::Services {
+                    outcome_aggregator,
+                    project_cache,
+                    test_store,
+                };
+                let buffer = match BufferService::create(
+                    memory_checker.clone(),
+                    buffer_services,
+                    config.clone(),
+                )
+                .await
+                {
+                    Ok(buffer) => {
+                        // NOTE: This service is not monitored by the service runner.
+                        buffer.start_detached()
+                    }
+                    Err(err) => {
+                        relay_log::error!(
+                            error = &err as &dyn Error,
+                            "failed to start buffer service",
+                        );
+                        // NOTE: The process will exit with error if the buffer file could not be
+                        // opened or the migrations could not be run.
+                        std::process::exit(1);
+                    }
+                };
 
-                    // Request the existing index from the spooler.
-                    buffer.send(RestoreIndex);
+                // Request the existing index from the spooler.
+                buffer.send(RestoreIndex);
 
-                    SpoolV1 {
-                        buffer_tx,
-                        index: HashSet::new(),
-                        buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()),
-                        buffer,
-                    }
-                }),
-            };
+                SpoolV1 {
+                    buffer_tx,
+                    index: HashSet::new(),
+                    buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()),
+                    buffer,
+                }
+            }),
+        };
 
-            let mut broker = ProjectCacheBroker {
-                config: config.clone(),
-                memory_checker,
-                projects: project_cache_handle,
-                services,
-                spool_v1_unspool_handle: SleepHandle::idle(),
-                spool_v1,
-                global_config,
-            };
+        let mut broker = ProjectCacheBroker {
+            config: config.clone(),
+            memory_checker,
+            projects: project_cache_handle,
+            services,
+            spool_v1_unspool_handle: SleepHandle::idle(),
+            spool_v1,
+            global_config,
+        };
 
-            loop {
-                tokio::select! {
-                    biased;
-
-                    Ok(()) = global_config_rx.changed() => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "update_global_config", {
-                            match global_config_rx.borrow().clone() {
-                                global_config::Status::Ready(_) => broker.set_global_config_ready(),
-                                // The watch should only be updated if it gets a new value.
-                                // This would imply a logical bug.
-                                global_config::Status::Pending => relay_log::error!("still waiting for the global config"),
-                            }
-                        })
-                    },
-                    project_change = project_changes.recv() => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_project_change", {
-                            if let Ok(project_change) = project_change {
-                                broker.handle_project_change(project_change);
-                            }
-                        })
-                    }
-                    // Buffer will not dequeue the envelopes from the spool if there is not enough
-                    // permits in `BufferGuard` available. Currently this is 50%.
-                    Some(UnspooledEnvelope { managed_envelope, .. }) = buffer_rx.recv() => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_processing", {
-                            broker.handle_processing(managed_envelope)
-                        })
-                    },
-                    () = &mut broker.spool_v1_unspool_handle => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "periodic_unspool", {
-                            broker.handle_periodic_unspool()
-                        })
-                    }
-                    Some(message) = rx.recv() => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_message", {
-                            broker.handle_message(message)
-                        })
-                    }
-                    Some(message) = envelopes_rx.recv() => {
-                        metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_envelope", {
-                            broker.handle_envelope(message)
-                        })
-                    }
-                    else => break,
+        loop {
+            tokio::select! {
+                biased;
+
+                Ok(()) = global_config_rx.changed() => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "update_global_config", {
+                        match global_config_rx.borrow().clone() {
+                            global_config::Status::Ready(_) => broker.set_global_config_ready(),
+                            // The watch should only be updated if it gets a new value.
+                            // This would imply a logical bug.
+                            global_config::Status::Pending => relay_log::error!("still waiting for the global config"),
+                        }
+                    })
+                },
+                project_change = project_changes.recv() => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_project_change", {
+                        if let Ok(project_change) = project_change {
+                            broker.handle_project_change(project_change);
+                        }
+                    })
+                }
+                // Buffer will not dequeue the envelopes from the spool if there is not enough
+                // permits in `BufferGuard` available. Currently this is 50%.
+                Some(UnspooledEnvelope { managed_envelope, .. }) = buffer_rx.recv() => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_processing", {
+                        broker.handle_processing(managed_envelope)
+                    })
+                },
+                () = &mut broker.spool_v1_unspool_handle => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "periodic_unspool", {
+                        broker.handle_periodic_unspool()
+                    })
+                }
+                Some(message) = rx.recv() => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_message", {
+                        broker.handle_message(message)
+                    })
+                }
+                Some(message) = envelopes_rx.recv() => {
+                    metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_envelope", {
+                        broker.handle_envelope(message)
+                    })
                 }
+                else => break,
             }
+        }
 
-            relay_log::info!("project cache stopped");
-        });
+        relay_log::info!("project cache stopped");
     }
 }
 
@@ -867,7 +868,7 @@ mod tests {
             match BufferService::create(memory_checker.clone(), buffer_services, config.clone())
                 .await
             {
-                Ok(buffer) => buffer.start(),
+                Ok(buffer) => buffer.start_detached(),
                 Err(err) => {
                     relay_log::error!(error = &err as &dyn Error, "failed to start buffer service");
                     // NOTE: The process will exit with error if the buffer file could not be
diff --git a/relay-server/src/services/projects/cache/service.rs b/relay-server/src/services/projects/cache/service.rs
index 3d0c0744c58..435355c1067 100644
--- a/relay-server/src/services/projects/cache/service.rs
+++ b/relay-server/src/services/projects/cache/service.rs
@@ -5,7 +5,7 @@ use futures::StreamExt as _;
 use relay_base_schema::project::ProjectKey;
 use relay_config::Config;
 use relay_statsd::metric;
-use relay_system::Service;
+use relay_system::{Service, ServiceRunner};
 use tokio::sync::broadcast;
 
 use crate::services::projects::cache::handle::ProjectCacheHandle;
@@ -91,7 +91,7 @@ impl ProjectCacheService {
     /// Consumes and starts a [`ProjectCacheService`].
     ///
     /// Returns a [`ProjectCacheHandle`] to access the cache concurrently.
-    pub fn start(self) -> ProjectCacheHandle {
+    pub fn start_in(self, runner: &mut ServiceRunner) -> ProjectCacheHandle {
         let (addr, addr_rx) = relay_system::channel(Self::name());
 
         let handle = ProjectCacheHandle {
@@ -101,7 +101,7 @@ impl ProjectCacheService {
             project_changes: self.project_events_tx.clone(),
         };
 
-        self.spawn_handler(addr_rx);
+        runner.start_with(self, addr_rx);
 
         handle
     }
@@ -194,7 +194,7 @@ impl ProjectCacheService {
 impl relay_system::Service for ProjectCacheService {
     type Interface = ProjectCache;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
         macro_rules! timed {
             ($task:expr, $body:expr) => {{
                 let task_name = $task;
@@ -206,25 +206,23 @@ impl relay_system::Service for ProjectCacheService {
             }};
         }
 
-        relay_system::spawn!(async move {
-            loop {
-                tokio::select! {
-                    biased;
-
-                    Some(fetch) = self.scheduled_fetches.next() => timed!(
-                        "completed_fetch",
-                        self.handle_completed_fetch(fetch)
-                    ),
-                    Some(message) = rx.recv() => timed!(
-                        message.variant(),
-                        self.handle_message(message)
-                    ),
-                    Some(eviction) = self.store.next_eviction() => timed!(
-                        "eviction",
-                        self.handle_eviction(eviction)
-                    ),
-                }
+        loop {
+            tokio::select! {
+                biased;
+
+                Some(fetch) = self.scheduled_fetches.next() => timed!(
+                    "completed_fetch",
+                    self.handle_completed_fetch(fetch)
+                ),
+                Some(message) = rx.recv() => timed!(
+                    message.variant(),
+                    self.handle_message(message)
+                ),
+                Some(eviction) = self.store.next_eviction() => timed!(
+                    "eviction",
+                    self.handle_eviction(eviction)
+                ),
             }
-        });
+        }
     }
 }
diff --git a/relay-server/src/services/projects/source/local.rs b/relay-server/src/services/projects/source/local.rs
index c4c89db31c6..a288d57d052 100644
--- a/relay-server/src/services/projects/source/local.rs
+++ b/relay-server/src/services/projects/source/local.rs
@@ -171,28 +171,26 @@ async fn spawn_poll_local_states(
 impl Service for LocalProjectSourceService {
     type Interface = LocalProjectSource;
 
-    fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
+    async fn run(mut self, mut rx: Receiver<Self::Interface>) {
         // Use a channel with size 1. If the channel is full because the consumer does not
         // collect the result, the producer will block, which is acceptable.
         let (state_tx, mut state_rx) = mpsc::channel(1);
 
-        relay_system::spawn!(async move {
-            relay_log::info!("project local cache started");
+        relay_log::info!("project local cache started");
 
-            // Start the background task that periodically reloads projects from disk:
-            spawn_poll_local_states(&self.config, state_tx).await;
+        // Start the background task that periodically reloads projects from disk:
+        spawn_poll_local_states(&self.config, state_tx).await;
 
-            loop {
-                tokio::select! {
-                    biased;
-                    Some(message) = rx.recv() => self.handle_message(message),
-                    Some(states) = state_rx.recv() => self.local_states = states,
+        loop {
+            tokio::select! {
+                biased;
+                Some(message) = rx.recv() => self.handle_message(message),
+                Some(states) = state_rx.recv() => self.local_states = states,
 
-                    else => break,
-                }
+                else => break,
             }
-            relay_log::info!("project local cache stopped");
-        });
+        }
+        relay_log::info!("project local cache stopped");
     }
 }
 
diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs
index 1311acfc401..2720e39c58e 100644
--- a/relay-server/src/services/projects/source/mod.rs
+++ b/relay-server/src/services/projects/source/mod.rs
@@ -6,7 +6,7 @@ use relay_base_schema::project::ProjectKey;
 use relay_config::RedisConfigRef;
 use relay_config::{Config, RelayMode};
 use relay_redis::RedisPool;
-use relay_system::{Addr, Service as _};
+use relay_system::{Addr, ServiceRunner};
 #[cfg(feature = "processing")]
 use tokio::sync::Semaphore;
 
@@ -40,15 +40,18 @@ pub struct ProjectSource {
 }
 
 impl ProjectSource {
-    /// Starts all project source services in the current runtime.
-    pub fn start(
+    /// Starts all project source services in the given `ServiceRunner`.
+    pub fn start_in(
+        runner: &mut ServiceRunner,
         config: Arc<Config>,
         upstream_relay: Addr<UpstreamRelay>,
         _redis: Option<RedisPool>,
     ) -> Self {
-        let local_source = LocalProjectSourceService::new(config.clone()).start();
-        let upstream_source =
-            UpstreamProjectSourceService::new(config.clone(), upstream_relay).start();
+        let local_source = runner.start(LocalProjectSourceService::new(config.clone()));
+        let upstream_source = runner.start(UpstreamProjectSourceService::new(
+            config.clone(),
+            upstream_relay,
+        ));
 
         #[cfg(feature = "processing")]
         let redis_max_connections = config
diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs
index 5b1e6d76ce3..ac760ab6491 100644
--- a/relay-server/src/services/projects/source/upstream.rs
+++ b/relay-server/src/services/projects/source/upstream.rs
@@ -635,22 +635,20 @@ impl UpstreamProjectSourceService {
 impl Service for UpstreamProjectSourceService {
     type Interface = UpstreamProjectSource;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            relay_log::info!("project upstream cache started");
-            loop {
-                tokio::select! {
-                    biased;
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        relay_log::info!("project upstream cache started");
+        loop {
+            tokio::select! {
+                biased;
 
-                    () = &mut self.fetch_handle => self.do_fetch(),
-                    Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
-                    Some(message) = rx.recv() => self.handle_message(message),
+                () = &mut self.fetch_handle => self.do_fetch(),
+                Some(responses) = self.inner_rx.recv() => self.handle_responses(responses),
+                Some(message) = rx.recv() => self.handle_message(message),
 
-                    else => break,
-                }
+                else => break,
             }
-            relay_log::info!("project upstream cache stopped");
-        });
+        }
+        relay_log::info!("project upstream cache stopped");
     }
 }
 
@@ -688,7 +686,8 @@ mod tests {
             }};
         }
 
-        let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start();
+        let service =
+            UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached();
 
         let mut response1 = service.send(FetchProjectState {
             project_key,
diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs
index 8d10a33b334..7f160625ad6 100644
--- a/relay-server/src/services/relays.rs
+++ b/relay-server/src/services/relays.rs
@@ -334,23 +334,21 @@ impl RelayCacheService {
 impl Service for RelayCacheService {
     type Interface = RelayCache;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            relay_log::info!("key cache started");
-
-            loop {
-                tokio::select! {
-                    // Prioritize flush over receiving messages to prevent starving.
-                    biased;
-
-                    Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result),
-                    () = &mut self.delay => self.fetch_relays(),
-                    Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1),
-                    else => break,
-                }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        relay_log::info!("key cache started");
+
+        loop {
+            tokio::select! {
+                // Prioritize flush over receiving messages to prevent starving.
+                biased;
+
+                Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result),
+                () = &mut self.delay => self.fetch_relays(),
+                Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1),
+                else => break,
             }
+        }
 
-            relay_log::info!("key cache stopped");
-        });
+        relay_log::info!("key cache stopped");
     }
 }
diff --git a/relay-server/src/services/server/mod.rs b/relay-server/src/services/server/mod.rs
index d29787c38c3..02d22353bd7 100644
--- a/relay-server/src/services/server/mod.rs
+++ b/relay-server/src/services/server/mod.rs
@@ -112,7 +112,7 @@ fn listen(config: &Config) -> Result<TcpListener, ServerError> {
     Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?)
 }
 
-fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
+async fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
     let handle = Handle::new();
 
     let acceptor = self::acceptor::RelayAcceptor::new()
@@ -139,7 +139,6 @@ fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
         .keep_alive_timeout(config.keepalive_timeout());
 
     let service = ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app);
-    relay_system::spawn!(server.serve(service));
 
     relay_system::spawn!(emit_active_connections_metric(
         config.metrics_periodic_interval(),
@@ -155,6 +154,11 @@ fn serve(listener: TcpListener, app: App, config: Arc<Config>) {
             None => handle.shutdown(),
         }
     });
+
+    server
+        .serve(service)
+        .await
+        .expect("failed to start axum server");
 }
 
 /// HTTP server service.
@@ -182,7 +186,7 @@ impl HttpServer {
 impl Service for HttpServer {
     type Interface = ();
 
-    fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
         let Self {
             config,
             service,
@@ -194,7 +198,7 @@ impl Service for HttpServer {
         relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
 
         let app = make_app(service);
-        serve(listener, app, config);
+        serve(listener, app, config).await;
     }
 }
 
diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs
index 0bfbd6b82a4..40741d2693e 100644
--- a/relay-server/src/services/spooler/mod.rs
+++ b/relay-server/src/services/spooler/mod.rs
@@ -1264,34 +1264,32 @@ impl BufferService {
 impl Service for BufferService {
     type Interface = Buffer;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            let mut shutdown = Controller::shutdown_handle();
-
-            loop {
-                tokio::select! {
-                    biased;
-
-                    Some(message) = rx.recv() => {
-                        if let Err(err) = self.handle_message(message).await {
-                            relay_log::error!(
-                                error = &err as &dyn Error,
-                                "failed to handle an incoming message",
-                            );
-                        }
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        let mut shutdown = Controller::shutdown_handle();
+
+        loop {
+            tokio::select! {
+                biased;
+
+                Some(message) = rx.recv() => {
+                    if let Err(err) = self.handle_message(message).await {
+                        relay_log::error!(
+                            error = &err as &dyn Error,
+                            "failed to handle an incoming message",
+                        );
                     }
-                    _ = shutdown.notified() => {
-                       if let Err(err) = self.handle_shutdown().await {
-                            relay_log::error!(
-                                error = &err as &dyn Error,
-                                "failed while shutting down the service",
-                            );
-                        }
+                }
+                _ = shutdown.notified() => {
+                   if let Err(err) = self.handle_shutdown().await {
+                        relay_log::error!(
+                            error = &err as &dyn Error,
+                            "failed while shutting down the service",
+                        );
                     }
-                    else => break,
                 }
+                else => break,
             }
-        });
+        }
     }
 }
 
@@ -1394,7 +1392,7 @@ mod tests {
         let service = BufferService::create(memory_checker, services(), config)
             .await
             .unwrap();
-        let addr = service.start();
+        let addr = service.start_detached();
         let (tx, mut rx) = mpsc::unbounded_channel();
 
         // Test cases:
@@ -1609,7 +1607,7 @@ mod tests {
         let buffer = BufferService::create(memory_checker, services, config)
             .await
             .unwrap();
-        let addr = buffer.start();
+        let addr = buffer.start_detached();
         addr.send(RestoreIndex);
         // Give some time to process the message
         tokio::time::sleep(Duration::from_millis(500)).await;
@@ -1654,7 +1652,7 @@ mod tests {
         let buffer = BufferService::create(memory_checker, services, config)
             .await
             .unwrap();
-        let addr = buffer.start();
+        let addr = buffer.start_detached();
 
         let mut keys = HashSet::new();
         for _ in 1..=300 {
diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs
index 3b05388d81c..6ae5183c302 100644
--- a/relay-server/src/services/stats.rs
+++ b/relay-server/src/services/stats.rs
@@ -134,20 +134,18 @@ impl RelayStats {
 impl Service for RelayStats {
     type Interface = ();
 
-    fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, _rx: relay_system::Receiver<Self::Interface>) {
         let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else {
             return;
         };
 
-        relay_system::spawn!(async move {
-            loop {
-                let _ = tokio::join!(
-                    self.upstream_status(),
-                    self.tokio_metrics(),
-                    self.redis_pools(),
-                );
-                ticker.tick().await;
-            }
-        });
+        loop {
+            let _ = tokio::join!(
+                self.upstream_status(),
+                self.tokio_metrics(),
+                self.redis_pools(),
+            );
+            ticker.tick().await;
+        }
     }
 }
diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs
index 20007b23ad9..d20133535eb 100644
--- a/relay-server/src/services/store.rs
+++ b/relay-server/src/services/store.rs
@@ -1048,21 +1048,19 @@ impl StoreService {
 impl Service for StoreService {
     type Interface = Store;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
         let this = Arc::new(self);
 
-        relay_system::spawn!(async move {
-            relay_log::info!("store forwarder started");
+        relay_log::info!("store forwarder started");
 
-            while let Some(message) = rx.recv().await {
-                let service = Arc::clone(&this);
-                this.workers
-                    .spawn(move || service.handle_message(message))
-                    .await;
-            }
+        while let Some(message) = rx.recv().await {
+            let service = Arc::clone(&this);
+            this.workers
+                .spawn(move || service.handle_message(message))
+                .await;
+        }
 
-            relay_log::info!("store forwarder stopped");
-        });
+        relay_log::info!("store forwarder stopped");
     }
 }
 
diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs
index f9a2cc9d126..4a8ec18c0ee 100644
--- a/relay-server/src/services/test_store.rs
+++ b/relay-server/src/services/test_store.rs
@@ -134,11 +134,9 @@ impl TestStoreService {
 impl relay_system::Service for TestStoreService {
     type Interface = TestStore;
 
-    fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
-        relay_system::spawn!(async move {
-            while let Some(message) = rx.recv().await {
-                self.handle_message(message);
-            }
-        });
+    async fn run(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
+        while let Some(message) = rx.recv().await {
+            self.handle_message(message);
+        }
     }
 }
diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs
index 599186660fc..d33ac175b7f 100644
--- a/relay-server/src/services/upstream.rs
+++ b/relay-server/src/services/upstream.rs
@@ -1498,7 +1498,7 @@ impl UpstreamRelayService {
 impl Service for UpstreamRelayService {
     type Interface = UpstreamRelay;
 
-    fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
+    async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
         let Self { config } = self;
 
         let client = SharedClient::build(config.clone());
@@ -1528,18 +1528,16 @@ impl Service for UpstreamRelayService {
             action_tx,
         };
 
-        relay_system::spawn!(async move {
-            loop {
-                tokio::select! {
-                    biased;
+        loop {
+            tokio::select! {
+                biased;
 
-                    Some(action) = action_rx.recv() => broker.handle_action(action),
-                    Some(request) = broker.next_request() => broker.execute(request),
-                    Some(message) = rx.recv() => broker.handle_message(message).await,
+                Some(action) = action_rx.recv() => broker.handle_action(action),
+                Some(request) = broker.next_request() => broker.execute(request),
+                Some(message) = rx.recv() => broker.handle_message(message).await,
 
-                    else => break,
-                }
+                else => break,
             }
-        });
+        }
     }
 }
diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs
index 3997c8c2869..edef9521ff2 100644
--- a/relay-system/src/controller.rs
+++ b/relay-system/src/controller.rs
@@ -121,7 +121,7 @@ impl ShutdownHandle {
 /// ### Example
 ///
 /// ```
-/// use relay_system::{Controller, Service, Shutdown, ShutdownMode};
+/// use relay_system::{Controller, Service, ServiceRunner, Shutdown, ShutdownMode};
 /// use std::time::Duration;
 ///
 /// struct MyService;
@@ -129,17 +129,14 @@ impl ShutdownHandle {
 /// impl Service for MyService {
 ///     type Interface = ();
 ///
-///     fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
-///         relay_system::spawn!(async move {
-///             let mut shutdown = Controller::shutdown_handle();
-///
-///             loop {
-///                 tokio::select! {
-///                     shutdown = shutdown.notified() => break, // Handle shutdown here
-///                     Some(message) = rx.recv() => (),         // Process incoming message
-///                 }
+///     async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
+///         let mut shutdown = Controller::shutdown_handle();
+///         loop {
+///             tokio::select! {
+///                 shutdown = shutdown.notified() => break, // Handle shutdown here
+///                 Some(message) = rx.recv() => (),         // Process incoming message
 ///             }
-///         });
+///         }
 ///     }
 /// }
 ///
@@ -151,9 +148,9 @@ impl ShutdownHandle {
 ///
 ///     // Start all other services. Controller::shutdown_handle will use the same controller
 ///     // instance and receives the configured shutdown timeout.
-///     let addr = MyService.start();
+///     let addr = ServiceRunner::new().start(MyService);
 ///
-///     // By triggering a shutdown, all attached services will be notified. This happens
+///     // By triggering a shutdown, all subscribed services will be notified. This happens
 ///     // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM).
 ///     Controller::shutdown(ShutdownMode::Graceful);
 ///
diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs
index 388a07459fd..9add08d35c9 100644
--- a/relay-system/src/service.rs
+++ b/relay-system/src/service.rs
@@ -8,11 +8,13 @@ use std::task::{Context, Poll};
 use std::time::Duration;
 
 use futures::future::Shared;
-use futures::FutureExt;
-use tokio::runtime::Runtime;
+use futures::stream::FuturesUnordered;
+use futures::{FutureExt, StreamExt};
 use tokio::sync::{mpsc, oneshot};
+use tokio::task::JoinHandle;
 use tokio::time::MissedTickBehavior;
 
+use crate::spawn;
 use crate::statsd::SystemGauges;
 
 /// Interval for recording backlog metrics on service channels.
@@ -818,7 +820,7 @@ where
 ///
 /// This channel is meant to be polled in a [`Service`].
 ///
-/// Instances are created automatically when [spawning](Service::spawn_handler) a service, or can be
+/// Instances are created automatically when [spawning](ServiceRunner::start) a service, or can be
 /// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
 pub struct Receiver<I: Interface> {
     rx: mpsc::UnboundedReceiver<I>,
@@ -903,7 +905,7 @@ pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
 /// Individual messages can have a response which will be sent once the message is handled by the
 /// service. The sender can asynchronously await the responses of such messages.
 ///
-/// To start a service, create an instance of the service and use [`Service::start`].
+/// To start a service, create a service runner and call [`ServiceRunner::start`].
 ///
 /// # Implementing Services
 ///
@@ -912,7 +914,7 @@ pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
 /// synchronous, so that this needs to spawn at least one task internally:
 ///
 /// ```no_run
-/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service};
+/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceRunner};
 ///
 /// struct MyMessage;
 ///
@@ -931,16 +933,14 @@ pub fn channel<I: Interface>(name: &'static str) -> (Addr<I>, Receiver<I>) {
 /// impl Service for MyService {
 ///     type Interface = MyMessage;
 ///
-///     fn spawn_handler(self, mut rx: Receiver<Self::Interface>) {
-///         relay_system::spawn!(async move {
-///             while let Some(message) = rx.recv().await {
-///                 // handle the message
-///             }
-///         });
+///     async fn run(self, mut rx: Receiver<Self::Interface>) {
+///         while let Some(message) = rx.recv().await {
+///             // handle the message
+///         }
 ///     }
 /// }
 ///
-/// let addr = MyService.start();
+/// let addr = ServiceRunner::new().start(MyService);
 /// ```
 ///
 /// ## Debounce and Caching
@@ -997,25 +997,22 @@ pub trait Service: Sized {
     /// can be handled by this service.
     type Interface: Interface;
 
-    /// Spawns a task to handle service messages.
+    /// Defines the main task of this service.
     ///
-    /// Receives an inbound channel for all messages sent through the service's [`Addr`]. Note
-    /// that this function is synchronous, so that this needs to spawn a task internally.
-    fn spawn_handler(self, rx: Receiver<Self::Interface>);
+    /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads
+    /// from multiple sources at once.
+    fn run(self, rx: Receiver<Self::Interface>) -> impl Future<Output = ()> + Send + 'static;
 
     /// Starts the service in the current runtime and returns an address for it.
-    fn start(self) -> Addr<Self::Interface> {
+    ///
+    /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful
+    /// for tests.
+    fn start_detached(self) -> Addr<Self::Interface> {
         let (addr, rx) = channel(Self::name());
-        self.spawn_handler(rx);
+        spawn!(self.run(rx));
         addr
     }
 
-    /// Starts the service in the given runtime and returns an address for it.
-    fn start_in(self, runtime: &Runtime) -> Addr<Self::Interface> {
-        let _guard = runtime.enter();
-        self.start()
-    }
-
     /// Returns a unique name for this service implementation.
     ///
     /// This is used for internal diagnostics and uses the fully qualified type name of the service
@@ -1025,6 +1022,45 @@ pub trait Service: Sized {
     }
 }
 
+/// Keeps track of running services.
+///
+/// Exposes information about crashed services.
+#[derive(Debug, Default)]
+pub struct ServiceRunner(FuturesUnordered<JoinHandle<()>>);
+
+impl ServiceRunner {
+    /// Creates a new service runner.
+    pub fn new() -> Self {
+        Self(FuturesUnordered::new())
+    }
+
+    /// Starts a service and starts tracking its join handle, exposing an [Addr] for message passing.
+    pub fn start<S: Service>(&mut self, service: S) -> Addr<S::Interface> {
+        let (addr, rx) = channel(S::name());
+        self.start_with(service, rx);
+        addr
+    }
+
+    /// Starts a service and starts tracking its join handle, given a predefined receiver.
+    pub fn start_with<S: Service>(&mut self, service: S, rx: Receiver<S::Interface>) {
+        self.0.push(spawn!(service.run(rx)));
+    }
+
+    /// Awaits until all services have finished.
+    ///
+    /// Panics if one of the spawned services has panicked.
+    pub async fn join(&mut self) {
+        while let Some(res) = self.0.next().await {
+            if let Err(e) = res {
+                if e.is_panic() {
+                    // Re-trigger panic to terminate the process:
+                    std::panic::resume_unwind(e.into_panic());
+                }
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1046,12 +1082,10 @@ mod tests {
     impl Service for MockService {
         type Interface = MockMessage;
 
-        fn spawn_handler(self, mut rx: Receiver<Self::Interface>) {
-            crate::spawn!(async move {
-                while rx.recv().await.is_some() {
-                    tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
-                }
-            });
+        async fn run(self, mut rx: Receiver<Self::Interface>) {
+            while rx.recv().await.is_some() {
+                tokio::time::sleep(BACKLOG_INTERVAL * 2).await;
+            }
         }
 
         fn name() -> &'static str {
@@ -1070,7 +1104,7 @@ mod tests {
         tokio::time::pause();
 
         // Mock service takes 2 * BACKLOG_INTERVAL for every message
-        let addr = MockService.start();
+        let addr = MockService.start_detached();
 
         // Advance the timer by a tiny offset to trigger the first metric emission.
         let captures = relay_statsd::with_capturing_test_client(|| {