Skip to content

Commit

Permalink
Way Optimized loading
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jun 20, 2022
1 parent 083ecaa commit e21c98e
Show file tree
Hide file tree
Showing 29 changed files with 531 additions and 444 deletions.
24 changes: 12 additions & 12 deletions JavaScript/HtmlMain.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion JavaScript/HtmlMain.js.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 15 additions & 12 deletions TypeScript/HtmlMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ class HtmlMain {

public static generateInit(model: INonInitializedModel): string {

var result = '<h1>Remains tables to load: ' + model.tablesRemains + '</h1>' +
'<h2>Table which files are being loaded: ' + model.tableBeingLoadedFiles + '</h2>' +
var result = '<h1>Total Tables amount: ' + model.tablesTotal + ' / ' + model.tablesLoaded + '</h1>' +
'<h2>Files to load: ' + model.filesTotal + ' / ' + model.filesLoaded + '</h2>' +
'<h2>Total loading time is: ' + this.formatSeconds(model.initializingSeconds) + '</h2>' +
'<table class="table table-striped table-bordered"><tr><th>TableName</th><th>Partitions loaded</th><th>Partitions total</th><th>Time gone</th><th>Time estimation</th></tr>'
'<h3>Est: ' + this.getInitRemains(model) + '</h3>';

for (let itm of model.progress.sort((a, b) => a.tableName > b.tableName ? 1 : -1)) {
result += '<tr><td style="width:50%">' + itm.tableName + '</td><td>' + itm.loaded + '</td><td>' + itm.toLoad + "</td><td>" + this.formatSeconds(itm.secondsGone) + "</td><td>" + this.getInitRemains(itm) + "</td></tr>"
}
return result;
}

return result + "</table>";

}

