Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Apr 4, 2024
2 parents 1ca860f + e0cd391 commit 3a21e71
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 205 deletions.
10 changes: 5 additions & 5 deletions mm2src/coins/eth/eth_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl EthCoin {
async fn try_rpc_send(&self, method: &str, params: Vec<jsonrpc_core::Value>) -> Result<Value, web3::Error> {
let mut clients = self.web3_instances.lock().await;

let mut error = web3::Error::Unreachable;
for (i, client) in clients.clone().into_iter().enumerate() {
let execute_fut = match client.web3.transport() {
Web3Transport::Http(http) => http.execute(method, params.clone()),
Expand All @@ -35,8 +36,9 @@ impl EthCoin {
clients.rotate_left(i);
return Ok(r);
},
Ok(Err(rpc_error)) => {
debug!("Request on '{method}' failed. Error: {rpc_error}");
Ok(Err(err)) => {
debug!("Request on '{method}' failed. Error: {err}");
error = err;

if let Web3Transport::Websocket(socket_transport) = client.web3.transport() {
socket_transport.stop_connection_loop().await;
Expand All @@ -52,9 +54,7 @@ impl EthCoin {
};
}

Err(web3::Error::Transport(web3::error::TransportError::Message(format!(
"Request '{method}' failed due to not being able to find a living RPC client"
))))
Err(error)
}
}

Expand Down
1 change: 0 additions & 1 deletion mm2src/coins/my_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ where
})
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn z_coin_tx_history_rpc(
ctx: MmArc,
request: MyTxHistoryRequestV2<i64>,
Expand Down
258 changes: 91 additions & 167 deletions mm2src/coins/z_coin.rs

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions mm2src/coins/z_coin/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ pub mod blockdb;
pub use blockdb::*;

pub mod walletdb;
#[cfg(target_arch = "wasm32")] mod z_params;
#[cfg(target_arch = "wasm32")]
pub(crate) use z_params::ZcashParamsWasmImpl;

pub use walletdb::*;

use crate::z_coin::z_balance_streaming::ZBalanceEventSender;
use mm2_err_handle::mm_error::MmResult;
#[cfg(target_arch = "wasm32")]
use walletdb::wasm::storage::DataConnStmtCacheWasm;
Expand Down Expand Up @@ -55,7 +60,7 @@ pub struct CompactBlockRow {
#[derive(Clone)]
pub enum BlockProcessingMode {
Validate,
Scan(DataConnStmtCacheWrapper),
Scan(DataConnStmtCacheWrapper, Option<ZBalanceEventSender>),
}

/// Checks that the scanned blocks in the data database, when combined with the recent
Expand Down Expand Up @@ -114,7 +119,7 @@ pub async fn scan_cached_block(
params: &ZcoinConsensusParams,
block: &CompactBlock,
last_height: &mut BlockHeight,
) -> Result<(), ValidateBlocksError> {
) -> Result<usize, ValidateBlocksError> {
let mut data_guard = data.inner().clone();
// Fetch the ExtendedFullViewingKeys we are tracking
let extfvks = data_guard.get_extended_full_viewing_keys().await?;
Expand Down Expand Up @@ -201,5 +206,6 @@ pub async fn scan_cached_block(

*last_height = current_height;

Ok(())
// If there are any transactions in the block, return the transaction count
Ok(txs.len())
}
16 changes: 12 additions & 4 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::z_coin::storage::{scan_cached_block, validate_chain, BlockDbImpl, Blo
use crate::z_coin::z_coin_errors::ZcoinStorageError;

use async_trait::async_trait;
use futures_util::SinkExt;
use mm2_core::mm_ctx::MmArc;
use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, DbUpgrader, IndexedDb,
IndexedDbBuilder, InitDbResult, MultiIndex, OnUpgradeResult, TableSignature};
Expand Down Expand Up @@ -123,7 +124,7 @@ impl BlockDbImpl {
}

/// Asynchronously rewinds the storage to a specified block height, effectively
/// removing data beyond the specified height from the storage.
/// removing data beyond the specified height from the storage.
pub async fn rewind_to_height(&self, height: BlockHeight) -> ZcoinStorageRes<usize> {
let locked_db = self.lock_db().await?;
let db_transaction = locked_db.get_inner().transaction().await?;
Expand Down Expand Up @@ -224,7 +225,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => data.inner().block_height_extrema().await.map(|opt| {
BlockProcessingMode::Scan(data, _) => data.inner().block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1)
})?,
Expand All @@ -250,8 +251,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there is/are transactions present in the current scanned block(s),
// we trigger a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
14 changes: 11 additions & 3 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::z_coin::ZcoinConsensusParams;
use common::async_blocking;
use db_common::sqlite::rusqlite::{params, Connection};
use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite};
use futures_util::SinkExt;
use itertools::Itertools;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -193,7 +194,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => {
BlockProcessingMode::Scan(data, _) => {
let data = data.inner();
data.block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
Expand Down Expand Up @@ -224,8 +225,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there are transactions present in the current scanned block,
// we send a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl<'a> WalletIndexedDb {
Ok(db)
}

async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
pub(crate) async fn lock_db(&self) -> ZcoinStorageRes<WalletDbInnerLocked<'_>> {
self.db
.get_or_initialize()
.await
Expand Down
File renamed without changes.
File renamed without changes.
110 changes: 110 additions & 0 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::common::Future01CompatExt;
use crate::hd_wallet::AsyncMutex;
use crate::z_coin::ZCoin;
use crate::{MarketCoinOps, MmCoin};

use async_trait::async_trait;
use common::executor::{AbortSettings, SpawnAbortable};
use common::log::{error, info};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::channel::oneshot::{Receiver, Sender};
use futures_util::StreamExt;
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_event_stream::{Event, EventStreamConfiguration};
use std::sync::Arc;

pub type ZBalanceEventSender = UnboundedSender<()>;
pub type ZBalanceEventHandler = Arc<AsyncMutex<UnboundedReceiver<()>>>;

#[async_trait]
impl EventBehaviour for ZCoin {
const EVENT_NAME: &'static str = "COIN_BALANCE";
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";

async fn handle(self, _interval: f64, tx: Sender<EventInitStatus>) {
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";

macro_rules! send_status_on_err {
($match: expr, $sender: tt, $msg: literal) => {
match $match {
Some(t) => t,
None => {
$sender
.send(EventInitStatus::Failed($msg.to_owned()))
.expect(RECEIVER_DROPPED_MSG);
panic!("{}", $msg);
},
}
};
}

let ctx = send_status_on_err!(
MmArc::from_weak(&self.as_ref().ctx),
tx,
"MM context must have been initialized already."
);
let z_balance_change_handler = send_status_on_err!(
self.z_fields.z_balance_event_handler.as_ref(),
tx,
"Z balance change receiver can not be empty."
);

tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG);

// Locks the balance change handler, iterates through received events, and updates balance changes accordingly.
let mut bal = z_balance_change_handler.lock().await;
while (bal.next().await).is_some() {
match self.my_balance().compat().await {
Ok(balance) => {
let payload = json!({
"ticker": self.ticker(),
"address": self.my_z_address_encoded(),
"balance": { "spendable": balance.spendable, "unspendable": balance.unspendable }
});

ctx.stream_channel_controller
.broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string()))
.await;
},
Err(err) => {
let ticker = self.ticker();
error!("Failed getting balance for '{ticker}'. Error: {err}");
let e = serde_json::to_value(err).expect("Serialization should't fail.");
return ctx
.stream_channel_controller
.broadcast(Event::new(
format!("{}:{}", Self::ERROR_EVENT_NAME, ticker),
e.to_string(),
))
.await;
},
};
}
}

async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus {
if let Some(event) = config.get_event(Self::EVENT_NAME) {
info!(
"{} event is activated for {} address {}. `stream_interval_seconds`({}) has no effect on this.",
Self::EVENT_NAME,
self.ticker(),
self.my_z_address_encoded(),
event.stream_interval_seconds
);

let (tx, rx): (Sender<EventInitStatus>, Receiver<EventInitStatus>) = oneshot::channel();
let fut = self.clone().handle(event.stream_interval_seconds, tx);
let settings =
AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker()));
self.spawner().spawn_with_settings(fut, settings);

rx.await.unwrap_or_else(|e| {
EventInitStatus::Failed(format!("Event initialization status must be received: {}", e))
})
} else {
EventInitStatus::Inactive
}
}
}
36 changes: 22 additions & 14 deletions mm2src/coins/z_coin/z_coin_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub enum ZCoinBuildError {
ZCashParamsError(String),
ZDerivationPathNotSet,
SaplingParamsInvalidChecksum,
FailedSpawningBalanceEvents(String),
}

