Skip to content

Commit

Permalink
wasm migration wip
Browse files Browse the repository at this point in the history
  • Loading branch information
laruh committed Sep 15, 2024
1 parent 5cae2d4 commit 9cebf62
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 16 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/nft/storage/wasm/nft_idb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use mm2_db::indexed_db::InitDbResult;
use mm2_db::indexed_db::{DbIdentifier, DbInstance, DbLocked, IndexedDb, IndexedDbBuilder};

const DB_VERSION: u32 = 1;
const DB_VERSION: u32 = 2; // TODO need to set it as 2, as we need to change prim key in NftTransferHistoryTable

/// Represents a locked instance of the `NftCacheIDB` database.
///
Expand Down
97 changes: 84 additions & 13 deletions mm2src/coins/nft/storage/wasm/wasm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::nft::storage::{get_offset_limit, NftListStorageOps, NftTokenAddrId, N
use async_trait::async_trait;
use common::is_initial_upgrade;
use ethereum_types::Address;
use futures_util::StreamExt;
use mm2_db::indexed_db::{BeBigUint, DbTable, DbUpgrader, MultiIndex, OnUpgradeResult, TableSignature};
use mm2_err_handle::map_to_mm::MapToMmResult;
use mm2_err_handle::prelude::MmResult;
Expand Down Expand Up @@ -881,8 +882,8 @@ pub(crate) struct NftListTable {
}

impl NftListTable {
const CHAIN_ANIMATION_DOMAIN_INDEX: &str = "chain_animation_domain_index";
const CHAIN_EXTERNAL_DOMAIN_INDEX: &str = "chain_external_domain_index";
const CHAIN_ANIMATION_DOMAIN_INDEX: &'static str = "chain_animation_domain_index";
const CHAIN_EXTERNAL_DOMAIN_INDEX: &'static str = "chain_external_domain_index";

fn from_nft(nft: &Nft) -> WasmNftCacheResult<NftListTable> {
let details_json = json::to_value(nft).map_to_mm(|e| WasmNftCacheError::ErrorSerializing(e.to_string()))?;
Expand All @@ -907,8 +908,8 @@ impl NftListTable {
impl TableSignature for NftListTable {
const TABLE_NAME: &'static str = "nft_list_cache_table";

fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> {
if is_initial_upgrade(old_version, new_version) {
fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, _new_version: u32) -> OnUpgradeResult<()> {
if is_initial_upgrade(old_version) {
let table = upgrader.create_table(Self::TABLE_NAME)?;
table.create_multi_index(
CHAIN_TOKEN_ADD_TOKEN_ID_INDEX,
Expand Down Expand Up @@ -956,7 +957,10 @@ pub(crate) struct NftTransferHistoryTable {
}

impl NftTransferHistoryTable {
const CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX: &str = "chain_tx_hash_log_index_token_idindex";
// old prim key index for DB_VERSION: u32 = 1
const CHAIN_TX_HASH_LOG_INDEX_INDEX: &'static str = "chain_tx_hash_log_index_index";
// this is new prim key multi index. DB_VERSION = 2
const CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX: &'static str = "chain_tx_hash_log_index_token_idindex";

fn from_transfer_history(transfer: &NftTransferHistory) -> WasmNftCacheResult<NftTransferHistoryTable> {
let details_json =
Expand Down Expand Up @@ -989,24 +993,91 @@ impl TableSignature for NftTransferHistoryTable {
const TABLE_NAME: &'static str = "nft_transfer_history_cache_table";

fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> {
if is_initial_upgrade(old_version, new_version) {
if is_initial_upgrade(old_version) {
// Initial creation of the table with the new schema
let table = upgrader.create_table(Self::TABLE_NAME)?;
table.create_multi_index(
CHAIN_TOKEN_ADD_TOKEN_ID_INDEX,
&["chain", "token_address", "token_id"],
false,
)?;
table.create_multi_index(
Self::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX,
&["chain", "transaction_hash", "log_index", "token_id"],
true,
)?;
table.create_multi_index(
CHAIN_TOKEN_ADD_TOKEN_ID_INDEX,
&["chain", "token_address", "token_id"],
false,
)?;
table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?;
table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?;
table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?;
table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?;
table.create_index("block_number", false)?;
table.create_index("chain", false)?;
} else if old_version == 1 && new_version == 2 {
// Migration from version 1 to version 2

// Step 1: Create a temporary table to hold data with the old schema
let temp_table_name = format!("{}_temp", Self::TABLE_NAME);
let temp_table = upgrader.create_table(&temp_table_name)?;
temp_table.create_multi_index(
Self::CHAIN_TX_HASH_LOG_INDEX_INDEX, // old primary key index
&["chain", "transaction_hash", "log_index"],
true,
)?;
temp_table.create_multi_index(
CHAIN_TOKEN_ADD_TOKEN_ID_INDEX,
&["chain", "token_address", "token_id"],
false,
)?;
temp_table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?;
temp_table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?;
temp_table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?;
temp_table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?;
temp_table.create_index("block_number", false)?;
temp_table.create_index("chain", false)?;

// Step 2: Open the old table and copy data to the temporary table using cursors
let old_store = upgrader.open_table(Self::TABLE_NAME)?;
let mut cursor = old_store.open_cursor()?; // Open cursor on the old store
while let Some(cursor_result) = cursor.next()? {
let key = cursor_result.key()?;
let value = cursor_result.value()?;
temp_table.add_with_key(&value, &key)?;
}

// Step 3: Delete the old object store
upgrader.delete_table(Self::TABLE_NAME)?;

// Step 4: Create the new object store with the updated schema
let new_table = upgrader.create_table(Self::TABLE_NAME)?;
new_table.create_multi_index(
Self::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX,
&["chain", "transaction_hash", "log_index", "token_id"],
true,
)?;
new_table.create_multi_index(
CHAIN_TOKEN_ADD_TOKEN_ID_INDEX,
&["chain", "token_address", "token_id"],
false,
)?;
new_table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?;
new_table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?;
new_table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?;
new_table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?;
new_table.create_index("block_number", false)?;
new_table.create_index("chain", false)?;

// Step 5: Copy data from the temporary table to the new table
let temp_store = upgrader.open_table(&temp_table_name)?;
let new_store = upgrader.open_table(Self::TABLE_NAME)?;
let mut temp_cursor = temp_store.open_cursor()?;
while let Some(cursor_result) = temp_cursor.next()? {
let key = cursor_result.key()?;
let value = cursor_result.value()?;
new_store.add_with_key(&value, &key)?;
}

// Step 6: Delete the temporary table
upgrader.delete_table(&temp_table_name)?;
}
Ok(())
}
Expand All @@ -1021,8 +1092,8 @@ pub(crate) struct LastScannedBlockTable {
impl TableSignature for LastScannedBlockTable {
const TABLE_NAME: &'static str = "last_scanned_block_table";

fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> {
if is_initial_upgrade(old_version, new_version) {
fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, _new_version: u32) -> OnUpgradeResult<()> {
if is_initial_upgrade(old_version) {
let table = upgrader.create_table(Self::TABLE_NAME)?;
table.create_index("chain", true)?;
}
Expand Down
2 changes: 1 addition & 1 deletion mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ pub fn parse_rfc3339_to_timestamp(date_str: &str) -> Result<u64, ParseRfc3339Err
/// `is_initial_upgrade` function checks if the database is being upgraded from version 0 to 1.
/// This function returns a boolean indicating whether the database is being upgraded from version 0 to 1.
#[cfg(target_arch = "wasm32")]
pub fn is_initial_upgrade(old_version: u32, new_version: u32) -> bool { old_version == 0 && new_version == 1 }
pub fn is_initial_upgrade(old_version: u32) -> bool { old_version == 0 }

/// Takes `http:Uri` and converts it into `String` of websocket address
///
Expand Down
39 changes: 38 additions & 1 deletion mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use derive_more::Display;
use js_sys::Array;
use mm2_err_handle::prelude::*;
use wasm_bindgen::prelude::*;
use web_sys::{IdbDatabase, IdbIndexParameters, IdbObjectStore, IdbObjectStoreParameters, IdbTransaction};
use web_sys::{IdbDatabase, IdbIndexParameters, IdbObjectStore, IdbObjectStoreParameters, IdbRequest, IdbTransaction};

const ITEM_KEY_PATH: &str = "_item_id";

Expand All @@ -29,6 +29,12 @@ pub enum OnUpgradeError {
old_version: u32,
new_version: u32,
},
#[display(fmt = "Error occurred due to deleting the '{}' table: {}", table, description)]
ErrorDeletingTable { table: String, description: String },
#[display(fmt = "Error occurred while opening the cursor: {}", description)]
ErrorOpeningCursor { description: String },
#[display(fmt = "Error occurred while adding data: {}", description)]
ErrorAddingData { description: String },
}

pub struct DbUpgrader {
Expand Down Expand Up @@ -66,6 +72,17 @@ impl DbUpgrader {
}),
}
}

/// Deletes an object store (table) from the database.
pub fn delete_table(&self, table: &str) -> OnUpgradeResult<()> {
match self.db.delete_object_store(table) {
Ok(_) => Ok(()),
Err(e) => MmError::err(OnUpgradeError::ErrorDeletingTable {
table: table.to_owned(),
description: stringify_js_error(&e),
}),
}
}
}

pub struct TableUpgrader {
Expand Down Expand Up @@ -108,4 +125,24 @@ impl TableUpgrader {
description: stringify_js_error(&e),
})
}

/// Opens a cursor to iterate over the entries in the object store.
/// Provides a safe way to access the object store's cursor without exposing the field directly.
pub fn open_cursor(&self) -> OnUpgradeResult<IdbRequest> {
self.object_store
.open_cursor()
.map_to_mm(|e| OnUpgradeError::ErrorOpeningCursor {
description: stringify_js_error(&e),
})
}

/// Adds a value with the specified key to the object store.
/// Provides a safe way to add entries without exposing the object store directly.
pub fn add_with_key(&self, value: &JsValue, key: &JsValue) -> OnUpgradeResult<IdbRequest> {
self.object_store
.add_with_key(value, key)
.map_to_mm(|e| OnUpgradeError::ErrorAddingData {
description: stringify_js_error(&e),
})
}
}

0 comments on commit 9cebf62

Please sign in to comment.