From 072a075885a06076e09cd6532c506c9803265943 Mon Sep 17 00:00:00 2001 From: amigin Date: Mon, 2 Dec 2024 15:51:48 +0200 Subject: [PATCH] Added Ability to restore Table or Tables from Backup --- .../write/clean_partition_and_bulk_insert.rs | 6 +- src/db_operations/write/mod.rs | 4 +- src/db_sync/sync_attributes.rs | 1 + .../restore_from_backup_action.rs | 7 +- .../bulk/clean_and_bulk_insert_action.rs | 2 +- src/operations/backup/mod.rs | 2 + src/operations/backup/restore.rs | 139 +++++++++++++++++- src/operations/backup/restore_file_name.rs | 96 ++++++++++++ src/zip/mod.rs | 2 + src/zip/zip_reader.rs | 36 +++++ 10 files changed, 281 insertions(+), 14 deletions(-) create mode 100644 src/operations/backup/restore_file_name.rs create mode 100644 src/zip/zip_reader.rs diff --git a/src/db_operations/write/clean_partition_and_bulk_insert.rs b/src/db_operations/write/clean_partition_and_bulk_insert.rs index bc143a0..8c91bf2 100644 --- a/src/db_operations/write/clean_partition_and_bulk_insert.rs +++ b/src/db_operations/write/clean_partition_and_bulk_insert.rs @@ -10,10 +10,10 @@ use crate::{ db_sync::{states::InitPartitionsSyncEventData, EventSource, SyncEvent}, }; -pub async fn execute( +pub async fn clean_partition_and_bulk_insert( app: &AppContext, db_table: Arc, - partition_key: String, + partition_to_clean: String, entities: Vec<(String, Vec>)>, event_src: EventSource, persist_moment: DateTimeAsMicroseconds, @@ -22,7 +22,7 @@ pub async fn execute( super::super::check_app_states(app)?; let mut table_data = db_table.data.write().await; - table_data.remove_partition(&partition_key, None); + table_data.remove_partition(&partition_to_clean, None); let mut partition_keys = Vec::new(); diff --git a/src/db_operations/write/mod.rs b/src/db_operations/write/mod.rs index 3496a79..4162c52 100644 --- a/src/db_operations/write/mod.rs +++ b/src/db_operations/write/mod.rs @@ -1,6 +1,8 @@ mod bulk_delete; pub mod bulk_insert_or_update; -pub mod clean_partition_and_bulk_insert; + +mod clean_partition_and_bulk_insert; +pub use clean_partition_and_bulk_insert::*; pub mod clean_table; pub mod clean_table_and_bulk_insert; mod delete_partitions; diff --git a/src/db_sync/sync_attributes.rs b/src/db_sync/sync_attributes.rs index c7337c0..66b0599 100644 --- a/src/db_sync/sync_attributes.rs +++ b/src/db_sync/sync_attributes.rs @@ -54,6 +54,7 @@ pub enum EventSource { ClientRequest(ClientRequestsSourceData), GarbageCollector, Subscriber, + Backup, } impl EventSource { diff --git a/src/http/controllers/backup_controller/restore_from_backup_action.rs b/src/http/controllers/backup_controller/restore_from_backup_action.rs index 97c7000..e6f9d28 100644 --- a/src/http/controllers/backup_controller/restore_from_backup_action.rs +++ b/src/http/controllers/backup_controller/restore_from_backup_action.rs @@ -56,9 +56,10 @@ async fn handle_request( } }; - HttpOutput::as_text(format!("{:?}", restore_result)) - .into_ok_result(true) - .into() + match restore_result { + Ok(_) => HttpOutput::Empty.into_ok_result(true).into(), + Err(err) => Err(HttpFailResult::as_fatal_error(format!("{:?}", err))), + } } #[derive(MyHttpInput)] diff --git a/src/http/controllers/bulk/clean_and_bulk_insert_action.rs b/src/http/controllers/bulk/clean_and_bulk_insert_action.rs index 3518b01..361755f 100644 --- a/src/http/controllers/bulk/clean_and_bulk_insert_action.rs +++ b/src/http/controllers/bulk/clean_and_bulk_insert_action.rs @@ -52,7 +52,7 @@ async fn handle_request( match input_data.partition_key { Some(partition_key) => { - crate::db_operations::write::clean_partition_and_bulk_insert::execute( + crate::db_operations::write::clean_partition_and_bulk_insert( action.app.as_ref(), db_table, partition_key, diff --git a/src/operations/backup/mod.rs b/src/operations/backup/mod.rs index 7a6203b..1b65f3d 100644 --- a/src/operations/backup/mod.rs +++ b/src/operations/backup/mod.rs @@ -5,3 +5,5 @@ mod utils; pub use gc_backups::*; mod restore; pub use restore::*; +mod restore_file_name; +pub use restore_file_name::*; diff --git a/src/operations/backup/restore.rs b/src/operations/backup/restore.rs index fcb09ee..8846a20 100644 --- a/src/operations/backup/restore.rs +++ b/src/operations/backup/restore.rs @@ -1,21 +1,148 @@ -use crate::app::AppContext; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use my_no_sql_sdk::core::db_json_entity::DbJsonEntity; +use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds; + +use crate::{ + app::AppContext, db_sync::EventSource, + scripts::serializers::table_attrs::TableMetadataFileContract, zip::ZipReader, +}; + +use super::RestoreFileName; #[derive(Debug)] pub enum BackupError { TableNotFoundInBackupFile, + #[allow(dead_code)] + InvalidFileName(String), + #[allow(dead_code)] + ZipArchiveError(String), + #[allow(dead_code)] + TableNotFoundToRestoreBackupAndNoMetadataFound(String), + #[allow(dead_code)] + InvalidFileContent { + file_name: String, + partition_key: String, + err: String, + }, } pub async fn restore( - app: &AppContext, + app: &Arc, backup_content: Vec, table_name: Option<&str>, ) -> Result<(), BackupError> { - let zip_cursor = std::io::Cursor::new(backup_content); + let mut zip_reader = ZipReader::new(backup_content); + + let mut partitions: BTreeMap> = BTreeMap::new(); + + for file_name in zip_reader.get_file_names() { + let file_name = + RestoreFileName::new(file_name).map_err(|err| BackupError::InvalidFileName(err))?; + + match partitions.get_mut(&file_name.table_name) { + Some(by_table) => { + if file_name.file_type.is_metadata() { + by_table.insert(0, file_name); + } else { + by_table.push(file_name) + } + } + None => { + partitions.insert(file_name.table_name.to_string(), vec![file_name]); + } + } + } + + if partitions.is_empty() { + return Err(BackupError::TableNotFoundInBackupFile); + } + + match table_name { + Some(table_name) => match partitions.remove(table_name) { + Some(files) => { + restore_to_db(&app, table_name, files, &mut zip_reader).await?; + } + None => { + return Err(BackupError::TableNotFoundInBackupFile); + } + }, + None => { + for (table_name, files) in partitions { + restore_to_db(&app, table_name.as_str(), files, &mut zip_reader).await?; + } + } + } + + Ok(()) +} + +async fn restore_to_db( + app: &Arc, + table_name: &str, + mut files: Vec, + zip: &mut ZipReader, +) -> Result<(), BackupError> { + let persist_moment = DateTimeAsMicroseconds::now().add(Duration::from_secs(5)); + let db_table = if files.get(0).unwrap().file_type.is_metadata() { + let metadata_file = files.remove(0); + + let content = zip + .get_content_as_vec(&metadata_file.file_name) + .map_err(|err| BackupError::ZipArchiveError(format!("{:?}", err)))?; + + let table = TableMetadataFileContract::parse(content.as_slice()); + + let db_table = crate::db_operations::write::table::create_if_not_exist( + app, + table_name, + table.persist, + table.max_partitions_amount, + table.max_rows_per_partition_amount, + EventSource::Backup, + persist_moment, + ) + .await + .unwrap(); + db_table + } else { + let db_table = app.db.get_table(table_name).await; + + if db_table.is_none() { + return Err(BackupError::TableNotFoundToRestoreBackupAndNoMetadataFound( + table_name.to_string(), + )); + } + + db_table.unwrap() + }; + + for partition_file in files { + let partition_key = partition_file.file_type.unwrap_as_partition_key(); + + let content = zip + .get_content_as_vec(&partition_file.file_name) + .map_err(|err| BackupError::ZipArchiveError(format!("{:?}", err)))?; - let zip = zip::ZipArchive::new(zip_cursor).unwrap(); + let db_rows = DbJsonEntity::restore_as_vec(content.as_slice()).map_err(|itm| { + BackupError::InvalidFileContent { + file_name: partition_file.file_name, + partition_key: partition_key.to_string(), + err: format!("{:?}", itm), + } + })?; - for itm in zip.file_names() { - println!("File: {}", itm); + crate::db_operations::write::clean_partition_and_bulk_insert( + app, + db_table.clone(), + partition_key.to_string(), + vec![(partition_key, db_rows)], + EventSource::Backup, + persist_moment, + DateTimeAsMicroseconds::now(), + ) + .await + .unwrap(); } Ok(()) diff --git a/src/operations/backup/restore_file_name.rs b/src/operations/backup/restore_file_name.rs new file mode 100644 index 0000000..86e0860 --- /dev/null +++ b/src/operations/backup/restore_file_name.rs @@ -0,0 +1,96 @@ +use my_no_sql_server_core::rust_extensions::base64::FromBase64; + +use crate::scripts::TABLE_METADATA_FILE_NAME; + +pub enum ArchiveFileType { + Metadata, + PartitionKey(String), +} + +impl ArchiveFileType { + pub fn is_metadata(&self) -> bool { + matches!(self, Self::Metadata) + } + + pub fn unwrap_as_partition_key(self) -> String { + match self { + Self::PartitionKey(key) => key, + _ => panic!("Can not unwrap partition key"), + } + } +} + +pub struct RestoreFileName { + pub table_name: String, + pub file_type: ArchiveFileType, + pub file_name: String, +} + +impl RestoreFileName { + pub fn new(file_name: &str) -> Result { + let table_separator = file_name.find(std::path::MAIN_SEPARATOR); + + if table_separator.is_none() { + return Err(format!("Invalid table file_name [{}]", file_name)); + } + + let table_separator = table_separator.unwrap(); + + let partition_key = &file_name[table_separator + 1..]; + + if partition_key == TABLE_METADATA_FILE_NAME { + return Ok(Self { + table_name: file_name[..table_separator].to_string(), + file_type: ArchiveFileType::Metadata, + file_name: file_name.to_string(), + }); + } + + let partition_key = partition_key.from_base64(); + + if partition_key.is_err() { + return Err(format!( + "Invalid file_name key [{}]. Can not extract partition key", + file_name + )); + } + + let partition_key = partition_key.unwrap(); + + let partition_key = match String::from_utf8(partition_key) { + Ok(result) => result, + Err(_) => { + return Err(format!( + "Invalid file_name key [{}]. Can not convert partition key to string", + file_name + )); + } + }; + + let result = Self { + table_name: file_name[..table_separator].to_string(), + file_type: ArchiveFileType::PartitionKey(partition_key), + file_name: file_name.to_string(), + }; + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_metadata_file() { + let result = super::RestoreFileName::new("key-value/.metadata").unwrap(); + assert_eq!(result.table_name, "key-value"); + assert!(result.file_type.is_metadata()); + } + + #[test] + fn test_partition_key() { + let result = super::RestoreFileName::new("key-value/Yw==").unwrap(); + assert_eq!(result.table_name, "key-value"); + assert!(result.file_type.unwrap_as_partition_key() == "c"); + } +} diff --git a/src/zip/mod.rs b/src/zip/mod.rs index 959315f..8beda9d 100644 --- a/src/zip/mod.rs +++ b/src/zip/mod.rs @@ -2,3 +2,5 @@ mod db_zip_builder; mod vec_writer; pub use db_zip_builder::*; pub use vec_writer::*; +mod zip_reader; +pub use zip_reader::*; diff --git a/src/zip/zip_reader.rs b/src/zip/zip_reader.rs new file mode 100644 index 0000000..513bef6 --- /dev/null +++ b/src/zip/zip_reader.rs @@ -0,0 +1,36 @@ +use std::io::Read; + +pub struct ZipReader { + zip: zip::ZipArchive>>, +} + +impl ZipReader { + pub fn new(zip_content: Vec) -> Self { + let zip_cursor = std::io::Cursor::new(zip_content); + let zip = zip::ZipArchive::new(zip_cursor).unwrap(); + Self { zip } + } + + pub fn get_file_names(&mut self) -> impl Iterator { + self.zip.file_names() + } + + pub fn get_content_as_vec(&mut self, file_name: &str) -> Result, std::io::Error> { + let mut file = self.zip.by_name(file_name)?; + let file_size = file.size() as usize; + let mut content: Vec = Vec::with_capacity(file_size); + + let mut pos = 0; + while pos < file_size { + let size = file.read(&mut content[pos..])?; + + if size == 0 { + break; + } + + pos += size; + } + + Ok(content) + } +}