Skip to content

Commit

Permalink
chore: introducing claimedBy to avoid double publish
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez committed Dec 10, 2024
1 parent 63aa3b5 commit f3e7353
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 46 deletions.
11 changes: 6 additions & 5 deletions integrationos-emit/src/domain/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl Event {
entity: self.clone(),
entity_id: Id::now(IdPrefix::PipelineEvent),
outcome: EventStatus::Created,
claimed_by: None,
metadata: RecordMetadata::default(),
}
}
Expand All @@ -39,17 +40,21 @@ pub struct EventEntity {
pub entity_id: Id,
pub entity: Event,
pub outcome: EventStatus,
#[serde(skip_serializing_if = "Option::is_none", default)]
// TODO: Index this field
pub claimed_by: Option<u32>,
#[serde(flatten, default)]
pub metadata: RecordMetadata,
}

impl EventEntity {
pub fn with_outcome(&self, outcome: EventStatus) -> Self {
pub fn with_status(&self, outcome: EventStatus) -> Self {
let mut metadata = self.metadata.clone();
metadata.mark_updated("system");
Self {
entity_id: self.entity_id,
entity: self.entity.clone(),
claimed_by: self.claimed_by,
outcome,
metadata,
}
Expand All @@ -66,10 +71,6 @@ impl EventEntity {
pub fn error(&self) -> Option<String> {
self.outcome.error()
}

pub fn is_created(&self) -> bool {
matches!(self.outcome, EventStatus::Created)
}
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, EnumString, AsRefStr)]
Expand Down
47 changes: 31 additions & 16 deletions integrationos-emit/src/stream/fluvio_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub struct FluvioDriverImpl {
pub dlq_consumer: ConsumerConfig,
pub tgt_producer: TargetProducer,
pub dlq_producer: DlqProducer,
metrics: MetricsRegistry,
pub partition: u32,
pub metrics: MetricsRegistry,
}

impl FluvioDriverImpl {
pub async fn new(config: &EmitterConfig) -> Result<Self, IntegrationOSError> {
let fluvio_config = FluvioConfig::new(config.fluvio.endpoint());
let fluvio_client = Fluvio::connect_with_config(&fluvio_config).await?;
let partition = config.partition()?;

let tgt_producer = match &config.fluvio.producer_topic {
Some(producer_topic) => {
Expand Down Expand Up @@ -114,7 +116,7 @@ impl FluvioDriverImpl {

let ext = ConsumerConfigExtBuilder::default()
.topic(consumer_topic)
.partition(config.partition()?)
.partition(partition)
.offset_start(offset)
.offset_consumer(consumer_id)
.offset_strategy(OffsetManagementStrategy::Manual)
Expand Down Expand Up @@ -147,7 +149,7 @@ impl FluvioDriverImpl {

let ext = ConsumerConfigExtBuilder::default()
.topic(&topic)
.partition(config.partition()?)
.partition(partition)
.offset_start(Offset::beginning())
.offset_consumer(consumer_id)
.offset_strategy(OffsetManagementStrategy::Manual)
Expand All @@ -166,6 +168,7 @@ impl FluvioDriverImpl {
dlq_consumer,
tgt_producer,
dlq_producer,
partition,
metrics: MetricsRegistry::default(),
})
}
Expand Down Expand Up @@ -241,6 +244,7 @@ impl FluvioDriverImpl {

if subsys.is_shutdown_requested() {
tracing::info!("Consumer for {} cancelled by external request. Breaking the loop", target.as_ref());
unset_claimed_by(ctx, self.partition).await?;
break Ok(());
}
}
Expand Down Expand Up @@ -392,6 +396,8 @@ impl EventStreamExt for FluvioDriverImpl {
return Ok(());
}

// TODO: Update claimedBy

let insert_result = ctx
.app_stores
.deduplication
Expand Down Expand Up @@ -431,7 +437,7 @@ impl EventStreamExt for FluvioDriverImpl {
})
.await;

update_event_outcome(ctx, event, EventStatus::executed()).await?;
update_event_status(ctx, event, EventStatus::executed()).await?;

if let Err(e) = result {
self.metrics.errored(1);
Expand All @@ -440,8 +446,8 @@ impl EventStreamExt for FluvioDriverImpl {
);
delete_deduplication_record(ctx, event).await?;

let outcome = EventStatus::errored(e.to_string(), 1);
let event = event.with_outcome(outcome.clone());
let status = EventStatus::errored(e.to_string(), 1);
let event = event.with_status(status.clone());

tracing::debug!(
"Event with id {} is in DLQ, with number of retries {}",
Expand All @@ -453,15 +459,14 @@ impl EventStreamExt for FluvioDriverImpl {

tracing::debug!("Event with id {} is published to DLQ", event.entity_id);

update_event_outcome(ctx, &event, outcome).await?;
update_event_status(ctx, &event, status).await?;

tracing::debug!("Event with id {} is updated to DLQ", event.entity_id);

return Ok(());
}

update_event_outcome(ctx, event, EventStatus::succeded(event.retries()))
.await?;
update_event_status(ctx, event, EventStatus::succeded(event.retries())).await?;
}
EventStreamTopic::Dlq => {
tracing::info!("Event with id {} is in DLQ", event.entity_id);
Expand All @@ -482,7 +487,7 @@ impl EventStreamExt for FluvioDriverImpl {
event.retries()
);

let event = event.with_outcome(outcome.clone());
let event = event.with_status(outcome.clone());

self.publish(event.clone(), EventStreamTopic::Dlq).await?;

Expand All @@ -491,14 +496,14 @@ impl EventStreamExt for FluvioDriverImpl {
event.entity_id
);

update_event_outcome(ctx, &event, outcome).await?;
update_event_status(ctx, &event, outcome).await?;

tracing::debug!("Event with id {} is updated to DLQ", event.entity_id);

return Ok(());
}

update_event_outcome(ctx, event, EventStatus::succeded(event.retries()))
update_event_status(ctx, event, EventStatus::succeded(event.retries()))
.await?;
} else {
tracing::info!("Giving up on event with id {}", event.entity_id);
Expand All @@ -507,14 +512,12 @@ impl EventStreamExt for FluvioDriverImpl {
let error = event.error().unwrap_or_default()
+ ".\n Exhausted retries, cannot process event";

update_event_outcome(
update_event_status(
ctx,
event,
EventStatus::errored(error, event.retries()),
)
.await?;

// TODO: create an alert on grafana
}
}
}
Expand All @@ -535,6 +538,18 @@ impl EventStreamExt for FluvioDriverImpl {
}
}

async fn unset_claimed_by(ctx: &AppState, claimer: u32) -> Result<Unit, IntegrationOSError> {
ctx.app_stores
.events
.update_many(
doc! { "claimedBy": claimer },
doc! { "$unset": { "claimedBy": "" } },
)
.await?;

Ok(())
}

async fn delete_deduplication_record(
ctx: &AppState,
event: &EventEntity,
Expand All @@ -550,7 +565,7 @@ async fn delete_deduplication_record(
Ok(())
}

async fn update_event_outcome(
async fn update_event_status(
ctx: &AppState,
event: &EventEntity,
outcome: EventStatus,
Expand Down
83 changes: 58 additions & 25 deletions integrationos-emit/src/stream/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl EventPusher {
let events_store = self.events.clone();
let deduplication_store = self.deduplication.clone();
let event_stream = Arc::clone(&self.event_stream);
let claimer = config.partition()?;

let max_concurrent_tasks = self.max_concurrent_tasks;
let max_chunk_size = self.max_chunk_size;
Expand All @@ -77,13 +78,19 @@ impl EventPusher {
{
"createdAt": { "$lt": before.timestamp_millis() }
},
{
"claimedBy": { "$exists": false }
}
]},
{"$and": [
{
"outcome.type": "executed"
},
{
"createdAt": { "$lt": before.timestamp_millis() }
},
{
"claimedBy": { "$exists": false }
}
]},
{"$and": [
Expand All @@ -92,6 +99,9 @@ impl EventPusher {
},
{
"createdAt": { "$lt": before.timestamp_millis() }
},
{
"claimedBy": { "$exists": false }
}
]}
]
Expand All @@ -102,25 +112,33 @@ impl EventPusher {
if let Ok(events) = events {
let event_stream = Arc::clone(&event_stream);
let deduplication_store = deduplication_store.clone();

let result =
events
.try_chunks(max_chunk_size)
.map(|result| {
let event_stream = Arc::clone(&event_stream);
let deduplication_store = deduplication_store.clone();

let result =
result.map_err(|e| InternalError::io_err(&e.to_string(), None));
async move {
process_chunk(result, &event_stream, &deduplication_store).await
}
})
.buffer_unordered(max_concurrent_tasks)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<Unit>, IntegrationOSError>>();
let events_store = events_store.clone();

let result = events
.try_chunks(max_chunk_size)
.map(|result| {
let event_stream = Arc::clone(&event_stream);
let deduplication_store = deduplication_store.clone();
let events_store = events_store.clone();

let result =
result.map_err(|e| InternalError::io_err(&e.to_string(), None));
async move {
process_chunk(
result,
&event_stream,
&deduplication_store,
&events_store,
claimer,
)
.await
}
})
.buffer_unordered(max_concurrent_tasks)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<Unit>, IntegrationOSError>>();

if let Err(e) = result {
tracing::error!("Failed to publish one or more event chunks: {e}");
Expand All @@ -138,17 +156,32 @@ async fn process_chunk(
result: Result<Vec<EventEntity>, IntegrationOSError>,
event_stream: &Arc<dyn EventStreamExt + Sync + Send>,
deduplication_store: &MongoStore<Deduplication>,
events_store: &MongoStore<EventEntity>,
claimer: u32,
) -> Result<Unit, IntegrationOSError> {
match result {
Ok(chunk) => {
tracing::info!("Publishing {} event(s)", chunk.len());
for event in chunk {
// Double check mechanism to prevent duplicated events
if events_store
.get_one_by_id(&event.entity_id.to_string())
.await?
.map(|e| e.claimed_by.is_some())
.unwrap_or(false)
{
tracing::warn!("Event with id {} is already published", event.entity_id);
continue;
}

events_store
.update_one(
&event.entity_id.to_string(),
doc! { "$set": { "claimedBy": claimer } },
)
.await?;

let entity_id = event.entity_id;
let topic = if event.is_created() {
EventStreamTopic::Target
} else {
EventStreamTopic::Dlq
};

let deleted = deduplication_store
.collection
Expand All @@ -161,7 +194,7 @@ async fn process_chunk(
);

event_stream
.publish(event, topic)
.publish(event, EventStreamTopic::Dlq)
.await
.inspect(|_| {
tracing::info!("Event with id {} is published", entity_id);
Expand Down

0 comments on commit f3e7353

Please sign in to comment.