From 24606739d6758e73e9f0539a8e8e6d4d41fa9cd4 Mon Sep 17 00:00:00 2001 From: HashEngineering Date: Mon, 23 Dec 2024 14:34:37 -0800 Subject: [PATCH] fix: batch updates --- .../org/dash/wallet/common/util/FlowExt.kt | 29 +++++++ .../wallet/ui/main/MainViewModel.kt | 80 +++++++++++++++---- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/common/src/main/java/org/dash/wallet/common/util/FlowExt.kt b/common/src/main/java/org/dash/wallet/common/util/FlowExt.kt index 65a16834bf..98b3932c18 100644 --- a/common/src/main/java/org/dash/wallet/common/util/FlowExt.kt +++ b/common/src/main/java/org/dash/wallet/common/util/FlowExt.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.AbstractFlow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlin.time.Duration @@ -41,3 +42,31 @@ fun Flow.observe(lifecycleOwner: LifecycleOwner, collector: FlowCollector } } } + +/** from ChatGPT */ +fun Flow.window(intervalMillis: Long): Flow> = flow { + val buffer = mutableListOf() + val lock = Any() + var lastEmitTime = System.currentTimeMillis() + + this@window.collect { value -> + var batchToEmit: List? = null + + synchronized(lock) { + buffer.add(value) + val currentTime = System.currentTimeMillis() + if (currentTime - lastEmitTime >= intervalMillis) { + batchToEmit = buffer.toList() + buffer.clear() + lastEmitTime = currentTime + } + } + + batchToEmit?.let { emit(it) } + } + + val finalBatch = synchronized(lock) { + if (buffer.isNotEmpty()) buffer.toList() else null + } + finalBatch?.let { emit(it) } +} diff --git a/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt b/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt index 89b8576e45..3808272a68 100644 --- a/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt +++ b/wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt @@ -105,6 +105,7 @@ import org.dash.wallet.common.transactions.TransactionUtils.isEntirelySelf import org.dash.wallet.common.transactions.TransactionWrapper import org.dash.wallet.common.transactions.TransactionWrapperComparator import org.dash.wallet.common.util.toBigDecimal +import org.dash.wallet.common.util.window import org.dash.wallet.integrations.crowdnode.api.CrowdNodeApi import org.dash.wallet.integrations.crowdnode.transactions.FullCrowdNodeSignUpTxSetFactory import org.slf4j.LoggerFactory @@ -203,7 +204,7 @@ class MainViewModel @Inject constructor( val balance: LiveData get() = _balance - private var transactionViews: List = listOf() + private var transactionViews: MutableList = mutableListOf() private lateinit var crowdNodeWrapperFactory: FullCrowdNodeSignUpTxSetFactory private lateinit var coinJoinWrapperFactory: CoinJoinTxWrapperFactory private val _mostRecentTransaction = MutableLiveData() @@ -284,7 +285,10 @@ class MainViewModel @Inject constructor( private var contactRequestTimer: AnalyticsTimer? = null // end DashPay - + val refreshTxWatch: Stopwatch = Stopwatch.createUnstarted() + var refreshLastUpdate = 0L + private var refreshNewTx = 0 + private var refreshStatusTx = 0 init { transactionsDirection = savedStateHandle[DIRECTION_KEY] ?: TxFilterType.ALL @@ -295,15 +299,37 @@ class MainViewModel @Inject constructor( val filter = TxDirectionFilter(direction, walletData.wallet!!) refreshTransactions(filter, metadata) walletData.observeTransactions(true, filter) + .window(500) // batch every 500 ms .onEach { - log.info("observing transaction: {}", it.txId) - refreshTransaction(it, filter, metadata) + if (!refreshTxWatch.isRunning) { refreshTxWatch.start() } + val timeElapsed = refreshTxWatch.elapsed(TimeUnit.MILLISECONDS) + if (timeElapsed - refreshLastUpdate >= 1000) { + val interval = timeElapsed - refreshLastUpdate + log.info( + "observing transactions: {} txes, {} new tx/s, {} status tx/s", + it.size, + 1000 * refreshNewTx / interval, + 1000 * refreshStatusTx / interval + ) + refreshNewTx = 0 + refreshStatusTx = 0 + refreshLastUpdate = timeElapsed + } + refreshTransactions(it, filter, metadata) } } } .catch { analytics.logError(it, "is wallet null: ${walletData.wallet == null}") } .launchIn(viewModelWorkerScope) + walletData.observeWalletReset() + .filterNotNull() + .onEach { + val filter = TxDirectionFilter(transactionsDirection, walletData.wallet!!) + refreshTransactions(filter, mapOf()) + } + .launchIn(viewModelWorkerScope) + blockchainStateProvider.observeState() .filterNotNull() .onEach { state -> @@ -545,16 +571,38 @@ class MainViewModel @Inject constructor( ) } - transactionViews = allTransactionViews + transactionViews = allTransactionViews.toMutableList() log.info("refreshTransactions: {} ms", watch.elapsed(TimeUnit.MILLISECONDS)) - _transactions.postValue(allTransactionViews) + _transactions.postValue(transactionViews) + } + } + + /** update a batch of transactions */ + private suspend fun refreshTransactions( + txList: List, + filter: TxDirectionFilter, + metadata: Map + ) { + var updateCount = 0 + log.info("refreshTransactions({}, but only {} unique tx)", txList.size, txList.toSet().size) + txList.toSet().forEach { tx -> + if (filter.matches(tx)) { + refreshTransaction(tx, transactionViews, filter, metadata) + updateCount++ + } + } + if (updateCount > 0) { + _transactions.postValue(transactionViews.toList()) } } /** - * this will either add a single tranasction or update the transaction on the current view + * this will either add a single transaction or update the transaction on the current view */ - private suspend fun refreshTransaction(tx: Transaction, filter: TxDirectionFilter, metadata: Map) { + private suspend fun refreshTransaction( + tx: Transaction, + updatedList: MutableList, + filter: TxDirectionFilter, metadata: Map) { Context.propagate(Constants.CONTEXT) val watch = Stopwatch.createStarted() // is the item currently in our list @@ -570,6 +618,7 @@ class MainViewModel @Inject constructor( if (txRowViewIndex != -1) { val currentTxRowView = transactionViews[txRowViewIndex] + refreshStatusTx++ if (currentTxRowView.txWrapper == null) { log.info("observing transaction refresh: update {}", tx.txId) // update the current item by replacing the current item @@ -585,11 +634,7 @@ class MainViewModel @Inject constructor( tx.txId, watch.elapsed(TimeUnit.MILLISECONDS) ) - // some how tell the UI to update this item - val updatedList = transactionViews.toMutableList() updatedList[txRowViewIndex] = newTransactionRow - transactionViews = updatedList - _transactions.postValue(transactionViews) } else { // do nothing for updated transactions inside a wrapper // we presume that value, timestamp and title and count do to change with updates @@ -647,7 +692,7 @@ class MainViewModel @Inject constructor( contact ) } - val updatedList = transactionViews.toMutableList() // make a copy + // is there a new row to add? if (newTransactionRow != null) { if (replaceRowIndex != -1) { @@ -672,8 +717,7 @@ class MainViewModel @Inject constructor( } else { log.info("observing transaction: refreshTransaction adding item to a txwrapper; {} ms", watch.elapsed(TimeUnit.MILLISECONDS)) } - transactionViews = updatedList - _transactions.postValue(updatedList) + refreshNewTx++ } } private fun updateSyncStatus(state: BlockchainState) { @@ -689,8 +733,12 @@ class MainViewModel @Inject constructor( if (state.replaying) { // remove all tx - transactionViews = listOf() + transactionViews = mutableListOf() _transactions.postValue(transactionViews) + //val filter = TxDirectionFilter(_transactionsDirection.value, walletData.wallet!!) + //viewModelScope.launch { + // refreshTransactions(filter, mapOf()) + //} } }