Skip to content

Commit

Permalink
fix review notes
Browse files Browse the repository at this point in the history
  • Loading branch information
borngraced committed Jul 9, 2024
1 parent e643ab3 commit a78704d
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 16 deletions.
3 changes: 1 addition & 2 deletions mm2src/coins/eth/v2_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,12 +926,11 @@ fn compress_public_key(uncompressed: H520) -> MmResult<H264, EthActivationV2Erro
async fn run_db_migration_for_new_eth_pubkey(ctx: &MmArc, db_id: String) -> MmResult<(), EthActivationV2Error> {
info!("Public key hash: {db_id:?}");

let db_migration_sender = ctx
let mut db_migration_sender = ctx
.db_migration_watcher
.as_option()
.expect("Db migration watcher isn't intialized yet!")
.get_sender();
let mut db_migration_sender = db_migration_sender.lock().await;
db_migration_sender
.send(db_id)
.await
Expand Down
3 changes: 1 addition & 2 deletions mm2src/coins/utxo/utxo_builder/utxo_coin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,12 +1024,11 @@ async fn wait_for_protocol_version_checked(client: &ElectrumClientImpl) -> Resul
pub async fn run_db_migration_for_new_utxo_pubkey(ctx: &MmArc, db_id: String) -> MmResult<(), UtxoCoinBuildError> {
info!("Public key hash: {db_id:?}");

let db_migration_sender = ctx
let mut db_migration_sender = ctx
.db_migration_watcher
.as_option()
.expect("Db migration watcher isn't intialized yet!")
.get_sender();
let mut db_migration_sender = db_migration_sender.lock().await;
db_migration_sender
.send(db_id)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,11 @@ async fn run_db_migration_for_new_tendermint_pubkey(
ticker: String,
) -> MmResult<(), TendermintInitError> {
info!("Public key hash: {db_id:?}");

let db_migration_sender = ctx
let mut db_migration_sender = ctx
.db_migration_watcher
.as_option()
.expect("Db migration watcher isn't intialized yet!")
.get_sender();
let mut db_migration_sender = db_migration_sender.lock().await;
db_migration_sender
.send(db_id)
.await
Expand Down
26 changes: 20 additions & 6 deletions mm2src/mm2_core/src/sql_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl SqliteConnPool {

// Connection pool is already initialized, insert new connection.
if let Some(pool) = ctx.sqlite_conn_pool.as_option() {
let conns = pool.connections.read().await;
if conns.get(&db_id).is_some() {
return Ok(());
}

let conn = Self::open_connection(sqlite_file_path);
let mut pool = pool.connections.write().unwrap();
pool.insert(db_id, conn);
Expand Down Expand Up @@ -193,6 +198,11 @@ impl AsyncSqliteConnPool {
let db_id = db_id.map(|e| e.to_owned()).unwrap_or_else(|| ctx.rmd160.to_string());

if let Some(pool) = ctx.async_sqlite_conn_pool.as_option() {
let conns = pool.connections.read().await;
if conns.get(&db_id).is_some() {
return Ok(());
}

let conn = Self::open_connection(&pool.sqlite_file_path).await;
let mut pool = pool.connections.write().await;
pool.insert(db_id, conn);
Expand All @@ -217,6 +227,12 @@ impl AsyncSqliteConnPool {
let db_id = db_id.map(|e| e.to_owned()).unwrap_or_else(|| ctx.rmd160.to_string());

if let Some(pool) = ctx.async_sqlite_conn_pool.as_option() {
let conns = pool.connections.read().await;
if conns.get(&db_id).is_some() {
return Ok(());
}
drop(conns);

let mut pool = pool.connections.write().await;
let conn = Arc::new(AsyncMutex::new(AsyncConnection::open_in_memory().await.unwrap()));
pool.insert(db_id, conn);
Expand Down Expand Up @@ -273,8 +289,8 @@ impl AsyncSqliteConnPool {
}
}

pub type DbMigrationHandler = Arc<AsyncMutex<Receiver<String>>>;
pub type DbMigrationSender = Arc<AsyncMutex<Sender<String>>>;
pub type DbMigrationHandler = Receiver<String>;
pub type DbMigrationSender = Sender<String>;

pub struct DbMigrationWatcher {
sender: DbMigrationSender,
Expand All @@ -284,12 +300,10 @@ impl DbMigrationWatcher {
pub fn init(ctx: &MmCtx) -> Result<DbMigrationHandler, String> {
let (sender, receiver) = channel(1);

let selfi = Arc::new(Self {
sender: Arc::new(AsyncMutex::new(sender)),
});
let selfi = Arc::new(Self { sender });
try_s!(ctx.db_migration_watcher.pin(selfi));

Ok(Arc::new(AsyncMutex::new(receiver)))
Ok(receiver)
}

pub fn get_sender(&self) -> DbMigrationSender { self.sender.clone() }
Expand Down
5 changes: 2 additions & 3 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,11 @@ async fn init_db_migration_watcher_loop(ctx: MmArc) {
use std::collections::HashSet;

let mut migrations = HashSet::new();
let receiver = &ctx
let mut receiver = ctx
.init_db_migration_watcher()
.expect("db_m igration_watcher initialization failed");
let mut guard = receiver.lock().await;

while let Some(db_id) = guard.next().await {
while let Some(db_id) = receiver.next().await {
if migrations.contains(&db_id) {
debug!("{} migrated, skipping migration..", db_id);
continue;
Expand Down

0 comments on commit a78704d

Please sign in to comment.