Skip to content

Commit

Permalink
copy_store_data_sync for NftTransferHistoryTable migration
Browse files Browse the repository at this point in the history
  • Loading branch information
laruh committed Sep 15, 2024
1 parent 9cebf62 commit df164d4
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 50 deletions.
34 changes: 13 additions & 21 deletions mm2src/coins/nft/storage/wasm/wasm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::nft::storage::{get_offset_limit, NftListStorageOps, NftTokenAddrId, N
use async_trait::async_trait;
use common::is_initial_upgrade;
use ethereum_types::Address;
use futures_util::StreamExt;
use mm2_db::indexed_db::{BeBigUint, DbTable, DbUpgrader, MultiIndex, OnUpgradeResult, TableSignature};
use mm2_db::indexed_db::{copy_store_data_sync, BeBigUint, DbTable, DbUpgrader, MultiIndex, OnUpgradeResult,
TableSignature};
use mm2_err_handle::map_to_mm::MapToMmResult;
use mm2_err_handle::prelude::MmResult;
use mm2_number::BigUint;
Expand Down Expand Up @@ -959,7 +959,7 @@ pub(crate) struct NftTransferHistoryTable {
impl NftTransferHistoryTable {
// old prim key index for DB_VERSION: u32 = 1
const CHAIN_TX_HASH_LOG_INDEX_INDEX: &'static str = "chain_tx_hash_log_index_index";
// this is new prim key multi index. DB_VERSION = 2
// this is prim key multi index for DB_VERSION = 2
const CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX: &'static str = "chain_tx_hash_log_index_token_idindex";

fn from_transfer_history(transfer: &NftTransferHistory) -> WasmNftCacheResult<NftTransferHistoryTable> {
Expand Down Expand Up @@ -1035,19 +1035,17 @@ impl TableSignature for NftTransferHistoryTable {
temp_table.create_index("block_number", false)?;
temp_table.create_index("chain", false)?;

// Step 2: Open the old table and copy data to the temporary table using cursors
// Step 2: Copy data from the old store to the temp store
let old_store = upgrader.open_table(Self::TABLE_NAME)?;
let mut cursor = old_store.open_cursor()?; // Open cursor on the old store
while let Some(cursor_result) = cursor.next()? {
let key = cursor_result.key()?;
let value = cursor_result.value()?;
temp_table.add_with_key(&value, &key)?;
}
let temp_store = upgrader.open_table(&temp_table_name)?;

// TODO copy data from old_store to temp_store
copy_store_data_sync(&old_store.object_store, &temp_store.object_store)?;

// Step 3: Delete the old object store
upgrader.delete_table(Self::TABLE_NAME)?;

// Step 4: Create the new object store with the updated schema
// Step 4: Recreate the original object store with the new schema
let new_table = upgrader.create_table(Self::TABLE_NAME)?;
new_table.create_multi_index(
Self::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX,
Expand All @@ -1066,17 +1064,11 @@ impl TableSignature for NftTransferHistoryTable {
new_table.create_index("block_number", false)?;
new_table.create_index("chain", false)?;

// Step 5: Copy data from the temporary table to the new table
let temp_store = upgrader.open_table(&temp_table_name)?;
let new_store = upgrader.open_table(Self::TABLE_NAME)?;
let mut temp_cursor = temp_store.open_cursor()?;
while let Some(cursor_result) = temp_cursor.next()? {
let key = cursor_result.key()?;
let value = cursor_result.value()?;
new_store.add_with_key(&value, &key)?;
}
// Step 5: Copy data back from the temp store to the new store
// TODO copy data from temp_store to new_table
copy_store_data_sync(&temp_store.object_store, &new_table.object_store)?;

// Step 6: Delete the temporary table
// Step 6: Delete the temporary store
upgrader.delete_table(&temp_table_name)?;
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_db/src/indexed_db/db_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use web_sys::{IdbDatabase, IdbTransactionMode};
pub use builder::{IdbDatabaseBuilder, InitDbError, InitDbResult};
pub use object_store::IdbObjectStoreImpl;
pub use transaction::{DbTransactionError, DbTransactionResult, IdbTransactionImpl};
pub use upgrader::{DbUpgrader, OnUpgradeError, OnUpgradeNeededCb, OnUpgradeResult};
pub use upgrader::{copy_store_data_sync, DbUpgrader, OnUpgradeError, OnUpgradeNeededCb, OnUpgradeResult};

lazy_static! {
static ref OPEN_DATABASES: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
Expand Down
149 changes: 123 additions & 26 deletions mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use derive_more::Display;
use js_sys::Array;
use mm2_err_handle::prelude::*;
use wasm_bindgen::prelude::*;
use web_sys::{IdbDatabase, IdbIndexParameters, IdbObjectStore, IdbObjectStoreParameters, IdbRequest, IdbTransaction};
use wasm_bindgen::JsCast;
use web_sys::{IdbCursorWithValue, IdbDatabase, IdbIndexParameters, IdbObjectStore, IdbObjectStoreParameters,
IdbRequest, IdbTransaction};

const ITEM_KEY_PATH: &str = "_item_id";

Expand All @@ -13,11 +15,20 @@ pub type OnUpgradeNeededCb = Box<dyn FnOnce(&DbUpgrader, u32, u32) -> OnUpgradeR
#[derive(Debug, Display, PartialEq)]
pub enum OnUpgradeError {
#[display(fmt = "Error occurred due to creating the '{}' table: {}", table, description)]
ErrorCreatingTable { table: String, description: String },
ErrorCreatingTable {
table: String,
description: String,
},
#[display(fmt = "Error occurred due to opening the '{}' table: {}", table, description)]
ErrorOpeningTable { table: String, description: String },
ErrorOpeningTable {
table: String,
description: String,
},
#[display(fmt = "Error occurred due to creating the '{}' index: {}", index, description)]
ErrorCreatingIndex { index: String, description: String },
ErrorCreatingIndex {
index: String,
description: String,
},
#[display(
fmt = "Upgrade attempt to an unsupported version: {}, old: {}, new: {}",
unsupported_version,
Expand All @@ -30,11 +41,27 @@ pub enum OnUpgradeError {
new_version: u32,
},
#[display(fmt = "Error occurred due to deleting the '{}' table: {}", table, description)]
ErrorDeletingTable { table: String, description: String },
ErrorDeletingTable {
table: String,
description: String,
},
#[display(fmt = "Error occurred while opening the cursor: {}", description)]
ErrorOpeningCursor { description: String },
ErrorOpeningCursor {
description: String,
},
#[display(fmt = "Error occurred while adding data: {}", description)]
ErrorAddingData { description: String },
ErrorAddingData {
description: String,
},
ErrorGettingKey {
description: String,
},
ErrorGettingValue {
description: String,
},
ErrorAdvancingCursor {
description: String,
},
}

pub struct DbUpgrader {
Expand Down Expand Up @@ -86,7 +113,7 @@ impl DbUpgrader {
}

pub struct TableUpgrader {
object_store: IdbObjectStore,
pub object_store: IdbObjectStore,
}

impl TableUpgrader {
Expand Down Expand Up @@ -125,24 +152,94 @@ impl TableUpgrader {
description: stringify_js_error(&e),
})
}
}

/// Opens a cursor to iterate over the entries in the object store.
/// Provides a safe way to access the object store's cursor without exposing the field directly.
pub fn open_cursor(&self) -> OnUpgradeResult<IdbRequest> {
self.object_store
.open_cursor()
.map_to_mm(|e| OnUpgradeError::ErrorOpeningCursor {
description: stringify_js_error(&e),
})
}
// #[allow(dead_code)]
// pub async fn copy_store_data(
// source_store: &IdbObjectStore,
// target_store: &IdbObjectStore,
// ) -> Result<(), OnUpgradeError> {
// // Create a oneshot channel to signal when the data transfer is complete
// let (completion_sender, completion_receiver) = oneshot::channel::<()>();
//
// // Clone the target store for use in the closure
// let target_store = target_store.clone();
//
// // Move completion_sender into the closure
// let onsuccess_callback = Closure::wrap(Box::new(move |event: web_sys::Event| {
// let request = event.target().unwrap().unchecked_into::<IdbRequest>();
// let cursor_result = request.result().unwrap();
//
// if cursor_result.is_null() || cursor_result.is_undefined() {
// // No more entries; data transfer complete
// let _ = completion_sender.send(());
// return;
// }
//
// let cursor = cursor_result.unchecked_into::<IdbCursorWithValue>();
//
// let key = cursor.key().unwrap();
// let value = cursor.value().unwrap();
// target_store.add_with_key(&value, &key).unwrap();
//
// // Move to the next record
// cursor.continue_().unwrap();
// }) as Box<dyn FnMut(_)>);
//
// // Open the cursor on the source store
// let cursor_request = source_store.open_cursor().unwrap();
//
// // Attach the onsuccess callback
// cursor_request.set_onsuccess(Some(onsuccess_callback.as_ref().unchecked_ref()));
//
// // Prevent the closure from being dropped
// onsuccess_callback.forget();
//
// // Wait for the data transfer to complete
// completion_receiver.await.unwrap();
//
// Ok(())
// }

pub fn copy_store_data_sync(
source_store: &IdbObjectStore,
target_store: &IdbObjectStore,
) -> Result<(), OnUpgradeError> {
// Clone the target store for use in the closure
let target_store = target_store.clone();

// Define the onsuccess closure
let onsuccess_callback = Closure::wrap(Box::new(move |event: web_sys::Event| {
let request = event.target().unwrap().unchecked_into::<IdbRequest>();
let cursor_result = request.result().unwrap();

if cursor_result.is_null() || cursor_result.is_undefined() {
return;
}

/// Adds a value with the specified key to the object store.
/// Provides a safe way to add entries without exposing the object store directly.
pub fn add_with_key(&self, value: &JsValue, key: &JsValue) -> OnUpgradeResult<IdbRequest> {
self.object_store
.add_with_key(value, key)
.map_to_mm(|e| OnUpgradeError::ErrorAddingData {
description: stringify_js_error(&e),
})
}
let cursor = cursor_result.unchecked_into::<IdbCursorWithValue>();

let key = cursor.key().unwrap();
let value = cursor.value().unwrap();

// Insert the data into the target store
target_store.add_with_key(&value, &key).unwrap();

// Move to the next record
cursor.continue_().unwrap();
}) as Box<dyn FnMut(_)>);

// Open the cursor on the source store
let cursor_request = source_store.open_cursor().unwrap();

// Attach the onsuccess callback
cursor_request.set_onsuccess(Some(onsuccess_callback.as_ref().unchecked_ref()));

// Prevent the closure from being dropped
onsuccess_callback.forget();

// Note: We cannot block the function here to wait for completion.
// The transaction will remain open until all requests are completed.

Ok(())
}
4 changes: 2 additions & 2 deletions mm2src/mm2_db/src/indexed_db/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ mod db_lock;
mod indexed_cursor;

pub use be_big_uint::BeBigUint;
pub use db_driver::{DbTransactionError, DbTransactionResult, DbUpgrader, InitDbError, InitDbResult, ItemId,
OnUpgradeError, OnUpgradeResult};
pub use db_driver::{copy_store_data_sync, DbTransactionError, DbTransactionResult, DbUpgrader, InitDbError,
InitDbResult, ItemId, OnUpgradeError, OnUpgradeResult};
pub use db_lock::{ConstructibleDb, DbLocked, SharedDb, WeakDb};

use db_driver::{IdbDatabaseBuilder, IdbDatabaseImpl, IdbObjectStoreImpl, IdbTransactionImpl, OnUpgradeNeededCb};
Expand Down

0 comments on commit df164d4

Please sign in to comment.