Skip to content

Commit

Permalink
Move import attachment upload logic to seperate file
Browse files Browse the repository at this point in the history
  • Loading branch information
tuta-sudipg authored and ganthern committed Feb 12, 2025
1 parent c0e4894 commit 4abd506
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 144 deletions.
150 changes: 25 additions & 125 deletions packages/node-mimimi/src/importer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::importer::importable_mail::{ImportableMailWithPath, KeyedImportableMailAttachment};
use crate::importer::importable_mail::ImportableMailWithPath;
use crate::reduce_to_chunks::{KeyedImportMailData, MailUploadDataWithAttachment};
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
Expand All @@ -8,19 +8,19 @@ use std::fs;
use std::fs::DirEntry;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tutasdk::blobs::blob_facade::FileData;
use tutasdk::crypto::aes;
use tutasdk::crypto::aes::Iv;
use tutasdk::crypto::key::{GenericAesKey, VersionedAesKey};
use tutasdk::crypto::randomizer_facade::RandomizerFacade;
use tutasdk::entities::generated::sys::{BlobReferenceTokenWrapper, StringWrapper};
use tutasdk::entities::generated::sys::StringWrapper;

use crate::importer::attachment_importer::PerChunkAttachmentImporter;
use crate::importer::messages::{
ImportErrorKind, ImportOkKind, MailImportErrorMessage, PreparationError,
};
use crate::importer_api::TutaCredentials;
use tutasdk::entities::generated::tutanota::{
ImportAttachment, ImportMailGetIn, ImportMailPostIn, ImportMailPostOut, ImportMailState,
ImportMailGetIn, ImportMailPostIn, ImportMailPostOut, ImportMailState,
};
use tutasdk::entities::json_size_estimator::estimate_json_size;
use tutasdk::net::native_rest_client::NativeRestClient;
Expand All @@ -31,6 +31,7 @@ use tutasdk::{ApiCallError, CustomId, GeneratedId, IdTupleGenerated, LoggedInSdk

pub mod messages;

mod attachment_importer;
pub mod file_reader;
mod filename_producer;
pub mod importable_mail;
Expand Down Expand Up @@ -153,106 +154,6 @@ impl ImportEssential {
}
}

/// Upload all attachments for this chunk,
/// steps:
/// 1. flatten all attachment of all mail in this chunk
/// 2. upload it via `BlobFacade::encrypt_and_upload_multiple` and get back reference tokens for all blobs
/// 3. Assemble reference token to correct attachment
async fn upload_attachments_for_chunk(
&self,
importable_chunk: Vec<MailUploadDataWithAttachment>,
) -> Result<Vec<KeyedImportMailData>, MailImportErrorMessage> {
let session_keys_for_all_attachments = std::iter::repeat_with(|| {
GenericAesKey::Aes256(aes::Aes256Key::generate(&self.randomizer_facade))
})
.take(importable_chunk.iter().map(|u| u.attachments.len()).sum())
.collect::<Vec<_>>();

// aggregate attachment data from multiple mails to upload in fewer request to the BlobService
let flattened_attachments = Self::flatten_attachments_for_chunk(
session_keys_for_all_attachments.iter(),
&importable_chunk,
);

// upload all attachments in this chunk in one call to the blob_facade
// the blob_facade chunks them into efficient request to the BlobService
let reference_tokens_per_attachment_flattened = self
.logged_in_sdk
.blob_facade()
.encrypt_and_upload_multiple(
ArchiveDataType::Attachments,
&self.target_owner_group,
flattened_attachments.iter().map(|a| &a.file_data),
)
.await
.map_err(|e| MailImportErrorMessage::sdk("fail to upload multiple attachments", e))?;

let keyed_import_mail_data = self.assemble_import_mail_data_with_attachments(
importable_chunk,
session_keys_for_all_attachments,
reference_tokens_per_attachment_flattened,
);

Ok(keyed_import_mail_data)
}

