Skip to content

Commit

Permalink
batch effects
Browse files Browse the repository at this point in the history
  • Loading branch information
umuro committed Jun 30, 2023
1 parent 8c82bad commit 0886f3b
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 66 deletions.
99 changes: 33 additions & 66 deletions src/db/server/source_sink/effects_sink/apply_effects.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,26 @@
use crate::db::reference::effect::{AccessEffect, Effect, Effects, MetaEffect};
use crate::db::reference::effect::{Effect, Effects, MetaEffect};
use crate::db::server::db_error_to_status::DbErrorToStatus;
use crate::db::server::lockable_db::transaction_or_db::TransactionOrDb;
use crate::db::DbError;
use crate::ondo_remote::EmptyMessage;
use rocksdb::TransactionDB;
use tonic::{Response, Status};

mod apply_effects_batch;
mod apply_effects_batch_db;
mod apply_effects_batch_transaction_or_db;
mod make_access_effect_batch;
mod make_column_value_effect_batch;
mod make_database_server_stored_effect_batch;
mod make_domain_stored_effect_batch;
mod make_index_value_effect_batch;
mod make_table_stored_effect_batch;
mod make_table_value_effect_batch;
mod optimize_delete_cf_effects;
mod split_effects;

pub(crate) fn apply_meta_effect(
db: &mut TransactionDB,
meta_effect: &MetaEffect,
) -> Result<(), Status> {
let cf_opts = rocksdb::Options::default();
match meta_effect {
MetaEffect::CreateCf(cf_name) => {
db.create_cf(cf_name, &cf_opts)
.map_err(|err| DbError::RocksDbError(err))
.map_db_err_to_status()?;
}
MetaEffect::DeleteCf(cf_name) => {
db.drop_cf(cf_name)
.map_err(|err| DbError::RocksDbError(err))
.map_db_err_to_status()?;
}
}
Ok(())
}

