Skip to content

Commit

Permalink
fix: batch updates
Browse files Browse the repository at this point in the history
  • Loading branch information
HashEngineering committed Dec 23, 2024
1 parent 5065024 commit 2460673
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 16 deletions.
29 changes: 29 additions & 0 deletions common/src/main/java/org/dash/wallet/common/util/FlowExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlin.time.Duration

Expand Down Expand Up @@ -41,3 +42,31 @@ fun <T> Flow<T>.observe(lifecycleOwner: LifecycleOwner, collector: FlowCollector
}
}
}

/** from ChatGPT */
fun <T> Flow<T>.window(intervalMillis: Long): Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
val lock = Any()
var lastEmitTime = System.currentTimeMillis()

this@window.collect { value ->
var batchToEmit: List<T>? = null

synchronized(lock) {
buffer.add(value)
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmitTime >= intervalMillis) {
batchToEmit = buffer.toList()
buffer.clear()
lastEmitTime = currentTime
}
}

batchToEmit?.let { emit(it) }
}

val finalBatch = synchronized(lock) {
if (buffer.isNotEmpty()) buffer.toList() else null
}
finalBatch?.let { emit(it) }
}
80 changes: 64 additions & 16 deletions wallet/src/de/schildbach/wallet/ui/main/MainViewModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ import org.dash.wallet.common.transactions.TransactionUtils.isEntirelySelf
import org.dash.wallet.common.transactions.TransactionWrapper
import org.dash.wallet.common.transactions.TransactionWrapperComparator
import org.dash.wallet.common.util.toBigDecimal
import org.dash.wallet.common.util.window
import org.dash.wallet.integrations.crowdnode.api.CrowdNodeApi
import org.dash.wallet.integrations.crowdnode.transactions.FullCrowdNodeSignUpTxSetFactory
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -203,7 +204,7 @@ class MainViewModel @Inject constructor(
val balance: LiveData<Coin>
get() = _balance

private var transactionViews: List<TransactionRowView> = listOf()
private var transactionViews: MutableList<TransactionRowView> = mutableListOf()
private lateinit var crowdNodeWrapperFactory: FullCrowdNodeSignUpTxSetFactory
private lateinit var coinJoinWrapperFactory: CoinJoinTxWrapperFactory
private val _mostRecentTransaction = MutableLiveData<Transaction>()
Expand Down Expand Up @@ -284,7 +285,10 @@ class MainViewModel @Inject constructor(
private var contactRequestTimer: AnalyticsTimer? = null

// end DashPay

val refreshTxWatch: Stopwatch = Stopwatch.createUnstarted()
var refreshLastUpdate = 0L
private var refreshNewTx = 0
private var refreshStatusTx = 0
init {
transactionsDirection = savedStateHandle[DIRECTION_KEY] ?: TxFilterType.ALL

Expand All @@ -295,15 +299,37 @@ class MainViewModel @Inject constructor(
val filter = TxDirectionFilter(direction, walletData.wallet!!)
refreshTransactions(filter, metadata)
walletData.observeTransactions(true, filter)
.window(500) // batch every 500 ms
.onEach {
log.info("observing transaction: {}", it.txId)
refreshTransaction(it, filter, metadata)
if (!refreshTxWatch.isRunning) { refreshTxWatch.start() }
val timeElapsed = refreshTxWatch.elapsed(TimeUnit.MILLISECONDS)
if (timeElapsed - refreshLastUpdate >= 1000) {
val interval = timeElapsed - refreshLastUpdate
log.info(
"observing transactions: {} txes, {} new tx/s, {} status tx/s",
it.size,
1000 * refreshNewTx / interval,
1000 * refreshStatusTx / interval
)
refreshNewTx = 0
refreshStatusTx = 0
refreshLastUpdate = timeElapsed
}
refreshTransactions(it, filter, metadata)
}
}
}
.catch { analytics.logError(it, "is wallet null: ${walletData.wallet == null}") }
.launchIn(viewModelWorkerScope)

walletData.observeWalletReset()
.filterNotNull()
.onEach {
val filter = TxDirectionFilter(transactionsDirection, walletData.wallet!!)
refreshTransactions(filter, mapOf())
}
.launchIn(viewModelWorkerScope)

blockchainStateProvider.observeState()
.filterNotNull()
.onEach { state ->
Expand Down Expand Up @@ -545,16 +571,38 @@ class MainViewModel @Inject constructor(
)
}

transactionViews = allTransactionViews
transactionViews = allTransactionViews.toMutableList()
log.info("refreshTransactions: {} ms", watch.elapsed(TimeUnit.MILLISECONDS))
_transactions.postValue(allTransactionViews)
_transactions.postValue(transactionViews)
}
}

