Skip to content

Commit

Permalink
refactor: making pusher and scheduler singletons
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez committed Dec 9, 2024
1 parent 9b1a63a commit b957dcb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
3 changes: 2 additions & 1 deletion integrationos-emit/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ impl Server {
pusher.start(&config, s).await
}));

let config = server.state.config.clone();
subsys.start(SubsystemBuilder::new(
"SchedulerSubsystem",
|s| async move { scheduler.start(s).await },
|s| async move { scheduler.start(&config, s).await },
));

subsys.start(SubsystemBuilder::new("ServerSubsystem", |s| async move {
Expand Down
2 changes: 2 additions & 0 deletions integrationos-emit/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use integrationos_domain::{Id, IntegrationOSError, Unit};
use strum::{AsRefStr, Display, EnumIter, EnumString};
use tokio_graceful_shutdown::SubsystemHandle;

pub const SINGLETON_ID: u32 = 0;

#[async_trait]
pub trait EventStreamExt<T = EventEntity> {
async fn publish(
Expand Down
7 changes: 6 additions & 1 deletion integrationos-emit/src/stream/pusher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::EventStreamExt;
use super::{EventStreamExt, SINGLETON_ID};
use crate::{
domain::{config::EmitterConfig, deduplication::Deduplication, event::EventEntity},
stream::EventStreamTopic,
Expand Down Expand Up @@ -26,6 +26,11 @@ impl EventPusher {
config: &EmitterConfig,
subsys: SubsystemHandle,
) -> Result<Unit, IntegrationOSError> {
if config.partition()? != SINGLETON_ID {
tracing::info!("Limiting events to singleton id {}. Publisher proccessing finished.", SINGLETON_ID);
return Ok(());
}

match self.process(config).cancel_on_shutdown(&subsys).await {
Ok(result) => {
tracing::info!("Scheduled event publisher finished");
Expand Down
21 changes: 18 additions & 3 deletions integrationos-emit/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::EventStreamExt;
use crate::{domain::event::ScheduledEvent, stream::EventStreamTopic};
use super::{EventStreamExt, SINGLETON_ID};
use crate::{
domain::{config::EmitterConfig, event::ScheduledEvent},
stream::EventStreamTopic,
};
use chrono::Utc;
use futures::{StreamExt, TryStreamExt};
use integrationos_domain::{IntegrationOSError, InternalError, MongoStore, Unit};
Expand All @@ -18,7 +21,19 @@ pub struct PublishScheduler {
}

impl PublishScheduler {
pub async fn start(&self, subsys: SubsystemHandle) -> Result<Unit, IntegrationOSError> {
pub async fn start(
&self,
config: &EmitterConfig,
subsys: SubsystemHandle,
) -> Result<Unit, IntegrationOSError> {
if config.partition()? != SINGLETON_ID {
tracing::info!(
"Limiting events to singleton id {}. Scheduling proccessing finished.",
SINGLETON_ID
);
return Ok(());
}

match self.process().cancel_on_shutdown(&subsys).await {
Ok(result) => {
tracing::info!("Scheduled event publisher finished");
Expand Down

0 comments on commit b957dcb

Please sign in to comment.