diff --git a/src/db_operations/transactions/commit.rs b/src/db_operations/transactions/commit.rs index 19ab74e..0c64253 100644 --- a/src/db_operations/transactions/commit.rs +++ b/src/db_operations/transactions/commit.rs @@ -37,7 +37,7 @@ pub async fn commit( match event { TransactionalOperationStep::CleanTable { table_name } => { let db_table = tables.get(table_name.as_str()).unwrap(); - crate::db_operations::write::clean_table::execute( + crate::db_operations::write::clean_table( app, &db_table, event_src.clone(), diff --git a/src/db_operations/write/clean_partition_and_bulk_insert.rs b/src/db_operations/write/clean_partition_and_bulk_insert.rs index 8c91bf2..40c4cc1 100644 --- a/src/db_operations/write/clean_partition_and_bulk_insert.rs +++ b/src/db_operations/write/clean_partition_and_bulk_insert.rs @@ -12,7 +12,7 @@ use crate::{ pub async fn clean_partition_and_bulk_insert( app: &AppContext, - db_table: Arc, + db_table: &Arc, partition_to_clean: String, entities: Vec<(String, Vec>)>, event_src: EventSource, diff --git a/src/db_operations/write/clean_table.rs b/src/db_operations/write/clean_table.rs index 9797bd2..ca83d3a 100644 --- a/src/db_operations/write/clean_table.rs +++ b/src/db_operations/write/clean_table.rs @@ -9,7 +9,7 @@ use crate::{ db_sync::{states::InitTableEventSyncData, EventSource, SyncEvent}, }; -pub async fn execute( +pub async fn clean_table( app: &AppContext, db_table: &Arc, event_src: EventSource, diff --git a/src/db_operations/write/mod.rs b/src/db_operations/write/mod.rs index 4162c52..6166cc3 100644 --- a/src/db_operations/write/mod.rs +++ b/src/db_operations/write/mod.rs @@ -3,7 +3,8 @@ pub mod bulk_insert_or_update; mod clean_partition_and_bulk_insert; pub use clean_partition_and_bulk_insert::*; -pub mod clean_table; +mod clean_table; +pub use clean_table::*; pub mod clean_table_and_bulk_insert; mod delete_partitions; pub mod delete_row; diff --git a/src/http_server/controllers/backup/restore_from_backup_action.rs b/src/http_server/controllers/backup/restore_from_backup_action.rs index e6f9d28..35b0523 100644 --- a/src/http_server/controllers/backup/restore_from_backup_action.rs +++ b/src/http_server/controllers/backup/restore_from_backup_action.rs @@ -45,6 +45,7 @@ async fn handle_request( &action.app, backup_content, input_data.get_table_name(), + input_data.clean_table, ) .await } @@ -64,13 +65,17 @@ async fn handle_request( #[derive(MyHttpInput)] pub struct RestoreFromBackupInputData { - #[http_form_data(name = "fileName", description = "File in backup folder")] - pub file_name: String, #[http_form_data( name = "tableName", description = "Name of the table or '*' for all tables" )] pub table_name: String, + + #[http_form_data(name = "fileName", description = "File in backup folder")] + pub file_name: String, + + #[http_form_data(name = "cleanTable", description = "Clean table before restore")] + pub clean_table: bool, } impl RestoreFromBackupInputData { diff --git a/src/http_server/controllers/backup/restore_from_zip_action.rs b/src/http_server/controllers/backup/restore_from_zip_action.rs index f6cf789..c915443 100644 --- a/src/http_server/controllers/backup/restore_from_zip_action.rs +++ b/src/http_server/controllers/backup/restore_from_zip_action.rs @@ -37,6 +37,7 @@ async fn handle_request( &action.app, input_data.zip.content, table_name.as_deref(), + input_data.clean_table, ) .await; @@ -48,13 +49,17 @@ async fn handle_request( #[derive(MyHttpInput)] pub struct RestoreFromBackupZipFileInputData { - #[http_form_data(name = "fileName", description = "File in backup folder")] - pub zip: FileContent, #[http_form_data( name = "tableName", description = "Name of the table or '*' for all tables" )] pub table_name: String, + + #[http_form_data(name = "fileName", description = "File in backup folder")] + pub zip: FileContent, + + #[http_form_data(name = "cleanTable", description = "Clean table before restore")] + pub clean_table: bool, } impl RestoreFromBackupZipFileInputData { diff --git a/src/http_server/controllers/bulk/clean_and_bulk_insert_action.rs b/src/http_server/controllers/bulk/clean_and_bulk_insert_action.rs index 361755f..96e79d9 100644 --- a/src/http_server/controllers/bulk/clean_and_bulk_insert_action.rs +++ b/src/http_server/controllers/bulk/clean_and_bulk_insert_action.rs @@ -54,7 +54,7 @@ async fn handle_request( Some(partition_key) => { crate::db_operations::write::clean_partition_and_bulk_insert( action.app.as_ref(), - db_table, + &db_table, partition_key, rows_by_partition, event_src, diff --git a/src/http_server/controllers/tables_controller/clean_table_action.rs b/src/http_server/controllers/tables_controller/clean_table_action.rs index b86cee7..e336ecb 100644 --- a/src/http_server/controllers/tables_controller/clean_table_action.rs +++ b/src/http_server/controllers/tables_controller/clean_table_action.rs @@ -38,7 +38,7 @@ async fn handle_request( let event_src = EventSource::as_client_request(action.app.as_ref()); - crate::db_operations::write::clean_table::execute( + crate::db_operations::write::clean_table( action.app.as_ref(), &db_table, event_src, diff --git a/src/operations/backup/restore.rs b/src/operations/backup/restore.rs index 07014f6..fa351c3 100644 --- a/src/operations/backup/restore.rs +++ b/src/operations/backup/restore.rs @@ -31,6 +31,7 @@ pub async fn restore( app: &Arc, backup_content: Vec, table_name: Option<&str>, + clean_table: bool, ) -> Result<(), BackupError> { let mut zip_reader = ZipReader::new(backup_content); @@ -61,7 +62,7 @@ pub async fn restore( match table_name { Some(table_name) => match partitions.remove(table_name) { Some(files) => { - restore_to_db(&app, table_name, files, &mut zip_reader).await?; + restore_to_db(&app, table_name, files, &mut zip_reader, clean_table).await?; } None => { return Err(BackupError::TableNotFoundInBackupFile); @@ -69,7 +70,14 @@ pub async fn restore( }, None => { for (table_name, files) in partitions { - restore_to_db(&app, table_name.as_str(), files, &mut zip_reader).await?; + restore_to_db( + &app, + table_name.as_str(), + files, + &mut zip_reader, + clean_table, + ) + .await?; } } } @@ -81,6 +89,7 @@ async fn restore_to_db( table_name: &str, mut files: Vec, zip: &mut ZipReader, + clean_table: bool, ) -> Result<(), BackupError> { let persist_moment = DateTimeAsMicroseconds::now().add(Duration::from_secs(5)); let db_table = if files.get(0).unwrap().file_type.is_metadata() { @@ -116,6 +125,17 @@ async fn restore_to_db( db_table.unwrap() }; + if clean_table { + crate::db_operations::write::clean_table( + &app, + &db_table, + EventSource::Backup, + persist_moment, + ) + .await + .unwrap(); + } + for partition_file in files { let partition_key = partition_file.file_type.unwrap_as_partition_key(); @@ -133,7 +153,7 @@ async fn restore_to_db( crate::db_operations::write::clean_partition_and_bulk_insert( app, - db_table.clone(), + &db_table, partition_key.to_string(), vec![(partition_key, db_rows)], EventSource::Backup,