Skip to content

Commit

Permalink
fix: replace batch updates method and only process tx confidence chan…
Browse files Browse the repository at this point in the history
…ges for the past hour of tx's
  • Loading branch information
HashEngineering committed Dec 24, 2024
1 parent 2460673 commit 5cf9bfe
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@

package org.dash.wallet.common.transactions

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import org.bitcoinj.core.Address
import org.bitcoinj.core.Sha256Hash
import org.bitcoinj.core.Transaction
import org.bitcoinj.core.TransactionBag
import org.bitcoinj.script.ScriptException
import java.util.concurrent.ConcurrentHashMap

object TransactionUtils {
fun getWalletAddressOfReceived(tx: Transaction, bag: TransactionBag): Address? {
Expand Down Expand Up @@ -118,4 +125,22 @@ object TransactionUtils {
}
return result
}
}

fun Flow<Transaction>.batchAndFilterUpdates(timeInterval: Long = 500): Flow<List<Transaction>> {
val latestTransactions = ConcurrentHashMap<Sha256Hash, Transaction>()

return this
.onEach { transaction ->
// Update the latest transaction for the hash
latestTransactions[transaction.txId] = transaction
}
.sample(timeInterval) // Emit events every 500ms
.map {
// Collect the latest transactions
latestTransactions.values.toList().also {
latestTransactions.clear() // Clear after collecting
}
}
.filter { it.isNotEmpty() } // Only emit non-empty lists
}
50 changes: 41 additions & 9 deletions wallet/src/de/schildbach/wallet/transactions/WalletObserver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import org.bitcoinj.core.Context
import org.bitcoinj.core.Sha256Hash
import org.bitcoinj.core.Transaction
import org.bitcoinj.core.TransactionConfidence
import org.bitcoinj.core.listeners.TransactionConfidenceEventListener
import org.bitcoinj.utils.Threading
import org.bitcoinj.wallet.Wallet
Expand All @@ -36,6 +38,9 @@ import org.bitcoinj.wallet.listeners.WalletCoinsSentEventListener
import org.bitcoinj.wallet.listeners.WalletResetEventListener
import org.dash.wallet.common.transactions.filters.TransactionFilter
import org.slf4j.LoggerFactory
import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

