Skip to content

Commit

Permalink
fix: fixing start date to avoid event processing repetition (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Oct 31, 2024
1 parent 4144457 commit dc16d32
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 70 deletions.
19 changes: 16 additions & 3 deletions integrationos-archiver/src/domain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ pub struct ArchiverConfig {
pub db_config: DatabaseConfig,
#[envconfig(from = "EVENT_COLLECTION_NAME", default = "external-events")]
pub event_collection_name: String,
#[envconfig(from = "GS_STORAGE_BUCKET", default = "integrationos-zsk")]
#[envconfig(
from = "GS_STORAGE_BUCKET",
default = "integrationos-event-archives-local"
)]
pub gs_storage_bucket: String,
#[envconfig(from = "GS_STORAGE_URI", default = "gs://integrationos-zsk")]
#[envconfig(
from = "GS_STORAGE_URI",
default = "gs://integrationos-event-archives-local"
)]
pub gs_storage_uri: String,
#[envconfig(from = "STORAGE_PROVIDER", default = "google-cloud")]
pub storage_provider: StorageProvider,
Expand All @@ -34,9 +40,11 @@ pub struct ArchiverConfig {
pub min_date_days: i64,
#[envconfig(from = "CHUNK_SIZE_MINUTES", default = "5")]
pub chunk_size_minutes: i64,
#[envconfig(from = "CHUNK_TO_PROCESS_IN_DAYS", default = "1")]
pub chunk_to_process_in_days: i64,
#[envconfig(from = "CONCURRENT_CHUNKS", default = "10")]
pub concurrent_chunks: usize,
#[envconfig(from = "SLEEP_AFTER_FINISH_DUMP_SECS", default = "60")]
#[envconfig(from = "SLEEP_AFTER_FINISH_DUMP_SECS", default = "10")]
pub sleep_after_finish: u64,
#[envconfig(from = "MODE", default = "dump")]
pub mode: Mode,
Expand All @@ -60,6 +68,11 @@ impl Display for ArchiverConfig {
writeln!(f, "READ_BUFFER_SIZE_BYTES: {}", self.read_buffer_size)?;
writeln!(f, "MIN_DATE_DAYS: {}", self.min_date_days)?;
writeln!(f, "CHUNK_SIZE_MINUTES: {}", self.chunk_size_minutes)?;
writeln!(
f,
"CHUNK_TO_PROCESS_IN_DAYS: {}",
self.chunk_to_process_in_days
)?;
writeln!(f, "EVENT_COLLECTION_NAME: {}", self.event_collection_name)?;
writeln!(
f,
Expand Down
34 changes: 34 additions & 0 deletions integrationos-archiver/src/event/chosen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use super::EventMetadata;
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DateChosen {
#[serde(rename = "_id")]
id: Id,
reference: Id,
starts_from: i64,
ends_at: i64,
}

impl DateChosen {
pub fn new(reference: Id, starts_from: i64, ends_at: i64) -> Self {
Self {
id: Id::now(IdPrefix::Archive),
reference,
starts_from,
ends_at,
}
}

pub fn event_date(&self) -> i64 {
self.ends_at
}
}

impl EventMetadata for DateChosen {
fn reference(&self) -> Id {
self.reference
}
}
24 changes: 24 additions & 0 deletions integrationos-archiver/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod chosen;
pub mod completed;
pub mod dumped;
pub mod failed;
pub mod finished;
pub mod started;
pub mod uploaded;

use chosen::DateChosen;
use completed::Completed;
use dumped::Dumped;
use failed::Failed;
Expand All @@ -23,6 +25,8 @@ pub trait EventMetadata {
pub enum Event {
/// Archive process started event. Emitted when the archive process is started.
Started(Started),
/// Archive process has chosen the date to dump. Emitted when the archive process has chosen the date to dump.
DateChosen(DateChosen),
/// Archive process dumped event. Emitted when mongodump finishes dumping the database.
Dumped(Dumped),
/// Archive process failed event. Emitted when the archive process fails in some way.
Expand All @@ -34,3 +38,23 @@ pub enum Event {
/// Archive process finished event. Emitted when the archive process is finished.
Finished(Finished),
}

impl Event {
pub fn is_finished(&self) -> bool {
matches!(self, Event::Finished(_))
}
}

impl EventMetadata for Event {
fn reference(&self) -> Id {
match self {
Event::Started(event) => event.reference(),
Event::DateChosen(event) => event.reference(),
Event::Dumped(event) => event.reference(),
Event::Failed(event) => event.reference(),
Event::Uploaded(event) => event.reference(),
Event::Completed(event) => event.reference(),
Event::Finished(event) => event.reference(),
}
}
}
15 changes: 6 additions & 9 deletions integrationos-archiver/src/event/started.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
use super::EventMetadata;
use anyhow::Result;
use chrono::{DateTime, Utc};
use integrationos_domain::{prefix::IdPrefix, Id, Store};
use integrationos_domain::{prefix::IdPrefix, Id};
use serde::{Deserialize, Serialize};
use std::str::FromStr;

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Started {
#[serde(rename = "_id")]
id: Id,
started_at: DateTime<Utc>,
collection: Store,
collection: String,
}

impl Started {
pub fn new(collection: String) -> Result<Self> {
let store = Store::from_str(&collection).map_err(|e| anyhow::anyhow!(e))?;
Ok(Self {
pub fn new(store: String) -> Self {
Self {
id: Id::now(IdPrefix::Archive),
started_at: Utc::now(),
collection: store,
})
}
}

pub fn started_at(&self) -> DateTime<Utc> {
self.started_at
}

pub fn collection(&self) -> &Store {
pub fn collection(&self) -> &str {
&self.collection
}
}
Expand Down
Loading

0 comments on commit dc16d32

Please sign in to comment.