/** update a batch of transactions */
private suspend fun refreshTransactions(
txList: List<Transaction>,
filter: TxDirectionFilter,
metadata: Map<Sha256Hash, PresentableTxMetadata>
) {
var updateCount = 0
log.info("refreshTransactions({}, but only {} unique tx)", txList.size, txList.toSet().size)
txList.toSet().forEach { tx ->
if (filter.matches(tx)) {
refreshTransaction(tx, transactionViews, filter, metadata)
updateCount++
}
}
if (updateCount > 0) {
_transactions.postValue(transactionViews.toList())
}
}

/**
* this will either add a single tranasction or update the transaction on the current view
* this will either add a single transaction or update the transaction on the current view
*/
private suspend fun refreshTransaction(tx: Transaction, filter: TxDirectionFilter, metadata: Map<Sha256Hash, PresentableTxMetadata>) {
private suspend fun refreshTransaction(
tx: Transaction,
updatedList: MutableList<TransactionRowView>,
filter: TxDirectionFilter, metadata: Map<Sha256Hash, PresentableTxMetadata>) {
Context.propagate(Constants.CONTEXT)
val watch = Stopwatch.createStarted()
// is the item currently in our list
Expand All @@ -570,6 +618,7 @@ class MainViewModel @Inject constructor(

if (txRowViewIndex != -1) {
val currentTxRowView = transactionViews[txRowViewIndex]
refreshStatusTx++
if (currentTxRowView.txWrapper == null) {
log.info("observing transaction refresh: update {}", tx.txId)
// update the current item by replacing the current item
Expand All @@ -585,11 +634,7 @@ class MainViewModel @Inject constructor(
tx.txId,
watch.elapsed(TimeUnit.MILLISECONDS)
)
// some how tell the UI to update this item
val updatedList = transactionViews.toMutableList()
updatedList[txRowViewIndex] = newTransactionRow
transactionViews = updatedList
_transactions.postValue(transactionViews)
} else {
// do nothing for updated transactions inside a wrapper
// we presume that value, timestamp and title and count do to change with updates
Expand Down Expand Up @@ -647,7 +692,7 @@ class MainViewModel @Inject constructor(
contact
)
}
val updatedList = transactionViews.toMutableList() // make a copy

// is there a new row to add?
if (newTransactionRow != null) {
if (replaceRowIndex != -1) {
Expand All @@ -672,8 +717,7 @@ class MainViewModel @Inject constructor(
} else {
log.info("observing transaction: refreshTransaction adding item to a txwrapper; {} ms", watch.elapsed(TimeUnit.MILLISECONDS))
}
transactionViews = updatedList
_transactions.postValue(updatedList)
refreshNewTx++
}
}
private fun updateSyncStatus(state: BlockchainState) {
Expand All @@ -689,8 +733,12 @@ class MainViewModel @Inject constructor(

if (state.replaying) {
// remove all tx
transactionViews = listOf()
transactionViews = mutableListOf()
_transactions.postValue(transactionViews)
//val filter = TxDirectionFilter(_transactionsDirection.value, walletData.wallet!!)
//viewModelScope.launch {
// refreshTransactions(filter, mapOf())
//}
}
}

Expand Down

0 comments on commit 2460673

Please sign in to comment.