Skip to content

Commit

Permalink
abort transaction properly
Browse files Browse the repository at this point in the history
  • Loading branch information
umuro committed Jun 30, 2023
1 parent 0886f3b commit 90f0cfb
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions src/db/server/remote_server/ondo_remote_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ impl ondo_remote_server::OndoRemote for MyServer {
let db = &*db_guard;
let transaction = db.transaction();

let mut network_error_occurred = false;

while let Some(request) = stream.next().await {
match request {
Ok(transaction_request) => {
match transaction_request.request_type {
Some(transaction_request::RequestType::TableValueOps(
table_value_ops,
)) => {
let transaction_or_db = TransactionOrDb::Transaction(&transaction, db);
let transaction_or_db =
TransactionOrDb::Transaction(&transaction, db);
let response_type = my_server_clone
.table_value_ops_sub_server(transaction_or_db)
.table_value_ops_sub_server(transaction_or_db)
.process_request(
answer_tx.clone(),
table_value_ops.request_type.unwrap(),
Expand All @@ -59,7 +62,8 @@ impl ondo_remote_server::OndoRemote for MyServer {
Some(transaction_request::RequestType::IndexedValueOps(
indexed_value_ops,
)) => {
let transaction_or_db = TransactionOrDb::Transaction(&transaction, db);
let transaction_or_db =
TransactionOrDb::Transaction(&transaction, db);
let response_type = my_server_clone
.indexed_value_ops_sub_server(transaction_or_db)
.process_request(
Expand All @@ -71,7 +75,8 @@ impl ondo_remote_server::OndoRemote for MyServer {
Some(transaction_request::RequestType::KeyPrefixOps(
key_prefix_ops,
)) => {
let transaction_or_db = TransactionOrDb::Transaction(&transaction, db);
let transaction_or_db =
TransactionOrDb::Transaction(&transaction, db);
let response_type = my_server_clone
.tabled_value_ops_sub_server(transaction_or_db)
.process_request(
Expand All @@ -88,19 +93,23 @@ impl ondo_remote_server::OndoRemote for MyServer {
Err(err) => {
// Handle stream error
eprintln!("Error receiving request: {:?}", err);
network_error_occurred = true;
break;
}
}
}
let db_result = transaction.commit();
match db_result {
Ok(_) => {
// Do nothing
}
Err(err) => {
let db_err = crate::db::db_error::DbError::RocksDbError(err);
let status: tonic::Status = crate::db::server::db_error_to_status::db_error_to_status(db_err);
super::send_response::send_status_response(answer_tx.clone(), status).await;
if !network_error_occurred {
let db_result = transaction.commit();
match db_result {
Ok(_) => {
// Do nothing
}
Err(err) => {
let db_err = crate::db::db_error::DbError::RocksDbError(err);
let status: tonic::Status =
crate::db::server::db_error_to_status::db_error_to_status(db_err);
super::send_response::send_status_response(answer_tx.clone(), status).await;
}
}
}
});
Expand Down

0 comments on commit 90f0cfb

Please sign in to comment.