diff --git a/integrationos-emit/src/server.rs b/integrationos-emit/src/server.rs index 9400339d..db08ea3e 100644 --- a/integrationos-emit/src/server.rs +++ b/integrationos-emit/src/server.rs @@ -193,9 +193,10 @@ impl Server { pusher.start(&config, s).await })); + let config = server.state.config.clone(); subsys.start(SubsystemBuilder::new( "SchedulerSubsystem", - |s| async move { scheduler.start(s).await }, + |s| async move { scheduler.start(&config, s).await }, )); subsys.start(SubsystemBuilder::new("ServerSubsystem", |s| async move { diff --git a/integrationos-emit/src/stream/mod.rs b/integrationos-emit/src/stream/mod.rs index 2b6c178b..05d7ca91 100644 --- a/integrationos-emit/src/stream/mod.rs +++ b/integrationos-emit/src/stream/mod.rs @@ -9,6 +9,8 @@ use integrationos_domain::{Id, IntegrationOSError, Unit}; use strum::{AsRefStr, Display, EnumIter, EnumString}; use tokio_graceful_shutdown::SubsystemHandle; +pub const SINGLETON_ID: u32 = 0; + #[async_trait] pub trait EventStreamExt { async fn publish( diff --git a/integrationos-emit/src/stream/pusher.rs b/integrationos-emit/src/stream/pusher.rs index fda181aa..069c481c 100644 --- a/integrationos-emit/src/stream/pusher.rs +++ b/integrationos-emit/src/stream/pusher.rs @@ -1,4 +1,4 @@ -use super::EventStreamExt; +use super::{EventStreamExt, SINGLETON_ID}; use crate::{ domain::{config::EmitterConfig, deduplication::Deduplication, event::EventEntity}, stream::EventStreamTopic, @@ -26,6 +26,11 @@ impl EventPusher { config: &EmitterConfig, subsys: SubsystemHandle, ) -> Result { + if config.partition()? != SINGLETON_ID { + tracing::info!("Limiting events to singleton id {}. Publisher proccessing finished.", SINGLETON_ID); + return Ok(()); + } + match self.process(config).cancel_on_shutdown(&subsys).await { Ok(result) => { tracing::info!("Scheduled event publisher finished"); diff --git a/integrationos-emit/src/stream/scheduler.rs b/integrationos-emit/src/stream/scheduler.rs index 2f4c6864..d407b560 100644 --- a/integrationos-emit/src/stream/scheduler.rs +++ b/integrationos-emit/src/stream/scheduler.rs @@ -1,5 +1,8 @@ -use super::EventStreamExt; -use crate::{domain::event::ScheduledEvent, stream::EventStreamTopic}; +use super::{EventStreamExt, SINGLETON_ID}; +use crate::{ + domain::{config::EmitterConfig, event::ScheduledEvent}, + stream::EventStreamTopic, +}; use chrono::Utc; use futures::{StreamExt, TryStreamExt}; use integrationos_domain::{IntegrationOSError, InternalError, MongoStore, Unit}; @@ -18,7 +21,19 @@ pub struct PublishScheduler { } impl PublishScheduler { - pub async fn start(&self, subsys: SubsystemHandle) -> Result { + pub async fn start( + &self, + config: &EmitterConfig, + subsys: SubsystemHandle, + ) -> Result { + if config.partition()? != SINGLETON_ID { + tracing::info!( + "Limiting events to singleton id {}. Scheduling proccessing finished.", + SINGLETON_ID + ); + return Ok(()); + } + match self.process().cancel_on_shutdown(&subsys).await { Ok(result) => { tracing::info!("Scheduled event publisher finished");