From dc16d32dd5c63c646b172e2bcdd1feb5cd6ef090 Mon Sep 17 00:00:00 2001 From: Samuel <39674930+sagoez@users.noreply.github.com> Date: Thu, 31 Oct 2024 20:05:49 +0000 Subject: [PATCH] fix: fixing start date to avoid event processing repetition (#187) --- integrationos-archiver/src/domain/config.rs | 19 ++- integrationos-archiver/src/event/chosen.rs | 34 ++++ integrationos-archiver/src/event/mod.rs | 24 +++ integrationos-archiver/src/event/started.rs | 15 +- integrationos-archiver/src/main.rs | 177 +++++++++++++------- 5 files changed, 199 insertions(+), 70 deletions(-) create mode 100644 integrationos-archiver/src/event/chosen.rs diff --git a/integrationos-archiver/src/domain/config.rs b/integrationos-archiver/src/domain/config.rs index 69ccb11d..3a2ae9fb 100644 --- a/integrationos-archiver/src/domain/config.rs +++ b/integrationos-archiver/src/domain/config.rs @@ -18,9 +18,15 @@ pub struct ArchiverConfig { pub db_config: DatabaseConfig, #[envconfig(from = "EVENT_COLLECTION_NAME", default = "external-events")] pub event_collection_name: String, - #[envconfig(from = "GS_STORAGE_BUCKET", default = "integrationos-zsk")] + #[envconfig( + from = "GS_STORAGE_BUCKET", + default = "integrationos-event-archives-local" + )] pub gs_storage_bucket: String, - #[envconfig(from = "GS_STORAGE_URI", default = "gs://integrationos-zsk")] + #[envconfig( + from = "GS_STORAGE_URI", + default = "gs://integrationos-event-archives-local" + )] pub gs_storage_uri: String, #[envconfig(from = "STORAGE_PROVIDER", default = "google-cloud")] pub storage_provider: StorageProvider, @@ -34,9 +40,11 @@ pub struct ArchiverConfig { pub min_date_days: i64, #[envconfig(from = "CHUNK_SIZE_MINUTES", default = "5")] pub chunk_size_minutes: i64, + #[envconfig(from = "CHUNK_TO_PROCESS_IN_DAYS", default = "1")] + pub chunk_to_process_in_days: i64, #[envconfig(from = "CONCURRENT_CHUNKS", default = "10")] pub concurrent_chunks: usize, - #[envconfig(from = "SLEEP_AFTER_FINISH_DUMP_SECS", default = "60")] + #[envconfig(from = "SLEEP_AFTER_FINISH_DUMP_SECS", default = "10")] pub sleep_after_finish: u64, #[envconfig(from = "MODE", default = "dump")] pub mode: Mode, @@ -60,6 +68,11 @@ impl Display for ArchiverConfig { writeln!(f, "READ_BUFFER_SIZE_BYTES: {}", self.read_buffer_size)?; writeln!(f, "MIN_DATE_DAYS: {}", self.min_date_days)?; writeln!(f, "CHUNK_SIZE_MINUTES: {}", self.chunk_size_minutes)?; + writeln!( + f, + "CHUNK_TO_PROCESS_IN_DAYS: {}", + self.chunk_to_process_in_days + )?; writeln!(f, "EVENT_COLLECTION_NAME: {}", self.event_collection_name)?; writeln!( f, diff --git a/integrationos-archiver/src/event/chosen.rs b/integrationos-archiver/src/event/chosen.rs new file mode 100644 index 00000000..780f83db --- /dev/null +++ b/integrationos-archiver/src/event/chosen.rs @@ -0,0 +1,34 @@ +use super::EventMetadata; +use integrationos_domain::{prefix::IdPrefix, Id}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DateChosen { + #[serde(rename = "_id")] + id: Id, + reference: Id, + starts_from: i64, + ends_at: i64, +} + +impl DateChosen { + pub fn new(reference: Id, starts_from: i64, ends_at: i64) -> Self { + Self { + id: Id::now(IdPrefix::Archive), + reference, + starts_from, + ends_at, + } + } + + pub fn event_date(&self) -> i64 { + self.ends_at + } +} + +impl EventMetadata for DateChosen { + fn reference(&self) -> Id { + self.reference + } +} diff --git a/integrationos-archiver/src/event/mod.rs b/integrationos-archiver/src/event/mod.rs index cd18bf1d..76ae1f23 100644 --- a/integrationos-archiver/src/event/mod.rs +++ b/integrationos-archiver/src/event/mod.rs @@ -1,3 +1,4 @@ +pub mod chosen; pub mod completed; pub mod dumped; pub mod failed; @@ -5,6 +6,7 @@ pub mod finished; pub mod started; pub mod uploaded; +use chosen::DateChosen; use completed::Completed; use dumped::Dumped; use failed::Failed; @@ -23,6 +25,8 @@ pub trait EventMetadata { pub enum Event { /// Archive process started event. Emitted when the archive process is started. Started(Started), + /// Archive process has chosen the date to dump. Emitted when the archive process has chosen the date to dump. + DateChosen(DateChosen), /// Archive process dumped event. Emitted when mongodump finishes dumping the database. Dumped(Dumped), /// Archive process failed event. Emitted when the archive process fails in some way. @@ -34,3 +38,23 @@ pub enum Event { /// Archive process finished event. Emitted when the archive process is finished. Finished(Finished), } + +impl Event { + pub fn is_finished(&self) -> bool { + matches!(self, Event::Finished(_)) + } +} + +impl EventMetadata for Event { + fn reference(&self) -> Id { + match self { + Event::Started(event) => event.reference(), + Event::DateChosen(event) => event.reference(), + Event::Dumped(event) => event.reference(), + Event::Failed(event) => event.reference(), + Event::Uploaded(event) => event.reference(), + Event::Completed(event) => event.reference(), + Event::Finished(event) => event.reference(), + } + } +} diff --git a/integrationos-archiver/src/event/started.rs b/integrationos-archiver/src/event/started.rs index 850315aa..cdb4047b 100644 --- a/integrationos-archiver/src/event/started.rs +++ b/integrationos-archiver/src/event/started.rs @@ -1,9 +1,7 @@ use super::EventMetadata; -use anyhow::Result; use chrono::{DateTime, Utc}; -use integrationos_domain::{prefix::IdPrefix, Id, Store}; +use integrationos_domain::{prefix::IdPrefix, Id}; use serde::{Deserialize, Serialize}; -use std::str::FromStr; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -11,24 +9,23 @@ pub struct Started { #[serde(rename = "_id")] id: Id, started_at: DateTime, - collection: Store, + collection: String, } impl Started { - pub fn new(collection: String) -> Result { - let store = Store::from_str(&collection).map_err(|e| anyhow::anyhow!(e))?; - Ok(Self { + pub fn new(store: String) -> Self { + Self { id: Id::now(IdPrefix::Archive), started_at: Utc::now(), collection: store, - }) + } } pub fn started_at(&self) -> DateTime { self.started_at } - pub fn collection(&self) -> &Store { + pub fn collection(&self) -> &str { &self.collection } } diff --git a/integrationos-archiver/src/main.rs b/integrationos-archiver/src/main.rs index d20e1989..062ec7b3 100644 --- a/integrationos-archiver/src/main.rs +++ b/integrationos-archiver/src/main.rs @@ -10,6 +10,7 @@ use chrono::offset::LocalResult; use chrono::{DateTime, Duration as CDuration, TimeZone, Utc}; use dotenvy::dotenv; use envconfig::Envconfig; +use event::chosen::DateChosen; use event::completed::Completed; use event::dumped::Dumped; use event::failed::Failed; @@ -21,8 +22,10 @@ use futures::stream::{self, Stream}; use futures::{StreamExt, TryStreamExt}; use integrationos_domain::telemetry::{get_subscriber, init_subscriber}; use integrationos_domain::{MongoStore, Store, Unit}; +use mongodb::options::FindOneOptions; use mongodb::Client; use std::process::Command; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use storage::google_cloud::GoogleCloudStorage; @@ -47,14 +50,16 @@ async fn main() -> Result { let archives: Arc> = Arc::new(MongoStore::new(&database, &Store::Archives).await?); - let started = Started::new(config.event_collection_name.clone())?; + let store = Store::from_str(&config.event_collection_name).map_err(|e| anyhow::anyhow!(e))?; let target_store: Arc> = - Arc::new(MongoStore::new(&database, started.collection()).await?); - archives - .create_one(&Event::Started(started.clone())) - .await?; + Arc::new(MongoStore::new(&database, &store).await?); loop { + let started = Started::new(config.event_collection_name.clone()); + archives + .create_one(&Event::Started(started.clone())) + .await?; + let res = match config.mode { Mode::Dump => dump(&config, &archives, &started, &storage, &target_store, false).await, Mode::DumpDelete => { @@ -113,6 +118,7 @@ async fn dump( Some(document) => document .get_i64("createdAt") .map_err(|e| anyhow!("Failed to get createdAt from document: {e}"))?, + None => { tracing::info!( "No events found in collection {}", @@ -127,11 +133,65 @@ async fn dump( } }; - let start = match Utc.timestamp_millis_opt(start) { + let last_chosen_date_event = archives + .collection + .find_one( + doc! { + "type": "DateChosen" + }, + FindOneOptions::builder() + .sort(doc! { "endsAt": -1 }) + .build(), + ) + .await?; + + tracing::info!("Last chosen date event: {:?}", last_chosen_date_event); + + let started_at = match last_chosen_date_event { + Some(event) => match event { + Event::DateChosen(e) => { + let finished = archives + .collection + .find_one( + doc! { + "type": "Finished", + "reference": e.reference().to_string() + }, + None, + ) + .await? + .map(|e| e.is_finished()) + .unwrap_or(false); + + tracing::info!("Date chosen event is finished: {}", finished); + + if finished { + e.event_date() + } else { + 0 + } + } + _ => return Err(anyhow!("Invalid event type, DateChosen expected")), + }, + _ => 0, + }; + + let start = match Utc.timestamp_millis_opt(start.max(started_at)) { LocalResult::Single(date) => date, _ => return Err(anyhow!("Invalid timestamp")), }; - let end = Utc::now() - CDuration::days(config.min_date_days); // 30 days ago by default + + // End date should be a chunk of size config.chunk_to_process_in_days and not bigger than 30 days ago + let max_end = Utc::now() - CDuration::days(config.min_date_days); + let end = (start + CDuration::days(config.chunk_to_process_in_days)).min(max_end); + + archives + .create_one(&Event::DateChosen(DateChosen::new( + started.reference(), + start.timestamp_millis(), + end.timestamp_millis(), + ))) + .await?; if start.timestamp_millis() >= end.timestamp_millis() { // If the very first event is after the end time, exit @@ -139,56 +199,60 @@ async fn dump( return Ok(()); } - let chunks = start.divide_by_stream(CDuration::minutes(config.chunk_size_minutes), end); // Chunk size is 20 minutes by default + tracing::info!("Start date: {}, End date: {}", start, end); - let stream = chunks.map(|(start_time, end_time)| async move { - tracing::info!( - "Processing events between {} - ({}) and {} - ({})", - start_time, - start_time.timestamp_millis(), - end_time, - end_time.timestamp_millis() - ); - let saved = save( - config, - archives, - storage, - target_store, - started, - &start_time, - &end_time, - ) - .await; + let chunks = start.divide_by_stream(CDuration::minutes(config.chunk_size_minutes), end); // Chunk size is 20 minutes by default - match saved { - Ok(0) => { - tracing::warn!("No events found between {} and {}", start_time, end_time); - return Ok(()); - } - Ok(count) => { - tracing::info!("Archive saved successfully, saved {} events", count); - } - Err(e) => { - tracing::error!("Failed to save archive: {e}"); + let stream = chunks + .enumerate() + .map(|(index, (start_time, end_time))| async move { + tracing::info!( + "Processing events between {} - ({}) and {} - ({})", + start_time, + start_time.timestamp_millis(), + end_time, + end_time.timestamp_millis() + ); + let saved = save( + config, + archives, + storage, + target_store, + started, + (&start_time, &end_time), + index, + ) + .await; + + match saved { + Ok(0) => { + tracing::warn!("No events found between {} and {}", start_time, end_time); + return Ok(()); + } + Ok(count) => { + tracing::info!("Archive saved successfully, saved {} events", count); + } + Err(e) => { + tracing::error!("Failed to save archive: {e}"); - return Err(e); - } - }; - - if destructive { - tracing::warn!("Deleting old events as destructive mode is enabled"); - let filter = doc! { - "createdAt": { - "$gte": start_time.timestamp_millis(), - "$lt": end_time.timestamp_millis() + return Err(e); } }; - target_store.collection.delete_many(filter, None).await?; - tracing::warn!("Old events deleted successfully"); - } - Ok::<_, anyhow::Error>(()) - }); + if destructive { + tracing::warn!("Deleting old events as destructive mode is enabled"); + let filter = doc! { + "createdAt": { + "$gte": start_time.timestamp_millis(), + "$lt": end_time.timestamp_millis() + } + }; + + target_store.collection.delete_many(filter, None).await?; + tracing::warn!("Old events deleted successfully"); + } + Ok::<_, anyhow::Error>(()) + }); let errors = stream .buffer_unordered(config.concurrent_chunks) @@ -222,9 +286,10 @@ async fn save( storage: &Arc, target_store: &MongoStore, started_event: &Started, - start_time: &DateTime, - end_time: &DateTime, + times: (&DateTime, &DateTime), + part: usize, ) -> Result { + let (start_time, end_time) = times; let tmp_dir = TempDir::new()?; let filter = doc! { "createdAt": { @@ -293,11 +358,7 @@ async fn save( .join(&config.db_config.event_db_name) .join(&config.event_collection_name); - let suffix = format!( - "{}-{}", - start_time.timestamp_millis(), - end_time.timestamp_millis() - ); + let suffix = format!("{}-part-{}", start_time.timestamp_millis(), part); if let Err(e) = storage .upload_file(&base_path, &Extension::Bson, config, suffix.clone())