Skip to content

Commit

Permalink
Made Optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 19, 2024
1 parent bf189c3 commit 4820c33
Show file tree
Hide file tree
Showing 66 changed files with 831 additions and 822 deletions.
96 changes: 53 additions & 43 deletions src/background/gc_db_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,52 +41,62 @@ async fn gc_it(app: &AppContext) {
table_data.get_data_to_gc(now)
};

if let Some(data_to_gc) = data_to_gc.get_data_to_gc() {
let now = DateTimeAsMicroseconds::now();
let mut persist_moment = now.clone();
persist_moment.add_seconds(5);

if data_to_gc.partitions.len() > 0 {
if let Err(err) = crate::db_operations::write::delete_partitions(
app,
&table,
data_to_gc.partitions.iter().map(|x| x.0.as_str()),
EventSource::GarbageCollector,
persist_moment,
now,
)
.await
{
my_logger::LOGGER.write_error(
"GcPartitions",
format!("{:?}", err),
LogEventCtx::new().add("tableName", table.name.as_str()),
);
}
if !data_to_gc.has_data_to_gc() {
continue;
}

let now = DateTimeAsMicroseconds::now();
let mut persist_moment = now.clone();
persist_moment.add_seconds(5);

if data_to_gc.partitions.len() > 0 {
if let Err(err) = crate::db_operations::write::delete_partitions(
app,
&table,
data_to_gc.partitions.into_vec().into_iter(),
EventSource::GarbageCollector,
persist_moment,
now,
)
.await
{
my_logger::LOGGER.write_error(
"GcPartitions",
format!("{:?}", err),
LogEventCtx::new().add("tableName", table.name.as_str()),
);
}
}

if data_to_gc.db_rows.len() > 0 {
println!("GcRows: {}", data_to_gc.db_rows.len());
if let Err(err) = crate::db_operations::write::bulk_delete(
app,
&table,
data_to_gc.db_rows.into_vec().into_iter().map(|itm| {
let db_rows: Vec<_> = itm
.rows
.iter()
.map(|itm| itm.get_row_key().to_string())
.collect();

(itm.partition_key.to_string(), db_rows)
}),
EventSource::GarbageCollector,
persist_moment,
now,
)
.await
{
let mut ctx = HashMap::new();

ctx.insert("TableName".to_string(), table.name.to_string());

if data_to_gc.db_rows.len() > 0 {
println!("GcRows: {}", data_to_gc.db_rows.len());
if let Err(err) = crate::db_operations::write::bulk_delete(
app,
&table,
data_to_gc.db_rows,
EventSource::GarbageCollector,
persist_moment,
now,
my_logger::LOGGER.write_error(
"GcRows",
format!("{:?}", err),
LogEventCtx::new().add("tableName", table.name.as_str()),
)
.await
{
let mut ctx = HashMap::new();

ctx.insert("TableName".to_string(), table.name.to_string());

my_logger::LOGGER.write_error(
"GcRows",
format!("{:?}", err),
LogEventCtx::new().add("tableName", table.name.as_str()),
)
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/data_readers/http_connection/into_http_payload.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use my_json::json_writer::{JsonArrayWriter, JsonObjectWriter};

use crate::db_sync::{
states::{DeleteRowsEventSyncData, InitPartitionsSyncData, UpdateRowsSyncData},
states::{DeleteRowsEventSyncData, InitPartitionsSyncEventData, UpdateRowsSyncData},
SyncEvent,
};

Expand Down Expand Up @@ -45,7 +45,7 @@ fn write_init_table_result(table_name: &str, content: JsonArrayWriter) -> Vec<u8
result
}

fn write_init_partitions_result(sync_data: &InitPartitionsSyncData) -> Vec<u8> {
fn write_init_partitions_result(sync_data: &InitPartitionsSyncEventData) -> Vec<u8> {
let mut result = Vec::new();

let mut header_json = JsonObjectWriter::new();
Expand Down
16 changes: 8 additions & 8 deletions src/data_readers/tcp_connection/tcp_payload_to_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ pub async fn serialize(sync_event: &SyncEvent, compress: bool) -> Vec<MyNoSqlTcp
}
SyncEvent::InitPartitions(data) => {
let mut result = Vec::with_capacity(data.partitions_to_update.len());
for (partition_key, snapshot) in &data.partitions_to_update {
for partition in data.partitions_to_update.iter() {
let tcp_contract = MyNoSqlTcpContract::InitPartition {
partition_key: partition_key.to_string(),
partition_key: partition.partition_key.to_string(),
table_name: data.table_data.table_name.to_string(),
data: if let Some(db_partition_snapshot) = snapshot {
data: if let Some(db_partition_snapshot) = &partition.snapshot {
db_partition_snapshot
.db_rows_snapshot
.as_json_array()
Expand Down Expand Up @@ -77,7 +77,7 @@ pub async fn serialize(sync_event: &SyncEvent, compress: bool) -> Vec<MyNoSqlTcp
let mut result = Vec::new();

if let Some(deleted_partitions) = &data.deleted_partitions {
for (partition_key, _) in deleted_partitions {
for partition_key in deleted_partitions.iter() {
let contract = MyNoSqlTcpContract::InitPartition {
table_name: data.table_data.table_name.to_string(),
partition_key: partition_key.to_string(),
Expand All @@ -89,13 +89,13 @@ pub async fn serialize(sync_event: &SyncEvent, compress: bool) -> Vec<MyNoSqlTcp
}

if let Some(deleted_rows) = &data.deleted_rows {
for (partition_key, rows) in deleted_rows {
for deleted_row in deleted_rows.iter() {
let mut deleted_rows = Vec::new();

for row_key in rows.keys() {
for db_row in deleted_row.db_rows.iter() {
let contract = DeleteRowTcpContract {
partition_key: partition_key.to_string(),
row_key: row_key.to_string(),
partition_key: deleted_row.partition_key.to_string(),
row_key: db_row.get_row_key().to_string(),
};

deleted_rows.push(contract);
Expand Down
15 changes: 4 additions & 11 deletions src/db_operations/gc/keep_max_partitions_amount.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use my_no_sql_sdk::core::db::PartitionKey;
use my_no_sql_server_core::DbTableWrapper;
use rust_extensions::date_time::DateTimeAsMicroseconds;

Expand All @@ -15,25 +14,19 @@ pub async fn keep_max_partitions_amount(
) -> Result<(), DbOperationError> {
super::super::check_app_states(app)?;

let partitions_to_gc: Option<Vec<PartitionKey>> = {
let partitions_to_gc = {
let read_access = db_table.data.read().await;

let result = read_access
read_access
.partitions
.get_partitions_to_gc_by_max_amount(max_partitions_amount);

if let Some(result) = result {
Some(result.into_iter().map(|p| p.clone()).collect())
} else {
None
}
.get_partitions_to_gc_by_max_amount(max_partitions_amount)
};

if let Some(partitions_to_gc) = partitions_to_gc {
super::super::write::delete_partitions(
app,
db_table,
partitions_to_gc.iter().map(|itm| itm.as_str()),
partitions_to_gc.into_iter().map(|itm| itm.partition_key),
event_src,
persist_moment,
DateTimeAsMicroseconds::now(),
Expand Down
21 changes: 6 additions & 15 deletions src/db_operations/gc/keep_partition_max_records.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::BTreeMap;

use my_no_sql_sdk::core::db::PartitionKeyParameter;
use my_no_sql_server_core::DbTableWrapper;
use rust_extensions::date_time::DateTimeAsMicroseconds;

Expand All @@ -8,16 +7,16 @@ use crate::{app::AppContext, db_operations::DbOperationError, db_sync::EventSour
pub async fn execute(
app: &AppContext,
db_table: &DbTableWrapper,
partition_key: &String,
partition_key: impl PartitionKeyParameter,
max_rows_amount: usize,
event_source: EventSource,
persist_moment: DateTimeAsMicroseconds,
) -> Result<(), DbOperationError> {
super::super::check_app_states(app)?;

let rows_to_gc: Vec<String> = {
let rows_to_gc = {
let table_data = db_table.data.read().await;
let partition = table_data.get_partition(partition_key);
let partition = table_data.get_partition(partition_key.as_str());

if partition.is_none() {
return Ok(());
Expand All @@ -32,21 +31,13 @@ pub async fn execute(
return Ok(());
}

let db_rows = db_rows.unwrap();

db_rows
.into_iter()
.map(|r| r.get_row_key().to_string())
.collect()
db_rows.unwrap()
};

let mut row_to_delete = BTreeMap::new();
row_to_delete.insert(partition_key.to_string(), rows_to_gc);

super::super::write::bulk_delete(
app,
db_table,
row_to_delete,
[(partition_key, rows_to_gc)].into_iter(),
event_source,
persist_moment,
DateTimeAsMicroseconds::now(),
Expand Down
8 changes: 4 additions & 4 deletions src/db_operations/parse_json_entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc};
use std::sync::Arc;

use my_no_sql_sdk::core::{
db::DbRow,
Expand All @@ -7,11 +7,11 @@ use my_no_sql_sdk::core::{

use super::DbOperationError;

pub fn parse_as_btree_map(
pub fn parse_grouped_by_partition_key(
as_bytes: &[u8],
inject_time_stamp: &JsonTimeStamp,
) -> Result<BTreeMap<String, Vec<Arc<DbRow>>>, DbOperationError> {
match DbJsonEntity::parse_as_btreemap(as_bytes, inject_time_stamp) {
) -> Result<Vec<(String, Vec<Arc<DbRow>>)>, DbOperationError> {
match DbJsonEntity::parse_grouped_by_partition_key(as_bytes, inject_time_stamp) {
Ok(result) => Ok(result),
Err(err) => {
let result = DbOperationError::DbEntityParseFail(err);
Expand Down
27 changes: 17 additions & 10 deletions src/db_operations/read/get_highest_row_and_below.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use my_json::json_writer::JsonArrayWriter;
use my_no_sql_server_core::DbTableWrapper;
use rust_extensions::date_time::DateTimeAsMicroseconds;

use crate::{
app::AppContext,
Expand All @@ -16,6 +18,7 @@ pub async fn get_highest_row_and_below(
row_key: &String,
limit: Option<usize>,
update_statistics: UpdateStatistics,
now: DateTimeAsMicroseconds,
) -> Result<ReadOperationResult, DbOperationError> {
super::super::check_app_states(app)?;

Expand All @@ -29,17 +32,21 @@ pub async fn get_highest_row_and_below(

let db_partition = db_partition.unwrap();

let db_rows = db_partition.get_highest_row_and_below(row_key, limit);

return Ok(ReadOperationResult::compile_array_or_empty_from_partition(
app,
db_table_wrapper,
partition_key,
db_rows,
update_statistics,
)
.await);
let mut json_array_writer = JsonArrayWriter::new();
let mut count = 0;
for db_row in db_partition.get_highest_row_and_below(row_key) {
if let Some(limit) = limit {
if count >= limit {
break;
}
}
update_statistics.update(db_partition, Some(db_row), now);
json_array_writer.write(db_row.as_ref());

count += 1;
}

return Ok(ReadOperationResult::RowsArray(json_array_writer.build()));
/*
let mut json_array_writer = JsonArrayWriter::new();
Expand Down
Loading

0 comments on commit 4820c33

Please sign in to comment.