fn flatten_attachments_for_chunk<'a>(
mut session_keys: impl Iterator<Item = &'a GenericAesKey>,
importable_chunk: &'a [MailUploadDataWithAttachment],
) -> Vec<KeyedImportableMailAttachment<'a>> {
importable_chunk
.iter()
.flat_map(|mail_upload_data_with_attachment| {
mail_upload_data_with_attachment
.attachments
.iter()
.map(|attachment| KeyedImportableMailAttachment {
file_data: FileData {
session_key: session_keys.next().expect("Not enough session keys"),
data: &attachment.content,
},
meta_data: &attachment.meta_data,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
}

fn assemble_import_mail_data_with_attachments(
&self,
importable_chunk: Vec<MailUploadDataWithAttachment>,
session_keys_for_all_attachments: Vec<GenericAesKey>,
mut reference_tokens_per_attachment_flattened: Vec<Vec<BlobReferenceTokenWrapper>>,
) -> Vec<KeyedImportMailData> {
importable_chunk
.into_iter()
.map(|mail_upload_data| {
let mut attachment_session_keys_iter = session_keys_for_all_attachments.iter();
let mut attachments_reference_tokens_iter =
reference_tokens_per_attachment_flattened
.drain(0..mail_upload_data.attachments.len());

let import_attachments = mail_upload_data
.attachments
.into_iter()
.map(|attachment| {
let session_key = attachment_session_keys_iter
.next()
.expect("More attachments than we have session keys for");
let reference_tokens = attachments_reference_tokens_iter
.next()
.expect("Not enough reference tokens");
attachment.make_import_attachment_data(self, session_key, reference_tokens)
})
.collect::<Vec<ImportAttachment>>();

let mut keyed_import_data = mail_upload_data.keyed_import_mail_data;
keyed_import_data.import_mail_data.importedAttachments = import_attachments;
keyed_import_data
})
.collect::<Vec<_>>()
}

fn make_serialized_chunk(
&self,
importable_chunk: Vec<KeyedImportMailData>,
Expand Down Expand Up @@ -608,33 +509,30 @@ impl Importer {
.len()
.try_into()
.expect("item count in single chunk will never exceed i64::max");

let eml_file_paths = chunked_import_data
.iter()
.map(|id| id.keyed_import_mail_data.eml_file_path.clone())
.collect();

let mut failed_count = 0;
let unit_import_data = import_essentials
.upload_attachments_for_chunk(chunked_import_data)
.await
.inspect_err(|_e| failed_count += import_count_in_this_chunk)?;
let importable_post_data = import_essentials
.make_serialized_chunk(unit_import_data)
.inspect_err(|_e| failed_count += import_count_in_this_chunk)?;

let import_data_with_attachment_token =
PerChunkAttachmentImporter::upload_attachments_for_chunk(
&self.essentials,
chunked_import_data,
)
.await?;
let post_data =
import_essentials.make_serialized_chunk(import_data_with_attachment_token)?;
import_essentials
.make_import_service_call(importable_post_data)
.await
.inspect_err(|_e| failed_count += import_count_in_this_chunk)?;
.make_import_service_call(post_data)
.await?;

self.essentials
.update_remote_state(move |state| {
state.failedMails += failed_count;
state.successfulMails += import_count_in_this_chunk;
true
})
.await?;

Ok(Some(eml_file_paths))
},
}
Expand Down Expand Up @@ -729,9 +627,10 @@ impl Importer {
Ok(())
}

/// called if any chunk fails to import for any reason. if it returns `Ok`, we should continue with the next
/// chunk, if it returns `Err`, the error should be propagated to the node process to maybe be displayed and
/// the import should stop for now.
/// Decide weather this error should abort the import process, or it should move to import next chunk,
/// - if it returns `Ok`, we should continue with the next chunk, if it returns `Err`, the error should
/// be propagated to the node process to maybe be displayed and the import should stop for now.
/// - if mails involved in this error should not be picked when user retry, move them to FAILED_MAILS_SUB_DIR
async fn handle_err_while_importing_chunk(
&self,
import_error: MailImportErrorMessage,
Expand All @@ -741,10 +640,11 @@ impl Importer {
// if the import is (temporary) disabled, we should give up and let user try again later
ImportErrorKind::ImportFeatureDisabled => Err(ImportErrorKind::ImportFeatureDisabled)?,

// these are error we can do nothing about
ImportErrorKind::EmptyBlobServerList | ImportErrorKind::SdkError => {
Err(ImportErrorKind::SdkError)?
},
// If server do not give any available blob storage client ( which we also use to make service call )
// nothing we can do about, should error out in client
ImportErrorKind::EmptyBlobServerList => Err(ImportErrorKind::SdkError)?,

ImportErrorKind::SdkError => Err(import_error)?,

// if something is too big, we just rename it as failed,
// and is fine to continue importing next one. user will get import incomplete notification at end
Expand Down
137 changes: 137 additions & 0 deletions packages/node-mimimi/src/importer/attachment_importer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::importer::importable_mail::KeyedImportableMailAttachment;
use crate::importer::messages::MailImportErrorMessage;
use crate::importer::ImportEssential;
use crate::reduce_to_chunks::{KeyedImportMailData, MailUploadDataWithAttachment};
use tutasdk::blobs::blob_facade::FileData;
use tutasdk::crypto::aes;
use tutasdk::crypto::key::GenericAesKey;
use tutasdk::crypto::randomizer_facade::RandomizerFacade;
use tutasdk::entities::generated::sys::BlobReferenceTokenWrapper;
use tutasdk::entities::generated::tutanota::ImportAttachment;
use tutasdk::tutanota_constants::ArchiveDataType;

/// Upload all attachments for this chunk,
/// steps:
/// 1. flatten all attachment of all mail in this chunk
/// 2. upload it via `BlobFacade::encrypt_and_upload_multiple` and get back reference tokens for all blobs
/// 3. Assemble reference token to correct attachment
#[must_use]
pub struct PerChunkAttachmentImporter;

impl PerChunkAttachmentImporter {
/// upload all attachments in each chunk item to blob storage and assign blob access token reference
pub async fn upload_attachments_for_chunk(
import_essential: &ImportEssential,
importable_chunk: Vec<MailUploadDataWithAttachment>,
) -> Result<Vec<KeyedImportMailData>, MailImportErrorMessage> {
// aggregate attachment data from multiple mails to upload in fewer request to the BlobService
let flattened_attachments = Self::flatten_attachments_for_chunk(
&import_essential.randomizer_facade,
&importable_chunk,
);

// upload all attachments in this chunk in one call to the blob_facade
// the blob_facade chunks them into efficient request to the BlobService
let reference_tokens_per_attachment_flattened = import_essential
.logged_in_sdk
.blob_facade()
.encrypt_and_upload_multiple(
ArchiveDataType::Attachments,
&import_essential.target_owner_group,
flattened_attachments.iter().map(|a| &a.file_data),
)
.await
.map_err(|e| MailImportErrorMessage::sdk("fail to upload multiple attachments", e))?;

let session_keys_for_all_attachments = flattened_attachments
.iter()
.map(|a| a.file_data.session_key.clone())
.collect();
let keyed_import_mail_data = Self::assemble_import_mail_data_with_attachments(
import_essential,
importable_chunk,
session_keys_for_all_attachments,
reference_tokens_per_attachment_flattened,
);

Ok(keyed_import_mail_data)
}
}

impl PerChunkAttachmentImporter {
/// Visit all attachment of all mail in this chunk,
/// map them to `KeyedImportableMailAttachment` and return flat list in as-is order
fn flatten_attachments_for_chunk<'a>(
randomizer_facade: &RandomizerFacade,
importable_chunk: &'a [MailUploadDataWithAttachment],
) -> Vec<KeyedImportableMailAttachment<'a>> {
importable_chunk
.iter()
.flat_map(|mail_upload_data_with_attachment| {
mail_upload_data_with_attachment
.attachments
.iter()
.map(|attachment| KeyedImportableMailAttachment {
file_data: FileData {
session_key: GenericAesKey::Aes256(aes::Aes256Key::generate(
randomizer_facade,
)),
data: &attachment.content,
},
meta_data: &attachment.meta_data,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
}

/// Given list of item in one chunk, assign each `MailData::importedAttachments` to the reference token.
///
/// Assumes that `reference_tokens_per_attachment` is in as-is flattened order as `importable_chunk`'s attachment
///
/// - `importable_chunk`:
/// Original list of upload data in this chunk
/// - `reference_tokens_per_attachment`:
/// Collection of reference tokens for flattened attachment in `importable_chunk`
/// note: one attachment might have been broken into multiple blobs ( see `blob_facade.rs` ) hence there might be
/// multiple token reference for single attachment
fn assemble_import_mail_data_with_attachments(
import_essential: &ImportEssential,
importable_chunk: Vec<MailUploadDataWithAttachment>,
session_keys_for_all_attachments: Vec<GenericAesKey>,
mut reference_tokens_per_attachment: Vec<Vec<BlobReferenceTokenWrapper>>,
) -> Vec<KeyedImportMailData> {
let mut attachment_session_keys_iter = session_keys_for_all_attachments.iter();

importable_chunk
.into_iter()
.map(|mail_upload_data| {
let mut attachments_reference_tokens_iter =
reference_tokens_per_attachment.drain(..mail_upload_data.attachments.len());

let import_attachments = mail_upload_data
.attachments
.into_iter()
.map(|attachment| {
let session_key = attachment_session_keys_iter
.next()
.expect("More attachments than we have session keys for");
let reference_tokens = attachments_reference_tokens_iter
.next()
.expect("Not enough reference tokens");

attachment.make_import_attachment_data(
import_essential,
session_key,
reference_tokens,
)
})
.collect::<Vec<ImportAttachment>>();

let mut keyed_import_mail_data = mail_upload_data.keyed_import_mail_data;
keyed_import_mail_data.import_mail_data.importedAttachments = import_attachments;
keyed_import_mail_data
})
.collect::<Vec<_>>()
}
}
Loading

0 comments on commit 4abd506

Please sign in to comment.