Skip to content

Commit

Permalink
Integrate new Seen Tx Checkpoint API
Browse files Browse the repository at this point in the history
  • Loading branch information
samholmes committed Jan 3, 2025
1 parent 13d9ede commit e089ee1
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- added: Implement `updateInfoPayload` for `EdgeCurrencyEngine` to get currency info updates from info-server.
- changed: Implement new Seen Tx Checkpoint API for all currencies.
- fixed: Fixed cleaner failure for getInfo Blockbook request.

## 3.4.5 (2024-12-12)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"base-x": "^4.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"edge-core-js": "^2.22.1",
"edge-core-js": "../edge-core-js",
"esbuild-loader": "^4.1.0",
"eslint": "^7.14.0",
"eslint-config-standard-kit": "0.15.1",
Expand Down
38 changes: 31 additions & 7 deletions src/common/plugin/EngineEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
EdgeCurrencyEngineCallbacks,
EdgeTransaction,
EdgeTransactionEvent,
EdgeTxidMap
} from 'edge-core-js/types'
import { EventEmitter } from 'events'
Expand All @@ -9,9 +10,17 @@ import { SubscribeAddressResponse } from '../utxobased/network/blockbookApi'

export declare interface EngineEmitter {
emit: ((
event: EngineEvent.TRANSACTIONS_CHANGED,
transactions: EdgeTransaction[]
event: EngineEvent.SEEN_TX_CHECKPOINT,
checkpoint: string
) => boolean) &
((
event: EngineEvent.TRANSACTIONS,
transactionEvents: EdgeTransactionEvent[]
) => boolean) &
((
event: EngineEvent.TRANSACTIONS_CHANGED,
transactions: EdgeTransaction[]
) => boolean) &
((
event: EngineEvent.ADDRESS_BALANCE_CHANGED,
currencyCode: string,
Expand All @@ -36,9 +45,19 @@ export declare interface EngineEmitter {
((event: EngineEvent.TXIDS_CHANGED, txids: EdgeTxidMap) => boolean)

on: ((
event: EngineEvent.TRANSACTIONS_CHANGED,
listener: (transactions: EdgeTransaction[]) => Promise<void> | void
event: EngineEvent.SEEN_TX_CHECKPOINT,
listener: (checkpoint: string) => Promise<void> | void
) => this) &
((
event: EngineEvent.TRANSACTIONS,
listener: (
transactionEvents: EdgeTransactionEvent[]
) => Promise<void> | void
) => boolean) &
((
event: EngineEvent.TRANSACTIONS_CHANGED,
listener: (transactions: EdgeTransaction[]) => Promise<void> | void
) => this) &
((
event: EngineEvent.ADDRESS_BALANCE_CHANGED,
listener: (
Expand Down Expand Up @@ -76,6 +95,9 @@ export declare interface EngineEmitter {
export class EngineEmitter extends EventEmitter {}

export enum EngineEvent {
SEEN_TX_CHECKPOINT = 'seen:tx:checkpoint',
TRANSACTIONS = 'transactions',
/** @deprecated Use TRANSACTIONS */
TRANSACTIONS_CHANGED = 'transactions:changed',
WALLET_BALANCE_CHANGED = 'wallet:balance:changed',
ADDRESS_BALANCE_CHANGED = 'address:balance:changed',
Expand All @@ -93,16 +115,18 @@ export const makeEngineEmitter = (
): EngineEmitter => {
const emitter = new EngineEmitter()

emitter.on(EngineEvent.TRANSACTIONS_CHANGED, callbacks.onTransactionsChanged)
emitter.on(EngineEvent.WALLET_BALANCE_CHANGED, callbacks.onBalanceChanged)
emitter.on(EngineEvent.ADDRESSES_CHECKED, callbacks.onAddressesChecked)
emitter.on(
EngineEvent.BLOCK_HEIGHT_CHANGED,
(_uri: string, height: number) => {
callbacks.onBlockHeightChanged(height)
}
)
emitter.on(EngineEvent.ADDRESSES_CHECKED, callbacks.onAddressesChecked)
emitter.on(EngineEvent.SEEN_TX_CHECKPOINT, callbacks.onSeenTxCheckpoint)
emitter.on(EngineEvent.TRANSACTIONS, callbacks.onTransactions)
emitter.on(EngineEvent.TRANSACTIONS_CHANGED, callbacks.onTransactionsChanged)
emitter.on(EngineEvent.TXIDS_CHANGED, callbacks.onTxidsChanged)
emitter.on(EngineEvent.WALLET_BALANCE_CHANGED, callbacks.onBalanceChanged)

return emitter
}
27 changes: 22 additions & 5 deletions src/common/utxobased/engine/UtxoEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,24 @@ export async function makeUtxoEngine(
// private keys.
let nonceDataLayer: DataLayer | undefined

// This cached value allows the engine to resync using the same checkpoint as
// what the core gave the plugin when the core started plugin.
const seenTxCheckpoint = ((state?: string) => (
value?: string
): string | undefined => {
if (value != null) state = value
return state
})()

emitter.on(EngineEvent.SEEN_TX_CHECKPOINT, checkpoint => {
seenTxCheckpoint(checkpoint)
})

const engineProcessor = makeUtxoEngineProcessor({
...config,
dataLayer,
pluginState,
seenTxCheckpoint,
walletTools,
walletInfo
})
Expand Down Expand Up @@ -350,7 +364,9 @@ export async function makeUtxoEngine(
}
},

async startEngine(): Promise<void> {
async startEngine(opts): Promise<void> {
seenTxCheckpoint(opts?.seenTxCheckpoint)

emitter.emit(
EngineEvent.WALLET_BALANCE_CHANGED,
currencyInfo.currencyCode,
Expand Down Expand Up @@ -718,7 +734,7 @@ export async function makeUtxoEngine(
await pluginState.refreshServers()

// Restart the engine
await engine.startEngine()
await engine.startEngine({ seenTxCheckpoint: seenTxCheckpoint() })
},

async saveTx(edgeTx: EdgeTransaction): Promise<void> {
Expand All @@ -741,7 +757,9 @@ export async function makeUtxoEngine(
walletId: walletInfo.id,
walletTools
})
emitter.emit(EngineEvent.TRANSACTIONS_CHANGED, [rbfEdgeTx])
emitter.emit(EngineEvent.TRANSACTIONS, [
{ isNew: false, transaction: rbfEdgeTx }
])
}
}

Expand All @@ -751,8 +769,6 @@ export async function makeUtxoEngine(
scriptPubkeys: edgeTx.otherParams?.ourScriptPubkeys
})

emitter.emit(EngineEvent.TRANSACTIONS_CHANGED, [edgeTx])

/*
Get the wallet's UTXOs from the new transaction and save them to the processsor.
*/
Expand Down Expand Up @@ -988,6 +1004,7 @@ export async function makeUtxoEngine(
gapLimit: 0
}
},
seenTxCheckpoint: () => '0',
walletTools: tmpWalletTools,
walletInfo: tmpWalletInfo
})
Expand Down
73 changes: 66 additions & 7 deletions src/common/utxobased/engine/UtxoEngineProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export interface UtxoEngineProcessor {

export interface UtxoEngineProcessorConfig extends EngineConfig {
dataLayer: DataLayer
seenTxCheckpoint: (value?: string) => string | undefined
walletTools: UtxoWalletTools
walletInfo: SafeWalletInfo
}
Expand All @@ -96,6 +97,7 @@ export function makeUtxoEngineProcessor(
engineOptions,
pluginState,
pluginInfo,
seenTxCheckpoint,
walletInfo,
walletTools
} = config
Expand Down Expand Up @@ -170,6 +172,27 @@ export function makeUtxoEngineProcessor(
}
}

const updateSeenTxCheckpoint = (): void => {
// Only update the seenTxCheckpoint if the wallet is fully synced.
// This ensure that initial syncs without a defined seenTxCheckpoint,
// will not incorrectly update the seenTxCheckpoint in the middle of an
// initial sync.
if (processedPercent < 1) return

const seenTxCheckpoint = common.seenTxCheckpoint()
const seenTxBlockHeight =
seenTxCheckpoint != null ? parseInt(seenTxCheckpoint) : undefined

// Update the seenTxCheckpoint
if (
seenTxBlockHeight == null ||
common.maxSeenTxBlockHeight > seenTxBlockHeight
) {
const newSeenTxCheckpoint = common.maxSeenTxBlockHeight.toString()
common.emitter.emit(EngineEvent.SEEN_TX_CHECKPOINT, newSeenTxCheckpoint)
}
}

const lock = new AwaitLock()

const serverStates = makeServerStates({
Expand All @@ -189,8 +212,11 @@ export function makeUtxoEngineProcessor(
emitter,
taskCache,
updateProgressRatio,
updateSeenTxCheckpoint,
io,
log,
maxSeenTxBlockHeight: 0,
seenTxCheckpoint,
serverStates,
walletFormats,
lock
Expand Down Expand Up @@ -603,8 +629,11 @@ interface CommonParams {
emitter: EngineEmitter
taskCache: TaskCache
updateProgressRatio: () => void
updateSeenTxCheckpoint: () => void
io: EdgeIo
log: EdgeLog
maxSeenTxBlockHeight: number
seenTxCheckpoint: (value?: string) => string | undefined
serverStates: ServerStates
walletFormats: CurrencyFormat[]
lock: AwaitLock
Expand All @@ -617,7 +646,7 @@ interface TaskCache {
readonly blockbookUtxoCache: BlockbookUtxoCache
readonly dataLayerUtxoCache: DataLayerUtxoCache
readonly transactionSpecificUpdateCache: TransactionSpecificUpdateCache
readonly transactionUpdateCache: TransactionUpdateCache
readonly transactionUpdateCache: transactionUpdateCache
}

interface AddressForTransactionsCache {
Expand Down Expand Up @@ -654,7 +683,7 @@ interface BlockbookUtxoCache {
interface TransactionSpecificUpdateCache {
[key: string]: { processing: boolean }
}
interface TransactionUpdateCache {
interface transactionUpdateCache {
[key: string]: { processing: boolean }
}

Expand Down Expand Up @@ -960,7 +989,7 @@ export async function* pickNextTask(
hasProcessedAtLeastOnce = true
cacheItem.processing = true
removeItem(transactionUpdateCache, txId)
yield* processTransactionUpdate(common, {
yield* processCheckTransactionConfirmation(common, {
serverState,
serverUri,
txId
Expand Down Expand Up @@ -1038,7 +1067,9 @@ async function* processTransactionsSpecificUpdate(
walletId: common.walletInfo.id,
walletTools: common.walletTools
})
common.emitter.emit(EngineEvent.TRANSACTIONS_CHANGED, [edgeTx])
common.emitter.emit(EngineEvent.TRANSACTIONS, [
{ isNew: false, transaction: edgeTx }
])

// Add the txid to the server cache
serverState.txids.add(txId)
Expand All @@ -1060,7 +1091,7 @@ async function* processTransactionsSpecificUpdate(
* It updates the transaction and all of the transaction's UTXO with the
* blockHeight received from the network.
*/
async function* processTransactionUpdate(
async function* processCheckTransactionConfirmation(
common: CommonParams,
args: {
serverState: ServerState
Expand Down Expand Up @@ -1105,7 +1136,9 @@ async function* processTransactionUpdate(
walletId: common.walletInfo.id,
walletTools: common.walletTools
})
common.emitter.emit(EngineEvent.TRANSACTIONS_CHANGED, [edgeTx])
common.emitter.emit(EngineEvent.TRANSACTIONS, [
{ isNew: false, transaction: edgeTx }
])

if (needsTxSpecific(common)) {
// Add task to grab transactionSpecific payload
Expand Down Expand Up @@ -1174,8 +1207,16 @@ async function* processAddressForTransactions(
addressData.used = true
}

const seenTxCheckpoint = common.seenTxCheckpoint()
const seenTxBlockHeight =
seenTxCheckpoint != null ? parseInt(seenTxCheckpoint) : undefined

// Process and save the address's transactions
for (const txResponse of transactions) {
const [existingTx] = await common.dataLayer.fetchTransactions({
txId: txResponse.txid
})

const tx = processTransactionResponse(common, { txResponse })
const processedTx = await common.dataLayer.saveTransaction({
tx,
Expand All @@ -1188,7 +1229,22 @@ async function* processAddressForTransactions(
walletId: common.walletInfo.id,
walletTools: common.walletTools
})
common.emitter.emit(EngineEvent.TRANSACTIONS_CHANGED, [edgeTx])

// Keep track of transactions which are determined to be unseen:
const isNew =
seenTxBlockHeight != null &&
// Unseen in the DataLayer
existingTx == null &&
// The tx unconfirmed or confirmed after/at the last seenTxCheckpoint
(tx.blockHeight === 0 || tx.blockHeight > seenTxBlockHeight)

common.emitter.emit(EngineEvent.TRANSACTIONS, [
{ isNew, transaction: edgeTx }
])

if (edgeTx.blockHeight > common.maxSeenTxBlockHeight) {
common.maxSeenTxBlockHeight = edgeTx.blockHeight
}

if (needsTxSpecific(common)) {
// Add task to grab transactionSpecific payload
Expand All @@ -1198,6 +1254,9 @@ async function* processAddressForTransactions(
}
}

// Make sure to update the seenTxCheckpoint after processing the transactions
common.updateSeenTxCheckpoint()

// Halt on finishing the processing of address transaction until
// we have progressed through all of the blockbook pages
if (page < totalPages) {
Expand Down
7 changes: 6 additions & 1 deletion test/common/utxobased/engine/engine.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ describe('engine.spec', function () {
emitter.emit('onBlockHeightChange', height)
},
onNewTokens() {},
onSeenTxCheckpoint() {},
onStakingStatusChanged() {},
onTokenBalanceChanged() {},
onTransactions(transactionEvents) {
fakeLogger.info('onTransactions:', transactionEvents)
emitter.emit('onTransactions', transactionEvents)
},
onTransactionsChanged(transactionList) {
fakeLogger.info('onTransactionsChanged:', transactionList)
emitter.emit('onTransactionsChanged', transactionList)
Expand Down Expand Up @@ -335,7 +340,7 @@ describe('engine.spec', function () {
done() // Can be "done" since the promise resolves before the event fires but just be on the safe side
}
})
engine.startEngine().catch(e => {
engine.startEngine({ seenTxCheckpoint: '' }).catch(e => {
fakeLogger.info('startEngine error', e, e.message)
})
}
Expand Down
4 changes: 1 addition & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3863,10 +3863,8 @@ ecc-jsbn@~0.1.1:
typeforce "^1.18.0"
wif "^2.0.6"

edge-core-js@^2.22.1:
edge-core-js@../edge-core-js:
version "2.22.1"
resolved "https://registry.yarnpkg.com/edge-core-js/-/edge-core-js-2.22.1.tgz#325f7ef2826a23acce10a8c93f89ea2808c1ace5"
integrity sha512-fxCbkLKwWiGYyB72T1VKEuekXqCm+kCpX6ZRWyNVqkNHAlhER01jCvVcdpFX4DBgoWU2au1K4cMMqvPeU70+cQ==
dependencies:
aes-js "^3.1.0"
base-x "^4.0.0"
Expand Down

0 comments on commit e089ee1

Please sign in to comment.