static getInitRemains(progress: ITableLoadProgress): String {
static getInitRemains(model: INonInitializedModel): String {

if (progress.toLoad == 0 || progress.loaded == 0) {
if (model.filesLoaded == 0 || model.filesTotal == 0) {
return "Unknown"
}

if (model.filesLoaded > model.filesTotal) {
return "Unknown"
}

let toLoad = model.filesTotal - model.filesLoaded;

let pieceDuration = progress.secondsGone / progress.loaded;
let pieceDuration = model.initializingSeconds / model.filesLoaded;

let remains = (progress.toLoad - progress.loaded) * pieceDuration;
let remains = toLoad * pieceDuration;

remains = this.trunc(remains);

Expand All @@ -40,6 +42,7 @@ class HtmlMain {
}



public static formatSecMin(value: number): String {
if (value < 10) {
return "0" + value.toFixed(0);
Expand Down
13 changes: 4 additions & 9 deletions TypeScript/contracts.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@ interface ILocationStatus {


interface INonInitializedModel {
tablesRemains: number,
tablesTotal: number,
tablesLoaded: number,
filesTotal: number,
filesLoaded: number,
initializingSeconds: number,
progress: ITableLoadProgress[]
tableBeingLoadedFiles: String
}


interface ITableLoadProgress {
tableName: String,
loaded: number,
toLoad: number,
secondsGone: number
}

interface IStatus {
notInitialized: INonInitializedModel,
Expand Down
11 changes: 11 additions & 0 deletions src/db/db_table/db_table_attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ pub struct DbTableAttributes {
max_partitions_amount: AtomicI32,
created: AtomicDateTimeAsMicroseconds,
}

impl DbTableAttributes {
pub fn create_default() -> Self {
Self {
created: AtomicDateTimeAsMicroseconds::now(),
persist: AtomicBool::new(true),
max_partitions_amount: AtomicI32::new(0),
}
}
}

#[derive(Debug, Clone)]
pub struct DbTableAttributesSnapshot {
pub persist: bool,
Expand Down
65 changes: 16 additions & 49 deletions src/http/controllers/status_controller/non_initialized/model.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,34 @@
use crate::{
app::AppContext, persist_operations::data_initializer::load_tasks::InitTableStateSnapshot,
};
use crate::app::AppContext;
use my_http_server_swagger::*;
use rust_extensions::date_time::DateTimeAsMicroseconds;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)]
pub struct NonInitializedModel {
#[serde(rename = "tablesRemains")]
tables_remains: usize,
#[serde(rename = "tablesTotal")]
tables_total: usize,
#[serde(rename = "tablesLoaded")]
tables_loaded: usize,
#[serde(rename = "filesTotal")]
files_total: usize,
#[serde(rename = "filesLoaded")]
files_loaded: usize,
#[serde(rename = "initializingSeconds")]
initializing_seconds: i64,
progress: Vec<TableProgressModel>,
#[serde(rename = "tableBeingLoadedFiles")]
table_being_loaded_files: Option<String>,
loading_time: i64,
}

impl NonInitializedModel {
pub async fn new(app: &AppContext) -> Self {
let snapshot = app.init_state.get_snapshot().await;

let mut progress = Vec::new();

let now = DateTimeAsMicroseconds::now();

for table_snapshot in snapshot.loading {
progress.push(TableProgressModel::new(table_snapshot));
}

return Self {
progress,
tables_remains: snapshot.to_load.len(),
initializing_seconds: now.seconds_before(app.created),
table_being_loaded_files: snapshot.table_being_loaded_files,
};
}
}

#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)]
pub struct TableProgressModel {
#[serde(rename = "tableName")]
table_name: String,
#[serde(rename = "toLoad")]
to_load: usize,
loaded: usize,
#[serde(rename = "secondsGone")]
seconds_gone: i64,
}

impl TableProgressModel {
pub fn new(table_snapshot: InitTableStateSnapshot) -> Self {
let seconds_gone = if let Some(init_started) = table_snapshot.init_started {
let now = DateTimeAsMicroseconds::now();
now.seconds_before(init_started)
} else {
0
};
let snapshot = app.init_state.get_snapshot().await;

Self {
table_name: table_snapshot.name,
loaded: table_snapshot.loaded,
to_load: table_snapshot.to_load,
seconds_gone,
tables_total: snapshot.tables_total,
files_total: snapshot.files_total,
files_loaded: snapshot.files_loaded,
tables_loaded: snapshot.tables_loaded,
loading_time: now.seconds_before(app.created),
}
}
}
2 changes: 1 addition & 1 deletion src/persist_io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod persist_io_operations;
mod table_file;
mod with_retries;
pub use persist_io_operations::PersistIoOperations;
pub use persist_io_operations::{PersistIoOperations, TableListOfFilesUploader};
pub use table_file::TableFile;
20 changes: 14 additions & 6 deletions src/persist_io/persist_io_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ use std::sync::Arc;

use my_azure_storage_sdk::AzureStorageConnection;

use crate::{
app::logs::Logs, persist_io::TableFile,
persist_operations::data_initializer::load_tasks::TableToLoad,
};
use crate::{app::logs::Logs, persist_io::TableFile};

pub struct PersistIoOperations {
logs: Arc<Logs>,
azure_connection: Arc<AzureStorageConnection>,
}

#[async_trait::async_trait]
pub trait TableListOfFilesUploader {
async fn add_files(&self, table_name: &str, files: Vec<String>);
async fn set_files_list_is_loaded(&self, table_name: &str);
}

impl PersistIoOperations {
pub fn new(azure_connection: Arc<AzureStorageConnection>, logs: Arc<Logs>) -> Self {
Self {
Expand All @@ -25,11 +28,16 @@ impl PersistIoOperations {
.await
}

pub async fn get_table_files(&self, table_to_load: &Arc<TableToLoad>) {
pub async fn get_table_files<TTableListOfFilesUploader: TableListOfFilesUploader>(
&self,
table_name: &str,
uploader: &TTableListOfFilesUploader,
) {
super::with_retries::get_list_of_files(
self.logs.as_ref(),
self.azure_connection.as_ref(),
table_to_load,
table_name,
uploader,
)
.await;
}
Expand Down
41 changes: 24 additions & 17 deletions src/persist_io/with_retries/get_list_of_files.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,67 @@
use std::sync::Arc;

use my_azure_storage_sdk::{
blob_container::BlobContainersApi, sdk_azure::blobs::AzureBlobsListReader,
AzureStorageConnection, AzureStorageConnectionData,
};

use crate::{app::logs::Logs, persist_operations::data_initializer::load_tasks::TableToLoad};
use crate::{app::logs::Logs, persist_io::persist_io_operations::TableListOfFilesUploader};

pub async fn get_list_of_files(
pub async fn get_list_of_files<TTableListOfFilesUploader: TableListOfFilesUploader>(
logs: &Logs,
azure_connection: &AzureStorageConnection,
table_to_load: &Arc<TableToLoad>,
table_name: &str,
uploader: &TTableListOfFilesUploader,
) {
match azure_connection {
AzureStorageConnection::AzureStorage(connection_data) => {
get_list_of_files_from_azure_blob_container(logs, connection_data, table_to_load).await;
get_list_of_files_from_azure_blob_container(
logs,
connection_data,
table_name,
uploader,
)
.await;
}
_ => {
let chunk = azure_connection
.get_list_of_blobs(table_to_load.table_name.as_str())
.get_list_of_blobs(table_name)
.await
.unwrap();

table_to_load.add_partitions_to_load(chunk).await;
table_to_load.set_files_list_is_loaded();
uploader.add_files(table_name, chunk).await;
uploader.set_files_list_is_loaded(table_name).await;
}
};
}

async fn get_list_of_files_from_azure_blob_container(
async fn get_list_of_files_from_azure_blob_container<
TTableListOfFilesUploader: TableListOfFilesUploader,
>(
logs: &Logs,
connection: &AzureStorageConnectionData,
table_to_load: &Arc<TableToLoad>,
table_name: &str,
uploader: &TTableListOfFilesUploader,
) {
let mut attempt_no: u8 = 0;
let mut blobs_list_reader =
AzureBlobsListReader::new(connection, table_to_load.table_name.as_str());
let mut blobs_list_reader = AzureBlobsListReader::new(connection, table_name);
loop {
let next_result = blobs_list_reader.get_next().await;
match next_result {
Ok(chunk) => {
if let Some(chunk) = chunk {
table_to_load.add_partitions_to_load(chunk).await;
uploader.add_files(table_name, chunk).await;
} else {
table_to_load.set_files_list_is_loaded();
uploader.set_files_list_is_loaded(table_name).await;
return;
}
}
Err(err) => {
super::attempt_handling::execute(
logs,
Some(table_to_load.table_name.to_string()),
Some(table_name.to_string()),
"get_list_of_files_from_azure_blob_container",
format!(
"Can not get list of files from azure blob container:[{}]. Err: {:?}",
table_to_load.table_name, err
table_name, err
),
attempt_no,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ use std::sync::Arc;
use crate::{
app::AppContext,
db::{DbTableAttributesSnapshot, DbTableData},
persist_operations::data_initializer::load_tasks::TableToLoad,
persist_operations::data_initializer::load_tasks::TableLoadingTask,
};
use rust_extensions::date_time::DateTimeAsMicroseconds;

use super::LoadedTableItem;

pub async fn load_table(app: &Arc<AppContext>, table_to_load: &Arc<TableToLoad>) {
pub async fn load_table(
app: &Arc<AppContext>,
table_loading_task: &Arc<TableLoadingTask>,
file_name: String,
) {
app.logs.add_info(
Some(table_to_load.table_name.to_string()),
crate::app::logs::SystemProcess::Init,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,3 @@ pub async fn load_table_file(
}
}
}

fn get_item(table_file: &TableFile, content: &[u8]) -> Result<LoadedTableItem, String> {
match table_file {
TableFile::TableAttributes => {
let table_metadata = TableMetadataFileContract::parse(content);
let result = LoadedTableItem::TableAttributes(table_metadata.into());
return Ok(result);
}
TableFile::DbPartition(partition_key) => {
let db_partition = serializers::db_partition::deserialize(content)?;

let result = LoadedTableItem::DbPartition {
partition_key: partition_key.to_string(),
db_partition,
};

return Ok(result);
}
}
}
Loading

0 comments on commit e21c98e

Please sign in to comment.