diff --git a/CHANGELOG.md b/CHANGELOG.md index d2a36ffe80..63d576b58e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ **Bug Fixes**: - Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155)) +- Terminate the process when one of the services crashes. ([#4249](https://github.com/getsentry/relay/pull/4249)) **Features**: diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 986db2e5d4..d675c220bf 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -279,7 +279,7 @@ mod testutils; use std::sync::Arc; use relay_config::Config; -use relay_system::{Controller, Service}; +use relay_system::Controller; use crate::service::ServiceState; use crate::services::server::HttpServer; @@ -301,9 +301,18 @@ pub fn run(config: Config) -> anyhow::Result<()> { // information on all services. main_runtime.block_on(async { Controller::start(config.shutdown_timeout()); - let service = ServiceState::start(config.clone())?; - HttpServer::new(config, service.clone())?.start(); - Controller::shutdown_handle().finished().await; + let (state, mut runner) = ServiceState::start(config.clone())?; + runner.start(HttpServer::new(config, state.clone())?); + + tokio::select! { + _ = runner.join() => {}, + // NOTE: when every service implements a shutdown listener, + // awaiting on `finished` becomes unnecessary: We can simply join() and guarantee + // that every service finished its main task. + // See also https://github.com/getsentry/relay/issues/4050. + _ = Controller::shutdown_handle().finished() => {} + } + anyhow::Ok(()) })?; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a736fd17f3..4376ec7394 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -31,7 +31,7 @@ use relay_redis::redis::Script; #[cfg(feature = "processing")] use relay_redis::{PooledClient, RedisScripts}; use relay_redis::{RedisError, RedisPool, RedisPools}; -use relay_system::{channel, Addr, Service}; +use relay_system::{channel, Addr, Service, ServiceRunner}; use tokio::runtime::Runtime; use tokio::sync::mpsc; @@ -134,7 +134,6 @@ fn create_store_pool(config: &Config) -> Result { struct StateInner { config: Arc, memory_checker: MemoryChecker, - registry: Registry, } @@ -146,9 +145,10 @@ pub struct ServiceState { impl ServiceState { /// Starts all services and returns addresses to all of them. - pub fn start(config: Arc) -> Result { - let upstream_relay = UpstreamRelayService::new(config.clone()).start(); - let test_store = TestStoreService::new(config.clone()).start(); + pub fn start(config: Arc) -> Result<(Self, ServiceRunner)> { + let mut runner = ServiceRunner::new(); + let upstream_relay = runner.start(UpstreamRelayService::new(config.clone())); + let test_store = runner.start(TestStoreService::new(config.clone())); let redis_pools = config .redis() @@ -173,13 +173,13 @@ impl ServiceState { // Create an address for the `EnvelopeProcessor`, which can be injected into the // other services. let (processor, processor_rx) = channel(EnvelopeProcessorService::name()); - let outcome_producer = OutcomeProducerService::create( + let outcome_producer = runner.start(OutcomeProducerService::create( config.clone(), upstream_relay.clone(), processor.clone(), - )? - .start(); - let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start(); + )?); + let outcome_aggregator = + runner.start(OutcomeAggregator::new(&config, outcome_producer.clone())); let (global_config, global_config_rx) = GlobalConfigService::new(config.clone(), upstream_relay.clone()); @@ -187,12 +187,13 @@ impl ServiceState { // The global config service must start before dependant services are // started. Messages like subscription requests to the global config // service fail if the service is not running. - let global_config = global_config.start(); + let global_config = runner.start(global_config); let (legacy_project_cache, legacy_project_cache_rx) = channel(legacy::ProjectCacheService::name()); - let project_source = ProjectSource::start( + let project_source = ProjectSource::start_in( + &mut runner, Arc::clone(&config), upstream_relay.clone(), redis_pools @@ -200,7 +201,7 @@ impl ServiceState { .map(|pools| pools.project_configs.clone()), ); let project_cache_handle = - ProjectCacheService::new(Arc::clone(&config), project_source).start(); + ProjectCacheService::new(Arc::clone(&config), project_source).start_in(&mut runner); let aggregator = RouterService::new( config.default_aggregator_config().clone(), @@ -208,7 +209,7 @@ impl ServiceState { Some(legacy_project_cache.clone().recipient()), ); let aggregator_handle = aggregator.handle(); - let aggregator = aggregator.start(); + let aggregator = runner.start(aggregator); let metric_stats = MetricStats::new( config.clone(), @@ -229,32 +230,34 @@ impl ServiceState { outcome_aggregator.clone(), metric_outcomes.clone(), ) - .map(|s| s.start()) + .map(|s| runner.start(s)) }) .transpose()?; let cogs = CogsService::new(&config); - let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start())); - - EnvelopeProcessorService::new( - create_processor_pool(&config)?, - config.clone(), - global_config_handle, - project_cache_handle.clone(), - cogs, - #[cfg(feature = "processing")] - redis_pools.clone(), - processor::Addrs { - outcome_aggregator: outcome_aggregator.clone(), - upstream_relay: upstream_relay.clone(), - test_store: test_store.clone(), + let cogs = Cogs::new(CogsServiceRecorder::new(&config, runner.start(cogs))); + + runner.start_with( + EnvelopeProcessorService::new( + create_processor_pool(&config)?, + config.clone(), + global_config_handle, + project_cache_handle.clone(), + cogs, #[cfg(feature = "processing")] - store_forwarder: store.clone(), - aggregator: aggregator.clone(), - }, - metric_outcomes.clone(), - ) - .spawn_handler(processor_rx); + redis_pools.clone(), + processor::Addrs { + outcome_aggregator: outcome_aggregator.clone(), + upstream_relay: upstream_relay.clone(), + test_store: test_store.clone(), + #[cfg(feature = "processing")] + store_forwarder: store.clone(), + aggregator: aggregator.clone(), + }, + metric_outcomes.clone(), + ), + processor_rx, + ); let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); let envelope_buffer = EnvelopeBufferService::new( @@ -268,7 +271,7 @@ impl ServiceState { test_store: test_store.clone(), }, ) - .map(|b| b.start_observable()); + .map(|b| b.start_in(&mut runner)); // Keep all the services in one context. let project_cache_services = legacy::Services { @@ -280,34 +283,37 @@ impl ServiceState { test_store: test_store.clone(), }; - legacy::ProjectCacheService::new( - config.clone(), - MemoryChecker::new(memory_stat.clone(), config.clone()), - project_cache_handle.clone(), - project_cache_services, - global_config_rx, - envelopes_rx, - ) - .spawn_handler(legacy_project_cache_rx); + runner.start_with( + legacy::ProjectCacheService::new( + config.clone(), + MemoryChecker::new(memory_stat.clone(), config.clone()), + project_cache_handle.clone(), + project_cache_services, + global_config_rx, + envelopes_rx, + ), + legacy_project_cache_rx, + ); - let health_check = HealthCheckService::new( + let health_check = runner.start(HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), aggregator_handle, upstream_relay.clone(), envelope_buffer.clone(), - ) - .start(); + )); - RelayStats::new( + runner.start(RelayStats::new( config.clone(), upstream_relay.clone(), #[cfg(feature = "processing")] redis_pools.clone(), - ) - .start(); + )); - let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start(); + let relay_cache = runner.start(RelayCacheService::new( + config.clone(), + upstream_relay.clone(), + )); let registry = Registry { processor, @@ -329,9 +335,12 @@ impl ServiceState { registry, }; - Ok(ServiceState { - inner: Arc::new(state), - }) + Ok(( + ServiceState { + inner: Arc::new(state), + }, + runner, + )) } /// Returns a reference to the Relay configuration. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 519f5bcf90..968bc06d39 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,6 +10,7 @@ use chrono::DateTime; use chrono::Utc; use relay_base_schema::project::ProjectKey; use relay_config::Config; +use relay_system::ServiceRunner; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; use tokio::sync::mpsc::Permit; @@ -144,12 +145,11 @@ impl EnvelopeBufferService { } /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. - pub fn start_observable(self) -> ObservableEnvelopeBuffer { + pub fn start_in(self, runner: &mut ServiceRunner) -> ObservableEnvelopeBuffer { let has_capacity = self.has_capacity.clone(); - ObservableEnvelopeBuffer { - addr: self.start(), - has_capacity, - } + + let addr = runner.start(self); + ObservableEnvelopeBuffer { addr, has_capacity } } /// Wait for the configured amount of time and make sure the project cache is ready to receive. @@ -378,7 +378,7 @@ fn is_expired(last_received_at: DateTime, config: &Config) -> bool { impl Service for EnvelopeBufferService { type Interface = EnvelopeBuffer; - fn spawn_handler(mut self, mut rx: Receiver) { + async fn run(mut self, mut rx: Receiver) { let config = self.config.clone(); let memory_checker = MemoryChecker::new(self.memory_stat.clone(), config.clone()); let mut global_config_rx = self.global_config_rx.clone(); @@ -387,81 +387,14 @@ impl Service for EnvelopeBufferService { let dequeue = Arc::::new(true.into()); let dequeue1 = dequeue.clone(); - relay_system::spawn!(async move { - let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; - - let mut buffer = match buffer { - Ok(buffer) => buffer, - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to start the envelope buffer service", - ); - std::process::exit(1); - } - }; - buffer.initialize().await; - - let mut shutdown = Controller::shutdown_handle(); - let mut project_changes = self.services.project_cache_handle.changes(); - - relay_log::info!("EnvelopeBufferService: starting"); - loop { - let used_capacity = self.services.envelopes_tx.max_capacity() - - self.services.envelopes_tx.capacity(); - relay_statsd::metric!( - histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = - used_capacity as u64 - ); - - let mut sleep = Duration::MAX; - tokio::select! { - // NOTE: we do not select a bias here. - // On the one hand, we might want to prioritize dequeuing over enqueuing - // so we do not exceed the buffer capacity by starving the dequeue. - // on the other hand, prioritizing old messages violates the LIFO design. - Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { - match Self::try_pop(&config, &mut buffer, &services, permit).await { - Ok(new_sleep) => { - sleep = new_sleep; - } - Err(error) => { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to pop envelope" - ); - } - } - } - change = project_changes.recv() => { - if let Ok(ProjectChange::Ready(project_key)) = change { - buffer.mark_ready(&project_key, true); - } - sleep = Duration::ZERO; - } - Some(message) = rx.recv() => { - Self::handle_message(&mut buffer, &services, message).await; - sleep = Duration::ZERO; - } - shutdown = shutdown.notified() => { - // In case the shutdown was handled, we break out of the loop signaling that - // there is no need to process anymore envelopes. - if Self::handle_shutdown(&mut buffer, shutdown).await { - break; - } - } - Ok(()) = global_config_rx.changed() => { - sleep = Duration::ZERO; - } - else => break, - } + let mut buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker) + .await + .expect("failed to start the envelope buffer service"); - self.sleep = sleep; - self.update_observable_state(&mut buffer); - } + buffer.initialize().await; - relay_log::info!("EnvelopeBufferService: stopping"); - }); + let mut shutdown = Controller::shutdown_handle(); + let mut project_changes = self.services.project_cache_handle.changes(); #[cfg(unix)] relay_system::spawn!(async move { @@ -475,6 +408,62 @@ impl Service for EnvelopeBufferService { relay_log::info!("SIGUSR1 receive, dequeue={}", deq); } }); + + relay_log::info!("EnvelopeBufferService: starting"); + loop { + let used_capacity = + self.services.envelopes_tx.max_capacity() - self.services.envelopes_tx.capacity(); + relay_statsd::metric!( + histogram(RelayHistograms::BufferBackpressureEnvelopesCount) = used_capacity as u64 + ); + + let mut sleep = Duration::MAX; + tokio::select! { + // NOTE: we do not select a bias here. + // On the one hand, we might want to prioritize dequeuing over enqueuing + // so we do not exceed the buffer capacity by starving the dequeue. + // on the other hand, prioritizing old messages violates the LIFO design. + Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { + match Self::try_pop(&config, &mut buffer, &services, permit).await { + Ok(new_sleep) => { + sleep = new_sleep; + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to pop envelope" + ); + } + } + } + change = project_changes.recv() => { + if let Ok(ProjectChange::Ready(project_key)) = change { + buffer.mark_ready(&project_key, true); + } + sleep = Duration::ZERO; + } + Some(message) = rx.recv() => { + Self::handle_message(&mut buffer, &services, message).await; + sleep = Duration::ZERO; + } + shutdown = shutdown.notified() => { + // In case the shutdown was handled, we break out of the loop signaling that + // there is no need to process anymore envelopes. + if Self::handle_shutdown(&mut buffer, shutdown).await { + break; + } + } + Ok(()) = global_config_rx.changed() => { + sleep = Duration::ZERO; + } + else => break, + } + + self.sleep = sleep; + self.update_observable_state(&mut buffer); + } + + relay_log::info!("EnvelopeBufferService: stopping"); } } @@ -556,7 +545,8 @@ mod tests { service.has_capacity.store(false, Ordering::Relaxed); - let ObservableEnvelopeBuffer { has_capacity, .. } = service.start_observable(); + let ObservableEnvelopeBuffer { has_capacity, .. } = + service.start_in(&mut ServiceRunner::new()); assert!(!has_capacity.load(Ordering::Relaxed)); tokio::time::advance(Duration::from_millis(100)).await; @@ -580,7 +570,7 @@ mod tests { .. } = envelope_buffer_service(None, global_config::Status::Pending); - let addr = service.start(); + let addr = service.start_detached(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -624,7 +614,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let addr = service.start_detached(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -657,7 +647,7 @@ mod tests { ); let config = service.config.clone(); - let addr = service.start(); + let addr = service.start_detached(); tokio::time::sleep(Duration::from_millis(100)).await; @@ -690,7 +680,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let addr = service.start_detached(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -725,7 +715,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let addr = service.start_detached(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index 93462ddd89..5dc804f927 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -54,12 +54,10 @@ impl CogsService { impl Service for CogsService { type Interface = CogsReport; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - while let Some(message) = rx.recv().await { - self.handle_report(message); - } - }); + async fn run(mut self, mut rx: relay_system::Receiver) { + while let Some(message) = rx.recv().await { + self.handle_report(message); + } } } diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 06b9d72cd6..07520bb8ba 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -338,53 +338,51 @@ impl GlobalConfigService { impl Service for GlobalConfigService { type Interface = GlobalConfigManager; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - let mut shutdown_handle = Controller::shutdown_handle(); - - relay_log::info!("global config service starting"); - if self.config.relay_mode() == RelayMode::Managed { - relay_log::info!("requesting global config from upstream"); - self.request_global_config(); - } else { - match GlobalConfig::load(self.config.path()) { - Ok(Some(from_file)) => { - relay_log::info!("serving static global config loaded from file"); - self.global_config_watch - .send_replace(Status::Ready(Arc::new(from_file))); - } - Ok(None) => { - relay_log::info!( - "serving default global configs due to lacking static global config file" - ); - self.global_config_watch - .send_replace(Status::Ready(Arc::default())); - } - Err(e) => { - relay_log::error!("failed to load global config from file: {}", e); - relay_log::info!( + async fn run(mut self, mut rx: relay_system::Receiver) { + let mut shutdown_handle = Controller::shutdown_handle(); + + relay_log::info!("global config service starting"); + if self.config.relay_mode() == RelayMode::Managed { + relay_log::info!("requesting global config from upstream"); + self.request_global_config(); + } else { + match GlobalConfig::load(self.config.path()) { + Ok(Some(from_file)) => { + relay_log::info!("serving static global config loaded from file"); + self.global_config_watch + .send_replace(Status::Ready(Arc::new(from_file))); + } + Ok(None) => { + relay_log::info!( + "serving default global configs due to lacking static global config file" + ); + self.global_config_watch + .send_replace(Status::Ready(Arc::default())); + } + Err(e) => { + relay_log::error!("failed to load global config from file: {}", e); + relay_log::info!( "serving default global configs due to failure to load global config from file" ); - self.global_config_watch - .send_replace(Status::Ready(Arc::default())); - } + self.global_config_watch + .send_replace(Status::Ready(Arc::default())); } - }; + } + }; - loop { - tokio::select! { - biased; + loop { + tokio::select! { + biased; - () = &mut self.fetch_handle => self.request_global_config(), - Some(result) = self.internal_rx.recv() => self.handle_result(result), - Some(message) = rx.recv() => self.handle_message(message), - _ = shutdown_handle.notified() => self.handle_shutdown(), + () = &mut self.fetch_handle => self.request_global_config(), + Some(result) = self.internal_rx.recv() => self.handle_result(result), + Some(message) = rx.recv() => self.handle_message(message), + _ = shutdown_handle.notified() => self.handle_shutdown(), - else => break, - } + else => break, } - relay_log::info!("global config service stopped"); - }); + } + relay_log::info!("global config service stopped"); } } @@ -421,7 +419,7 @@ mod tests { let service = GlobalConfigService::new(Arc::new(config), upstream) .0 - .start(); + .start_detached(); assert!(service.send(Get).await.is_ok()); @@ -452,7 +450,7 @@ mod tests { let fetch_interval = config.global_config_fetch_interval(); let service = GlobalConfigService::new(Arc::new(config), upstream) .0 - .start(); + .start_detached(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; @@ -479,7 +477,7 @@ mod tests { let service = GlobalConfigService::new(Arc::new(config), upstream) .0 - .start(); + .start_detached(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index 8f3403f76b..b586d22429 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -91,8 +91,6 @@ pub struct HealthCheckService { impl HealthCheckService { /// Creates a new instance of the HealthCheck service. - /// - /// The service does not run. To run the service, use [`start`](Self::start). pub fn new( config: Arc, memory_checker: MemoryChecker, @@ -193,7 +191,7 @@ impl HealthCheckService { impl Service for HealthCheckService { type Interface = HealthCheck; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + async fn run(mut self, mut rx: relay_system::Receiver) { let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy)); let check_interval = self.config.health_refresh_interval(); // Add 10% buffer to the internal timeouts to avoid race conditions. @@ -216,19 +214,17 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - relay_system::spawn!(async move { - while let Some(HealthCheck(message, sender)) = rx.recv().await { - let update = update_rx.borrow(); - - sender.send(if matches!(message, IsHealthy::Liveness) { - Status::Healthy - } else if update.instant.elapsed() >= status_timeout { - Status::Unhealthy - } else { - update.status - }); - } - }); + while let Some(HealthCheck(message, sender)) = rx.recv().await { + let update = update_rx.borrow(); + + sender.send(if matches!(message, IsHealthy::Liveness) { + Status::Healthy + } else if update.instant.elapsed() >= status_timeout { + Status::Unhealthy + } else { + update.status + }); + } } } diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 2c170bd20b..9c433c9ab3 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -246,25 +246,23 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); - let mut shutdown = Controller::shutdown_handle(); - - // Note that currently this loop never exits and will run till the tokio runtime shuts - // down. This is about to change with the refactoring for the shutdown process. - loop { - tokio::select! { - biased; - - _ = ticker.tick() => self.try_flush(), - Some(message) = rx.recv() => self.handle_message(message), - shutdown = shutdown.notified() => self.handle_shutdown(shutdown), - - else => break, - } + async fn run(mut self, mut rx: relay_system::Receiver) { + let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); + let mut shutdown = Controller::shutdown_handle(); + + // Note that currently this loop never exits and will run till the tokio runtime shuts + // down. This is about to change with the refactoring for the shutdown process. + loop { + tokio::select! { + biased; + + _ = ticker.tick() => self.try_flush(), + Some(message) = rx.recv() => self.handle_message(message), + shutdown = shutdown.notified() => self.handle_shutdown(shutdown), + + else => break, } - }); + } } } @@ -361,16 +359,14 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - while let Some(message) = rx.recv().await { - let buckets = message.0.buckets; - relay_log::debug!(?buckets, "received buckets"); - if !self.reject_all { - self.add_buckets(buckets); - } + async fn run(self, mut rx: relay_system::Receiver) { + while let Some(message) = rx.recv().await { + let buckets = message.0.buckets; + relay_log::debug!(?buckets, "received buckets"); + if !self.reject_all { + self.add_buckets(buckets); } - }); + } } } @@ -392,7 +388,7 @@ mod tests { tokio::time::pause(); let receiver = TestReceiver::default(); - let recipient = receiver.clone().start().recipient(); + let recipient = receiver.clone().start_detached().recipient(); let config = AggregatorServiceConfig { aggregator: AggregatorConfig { @@ -402,7 +398,7 @@ mod tests { }, ..Default::default() }; - let aggregator = AggregatorService::new(config, Some(recipient)).start(); + let aggregator = AggregatorService::new(config, Some(recipient)).start_detached(); let mut bucket = some_bucket(); bucket.timestamp = UnixTimestamp::now(); diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index cb9578e2e8..8820dc9a1a 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -4,7 +4,7 @@ use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; -use relay_system::{Addr, NoResponse, Recipient, Service}; +use relay_system::{Addr, NoResponse, Recipient, Service, ServiceRunner}; use crate::services::metrics::{ Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, @@ -53,26 +53,24 @@ impl RouterService { impl Service for RouterService { type Interface = Aggregator; - fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - let mut router = StartedRouter::start(self); - relay_log::info!("metrics router started"); + async fn run(self, mut rx: relay_system::Receiver) { + let mut router = StartedRouter::start_in(self, &mut ServiceRunner::new()); + relay_log::info!("metrics router started"); - // Note that currently this loop never exists and will run till the tokio runtime shuts - // down. This is about to change with the refactoring for the shutdown process. - loop { - tokio::select! { - biased; + // Note that currently this loop never exists and will run till the tokio runtime shuts + // down. This is about to change with the refactoring for the shutdown process. + loop { + tokio::select! { + biased; - Some(message) = rx.recv() => { - router.handle_message(message) - }, + Some(message) = rx.recv() => { + router.handle_message(message) + }, - else => break, - } + else => break, } - relay_log::info!("metrics router stopped"); - }); + } + relay_log::info!("metrics router stopped"); } } @@ -83,7 +81,7 @@ struct StartedRouter { } impl StartedRouter { - fn start(router: RouterService) -> Self { + fn start_in(router: RouterService, runner: &mut ServiceRunner) -> Self { let RouterService { default, secondary } = router; let secondary = secondary @@ -94,12 +92,12 @@ impl StartedRouter { .filter(|&namespace| condition.matches(Some(namespace))) .collect(); - (aggregator.start(), namespaces) + (runner.start(aggregator), namespaces) }) .collect(); Self { - default: default.start(), + default: runner.start(default), secondary, } } diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 22ec72b3e7..bd7b34bf11 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -683,19 +683,17 @@ impl HttpOutcomeProducer { impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - loop { - tokio::select! { - // Prioritize flush over receiving messages to prevent starving. - biased; - - () = &mut self.flush_handle => self.send_batch(), - Some(message) = rx.recv() => self.handle_message(message), - else => break, - } + async fn run(mut self, mut rx: relay_system::Receiver) { + loop { + tokio::select! { + // Prioritize flush over receiving messages to prevent starving. + biased; + + () = &mut self.flush_handle => self.send_batch(), + Some(message) = rx.recv() => self.handle_message(message), + else => break, } - }); + } } } @@ -776,19 +774,17 @@ impl ClientReportOutcomeProducer { impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - loop { - tokio::select! { - // Prioritize flush over receiving messages to prevent starving. - biased; - - () = &mut self.flush_handle => self.flush(), - Some(message) = rx.recv() => self.handle_message(message), - else => break, - } + async fn run(mut self, mut rx: relay_system::Receiver) { + loop { + tokio::select! { + // Prioritize flush over receiving messages to prevent starving. + biased; + + () = &mut self.flush_handle => self.flush(), + Some(message) = rx.recv() => self.handle_message(message), + else => break, } - }); + } } } @@ -980,8 +976,10 @@ impl ProducerInner { match self { #[cfg(feature = "processing")] ProducerInner::Kafka(inner) => OutcomeBroker::Kafka(inner), - ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start()), - ProducerInner::ClientReport(inner) => OutcomeBroker::ClientReport(inner.start()), + ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start_detached()), + ProducerInner::ClientReport(inner) => { + OutcomeBroker::ClientReport(inner.start_detached()) + } ProducerInner::Disabled => OutcomeBroker::Disabled, } } @@ -1035,18 +1033,16 @@ impl OutcomeProducerService { impl Service for OutcomeProducerService { type Interface = OutcomeProducer; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + async fn run(self, mut rx: relay_system::Receiver) { let Self { config, inner } = self; - relay_system::spawn!(async move { - let broker = inner.start(); + let broker = inner.start(); - relay_log::info!("OutcomeProducer started."); - while let Some(message) = rx.recv().await { - broker.handle_message(message, &config); - } - relay_log::info!("OutcomeProducer stopped."); - }); + relay_log::info!("OutcomeProducer started."); + while let Some(message) = rx.recv().await { + broker.handle_message(message, &config); + } + relay_log::info!("OutcomeProducer stopped."); } } diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 91b7561e0c..788facb967 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -138,25 +138,23 @@ impl OutcomeAggregator { impl Service for OutcomeAggregator { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - let mut shutdown = Controller::shutdown_handle(); - relay_log::info!("outcome aggregator started"); - - loop { - tokio::select! { - // Prioritize flush over receiving messages to prevent starving. Shutdown can be - // last since it is not vital if there are still messages in the channel. - biased; - - () = &mut self.flush_handle => self.flush(), - Some(message) = rx.recv() => self.handle_track_outcome(message), - shutdown = shutdown.notified() => self.handle_shutdown(shutdown), - else => break, - } + async fn run(mut self, mut rx: relay_system::Receiver) { + let mut shutdown = Controller::shutdown_handle(); + relay_log::info!("outcome aggregator started"); + + loop { + tokio::select! { + // Prioritize flush over receiving messages to prevent starving. Shutdown can be + // last since it is not vital if there are still messages in the channel. + biased; + + () = &mut self.flush_handle => self.flush(), + Some(message) = rx.recv() => self.handle_track_outcome(message), + shutdown = shutdown.notified() => self.handle_shutdown(shutdown), + else => break, } + } - relay_log::info!("outcome aggregator stopped"); - }); + relay_log::info!("outcome aggregator stopped"); } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index bbe574bfdd..171b957439 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2820,16 +2820,14 @@ impl EnvelopeProcessorService { impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; - fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - while let Some(message) = rx.recv().await { - let service = self.clone(); - self.inner - .workers - .spawn(move || service.handle_message(message)) - .await; - } - }); + async fn run(self, mut rx: relay_system::Receiver) { + while let Some(message) = rx.recv().await { + let service = self.clone(); + self.inner + .workers + .spawn(move || service.handle_message(message)) + .await; + } } } diff --git a/relay-server/src/services/projects/cache/legacy.rs b/relay-server/src/services/projects/cache/legacy.rs index faf59d45de..33e9aeb4b0 100644 --- a/relay-server/src/services/projects/cache/legacy.rs +++ b/relay-server/src/services/projects/cache/legacy.rs @@ -680,7 +680,7 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + async fn run(self, mut rx: relay_system::Receiver) { let Self { config, memory_checker, @@ -694,120 +694,121 @@ impl Service for ProjectCacheService { let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); - relay_system::spawn!(async move { - relay_log::info!("project cache started"); + relay_log::info!("project cache started"); - let global_config = match global_config_rx.borrow().clone() { - global_config::Status::Ready(_) => { - relay_log::info!("global config received"); - GlobalConfigStatus::Ready - } - global_config::Status::Pending => { - relay_log::info!("waiting for global config"); - GlobalConfigStatus::Pending - } - }; + let global_config = match global_config_rx.borrow().clone() { + global_config::Status::Ready(_) => { + relay_log::info!("global config received"); + GlobalConfigStatus::Ready + } + global_config::Status::Pending => { + relay_log::info!("waiting for global config"); + GlobalConfigStatus::Pending + } + }; - let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); - let spool_v1 = match config.spool_v2() { - true => None, - false => Some({ - // Channel for envelope buffering. - let buffer_services = spooler::Services { - outcome_aggregator, - project_cache, - test_store, - }; - let buffer = match BufferService::create( - memory_checker.clone(), - buffer_services, - config.clone(), - ) - .await - { - Ok(buffer) => buffer.start(), - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to start buffer service", - ); - // NOTE: The process will exit with error if the buffer file could not be - // opened or the migrations could not be run. - std::process::exit(1); - } - }; + let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); + let spool_v1 = match config.spool_v2() { + true => None, + false => Some({ + // Channel for envelope buffering. + let buffer_services = spooler::Services { + outcome_aggregator, + project_cache, + test_store, + }; + let buffer = match BufferService::create( + memory_checker.clone(), + buffer_services, + config.clone(), + ) + .await + { + Ok(buffer) => { + // NOTE: This service is not monitored by the service runner. + buffer.start_detached() + } + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to start buffer service", + ); + // NOTE: The process will exit with error if the buffer file could not be + // opened or the migrations could not be run. + std::process::exit(1); + } + }; - // Request the existing index from the spooler. - buffer.send(RestoreIndex); + // Request the existing index from the spooler. + buffer.send(RestoreIndex); - SpoolV1 { - buffer_tx, - index: HashSet::new(), - buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), - buffer, - } - }), - }; + SpoolV1 { + buffer_tx, + index: HashSet::new(), + buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), + buffer, + } + }), + }; - let mut broker = ProjectCacheBroker { - config: config.clone(), - memory_checker, - projects: project_cache_handle, - services, - spool_v1_unspool_handle: SleepHandle::idle(), - spool_v1, - global_config, - }; + let mut broker = ProjectCacheBroker { + config: config.clone(), + memory_checker, + projects: project_cache_handle, + services, + spool_v1_unspool_handle: SleepHandle::idle(), + spool_v1, + global_config, + }; - loop { - tokio::select! { - biased; - - Ok(()) = global_config_rx.changed() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "update_global_config", { - match global_config_rx.borrow().clone() { - global_config::Status::Ready(_) => broker.set_global_config_ready(), - // The watch should only be updated if it gets a new value. - // This would imply a logical bug. - global_config::Status::Pending => relay_log::error!("still waiting for the global config"), - } - }) - }, - project_change = project_changes.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_project_change", { - if let Ok(project_change) = project_change { - broker.handle_project_change(project_change); - } - }) - } - // Buffer will not dequeue the envelopes from the spool if there is not enough - // permits in `BufferGuard` available. Currently this is 50%. - Some(UnspooledEnvelope { managed_envelope, .. }) = buffer_rx.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_processing", { - broker.handle_processing(managed_envelope) - }) - }, - () = &mut broker.spool_v1_unspool_handle => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "periodic_unspool", { - broker.handle_periodic_unspool() - }) - } - Some(message) = rx.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_message", { - broker.handle_message(message) - }) - } - Some(message) = envelopes_rx.recv() => { - metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_envelope", { - broker.handle_envelope(message) - }) - } - else => break, + loop { + tokio::select! { + biased; + + Ok(()) = global_config_rx.changed() => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "update_global_config", { + match global_config_rx.borrow().clone() { + global_config::Status::Ready(_) => broker.set_global_config_ready(), + // The watch should only be updated if it gets a new value. + // This would imply a logical bug. + global_config::Status::Pending => relay_log::error!("still waiting for the global config"), + } + }) + }, + project_change = project_changes.recv() => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_project_change", { + if let Ok(project_change) = project_change { + broker.handle_project_change(project_change); + } + }) + } + // Buffer will not dequeue the envelopes from the spool if there is not enough + // permits in `BufferGuard` available. Currently this is 50%. + Some(UnspooledEnvelope { managed_envelope, .. }) = buffer_rx.recv() => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_processing", { + broker.handle_processing(managed_envelope) + }) + }, + () = &mut broker.spool_v1_unspool_handle => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "periodic_unspool", { + broker.handle_periodic_unspool() + }) + } + Some(message) = rx.recv() => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_message", { + broker.handle_message(message) + }) + } + Some(message) = envelopes_rx.recv() => { + metric!(timer(RelayTimers::LegacyProjectCacheTaskDuration), task = "handle_envelope", { + broker.handle_envelope(message) + }) } + else => break, } + } - relay_log::info!("project cache stopped"); - }); + relay_log::info!("project cache stopped"); } } @@ -867,7 +868,7 @@ mod tests { match BufferService::create(memory_checker.clone(), buffer_services, config.clone()) .await { - Ok(buffer) => buffer.start(), + Ok(buffer) => buffer.start_detached(), Err(err) => { relay_log::error!(error = &err as &dyn Error, "failed to start buffer service"); // NOTE: The process will exit with error if the buffer file could not be diff --git a/relay-server/src/services/projects/cache/service.rs b/relay-server/src/services/projects/cache/service.rs index 3d0c0744c5..435355c106 100644 --- a/relay-server/src/services/projects/cache/service.rs +++ b/relay-server/src/services/projects/cache/service.rs @@ -5,7 +5,7 @@ use futures::StreamExt as _; use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_statsd::metric; -use relay_system::Service; +use relay_system::{Service, ServiceRunner}; use tokio::sync::broadcast; use crate::services::projects::cache::handle::ProjectCacheHandle; @@ -91,7 +91,7 @@ impl ProjectCacheService { /// Consumes and starts a [`ProjectCacheService`]. /// /// Returns a [`ProjectCacheHandle`] to access the cache concurrently. - pub fn start(self) -> ProjectCacheHandle { + pub fn start_in(self, runner: &mut ServiceRunner) -> ProjectCacheHandle { let (addr, addr_rx) = relay_system::channel(Self::name()); let handle = ProjectCacheHandle { @@ -101,7 +101,7 @@ impl ProjectCacheService { project_changes: self.project_events_tx.clone(), }; - self.spawn_handler(addr_rx); + runner.start_with(self, addr_rx); handle } @@ -194,7 +194,7 @@ impl ProjectCacheService { impl relay_system::Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + async fn run(mut self, mut rx: relay_system::Receiver) { macro_rules! timed { ($task:expr, $body:expr) => {{ let task_name = $task; @@ -206,25 +206,23 @@ impl relay_system::Service for ProjectCacheService { }}; } - relay_system::spawn!(async move { - loop { - tokio::select! { - biased; - - Some(fetch) = self.scheduled_fetches.next() => timed!( - "completed_fetch", - self.handle_completed_fetch(fetch) - ), - Some(message) = rx.recv() => timed!( - message.variant(), - self.handle_message(message) - ), - Some(eviction) = self.store.next_eviction() => timed!( - "eviction", - self.handle_eviction(eviction) - ), - } + loop { + tokio::select! { + biased; + + Some(fetch) = self.scheduled_fetches.next() => timed!( + "completed_fetch", + self.handle_completed_fetch(fetch) + ), + Some(message) = rx.recv() => timed!( + message.variant(), + self.handle_message(message) + ), + Some(eviction) = self.store.next_eviction() => timed!( + "eviction", + self.handle_eviction(eviction) + ), } - }); + } } } diff --git a/relay-server/src/services/projects/source/local.rs b/relay-server/src/services/projects/source/local.rs index c4c89db31c..a288d57d05 100644 --- a/relay-server/src/services/projects/source/local.rs +++ b/relay-server/src/services/projects/source/local.rs @@ -171,28 +171,26 @@ async fn spawn_poll_local_states( impl Service for LocalProjectSourceService { type Interface = LocalProjectSource; - fn spawn_handler(mut self, mut rx: Receiver) { + async fn run(mut self, mut rx: Receiver) { // Use a channel with size 1. If the channel is full because the consumer does not // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); - relay_system::spawn!(async move { - relay_log::info!("project local cache started"); + relay_log::info!("project local cache started"); - // Start the background task that periodically reloads projects from disk: - spawn_poll_local_states(&self.config, state_tx).await; + // Start the background task that periodically reloads projects from disk: + spawn_poll_local_states(&self.config, state_tx).await; - loop { - tokio::select! { - biased; - Some(message) = rx.recv() => self.handle_message(message), - Some(states) = state_rx.recv() => self.local_states = states, + loop { + tokio::select! { + biased; + Some(message) = rx.recv() => self.handle_message(message), + Some(states) = state_rx.recv() => self.local_states = states, - else => break, - } + else => break, } - relay_log::info!("project local cache stopped"); - }); + } + relay_log::info!("project local cache stopped"); } } diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 1311acfc40..2720e39c58 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -6,7 +6,7 @@ use relay_base_schema::project::ProjectKey; use relay_config::RedisConfigRef; use relay_config::{Config, RelayMode}; use relay_redis::RedisPool; -use relay_system::{Addr, Service as _}; +use relay_system::{Addr, ServiceRunner}; #[cfg(feature = "processing")] use tokio::sync::Semaphore; @@ -40,15 +40,18 @@ pub struct ProjectSource { } impl ProjectSource { - /// Starts all project source services in the current runtime. - pub fn start( + /// Starts all project source services in the given `ServiceRunner`. + pub fn start_in( + runner: &mut ServiceRunner, config: Arc, upstream_relay: Addr, _redis: Option, ) -> Self { - let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = - UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); + let local_source = runner.start(LocalProjectSourceService::new(config.clone())); + let upstream_source = runner.start(UpstreamProjectSourceService::new( + config.clone(), + upstream_relay, + )); #[cfg(feature = "processing")] let redis_max_connections = config diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs index 5b1e6d76ce..ac760ab649 100644 --- a/relay-server/src/services/projects/source/upstream.rs +++ b/relay-server/src/services/projects/source/upstream.rs @@ -635,22 +635,20 @@ impl UpstreamProjectSourceService { impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - relay_log::info!("project upstream cache started"); - loop { - tokio::select! { - biased; + async fn run(mut self, mut rx: relay_system::Receiver) { + relay_log::info!("project upstream cache started"); + loop { + tokio::select! { + biased; - () = &mut self.fetch_handle => self.do_fetch(), - Some(responses) = self.inner_rx.recv() => self.handle_responses(responses), - Some(message) = rx.recv() => self.handle_message(message), + () = &mut self.fetch_handle => self.do_fetch(), + Some(responses) = self.inner_rx.recv() => self.handle_responses(responses), + Some(message) = rx.recv() => self.handle_message(message), - else => break, - } + else => break, } - relay_log::info!("project upstream cache stopped"); - }); + } + relay_log::info!("project upstream cache stopped"); } } @@ -688,7 +686,8 @@ mod tests { }}; } - let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start(); + let service = + UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start_detached(); let mut response1 = service.send(FetchProjectState { project_key, diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 8d10a33b33..7f160625ad 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -334,23 +334,21 @@ impl RelayCacheService { impl Service for RelayCacheService { type Interface = RelayCache; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - relay_log::info!("key cache started"); - - loop { - tokio::select! { - // Prioritize flush over receiving messages to prevent starving. - biased; - - Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result), - () = &mut self.delay => self.fetch_relays(), - Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1), - else => break, - } + async fn run(mut self, mut rx: relay_system::Receiver) { + relay_log::info!("key cache started"); + + loop { + tokio::select! { + // Prioritize flush over receiving messages to prevent starving. + biased; + + Some(result) = self.fetch_channel.1.recv() => self.handle_fetch_result(result), + () = &mut self.delay => self.fetch_relays(), + Some(message) = rx.recv() => self.get_or_fetch(message.0, message.1), + else => break, } + } - relay_log::info!("key cache stopped"); - }); + relay_log::info!("key cache stopped"); } } diff --git a/relay-server/src/services/server/mod.rs b/relay-server/src/services/server/mod.rs index d29787c38c..02d22353bd 100644 --- a/relay-server/src/services/server/mod.rs +++ b/relay-server/src/services/server/mod.rs @@ -112,7 +112,7 @@ fn listen(config: &Config) -> Result { Ok(socket.listen(config.tcp_listen_backlog())?.into_std()?) } -fn serve(listener: TcpListener, app: App, config: Arc) { +async fn serve(listener: TcpListener, app: App, config: Arc) { let handle = Handle::new(); let acceptor = self::acceptor::RelayAcceptor::new() @@ -139,7 +139,6 @@ fn serve(listener: TcpListener, app: App, config: Arc) { .keep_alive_timeout(config.keepalive_timeout()); let service = ServiceExt::::into_make_service_with_connect_info::(app); - relay_system::spawn!(server.serve(service)); relay_system::spawn!(emit_active_connections_metric( config.metrics_periodic_interval(), @@ -155,6 +154,11 @@ fn serve(listener: TcpListener, app: App, config: Arc) { None => handle.shutdown(), } }); + + server + .serve(service) + .await + .expect("failed to start axum server"); } /// HTTP server service. @@ -182,7 +186,7 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + async fn run(self, _rx: relay_system::Receiver) { let Self { config, service, @@ -194,7 +198,7 @@ impl Service for HttpServer { relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); let app = make_app(service); - serve(listener, app, config); + serve(listener, app, config).await; } } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 0bfbd6b82a..40741d2693 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1264,34 +1264,32 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - let mut shutdown = Controller::shutdown_handle(); - - loop { - tokio::select! { - biased; - - Some(message) = rx.recv() => { - if let Err(err) = self.handle_message(message).await { - relay_log::error!( - error = &err as &dyn Error, - "failed to handle an incoming message", - ); - } + async fn run(mut self, mut rx: relay_system::Receiver) { + let mut shutdown = Controller::shutdown_handle(); + + loop { + tokio::select! { + biased; + + Some(message) = rx.recv() => { + if let Err(err) = self.handle_message(message).await { + relay_log::error!( + error = &err as &dyn Error, + "failed to handle an incoming message", + ); } - _ = shutdown.notified() => { - if let Err(err) = self.handle_shutdown().await { - relay_log::error!( - error = &err as &dyn Error, - "failed while shutting down the service", - ); - } + } + _ = shutdown.notified() => { + if let Err(err) = self.handle_shutdown().await { + relay_log::error!( + error = &err as &dyn Error, + "failed while shutting down the service", + ); } - else => break, } + else => break, } - }); + } } } @@ -1394,7 +1392,7 @@ mod tests { let service = BufferService::create(memory_checker, services(), config) .await .unwrap(); - let addr = service.start(); + let addr = service.start_detached(); let (tx, mut rx) = mpsc::unbounded_channel(); // Test cases: @@ -1609,7 +1607,7 @@ mod tests { let buffer = BufferService::create(memory_checker, services, config) .await .unwrap(); - let addr = buffer.start(); + let addr = buffer.start_detached(); addr.send(RestoreIndex); // Give some time to process the message tokio::time::sleep(Duration::from_millis(500)).await; @@ -1654,7 +1652,7 @@ mod tests { let buffer = BufferService::create(memory_checker, services, config) .await .unwrap(); - let addr = buffer.start(); + let addr = buffer.start_detached(); let mut keys = HashSet::new(); for _ in 1..=300 { diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 3b05388d81..6ae5183c30 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -134,20 +134,18 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + async fn run(self, _rx: relay_system::Receiver) { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { return; }; - relay_system::spawn!(async move { - loop { - let _ = tokio::join!( - self.upstream_status(), - self.tokio_metrics(), - self.redis_pools(), - ); - ticker.tick().await; - } - }); + loop { + let _ = tokio::join!( + self.upstream_status(), + self.tokio_metrics(), + self.redis_pools(), + ); + ticker.tick().await; + } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 20007b23ad..d20133535e 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1048,21 +1048,19 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + async fn run(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); - relay_system::spawn!(async move { - relay_log::info!("store forwarder started"); + relay_log::info!("store forwarder started"); - while let Some(message) = rx.recv().await { - let service = Arc::clone(&this); - this.workers - .spawn(move || service.handle_message(message)) - .await; - } + while let Some(message) = rx.recv().await { + let service = Arc::clone(&this); + this.workers + .spawn(move || service.handle_message(message)) + .await; + } - relay_log::info!("store forwarder stopped"); - }); + relay_log::info!("store forwarder stopped"); } } diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index f9a2cc9d12..4a8ec18c0e 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -134,11 +134,9 @@ impl TestStoreService { impl relay_system::Service for TestStoreService { type Interface = TestStore; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn!(async move { - while let Some(message) = rx.recv().await { - self.handle_message(message); - } - }); + async fn run(mut self, mut rx: relay_system::Receiver) { + while let Some(message) = rx.recv().await { + self.handle_message(message); + } } } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index 599186660f..d33ac175b7 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -1498,7 +1498,7 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + async fn run(self, mut rx: relay_system::Receiver) { let Self { config } = self; let client = SharedClient::build(config.clone()); @@ -1528,18 +1528,16 @@ impl Service for UpstreamRelayService { action_tx, }; - relay_system::spawn!(async move { - loop { - tokio::select! { - biased; + loop { + tokio::select! { + biased; - Some(action) = action_rx.recv() => broker.handle_action(action), - Some(request) = broker.next_request() => broker.execute(request), - Some(message) = rx.recv() => broker.handle_message(message).await, + Some(action) = action_rx.recv() => broker.handle_action(action), + Some(request) = broker.next_request() => broker.execute(request), + Some(message) = rx.recv() => broker.handle_message(message).await, - else => break, - } + else => break, } - }); + } } } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 3997c8c286..edef9521ff 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -121,7 +121,7 @@ impl ShutdownHandle { /// ### Example /// /// ``` -/// use relay_system::{Controller, Service, Shutdown, ShutdownMode}; +/// use relay_system::{Controller, Service, ServiceRunner, Shutdown, ShutdownMode}; /// use std::time::Duration; /// /// struct MyService; @@ -129,17 +129,14 @@ impl ShutdownHandle { /// impl Service for MyService { /// type Interface = (); /// -/// fn spawn_handler(self, mut rx: relay_system::Receiver) { -/// relay_system::spawn!(async move { -/// let mut shutdown = Controller::shutdown_handle(); -/// -/// loop { -/// tokio::select! { -/// shutdown = shutdown.notified() => break, // Handle shutdown here -/// Some(message) = rx.recv() => (), // Process incoming message -/// } +/// async fn run(self, mut rx: relay_system::Receiver) { +/// let mut shutdown = Controller::shutdown_handle(); +/// loop { +/// tokio::select! { +/// shutdown = shutdown.notified() => break, // Handle shutdown here +/// Some(message) = rx.recv() => (), // Process incoming message /// } -/// }); +/// } /// } /// } /// @@ -151,9 +148,9 @@ impl ShutdownHandle { /// /// // Start all other services. Controller::shutdown_handle will use the same controller /// // instance and receives the configured shutdown timeout. -/// let addr = MyService.start(); +/// let addr = ServiceRunner::new().start(MyService); /// -/// // By triggering a shutdown, all attached services will be notified. This happens +/// // By triggering a shutdown, all subscribed services will be notified. This happens /// // automatically when a signal is sent to the process (e.g. SIGINT or SIGTERM). /// Controller::shutdown(ShutdownMode::Graceful); /// diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index 388a07459f..9add08d35c 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -8,11 +8,13 @@ use std::task::{Context, Poll}; use std::time::Duration; use futures::future::Shared; -use futures::FutureExt; -use tokio::runtime::Runtime; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; +use crate::spawn; use crate::statsd::SystemGauges; /// Interval for recording backlog metrics on service channels. @@ -818,7 +820,7 @@ where /// /// This channel is meant to be polled in a [`Service`]. /// -/// Instances are created automatically when [spawning](Service::spawn_handler) a service, or can be +/// Instances are created automatically when [spawning](ServiceRunner::start) a service, or can be /// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped. pub struct Receiver { rx: mpsc::UnboundedReceiver, @@ -903,7 +905,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// Individual messages can have a response which will be sent once the message is handled by the /// service. The sender can asynchronously await the responses of such messages. /// -/// To start a service, create an instance of the service and use [`Service::start`]. +/// To start a service, create a service runner and call [`ServiceRunner::start`]. /// /// # Implementing Services /// @@ -912,7 +914,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// synchronous, so that this needs to spawn at least one task internally: /// /// ```no_run -/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service}; +/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ServiceRunner}; /// /// struct MyMessage; /// @@ -931,16 +933,14 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// impl Service for MyService { /// type Interface = MyMessage; /// -/// fn spawn_handler(self, mut rx: Receiver) { -/// relay_system::spawn!(async move { -/// while let Some(message) = rx.recv().await { -/// // handle the message -/// } -/// }); +/// async fn run(self, mut rx: Receiver) { +/// while let Some(message) = rx.recv().await { +/// // handle the message +/// } /// } /// } /// -/// let addr = MyService.start(); +/// let addr = ServiceRunner::new().start(MyService); /// ``` /// /// ## Debounce and Caching @@ -997,25 +997,22 @@ pub trait Service: Sized { /// can be handled by this service. type Interface: Interface; - /// Spawns a task to handle service messages. + /// Defines the main task of this service. /// - /// Receives an inbound channel for all messages sent through the service's [`Addr`]. Note - /// that this function is synchronous, so that this needs to spawn a task internally. - fn spawn_handler(self, rx: Receiver); + /// `run` typically contains a loop that reads from `rx`, or a `select!` that reads + /// from multiple sources at once. + fn run(self, rx: Receiver) -> impl Future + Send + 'static; /// Starts the service in the current runtime and returns an address for it. - fn start(self) -> Addr { + /// + /// The service runs in a detached tokio task that cannot be joined on. This is mainly useful + /// for tests. + fn start_detached(self) -> Addr { let (addr, rx) = channel(Self::name()); - self.spawn_handler(rx); + spawn!(self.run(rx)); addr } - /// Starts the service in the given runtime and returns an address for it. - fn start_in(self, runtime: &Runtime) -> Addr { - let _guard = runtime.enter(); - self.start() - } - /// Returns a unique name for this service implementation. /// /// This is used for internal diagnostics and uses the fully qualified type name of the service @@ -1025,6 +1022,45 @@ pub trait Service: Sized { } } +/// Keeps track of running services. +/// +/// Exposes information about crashed services. +#[derive(Debug, Default)] +pub struct ServiceRunner(FuturesUnordered>); + +impl ServiceRunner { + /// Creates a new service runner. + pub fn new() -> Self { + Self(FuturesUnordered::new()) + } + + /// Starts a service and starts tracking its join handle, exposing an [Addr] for message passing. + pub fn start(&mut self, service: S) -> Addr { + let (addr, rx) = channel(S::name()); + self.start_with(service, rx); + addr + } + + /// Starts a service and starts tracking its join handle, given a predefined receiver. + pub fn start_with(&mut self, service: S, rx: Receiver) { + self.0.push(spawn!(service.run(rx))); + } + + /// Awaits until all services have finished. + /// + /// Panics if one of the spawned services has panicked. + pub async fn join(&mut self) { + while let Some(res) = self.0.next().await { + if let Err(e) = res { + if e.is_panic() { + // Re-trigger panic to terminate the process: + std::panic::resume_unwind(e.into_panic()); + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -1046,12 +1082,10 @@ mod tests { impl Service for MockService { type Interface = MockMessage; - fn spawn_handler(self, mut rx: Receiver) { - crate::spawn!(async move { - while rx.recv().await.is_some() { - tokio::time::sleep(BACKLOG_INTERVAL * 2).await; - } - }); + async fn run(self, mut rx: Receiver) { + while rx.recv().await.is_some() { + tokio::time::sleep(BACKLOG_INTERVAL * 2).await; + } } fn name() -> &'static str { @@ -1070,7 +1104,7 @@ mod tests { tokio::time::pause(); // Mock service takes 2 * BACKLOG_INTERVAL for every message - let addr = MockService.start(); + let addr = MockService.start_detached(); // Advance the timer by a tiny offset to trigger the first metric emission. let captures = relay_statsd::with_capturing_test_client(|| {