Skip to content

Commit

Permalink
z_coin wasm tx_history streaming test
Browse files Browse the repository at this point in the history
ignoring the non-wasm test for now since it needs zcash params
downloaded which we don't have an auto way to do. would be better if we
can download them in the same way we do for wasm (right from kdf).
  • Loading branch information
mariocynicys committed Nov 1, 2024
1 parent c29cf81 commit 6a700f7
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 122 deletions.
8 changes: 5 additions & 3 deletions mm2src/coins/z_coin.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod storage;
pub mod tx_history_events;
#[cfg_attr(not(target_arch = "wasm32"), cfg(test))]
mod tx_streaming_tests;
pub mod z_balance_streaming;
mod z_coin_errors;
#[cfg(all(test, feature = "zhtlc-native-tests"))]
#[cfg(all(test, not(target_arch = "wasm32"), feature = "zhtlc-native-tests"))]
mod z_coin_native_tests;
mod z_htlc;
mod z_rpc;
Expand Down Expand Up @@ -780,8 +782,8 @@ impl Default for ZcoinActivationParams {
requires_notarization: None,
zcash_params_path: None,
scan_blocks_per_iteration: NonZeroU32::new(1000).expect("1000 is a valid value"),
scan_interval_ms: 0,
account: 0,
scan_interval_ms: Default::default(),
account: Default::default(),
}
}
}
Expand Down
41 changes: 21 additions & 20 deletions mm2src/coins/z_coin/storage/walletdb/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mod wasm_test {
use crate::z_coin::{ValidateBlocksError, ZcoinConsensusParams, ZcoinStorageError};
use crate::ZcoinProtocolInfo;
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::StreamingManager;
use mm2_test_helpers::for_tests::mm_ctx_with_custom_db;
use protobuf::Message;
use std::path::PathBuf;
Expand Down Expand Up @@ -255,7 +256,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -300,7 +301,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -359,7 +360,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -453,7 +454,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -542,7 +543,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -572,7 +573,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -611,7 +612,7 @@ mod wasm_test {
blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand All @@ -629,7 +630,7 @@ mod wasm_test {
let scan = blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand All @@ -653,7 +654,7 @@ mod wasm_test {
assert!(blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None
)
Expand Down Expand Up @@ -697,7 +698,7 @@ mod wasm_test {
assert!(blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None
)
Expand All @@ -718,7 +719,7 @@ mod wasm_test {
assert!(blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None
)
Expand Down Expand Up @@ -760,7 +761,7 @@ mod wasm_test {
assert!(blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None
)
Expand Down Expand Up @@ -790,7 +791,7 @@ mod wasm_test {
let scan = blockdb
.process_blocks_with_mode(
consensus_params.clone(),
BlockProcessingMode::Scan(scan, None),
BlockProcessingMode::Scan(scan, StreamingManager::default()),
None,
None,
)
Expand Down Expand Up @@ -832,7 +833,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// assert!(blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .is_ok());
//
Expand All @@ -852,7 +853,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// assert!(blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .is_ok());
//
Expand Down Expand Up @@ -897,7 +898,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// assert!(blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .is_ok());
//
Expand Down Expand Up @@ -928,7 +929,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// assert!(blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .is_ok());
//
Expand Down Expand Up @@ -1098,7 +1099,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .unwrap();
// assert_eq!(walletdb.get_balance(AccountId(0)).await.unwrap(), value);
Expand Down Expand Up @@ -1155,7 +1156,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .unwrap();
//
Expand Down Expand Up @@ -1191,7 +1192,7 @@ mod wasm_test {
// // Scan the cache
// let scan = DataConnStmtCacheWrapper::new(DataConnStmtCacheWasm(walletdb.clone()));
// blockdb
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, None), None, None)
// .process_blocks_with_mode(consensus_params.clone(), BlockProcessingMode::Scan(scan, StreamingManager::default()), None, None)
// .await
// .unwrap();
//
Expand Down
30 changes: 30 additions & 0 deletions mm2src/coins/z_coin/tx_streaming_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#[cfg(not(target_arch = "wasm32"))] mod native;
#[cfg(target_arch = "wasm32")] mod wasm;

use common::now_sec;
use mm2_test_helpers::for_tests::{PIRATE_ELECTRUMS, PIRATE_LIGHTWALLETD_URLS};

use crate::utxo::rpc_clients::ElectrumConnectionSettings;
use crate::z_coin::{ZcoinActivationParams, ZcoinRpcMode};

fn light_zcoin_activation_params() -> ZcoinActivationParams {
ZcoinActivationParams {
mode: ZcoinRpcMode::Light {
electrum_servers: PIRATE_ELECTRUMS
.iter()
.map(|s| ElectrumConnectionSettings {
url: s.to_string(),
protocol: Default::default(),
disable_cert_verification: Default::default(),
timeout_sec: None,
})
.collect(),
min_connected: None,
max_connected: None,
light_wallet_d_servers: PIRATE_LIGHTWALLETD_URLS.iter().map(|s| s.to_string()).collect(),
sync_params: Some(crate::z_coin::SyncStartPoint::Date(now_sec() - 24 * 60 * 60)),
skip_sync_params: None,
},
..Default::default()
}
}
73 changes: 73 additions & 0 deletions mm2src/coins/z_coin/tx_streaming_tests/native.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use common::custom_futures::timeout::FutureTimerExt;
use common::{block_on, Future01CompatExt};
use mm2_core::mm_ctx::MmCtxBuilder;
use mm2_test_helpers::for_tests::{pirate_conf, ARRR};
use std::time::Duration;

use super::light_zcoin_activation_params;
use crate::z_coin::tx_history_events::ZCoinTxHistoryEventStreamer;
use crate::z_coin::z_coin_from_conf_and_params;
use crate::z_coin::z_htlc::z_send_dex_fee;
use crate::{CoinProtocol, MarketCoinOps, MmCoin, PrivKeyBuildPolicy};

#[test]
#[ignore] // Ignored because we don't have zcash params in CI. TODO: Why not download them on demand like how we do in wasm (see download_and_save_params).
fn test_zcoin_tx_streaming() {
let ctx = MmCtxBuilder::default().into_mm_arc();
let conf = pirate_conf();
let params = light_zcoin_activation_params();
// Address: RQX5MnqnxEk6P33LSEAxC2vqA7DfSdWVyH
// Or: zs1n2azlwcj9pvl2eh36qvzgeukt2cpzmw44hya8wyu52j663d0dfs4d5hjx6tr04trz34jxyy433j
let priv_key_policy =
PrivKeyBuildPolicy::IguanaPrivKey("6d862798ef956fb60fb17bcc417dd6d44bfff066a4a49301cd2528e41a4a3e45".into());
let protocol_info = match serde_json::from_value::<CoinProtocol>(conf["protocol"].clone()).unwrap() {
CoinProtocol::ZHTLC(protocol_info) => protocol_info,
other_protocol => panic!("Failed to get protocol from config: {:?}", other_protocol),
};

let coin = block_on(z_coin_from_conf_and_params(
&ctx,
ARRR,
&conf,
&params,
protocol_info,
priv_key_policy,
))
.unwrap();

// Wait till we are synced with the sapling state.
while !block_on(coin.is_sapling_state_synced()) {
std::thread::sleep(Duration::from_secs(1));
}

// Query the block height to make sure our electrums are actually connected.
log!("current block = {:?}", block_on(coin.current_block().compat()).unwrap());

// Add a new client to use it for listening to tx history events.
let client_id = 1;
let mut event_receiver = ctx.event_stream_manager.new_client(client_id).unwrap();
// Add the streamer that will stream the tx history events.
let streamer = ZCoinTxHistoryEventStreamer::new(coin.clone());
// Subscribe the client to the streamer.
block_on(ctx.event_stream_manager.add(client_id, streamer, coin.spawner())).unwrap();

// Send a tx to have it in the tx history.
let tx = block_on(z_send_dex_fee(&coin, "0.0001".parse().unwrap(), &[1; 16])).unwrap();

// Wait for the tx history event (should be streamed next block).
let event = block_on(Box::pin(event_receiver.recv()).timeout_secs(120.))
.expect("timed out waiting for tx to showup")
.expect("tx history sender shutdown");

log!("{:?}", event.get());
let (event_type, event_data) = event.get();
// Make sure this is not an error event,
assert!(!event_type.starts_with("ERROR_"));
// from the expected streamer,
assert_eq!(
event_type,
ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker())
);
// and has the expected data.
assert_eq!(event_data["tx_hash"].as_str().unwrap(), tx.txid().to_string());
}
74 changes: 74 additions & 0 deletions mm2src/coins/z_coin/tx_streaming_tests/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use common::custom_futures::timeout::FutureTimerExt;
use common::{executor::Timer, Future01CompatExt};
use mm2_core::mm_ctx::MmCtxBuilder;
use mm2_test_helpers::for_tests::{pirate_conf, ARRR};
use wasm_bindgen_test::*;

use super::light_zcoin_activation_params;
use crate::z_coin::tx_history_events::ZCoinTxHistoryEventStreamer;
use crate::z_coin::z_coin_from_conf_and_params;
use crate::z_coin::z_htlc::z_send_dex_fee;
use crate::PrivKeyBuildPolicy;
use crate::{CoinProtocol, MarketCoinOps, MmCoin};

#[wasm_bindgen_test]
async fn test_zcoin_tx_streaming() {
let ctx = MmCtxBuilder::default().into_mm_arc();
let conf = pirate_conf();
let params = light_zcoin_activation_params();
// Address: RQX5MnqnxEk6P33LSEAxC2vqA7DfSdWVyH
// Or: zs1n2azlwcj9pvl2eh36qvzgeukt2cpzmw44hya8wyu52j663d0dfs4d5hjx6tr04trz34jxyy433j
let priv_key_policy =
PrivKeyBuildPolicy::IguanaPrivKey("6d862798ef956fb60fb17bcc417dd6d44bfff066a4a49301cd2528e41a4a3e45".into());
let protocol_info = match serde_json::from_value::<CoinProtocol>(conf["protocol"].clone()).unwrap() {
CoinProtocol::ZHTLC(protocol_info) => protocol_info,
other_protocol => panic!("Failed to get protocol from config: {:?}", other_protocol),
};

let coin = z_coin_from_conf_and_params(&ctx, ARRR, &conf, &params, protocol_info, priv_key_policy)
.await
.unwrap();

// Wait till we are synced with the sapling state.
while !coin.is_sapling_state_synced().await {
Timer::sleep(1.).await;
}

// Query the block height to make sure our electrums are actually connected.
log!("current block = {:?}", coin.current_block().compat().await.unwrap());

// Add a new client to use it for listening to tx history events.
let client_id = 1;
let mut event_receiver = ctx.event_stream_manager.new_client(client_id).unwrap();
// Add the streamer that will stream the tx history events.
let streamer = ZCoinTxHistoryEventStreamer::new(coin.clone());
// Subscribe the client to the streamer.
ctx.event_stream_manager
.add(client_id, streamer, coin.spawner())
.await
.unwrap();

// Send a tx to have it in the tx history.
let tx = z_send_dex_fee(&coin, "0.0001".parse().unwrap(), &[1; 16])
.await
.unwrap();

// Wait for the tx history event (should be streamed next block).
let event = Box::pin(event_receiver.recv())
.timeout_secs(120.)
.await
.expect("timed out waiting for tx to showup")
.expect("tx history sender shutdown");

log!("{:?}", event.get());
let (event_type, event_data) = event.get();
// Make sure this is not an error event,
assert!(!event_type.starts_with("ERROR_"));
// from the expected streamer,
assert_eq!(
event_type,
ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker())
);
// and has the expected data.
assert_eq!(event_data["tx_hash"].as_str().unwrap(), tx.txid().to_string());
}
Loading

0 comments on commit 6a700f7

Please sign in to comment.