#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -272,27 +273,32 @@ impl From<ZcoinClientInitError> for ZCoinBuildError {
fn from(err: ZcoinClientInitError) -> Self { ZCoinBuildError::RpcClientInitErr(err) }
}

#[cfg(not(target_arch = "wasm32"))]
pub(super) enum SqlTxHistoryError {
#[derive(Debug, Display)]
pub(crate) enum ZTxHistoryError {
#[cfg(not(target_arch = "wasm32"))]
Sql(SqliteError),
#[cfg(target_arch = "wasm32")]
IndexedDbError(String),
FromIdDoesNotExist(i64),
}

#[cfg(not(target_arch = "wasm32"))]
impl From<SqliteError> for SqlTxHistoryError {
fn from(err: SqliteError) -> Self { SqlTxHistoryError::Sql(err) }
impl From<ZTxHistoryError> for MyTxHistoryErrorV2 {
fn from(err: ZTxHistoryError) -> Self { MyTxHistoryErrorV2::StorageError(err.to_string()) }
}

#[cfg(not(target_arch = "wasm32"))]
impl From<SqlTxHistoryError> for MyTxHistoryErrorV2 {
fn from(err: SqlTxHistoryError) -> Self {
match err {
SqlTxHistoryError::Sql(sql) => MyTxHistoryErrorV2::StorageError(sql.to_string()),
SqlTxHistoryError::FromIdDoesNotExist(id) => {
MyTxHistoryErrorV2::StorageError(format!("from_id {} does not exist", id))
},
}
}
impl From<SqliteError> for ZTxHistoryError {
fn from(err: SqliteError) -> Self { ZTxHistoryError::Sql(err) }
}

#[cfg(target_arch = "wasm32")]
impl From<DbTransactionError> for ZTxHistoryError {
fn from(err: DbTransactionError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
}

#[cfg(target_arch = "wasm32")]
impl From<CursorError> for ZTxHistoryError {
fn from(err: CursorError) -> Self { ZTxHistoryError::IndexedDbError(err.to_string()) }
}

pub(super) struct NoInfoAboutTx(pub(super) H256Json);
Expand All @@ -316,6 +322,7 @@ pub enum ZCoinBalanceError {
impl From<ZcoinStorageError> for ZCoinBalanceError {
fn from(value: ZcoinStorageError) -> Self { ZCoinBalanceError::BalanceError(value.to_string()) }
}

/// The `ValidateBlocksError` enum encapsulates different types of errors that may occur
/// during the validation and scanning process of zcoin blocks.
#[derive(Debug, Display)]
Expand All @@ -342,6 +349,7 @@ pub enum ValidateBlocksError {
impl From<ValidateBlocksError> for ZcoinStorageError {
fn from(value: ValidateBlocksError) -> Self { Self::ValidateBlocksError(value) }
}

impl From<MmError<ZcoinStorageError>> for ValidateBlocksError {
fn from(value: MmError<ZcoinStorageError>) -> Self { Self::ZcoinStorageError(value.to_string()) }
}
Expand Down
Loading

0 comments on commit 3a21e71

Please sign in to comment.