Skip to content

Commit

Permalink
fix(server): Terminate on service panic (#4249)
Browse files Browse the repository at this point in the history
Terminate the process when a panic occurs in one of the services.

Instead of implementing a sync `fn spawn_handler()` interface, each
service now implements an `async fn run()` interface. This simplifies
service implementations in most cases (except where services need to
spawn multiple tasks), and allows us to join on all service main tasks
to detect a panic.

The PR introduces a `ServiceRunner` utility to easily start & join on
running services.

---------

Co-authored-by: Sebastian Zivota <[email protected]>
  • Loading branch information
jjbayer and loewenheim authored Nov 19, 2024
1 parent 8acb6f0 commit 69c2e6a
Show file tree
Hide file tree
Showing 26 changed files with 623 additions and 614 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Bug Fixes**:

- Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155))
- Terminate the process when one of the services crashes. ([#4249](https://github.com/getsentry/relay/pull/4249))

**Features**:

Expand Down
17 changes: 13 additions & 4 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ mod testutils;
use std::sync::Arc;

use relay_config::Config;
use relay_system::{Controller, Service};
use relay_system::Controller;

use crate::service::ServiceState;
use crate::services::server::HttpServer;
Expand All @@ -301,9 +301,18 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();
Controller::shutdown_handle().finished().await;
let (state, mut runner) = ServiceState::start(config.clone())?;
runner.start(HttpServer::new(config, state.clone())?);

tokio::select! {
_ = runner.join() => {},
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
_ = Controller::shutdown_handle().finished() => {}
}

anyhow::Ok(())
})?;

Expand Down
117 changes: 63 additions & 54 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use relay_redis::redis::Script;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisScripts};
use relay_redis::{RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use relay_system::{channel, Addr, Service, ServiceRunner};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -134,7 +134,6 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,

registry: Registry,
}

Expand All @@ -146,9 +145,10 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();
pub fn start(config: Arc<Config>) -> Result<(Self, ServiceRunner)> {
let mut runner = ServiceRunner::new();
let upstream_relay = runner.start(UpstreamRelayService::new(config.clone()));
let test_store = runner.start(TestStoreService::new(config.clone()));

let redis_pools = config
.redis()
Expand All @@ -173,42 +173,43 @@ impl ServiceState {
// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
let outcome_producer = OutcomeProducerService::create(
let outcome_producer = runner.start(OutcomeProducerService::create(
config.clone(),
upstream_relay.clone(),
processor.clone(),
)?
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();
)?);
let outcome_aggregator =
runner.start(OutcomeAggregator::new(&config, outcome_producer.clone()));

let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
// The global config service must start before dependant services are
// started. Messages like subscription requests to the global config
// service fail if the service is not running.
let global_config = global_config.start();
let global_config = runner.start(global_config);

let (legacy_project_cache, legacy_project_cache_rx) =
channel(legacy::ProjectCacheService::name());

let project_source = ProjectSource::start(
let project_source = ProjectSource::start_in(
&mut runner,
Arc::clone(&config),
upstream_relay.clone(),
redis_pools
.as_ref()
.map(|pools| pools.project_configs.clone()),
);
let project_cache_handle =
ProjectCacheService::new(Arc::clone(&config), project_source).start();
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(&mut runner);

let aggregator = RouterService::new(
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(legacy_project_cache.clone().recipient()),
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();
let aggregator = runner.start(aggregator);

let metric_stats = MetricStats::new(
config.clone(),
Expand All @@ -229,32 +230,34 @@ impl ServiceState {
outcome_aggregator.clone(),
metric_outcomes.clone(),
)
.map(|s| s.start())
.map(|s| runner.start(s))
})
.transpose()?;

let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));

EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
redis_pools.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
test_store: test_store.clone(),
let cogs = Cogs::new(CogsServiceRecorder::new(&config, runner.start(cogs)));

runner.start_with(
EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
global_config_handle,
project_cache_handle.clone(),
cogs,
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
},
metric_outcomes.clone(),
)
.spawn_handler(processor_rx);
redis_pools.clone(),
processor::Addrs {
outcome_aggregator: outcome_aggregator.clone(),
upstream_relay: upstream_relay.clone(),
test_store: test_store.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
aggregator: aggregator.clone(),
},
metric_outcomes.clone(),
),
processor_rx,
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(
Expand All @@ -268,7 +271,7 @@ impl ServiceState {
test_store: test_store.clone(),
},
)
.map(|b| b.start_observable());
.map(|b| b.start_in(&mut runner));

// Keep all the services in one context.
let project_cache_services = legacy::Services {
Expand All @@ -280,34 +283,37 @@ impl ServiceState {
test_store: test_store.clone(),
};

legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
envelopes_rx,
)
.spawn_handler(legacy_project_cache_rx);
runner.start_with(
legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
envelopes_rx,
),
legacy_project_cache_rx,
);

let health_check = HealthCheckService::new(
let health_check = runner.start(HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
envelope_buffer.clone(),
)
.start();
));

RelayStats::new(
runner.start(RelayStats::new(
config.clone(),
upstream_relay.clone(),
#[cfg(feature = "processing")]
redis_pools.clone(),
)
.start();
));

let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start();
let relay_cache = runner.start(RelayCacheService::new(
config.clone(),
upstream_relay.clone(),
));

let registry = Registry {
processor,
Expand All @@ -329,9 +335,12 @@ impl ServiceState {
registry,
};

Ok(ServiceState {
inner: Arc::new(state),
})
Ok((
ServiceState {
inner: Arc::new(state),
},
runner,
))
}

/// Returns a reference to the Relay configuration.
Expand Down
Loading

0 comments on commit 69c2e6a

Please sign in to comment.