class WalletObserver(private val wallet: Wallet) {
companion object {
Expand Down Expand Up @@ -65,12 +70,14 @@ class WalletObserver(private val wallet: Wallet) {
}
}

/** observe new transactions (sent and received) and optionally transaction confidence changes for the past hour. */
fun observeTransactions(
observeTxConfidence: Boolean,
vararg filters: TransactionFilter
): Flow<Transaction> = callbackFlow {
log.info("observing transactions start {}", this@WalletObserver)
try {
val transactions = ConcurrentHashMap<Sha256Hash, Transaction>()
Threading.USER_THREAD.execute {
try {
Context.propagate(wallet.context)
Expand All @@ -83,10 +90,18 @@ class WalletObserver(private val wallet: Wallet) {
}
}

var transactionConfidenceListener: TransactionConfidence.Listener? = null

val coinsSentListener = WalletCoinsSentEventListener { _, tx: Transaction?, _, _ ->
try {
val oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)
if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) {
// log.info("observing transaction sent: {} [=====] {}", tx.txId, this@WalletObserver)
if (tx.updateTime.time > oneHourAgo && observeTxConfidence) {
transactions[tx.txId] = tx
tx.confidence.addEventListener(Threading.USER_THREAD, transactionConfidenceListener)
// log.info("observing transaction: start listening to {}", tx.txId)
}
trySend(tx).onFailure {
log.error("Failed to send transaction sent event", it)
}
Expand All @@ -101,6 +116,12 @@ class WalletObserver(private val wallet: Wallet) {
try {
if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) {
// log.info("observing transaction received: {} [=====] {}", tx.txId, this@WalletObserver)
val oneHourAgo = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1)
if (tx.updateTime.time > oneHourAgo && observeTxConfidence) {
transactions[tx.txId] = tx
tx.confidence.addEventListener(Threading.USER_THREAD, transactionConfidenceListener)
// log.info("observing transaction: start listening to {}", tx.txId)
}
trySend(tx).onFailure {
log.error("Failed to send transaction received event", it)
}
Expand All @@ -111,25 +132,32 @@ class WalletObserver(private val wallet: Wallet) {
}
}

var transactionConfidenceChangedListener: TransactionConfidenceEventListener? = null

if (observeTxConfidence) {
transactionConfidenceChangedListener = TransactionConfidenceEventListener { _, tx: Transaction? ->
transactionConfidenceListener = TransactionConfidence.Listener { transactionConfidence, changeReason ->
try {
val tx = transactions[transactionConfidence.transactionHash]
if (tx != null && (filters.isEmpty() || filters.any { it.matches(tx) })) {
// log.info("observing transaction conf {} [=====] {}", tx.txId, this@WalletObserver)
if (tx.getConfidence(wallet.context).depthInBlocks < 7) {
trySend(tx).onFailure {
log.error("Failed to send transaction confidence event", it)
}
trySend(tx).onFailure {
log.error("Failed to send transaction confidence event", it)
}
}
val shouldStopListening = when (changeReason) {
TransactionConfidence.Listener.ChangeReason.CHAIN_LOCKED -> transactionConfidence.isChainLocked
TransactionConfidence.Listener.ChangeReason.IX_TYPE -> transactionConfidence.isTransactionLocked
TransactionConfidence.Listener.ChangeReason.DEPTH -> transactionConfidence.depthInBlocks >= 6
else -> false
}
if (shouldStopListening) {
// log.info("observing transaction: stop listening to {}", transactionConfidence.transactionHash)
transactionConfidence.removeEventListener(transactionConfidenceListener)
transactions.remove(transactionConfidence.transactionHash)
}
} catch (e: Exception) {
log.error("Error in transactionConfidenceChangedListener", e)
close(e)
}
}
wallet.addTransactionConfidenceEventListener(Threading.USER_THREAD, transactionConfidenceChangedListener)
}

wallet.addCoinsSentEventListener(Threading.USER_THREAD, coinsSentListener)
Expand All @@ -140,7 +168,11 @@ class WalletObserver(private val wallet: Wallet) {
wallet.removeCoinsSentEventListener(coinsSentListener)
wallet.removeCoinsReceivedEventListener(coinsReceivedListener)
if (observeTxConfidence) {
wallet.removeTransactionConfidenceEventListener(transactionConfidenceChangedListener)
transactions.forEach { (_, tx) ->
tx.confidence.removeEventListener(transactionConfidenceListener)
// log.info("observing transaction: stop listening to {}", tx.txId)
}
transactions.clear()
}
}
} catch (e: Exception) {
Expand Down
5 changes: 2 additions & 3 deletions wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ import org.dash.wallet.common.services.analytics.AnalyticsTimer
import org.dash.wallet.common.transactions.TransactionUtils.isEntirelySelf
import org.dash.wallet.common.transactions.TransactionWrapper
import org.dash.wallet.common.transactions.TransactionWrapperComparator
import org.dash.wallet.common.transactions.batchAndFilterUpdates
import org.dash.wallet.common.util.toBigDecimal
import org.dash.wallet.common.util.window
import org.dash.wallet.integrations.crowdnode.api.CrowdNodeApi
import org.dash.wallet.integrations.crowdnode.transactions.FullCrowdNodeSignUpTxSetFactory
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -160,7 +160,6 @@ class MainViewModel @Inject constructor(
val fiatFormat: MonetaryFormat = Constants.LOCAL_FORMAT.minDecimals(0).optionalDecimals(0, 2)

private val _transactions = MutableLiveData<List<TransactionRowView>>()
private val _modifyTransactionRow = MutableStateFlow<Pair<Boolean, TransactionRowView?>>(Pair(false, null))
val transactions: LiveData<List<TransactionRowView>>
get() = _transactions
private val _transactionsDirection = MutableStateFlow(TxFilterType.ALL)
Expand Down Expand Up @@ -299,7 +298,7 @@ class MainViewModel @Inject constructor(
val filter = TxDirectionFilter(direction, walletData.wallet!!)
refreshTransactions(filter, metadata)
walletData.observeTransactions(true, filter)
.window(500) // batch every 500 ms
.batchAndFilterUpdates(500) // batch every 500 ms
.onEach {
if (!refreshTxWatch.isRunning) { refreshTxWatch.start() }
val timeElapsed = refreshTxWatch.elapsed(TimeUnit.MILLISECONDS)
Expand Down

0 comments on commit 5cf9bfe

Please sign in to comment.