pub(crate) fn apply_access_effect<'a>(
db: &TransactionOrDb<'a>,
access_effect: &AccessEffect,
) -> Result<(), Status> {
match access_effect {
AccessEffect::DatabaseServerStoredEffect(effect) => {
super::super::database_server_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::DomainStoredEffect(effect) => {
super::super::domain_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::TableStoredEffect(effect) => {
super::super::table_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::TableValueEffect(effect) => {
super::super::table_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::IndexValueEffect(effect) => {
super::super::index_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::ColumnValueEffect(effect) => {
super::super::column_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
}
Ok(())
}

pub(crate) fn apply_effects<'a>(
db: &TransactionOrDb<'a>,
transaction_or_db: &TransactionOrDb<'a>,
effects: Effects,
) -> Result<Response<EmptyMessage>, Status> {
let (meta_effects, access_effects) = split_effects::split_effects(effects);
Expand All @@ -66,13 +29,8 @@ pub(crate) fn apply_effects<'a>(
return Err(Status::invalid_argument("Meta effects are not allowed"));
}

for effect in access_effects {
println!("Effect: {:?}", effect);
match effect {
Effect::Access(access) => apply_access_effect(db, &access)?,
_ => unreachable!(),
}
}
apply_effects_batch::apply_effects_batch(&transaction_or_db, &access_effects)
.map_db_err_to_status()?;

Ok(Response::new(EmptyMessage {}))
}
Expand All @@ -92,16 +50,25 @@ pub(crate) fn apply_all_effects(
}

let transaction_or_db = TransactionOrDb::Db(db);
// apply_effects_batch(&TransactionOrDb::Db(db), &access_effects)?;
apply_effects_batch::apply_effects_batch(&transaction_or_db, &access_effects)
.map_db_err_to_status()?;
Ok(Response::new(EmptyMessage {}))
}

for effect in access_effects {
println!("Effect: {:?}", effect);
match effect {
Effect::Access(access) => {
apply_access_effect(&transaction_or_db, &access)?;
}
_ => unreachable!(),
fn apply_meta_effect(db: &mut TransactionDB, meta_effect: &MetaEffect) -> Result<(), Status> {
let cf_opts = rocksdb::Options::default();
match meta_effect {
MetaEffect::CreateCf(cf_name) => {
db.create_cf(cf_name, &cf_opts)
.map_err(|err| DbError::RocksDbError(err))
.map_db_err_to_status()?;
}
MetaEffect::DeleteCf(cf_name) => {
db.drop_cf(cf_name)
.map_err(|err| DbError::RocksDbError(err))
.map_db_err_to_status()?;
}
}

Ok(Response::new(EmptyMessage {}))
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::db::{
db_error::DbError, reference::Effect, server::lockable_db::transaction_or_db::TransactionOrDb,
};

use super::{
apply_effects_batch_db::apply_effects_batch_db,
apply_effects_batch_transaction_or_db::apply_effects_batch_transaction_or_db,
};

pub(super) fn apply_effects_batch<'a>(
transaction_or_db: &TransactionOrDb<'a>,
effects: &[Effect],
) -> Result<(), DbError> {
match transaction_or_db {
TransactionOrDb::Transaction(_, _) => {
apply_effects_batch_transaction_or_db(transaction_or_db, effects)
}
TransactionOrDb::Db(db) => apply_effects_batch_db(db, effects),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::db::{db_error::DbError, reference::Effect};
use rocksdb::{TransactionDB, WriteBatchWithTransaction};

use super::make_access_effect_batch::make_access_effect_batch;

pub(super) fn apply_effects_batch_db(
db: &TransactionDB,
effects: &[Effect],
) -> Result<(), DbError> {
let mut batch = WriteBatchWithTransaction::default();

for effect in effects {
match effect {
Effect::Access(access) => {
make_access_effect_batch(db, access, &mut batch)?;
}
_ => unreachable!(),
}
}

db
.write(batch)
.map_err(|e| DbError::TantivyError(e.to_string()))?;

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use tonic::Status;

use crate::db::{
db_error::DbError, reference::{Effect, AccessEffect}, server::{lockable_db::transaction_or_db::TransactionOrDb, db_error_to_status::DbErrorToStatus},
};

pub(super) fn apply_effects_batch_transaction_or_db<'a>(
transaction_or_db: &TransactionOrDb<'a>,
effects: &[Effect],
) -> Result<(), DbError> {
for effect in effects {
println!("Effect: {:?}", effect);
match effect {
Effect::Access(access) => {
apply_access_effect(&transaction_or_db, &access)
.map_err(|e| DbError::TantivyError(e.to_string()))?;
}
_ => unreachable!(),
}
}

Ok(())
}

pub(crate) fn apply_access_effect<'a>(
db: &TransactionOrDb<'a>,
access_effect: &AccessEffect,
) -> Result<(), Status> {
match access_effect {
AccessEffect::DatabaseServerStoredEffect(effect) => {
super::super::super::database_server_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::DomainStoredEffect(effect) => {
super::super::super::domain_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::TableStoredEffect(effect) => {
super::super::super::table_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::TableValueEffect(effect) => {
super::super::super::table_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::IndexValueEffect(effect) => {
super::super::super::index_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
AccessEffect::ColumnValueEffect(effect) => {
super::super::super::column_value_sink::apply_effect(&db, effect).map_db_err_to_status()?;
}
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::{
make_column_value_effect_batch::make_column_value_effect_batch,
make_database_server_stored_effect_batch::make_database_server_stored_effect_batch,
make_domain_stored_effect_batch::make_domain_stored_effect_batch,
make_index_value_effect_batch::make_index_value_effect_batch,
make_table_stored_effect_batch::make_table_stored_effect_batch,
make_table_value_effect_batch::make_table_value_effect_batch,
};
use crate::db::{db_error::DbError, reference::AccessEffect};
use rocksdb::{TransactionDB, WriteBatchWithTransaction};

pub(super) fn make_access_effect_batch<'a>(
db: &TransactionDB,
access_effect: &AccessEffect,
batch: &mut WriteBatchWithTransaction<true>,
) -> Result<(), DbError> {
match access_effect {
AccessEffect::DatabaseServerStoredEffect(effect) => {
make_database_server_stored_effect_batch(db, effect, batch)?;
}
AccessEffect::DomainStoredEffect(effect) => {
make_domain_stored_effect_batch(db, effect, batch)?;
}
AccessEffect::TableStoredEffect(effect) => {
make_table_stored_effect_batch(db, effect, batch)?;
}
AccessEffect::TableValueEffect(effect) => {
make_table_value_effect_batch(db, effect, batch)?;
}
AccessEffect::IndexValueEffect(effect) => {
make_index_value_effect_batch(db, effect, batch)?;
}
AccessEffect::ColumnValueEffect(effect) => {
make_column_value_effect_batch(db, effect, batch)?;
}
}

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::db::{
db_error::DbError, entity::OndoKey, reference::ColumnValueEffect,
server::source_sink::ondo_serializer::OndoSerializer,
};
use rocksdb::WriteBatchWithTransaction;
use serde_json::Value;

pub(super) fn make_column_value_effect_batch(
db: &rocksdb::TransactionDB,
effect: &ColumnValueEffect,
batch: &mut WriteBatchWithTransaction<true>,
) -> Result<(), DbError> {
match effect {
ColumnValueEffect::Put(cf_name, key, value) => {
let ondo_key = OndoKey::ondo_serialize(key)?;
let ondo_value = Value::ondo_serialize(value)?;
let cf = db.cf_handle(cf_name).ok_or(DbError::CfNotFound)?;

batch.put_cf(&cf, ondo_key, ondo_value);
}
ColumnValueEffect::Delete(cf_name, key) => {
let ondo_key = OndoKey::ondo_serialize(key)?;
let cf = db.cf_handle(cf_name).ok_or(DbError::CfNotFound)?;

batch.delete_cf(&cf, ondo_key);
}
}

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::db::{
db_error::DbError,
entity::DatabaseServerStored,
reference::{DatabaseServerName, DatabaseServerStoredEffect},
server::source_sink::ondo_serializer::OndoSerializer,
};
use rocksdb::{TransactionDB, WriteBatchWithTransaction};

pub(super) fn make_database_server_stored_effect_batch(
db: &TransactionDB,
effect: &DatabaseServerStoredEffect,
batch: &mut WriteBatchWithTransaction<true>,
) -> Result<(), DbError> {
match effect {
DatabaseServerStoredEffect::Put(cf_name, key, database_server_stored) => {
let ondo_key = DatabaseServerName::ondo_serialize(key)?;
let ondo_value = DatabaseServerStored::ondo_serialize(database_server_stored)?;
let cf = db
.cf_handle(cf_name)
.ok_or(DbError::CfNotFound)?;

batch.put_cf(&cf, ondo_key, ondo_value);
}
DatabaseServerStoredEffect::Delete(cf_name, key) => {
let ondo_key = DatabaseServerName::ondo_serialize(key)?;
let cf = db
.cf_handle(cf_name)
.ok_or(DbError::CfNotFound)?;

batch.delete_cf(&cf, ondo_key);
}
}

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::db::{
db_error::DbError,
entity::DomainStored,
reference::{DomainName, DomainStoredEffect},
server::{

source_sink::ondo_serializer::OndoSerializer,
},
};
use rocksdb::{WriteBatchWithTransaction, TransactionDB};

pub(super) fn make_domain_stored_effect_batch<'a>(
db: &TransactionDB,
effect: &DomainStoredEffect,
batch: &mut WriteBatchWithTransaction<true>,
) -> Result<(), DbError> {
match effect {
DomainStoredEffect::Put(cf_name, key, domain_stored) => {
let ondo_key = DomainName::ondo_serialize(key)?;
let ondo_value = DomainStored::ondo_serialize(domain_stored)?;
let cf = db
.cf_handle(cf_name)
.ok_or(DbError::CfNotFound)?;

batch.put_cf(&cf, ondo_key, ondo_value);
}
DomainStoredEffect::Delete(cf_name, key) => {
let ondo_key = DomainName::ondo_serialize(key)?;
let cf = db
.cf_handle(cf_name)
.ok_or(DbError::CfNotFound)?;

batch.delete_cf(&cf, ondo_key);
}
}

Ok(())
}
Loading

0 comments on commit 0886f3b

Please sign in to comment.