diff --git a/packages/neuron-wallet/src/block-sync-renderer/sync/connector.ts b/packages/neuron-wallet/src/block-sync-renderer/sync/connector.ts index 0f1a7ab08f..fd19a734f2 100644 --- a/packages/neuron-wallet/src/block-sync-renderer/sync/connector.ts +++ b/packages/neuron-wallet/src/block-sync-renderer/sync/connector.ts @@ -1,5 +1,13 @@ -import { SyncAddressType } from '../../database/chain/entities/sync-progress' import { Subject } from 'rxjs' +import { queue, QueueObject } from 'async' +import { Indexer as CkbIndexer, CellCollector } from '@ckb-lumos/ckb-indexer' +import { QueryOptions } from '@ckb-lumos/base' +import AddressMeta from '../../database/address/meta' +import { Address } from '../../models/address' +import { SyncAddressType } from '../../database/chain/entities/sync-progress' +import IndexerCacheService from './indexer-cache-service' +import logger from '../../utils/logger' +import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache' export interface BlockTips { cacheTipNumber: number @@ -41,15 +49,146 @@ export interface AppendScript { scriptType: CKBRPC.ScriptType } -export abstract class Connector { - abstract blockTipsSubject: Subject - abstract transactionsSubject: Subject<{ txHashes: CKBComponents.Hash[]; params: TransactionsSubjectParam }> +export abstract class Connector { + public readonly blockTipsSubject: Subject = new Subject() + public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: string }>() + protected indexer: CkbIndexer + protected processNextBlockNumberQueue: QueueObject + protected processingBlockNumber?: string + protected addressesByWalletId: Map = new Map() + protected pollingIndexer: boolean = false + private indexerQueryQueue: QueueObject | undefined abstract connect(): Promise - abstract notifyCurrentBlockNumberProcessed(param: TransactionsSubjectParam): void - abstract stop(): void - abstract getLiveCellsByScript(query: LumosCellQuery): Promise + abstract processTxsInNextBlockNumber(): Promise + protected abstract upsertTxHashes(): Promise + public abstract notifyCurrentBlockNumberProcessed(blockNumber: string): Promise async appendScript(_scripts: AppendScript[]) { // do nothing } + + constructor({ addresses, nodeUrl, indexerUrl }: { addresses: Address[]; nodeUrl: string; indexerUrl: string }) { + this.indexer = new CkbIndexer(nodeUrl, indexerUrl) + this.addressesByWalletId = addresses + .map(address => AddressMeta.fromObject(address)) + .reduce((addressesByWalletId, addressMeta) => { + if (!addressesByWalletId.has(addressMeta.walletId)) { + addressesByWalletId.set(addressMeta.walletId, []) + } + + const addressMetas = addressesByWalletId.get(addressMeta.walletId) + addressMetas!.push(addressMeta) + + return addressesByWalletId + }, new Map()) + + this.processNextBlockNumberQueue = queue(async () => this.processTxsInNextBlockNumber(), 1) + this.processNextBlockNumberQueue.error((err: any) => { + logger.error(`Connector: \tError in processing next block number queue: ${err}`) + }) + + this.indexerQueryQueue = queue(async (query: any) => { + return await this.collectLiveCellsByScript(query) + }) + } + + public stop(): void { + this.pollingIndexer = false + } + + protected async processNextBlockNumber() { + // the processNextBlockNumberQueue is a queue to ensure that ONLY one + // block processing task runs at a time to avoid the data conflict while syncing + this.processNextBlockNumberQueue?.push() + await this.processNextBlockNumberQueue?.drain() + } + + protected async getTxHashesWithNextUnprocessedBlockNumber(): Promise<[string | undefined, string[]]> { + const txHashCachesByNextBlockNumberAndAddress = await Promise.all( + [...this.addressesByWalletId.keys()].map(async walletId => + IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId) + ) + ) + const groupedTxHashCaches = txHashCachesByNextBlockNumberAndAddress.flat().reduce((grouped, txHashCache) => { + if (!grouped.get(txHashCache.blockNumber.toString())) { + grouped.set(txHashCache.blockNumber.toString(), []) + } + grouped.get(txHashCache.blockNumber.toString())!.push(txHashCache) + + return grouped + }, new Map>()) + + const nextUnprocessedBlockNumber = [...groupedTxHashCaches.keys()].sort((a, b) => parseInt(a) - parseInt(b)).shift() + + if (!nextUnprocessedBlockNumber) { + return [undefined, []] + } + + const txHashCachesInNextUnprocessedBlockNumber = groupedTxHashCaches.get(nextUnprocessedBlockNumber) + + return [nextUnprocessedBlockNumber, txHashCachesInNextUnprocessedBlockNumber!.map(({ txHash }) => txHash)] + } + + protected async notifyAndSyncNext(indexerTipNumber: number) { + const nextUnprocessedBlockNumber = await IndexerCacheService.nextUnprocessedBlock([ + ...this.addressesByWalletId.keys(), + ]) + if (nextUnprocessedBlockNumber) { + this.blockTipsSubject.next({ + cacheTipNumber: parseInt(nextUnprocessedBlockNumber), + indexerTipNumber, + }) + if (!this.processingBlockNumber) { + await this.processNextBlockNumber() + } + return true + } + this.blockTipsSubject.next({ + cacheTipNumber: indexerTipNumber, + indexerTipNumber, + }) + return false + } + + public async getLiveCellsByScript(query: LumosCellQuery) { + return new Promise((resolve, reject) => { + this.indexerQueryQueue!.push(query, (err: any, result: unknown) => { + if (err) { + return reject(err) + } + resolve(result) + }) + }) + } + + private async collectLiveCellsByScript(query: LumosCellQuery) { + const { lock, type, data } = query + if (!lock && !type) { + throw new Error('at least one parameter is required') + } + + const queries: QueryOptions = { + ...(lock ? { lock } : {}), + ...(type ? { type } : {}), + data: data || 'any', + } + + const collector = new CellCollector(this.indexer, queries) + + const result = [] + for await (const cell of collector.collect()) { + //somehow the lumos indexer returns an invalid hash type "lock" for hash type "data" + //for now we have to fix it here + const cellOutput = cell.cellOutput + // FIXME + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error + if (cellOutput.type?.hashType === 'lock') { + console.error('Unexpected hash type "lock" found with the query', JSON.stringify(queries)) + cellOutput.type.hashType = 'data' + } + result.push(cell) + } + return result + } } diff --git a/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-cache-service.ts b/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-cache-service.ts index 0bd628201f..4775f70a67 100644 --- a/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-cache-service.ts +++ b/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-cache-service.ts @@ -1,4 +1,4 @@ -import { getConnection } from 'typeorm' +import { In, getConnection } from 'typeorm' import { queue } from 'async' import AddressMeta from '../../database/address/meta' import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache' @@ -27,19 +27,17 @@ export default class IndexerCacheService { this.indexer = indexer } - private async getTxHashes(): Promise { + private static async getTxHashes(walletIds: string[]): Promise { return getConnection() .getRepository(IndexerTxHashCache) .createQueryBuilder() .where({ - walletId: this.walletId, + walletId: In(walletIds), }) .getMany() } - public static async nextUnprocessedBlock( - walletIds: string[] - ): Promise<{ blockNumber: string; blockHash: string } | undefined> { + public static async nextUnprocessedBlock(walletIds: string[]): Promise { const result = await getConnection() .getRepository(IndexerTxHashCache) .createQueryBuilder() @@ -51,10 +49,7 @@ export default class IndexerCacheService { return } - return { - blockNumber: result.blockNumber.toString(), - blockHash: result.blockHash, - } + return result.blockNumber.toString() } public static async updateCacheProcessed(txHash: string) { @@ -183,7 +178,7 @@ export default class IndexerCacheService { await this.saveCacheBlockNumber(tipBlockNumber) return [] } - const txMetasCaches = await this.getTxHashes() + const txMetasCaches = await IndexerCacheService.getTxHashes([this.walletId]) const cachedTxHashes = txMetasCaches.map(meta => meta.txHash.toString()) const cachedTxHashesSet = new Set(cachedTxHashes) @@ -218,7 +213,7 @@ export default class IndexerCacheService { const indexerCaches: IndexerTxHashCache[] = [] for (const txWithStatus of txsWithStatus) { - const { transaction, txStatus } = txWithStatus + const { transaction } = txWithStatus const mappings = mappingsByTxHash.get(transaction.hash!) if (!mappings) { continue @@ -229,8 +224,6 @@ export default class IndexerCacheService { IndexerTxHashCache.fromObject({ txHash: transaction.hash!, blockNumber: parseInt(transaction.blockNumber!), - blockHash: txStatus.blockHash!, - blockTimestamp: transaction.timestamp!, lockHash, address, walletId: this.walletId, @@ -245,6 +238,44 @@ export default class IndexerCacheService { return newTxHashes } + public static async upsertIndexerCache( + txs: { + txHash: string + txIndex: string + blockNumber: string + lockHash: string + address: string + walletId: string + }[] + ): Promise { + if (!txs.length) { + return [] + } + const walletIds = txs.map(v => v.walletId) + const txMetasCaches = await IndexerCacheService.getTxHashes(walletIds) + const cachedTxHashes = txMetasCaches.map(meta => meta.txHash.toString()) + + const cachedTxHashesSet = new Set(cachedTxHashes) + + const newTxHashes = txs.filter(({ txHash }) => !cachedTxHashesSet.has(txHash)) + + if (!newTxHashes.length) { + return [] + } + const indexerCaches: IndexerTxHashCache[] = newTxHashes.map(v => + IndexerTxHashCache.fromObject({ + txHash: v.txHash, + blockNumber: parseInt(v.blockNumber!), + lockHash: v.lockHash, + address: v.address, + walletId: v.walletId, + }) + ) + indexerCaches.sort((a, b) => a.blockNumber - b.blockNumber) + await getConnection().manager.save(indexerCaches, { chunk: 100 }) + return newTxHashes.map(v => v.txHash) + } + public async updateProcessedTxHashes(blockNumber: string) { await getConnection() .createQueryBuilder() @@ -259,13 +290,13 @@ export default class IndexerCacheService { .execute() } - public async nextUnprocessedTxsGroupedByBlockNumber(): Promise { + public static async nextUnprocessedTxsGroupedByBlockNumber(walletId: string): Promise { const cache = await getConnection() .getRepository(IndexerTxHashCache) .createQueryBuilder() .where({ isProcessed: false, - walletId: this.walletId, + walletId, }) .orderBy('blockNumber', 'ASC') .getOne() @@ -281,7 +312,7 @@ export default class IndexerCacheService { .where({ blockNumber, isProcessed: false, - walletId: this.walletId, + walletId, }) .getMany() } diff --git a/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-connector.ts b/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-connector.ts index 1233c9ecd3..cec772ad50 100644 --- a/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-connector.ts +++ b/packages/neuron-wallet/src/block-sync-renderer/sync/indexer-connector.ts @@ -1,55 +1,18 @@ -import { Subject } from 'rxjs' -import { queue, QueueObject } from 'async' -import { Tip, QueryOptions } from '@ckb-lumos/base' -import { Indexer as CkbIndexer, CellCollector } from '@ckb-lumos/ckb-indexer' +import { Tip } from '@ckb-lumos/base' import logger from '../../utils/logger' import CommonUtils from '../../utils/common' import RpcService from '../../services/rpc-service' import { Address } from '../../models/address' -import AddressMeta from '../../database/address/meta' -import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache' -import IndexerCacheService from './indexer-cache-service' -import { BlockTips, LumosCellQuery, Connector } from './connector' +import { Connector } from './connector' import { NetworkType } from '../../models/network' +import IndexerCacheService from './indexer-cache-service' -export default class IndexerConnector extends Connector { - private indexer: CkbIndexer +export default class IndexerConnector extends Connector { private rpcService: RpcService - private addressesByWalletId: Map - private processNextBlockNumberQueue: QueueObject | undefined - private indexerQueryQueue: QueueObject | undefined - - private processingBlockNumber: string | undefined - private pollingIndexer: boolean = false - public readonly blockTipsSubject: Subject = new Subject() - public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: string | undefined }>() constructor(addresses: Address[], nodeUrl: string, indexerUrl: string, nodeType: NetworkType) { - super() - this.indexer = new CkbIndexer(nodeUrl, indexerUrl) + super({ addresses, nodeUrl, indexerUrl }) this.rpcService = new RpcService(nodeUrl, nodeType) - - this.addressesByWalletId = addresses - .map(address => AddressMeta.fromObject(address)) - .reduce((addressesByWalletId, addressMeta) => { - if (!addressesByWalletId.has(addressMeta.walletId)) { - addressesByWalletId.set(addressMeta.walletId, []) - } - - const addressMetas = addressesByWalletId.get(addressMeta.walletId) - addressMetas!.push(addressMeta) - - return addressesByWalletId - }, new Map()) - - this.processNextBlockNumberQueue = queue(async () => this.processTxsInNextBlockNumber(), 1) - this.processNextBlockNumberQueue.error((err: any) => { - logger.error(`Error in processing next block number queue: ${err}`) - }) - - this.indexerQueryQueue = queue(async (query: any) => { - return await this.collectLiveCellsByScript(query) - }) } private async synchronize(indexerTipBlock: Tip) { @@ -58,24 +21,8 @@ export default class IndexerConnector extends Connector { } await this.upsertTxHashes() - const indexerTipNumber = parseInt(indexerTipBlock.blockNumber, 16) - - const nextUnprocessedBlockTip = await IndexerCacheService.nextUnprocessedBlock([...this.addressesByWalletId.keys()]) - if (nextUnprocessedBlockTip) { - this.blockTipsSubject.next({ - cacheTipNumber: parseInt(nextUnprocessedBlockTip.blockNumber), - indexerTipNumber, - }) - if (!this.processingBlockNumber) { - await this.processNextBlockNumber() - } - } else { - this.blockTipsSubject.next({ - cacheTipNumber: indexerTipNumber, - indexerTipNumber, - }) - } + await this.notifyAndSyncNext(indexerTipNumber) } private async initSync() { @@ -99,84 +46,15 @@ export default class IndexerConnector extends Connector { } } - public async getLiveCellsByScript(query: LumosCellQuery) { - return new Promise((resolve, reject) => { - this.indexerQueryQueue!.push(query, (err: any, result: unknown) => { - if (err) { - return reject(err) - } - resolve(result) - }) - }) - } - - private async collectLiveCellsByScript(query: LumosCellQuery) { - const { lock, type, data } = query - if (!lock && !type) { - throw new Error('at least one parameter is required') - } - - const queries: QueryOptions = {} - if (lock) { - queries.lock = lock - } - if (type) { - queries.type = type - } - queries.data = data || 'any' - - const collector = new CellCollector(this.indexer, queries) - - const result = [] - for await (const cell of collector.collect()) { - //somehow the lumos indexer returns an invalid hash type "lock" for hash type "data" - //for now we have to fix it here - const cellOutput = cell.cellOutput - // FIXME - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - if (cellOutput.type?.hashType === 'lock') { - console.error('Unexpected hash type "lock" found with the query', JSON.stringify(queries)) - cellOutput.type.hashType = 'data' - } - result.push(cell) - } - return result - } - - private async getTxHashesWithNextUnprocessedBlockNumber(): Promise<[string | undefined, string[]]> { - const txHashCachesByNextBlockNumberAndAddress = await Promise.all( - [...this.addressesByWalletId.entries()].map(async ([walletId, addressMetas]) => { - const indexerCacheService = new IndexerCacheService(walletId, addressMetas, this.rpcService, this.indexer) - return indexerCacheService.nextUnprocessedTxsGroupedByBlockNumber() - }) - ) - const groupedTxHashCaches = txHashCachesByNextBlockNumberAndAddress - .flat() - .sort((a, b) => { - return parseInt(a.blockTimestamp) - parseInt(b.blockTimestamp) - }) - .reduce((grouped, txHashCache) => { - if (!grouped.get(txHashCache.blockNumber.toString())) { - grouped.set(txHashCache.blockNumber.toString(), []) - } - grouped.get(txHashCache.blockNumber.toString())!.push(txHashCache) - - return grouped - }, new Map>()) - - const nextUnprocessedBlockNumber = [...groupedTxHashCaches.keys()].sort((a, b) => parseInt(a) - parseInt(b)).shift() - - if (!nextUnprocessedBlockNumber) { - return [undefined, []] + async processTxsInNextBlockNumber() { + const [nextBlockNumber, txHashesInNextBlock] = await this.getTxHashesWithNextUnprocessedBlockNumber() + if (nextBlockNumber !== undefined && txHashesInNextBlock.length) { + this.processingBlockNumber = nextBlockNumber + this.transactionsSubject.next({ txHashes: txHashesInNextBlock, params: this.processingBlockNumber }) } - - const txHashCachesInNextUnprocessedBlockNumber = groupedTxHashCaches.get(nextUnprocessedBlockNumber) - - return [nextUnprocessedBlockNumber, txHashCachesInNextUnprocessedBlockNumber!.map(({ txHash }) => txHash)] } - private async upsertTxHashes(): Promise { + protected async upsertTxHashes(): Promise { const arrayOfInsertedTxHashes = [] for (const [walletId, addressMetas] of [...this.addressesByWalletId.entries()]) { const indexerCacheService = new IndexerCacheService(walletId, addressMetas, this.rpcService, this.indexer) @@ -186,29 +64,12 @@ export default class IndexerConnector extends Connector { return arrayOfInsertedTxHashes.flat() } - private async processNextBlockNumber() { - this.processNextBlockNumberQueue!.push(null) - await this.processNextBlockNumberQueue!.drain() - } - - private async processTxsInNextBlockNumber() { - const [nextBlockNumber, txHashesInNextBlock] = await this.getTxHashesWithNextUnprocessedBlockNumber() - if (nextBlockNumber !== undefined && txHashesInNextBlock.length) { - this.processingBlockNumber = nextBlockNumber - this.transactionsSubject.next({ txHashes: txHashesInNextBlock, params: this.processingBlockNumber }) - } - } - - public notifyCurrentBlockNumberProcessed(blockNumber: string) { + public async notifyCurrentBlockNumberProcessed(blockNumber: string) { if (blockNumber === this.processingBlockNumber) { - delete this.processingBlockNumber + this.processingBlockNumber = undefined } else { return } this.processNextBlockNumber() } - - public stop(): void { - this.pollingIndexer = false - } } diff --git a/packages/neuron-wallet/src/block-sync-renderer/sync/light-connector.ts b/packages/neuron-wallet/src/block-sync-renderer/sync/light-connector.ts index 46a9ad4d59..9a02faa48d 100644 --- a/packages/neuron-wallet/src/block-sync-renderer/sync/light-connector.ts +++ b/packages/neuron-wallet/src/block-sync-renderer/sync/light-connector.ts @@ -1,54 +1,41 @@ -import { BI } from '@ckb-lumos/bi' -import { Subject } from 'rxjs' -import { queue, QueueObject } from 'async' -import type { HexString, QueryOptions, TransactionWithStatus } from '@ckb-lumos/base' -import { Indexer as CkbIndexer, CellCollector } from '@ckb-lumos/ckb-indexer' +import type { HexString, Script, TransactionWithStatus } from '@ckb-lumos/base' import logger from '../../utils/logger' import { Address } from '../../models/address' import AddressMeta from '../../database/address/meta' import { scheduler } from 'timers/promises' import SyncProgressService from '../../services/sync-progress' -import { BlockTips, LumosCellQuery, Connector, AppendScript } from './connector' +import { Connector, AppendScript } from './connector' import { computeScriptHash as scriptToHash } from '@ckb-lumos/base/lib/utils' import { FetchTransactionReturnType, LightRPC, LightScriptFilter } from '../../utils/ckb-rpc' import Multisig from '../../services/multisig' -import { SyncAddressType } from '../../database/chain/entities/sync-progress' +import SyncProgress, { SyncAddressType } from '../../database/chain/entities/sync-progress' import WalletService from '../../services/wallets' import AssetAccountInfo from '../../models/asset-account-info' import { DepType } from '../../models/chain/cell-dep' import { molecule } from '@ckb-lumos/codec' import { blockchain } from '@ckb-lumos/base' import type { Base } from '@ckb-lumos/rpc/lib/Base' - -interface SyncQueueParam { - script: CKBComponents.Script - scriptType: CKBRPC.ScriptType - blockRange: [HexString, HexString] - cursor?: HexString -} +import { getConnection } from 'typeorm' +import { BI } from '@ckb-lumos/bi' +import IndexerCacheService from './indexer-cache-service' +import { ScriptType } from '@ckb-lumos/ckb-indexer/lib/type' +import { scriptToAddress } from '../../utils/scriptAndAddress' +import NetworksService from '../../services/networks' const unpackGroup = molecule.vector(blockchain.OutPoint) -export default class LightConnector extends Connector { +export default class LightConnector extends Connector { private lightRpc: LightRPC - private indexer: CkbIndexer private addressMetas: AddressMeta[] - private syncQueue: QueueObject = queue(this.syncNextWithScript.bind(this), 1) - private indexerQueryQueue: QueueObject | undefined - private pollingIndexer: boolean = false - private syncInQueue: Map = - new Map() - - public readonly blockTipsSubject: Subject = new Subject() - public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: CKBComponents.Hash }>() constructor(addresses: Address[], nodeUrl: string) { - super() - this.indexer = new CkbIndexer(nodeUrl, nodeUrl) + super({ + addresses, + nodeUrl, + indexerUrl: nodeUrl, + }) this.lightRpc = new LightRPC(nodeUrl) this.addressMetas = addresses.map(address => AddressMeta.fromObject(address)) - this.indexerQueryQueue = queue(this.collectLiveCellsByScript.bind(this)) - // fetch some dep cell this.fetchDepCell() } @@ -98,52 +85,90 @@ export default class LightConnector extends Connector { } private async synchronize() { - if (!this.syncQueue.idle()) { - return + const syncScripts = await this.upsertTxHashes() + await this.updateSyncedBlockOfScripts(syncScripts) + const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber() + const hasNextBlock = await this.notifyAndSyncNext(minSyncBlockNumber) + if (!hasNextBlock) { + await this.updateBlockStartNumber(minSyncBlockNumber) + } + } + + private async getTransactions({ + script, + blockRange, + scriptType, + }: { + script: Script + blockRange: [HexString, HexString] + scriptType: ScriptType + }) { + const res = [] + let lastCursor: HexString | undefined = undefined + while (lastCursor !== '0x') { + const result = await this.lightRpc.getTransactions( + { script, blockRange, scriptType }, + 'asc', + '0x64', + lastCursor as unknown as HexString + ) + lastCursor = result.lastCursor + res.push(...result.txs) } - await this.subscribeSync() + return res + } + + protected async upsertTxHashes() { const syncScripts = await this.lightRpc.getScripts() const syncStatusMap = await SyncProgressService.getAllSyncStatusToMap() - syncStatusMap.forEach(v => { - if (v.cursor && !this.syncInQueue.has(v.hash)) { - this.syncQueue.push({ - script: { - codeHash: v.codeHash, - hashType: v.hashType, - args: v.args, - }, - blockRange: [BI.from(v.blockStartNumber).toHexString(), BI.from(v.blockEndNumber).toHexString()], - scriptType: v.scriptType, - cursor: v.cursor, - }) - } - }) - syncScripts.forEach(syncScript => { + const insertTxCaches: { + txHash: string + txIndex: string + blockNumber: string + lockHash: string + address: string + walletId: string + }[] = [] + const isMainnet = NetworksService.getInstance().isMainnet() + for (let index = 0; index < syncScripts.length; index++) { + const syncScript = syncScripts[index] const scriptHash = scriptToHash(syncScript.script) const syncStatus = syncStatusMap.get(scriptHash) - if ( - syncStatus && - !this.syncInQueue.has(scriptHash) && - !syncStatus.cursor && - syncStatus.blockEndNumber < parseInt(syncScript.blockNumber) - ) { - this.syncQueue.push({ + if (syncStatus) { + const txs = await this.getTransactions({ script: syncScript.script, - blockRange: [BI.from(syncStatus.blockEndNumber).toHexString(), syncScript.blockNumber], scriptType: syncScript.scriptType, - cursor: undefined, + blockRange: [BI.from(syncStatus.blockEndNumber).toHexString(), syncScript.blockNumber], }) + insertTxCaches.push( + ...txs.map(v => ({ + ...v, + lockHash: scriptHash, + address: scriptToAddress(syncScript.script, isMainnet), + walletId: syncStatus.walletId, + })) + ) } - }) + } + // save txs to indexer cache + await IndexerCacheService.upsertIndexerCache(insertTxCaches) + return syncScripts } - private async subscribeSync() { - const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber() - const header = await this.lightRpc.getTipHeader() - this.blockTipsSubject.next({ - cacheTipNumber: minSyncBlockNumber, - indexerTipNumber: +header.number, + private async updateSyncedBlockOfScripts(syncScripts: LightScriptFilter[]) { + if (!syncScripts.length) { + return + } + const syncStatusMap = await SyncProgressService.getAllSyncStatusToMap() + const updatedSyncProgress: SyncProgress[] = [] + syncScripts.forEach(v => { + const currentSyncProgress = syncStatusMap.get(scriptToHash(v.script)) + if (currentSyncProgress) { + currentSyncProgress.blockEndNumber = parseInt(v.blockNumber) + updatedSyncProgress.push(currentSyncProgress) + } }) + await getConnection().manager.save(updatedSyncProgress, { chunk: 100 }) } private async initSyncProgress(appendScripts: AppendScript[] = []) { @@ -177,32 +202,29 @@ export default class LightConnector extends Connector { (pre, cur) => ({ ...pre, [cur.id]: cur.startBlockNumber }), {} ) - const otherTypeSyncProgress = await SyncProgressService.getOtherTypeSyncProgress() + const otherTypeSyncBlockNumber = await SyncProgressService.getOtherTypeSyncBlockNumber() const setScriptsParams = [ ...allScripts.map(v => { - let syncedBlockNumber = existSyncscripts[scriptToHash(v.script)]?.blockNumber - const walletStartBlockNumber = walletStartBlockMap[v.walletId] - if ( - walletStartBlockNumber && - (!syncedBlockNumber || BigInt(syncedBlockNumber) < BigInt(walletStartBlockNumber)) - ) { - syncedBlockNumber = walletStartBlockNumber - } + const blockNumber = Math.max( + parseInt(walletStartBlockMap[v.walletId] ?? '0x0'), + walletMinBlockNumber?.[v.walletId] ?? 0, + parseInt(existSyncscripts[scriptToHash(v.script)]?.blockNumber ?? '0x0') + ) return { ...v, - blockNumber: syncedBlockNumber ?? `0x${(walletMinBlockNumber?.[v.walletId] ?? 0).toString(16)}`, + blockNumber: `0x${blockNumber.toString(16)}`, } }), ...appendScripts.map(v => ({ ...v, blockNumber: existSyncscripts[scriptToHash(v.script)]?.blockNumber ?? - `0x${(otherTypeSyncProgress[scriptToHash(v.script)] ?? 0).toString(16)}`, + `0x${(otherTypeSyncBlockNumber[scriptToHash(v.script)] ?? 0).toString(16)}`, })), ] await this.lightRpc.setScripts(setScriptsParams) const walletIds = [...new Set(this.addressMetas.map(v => v.walletId))] - await SyncProgressService.resetSyncProgress([allScripts, appendScripts].flat()) + await SyncProgressService.resetSyncProgress(setScriptsParams) await SyncProgressService.updateSyncProgressFlag(walletIds) await SyncProgressService.removeByHashesAndAddressType( SyncAddressType.Multisig, @@ -219,30 +241,6 @@ export default class LightConnector extends Connector { } } - private async syncNextWithScript({ script, scriptType, blockRange, cursor }: SyncQueueParam) { - const syncProgress = await SyncProgressService.getSyncStatus(script) - if (!syncProgress) { - return - } - const result = await this.lightRpc.getTransactions({ script, blockRange, scriptType }, 'asc', '0x64', cursor!) - if (!result.txs.length) { - await SyncProgressService.updateSyncStatus(syncProgress.hash, { - blockStartNumber: parseInt(blockRange[1]), - blockEndNumber: parseInt(blockRange[1]), - cursor: undefined, - }) - return - } - const txHashes = result.txs.map(v => v.txHash) - await this.fetchPreviousOutputs(txHashes) - this.transactionsSubject.next({ txHashes, params: syncProgress.hash }) - this.syncInQueue.set(syncProgress.hash, { - blockStartNumber: result.lastCursor === '0x' ? parseInt(blockRange[1]) : parseInt(blockRange[0]), - blockEndNumber: parseInt(blockRange[1]), - cursor: result.lastCursor === '0x' ? undefined : result.lastCursor, - }) - } - private async fetchPreviousOutputs(txHashes: string[]) { const transactions = await this.lightRpc .createBatchRequest<'getTransaction', string[], TransactionWithStatus[]>(txHashes.map(v => ['getTransaction', v])) @@ -260,28 +258,12 @@ export default class LightConnector extends Connector { await this.lightRpc.createBatchRequest([...previousTxHashes].map(v => ['fetchTransaction' as keyof Base, v])).exec() } - private async collectLiveCellsByScript(query: LumosCellQuery) { - const { lock, type, data } = query - if (!lock && !type) { - throw new Error('at least one script is required') - } - - const queries: QueryOptions = {} - if (lock) { - queries.lock = lock - } - if (type) { - queries.type = type - } - queries.data = data || 'any' - - const collector = new CellCollector(this.indexer, queries) - - const result = [] - for await (const cell of collector.collect()) { - result.push(cell) - } - return result + private async updateBlockStartNumber(blockNumber: number) { + const scripts = await this.lightRpc.getScripts() + await SyncProgressService.updateBlockNumber( + scripts.map(v => v.script.args), + blockNumber + ) } public async connect() { @@ -295,34 +277,27 @@ export default class LightConnector extends Connector { } } - public stop(): void { - this.pollingIndexer = false - } - - public async getLiveCellsByScript(query: LumosCellQuery) { - return new Promise((resolve, reject) => { - this.indexerQueryQueue!.push(query, (err: any, result: unknown) => { - if (err) { - return reject(err) - } - resolve(result) - }) - }) + async appendScript(scripts: AppendScript[]) { + this.initSyncProgress(scripts) } - public async notifyCurrentBlockNumberProcessed(hash: CKBComponents.Hash) { - const nextSyncParams = this.syncInQueue.get(hash) - if (nextSyncParams) { - try { - await SyncProgressService.updateSyncStatus(hash, nextSyncParams) - } finally { - this.syncInQueue.delete(hash) - } + async processTxsInNextBlockNumber() { + const [nextBlockNumber, txHashesInNextBlock] = await this.getTxHashesWithNextUnprocessedBlockNumber() + if (nextBlockNumber !== undefined && txHashesInNextBlock.length) { + this.processingBlockNumber = nextBlockNumber + await this.fetchPreviousOutputs(txHashesInNextBlock) + this.transactionsSubject.next({ txHashes: txHashesInNextBlock, params: this.processingBlockNumber }) } - await this.subscribeSync() } - async appendScript(scripts: AppendScript[]) { - this.initSyncProgress(scripts) + public async notifyCurrentBlockNumberProcessed(blockNumber: string) { + if (blockNumber === this.processingBlockNumber) { + this.processingBlockNumber = undefined + } else { + return + } + const minCachedBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber() + await this.updateBlockStartNumber(Math.min(parseInt(blockNumber), minCachedBlockNumber)) + this.processNextBlockNumber() } } diff --git a/packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts b/packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts index e53df531d4..36944d7dcc 100644 --- a/packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts +++ b/packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts @@ -98,7 +98,7 @@ export default class Queue { } } - this.#indexerConnector!.notifyCurrentBlockNumberProcessed(params) + await this.#indexerConnector!.notifyCurrentBlockNumberProcessed(params) }) this.#checkAndSaveQueue.error((err: any, task: any) => { diff --git a/packages/neuron-wallet/src/database/chain/entities/indexer-tx-hash-cache.ts b/packages/neuron-wallet/src/database/chain/entities/indexer-tx-hash-cache.ts index 0a278e44c4..a2b3ce1530 100644 --- a/packages/neuron-wallet/src/database/chain/entities/indexer-tx-hash-cache.ts +++ b/packages/neuron-wallet/src/database/chain/entities/indexer-tx-hash-cache.ts @@ -37,19 +37,6 @@ export default class IndexerTxHashCache extends BaseEntity { @Index() blockNumber!: number - @Column({ - type: 'character', - length: 32, - }) - @Index() - blockHash!: string - - @Column({ - type: 'varchar', - }) - @Index() - blockTimestamp!: string - @Column() @Index() isProcessed: boolean = false @@ -67,20 +54,10 @@ export default class IndexerTxHashCache extends BaseEntity { }) updatedAt!: Date - static fromObject(obj: { - txHash: string - blockNumber: number - blockHash: string - blockTimestamp: string - lockHash: string - address: string - walletId: string - }) { + static fromObject(obj: { txHash: string; blockNumber: number; lockHash: string; address: string; walletId: string }) { const result = new IndexerTxHashCache() result.txHash = obj.txHash result.blockNumber = obj.blockNumber - result.blockHash = obj.blockHash - result.blockTimestamp = obj.blockTimestamp result.lockHash = obj.lockHash result.address = obj.address result.walletId = obj.walletId diff --git a/packages/neuron-wallet/src/database/chain/entities/sync-progress.ts b/packages/neuron-wallet/src/database/chain/entities/sync-progress.ts index 65d786a503..abacf26067 100644 --- a/packages/neuron-wallet/src/database/chain/entities/sync-progress.ts +++ b/packages/neuron-wallet/src/database/chain/entities/sync-progress.ts @@ -47,6 +47,7 @@ export default class SyncProgress { scriptType: CKBRPC.ScriptType walletId: string addressType?: SyncAddressType + blockNumber: string }) { const res = new SyncProgress() res.hash = scriptToHash(obj.script) @@ -57,6 +58,8 @@ export default class SyncProgress { res.scriptType = obj.scriptType res.delete = false res.addressType = obj.addressType ?? SyncAddressType.Default + res.blockStartNumber = parseInt(obj.blockNumber) + res.blockEndNumber = parseInt(obj.blockNumber) return res } } diff --git a/packages/neuron-wallet/src/database/chain/index.ts b/packages/neuron-wallet/src/database/chain/index.ts index 74e755a62f..bfa5492391 100644 --- a/packages/neuron-wallet/src/database/chain/index.ts +++ b/packages/neuron-wallet/src/database/chain/index.ts @@ -20,7 +20,7 @@ export const clean = async (clearAllLightClientData?: boolean) => { }), clearAllLightClientData ? getConnection().getRepository(SyncProgress).clear() - : SyncProgressService.clearCurrentWalletProgress(), + : SyncProgressService.clearWalletProgress(), ]) MultisigOutputChangedSubject.getSubject().next('reset') diff --git a/packages/neuron-wallet/src/database/chain/migrations/1701234043431-IndexerTxHashCacheRemoveField.ts b/packages/neuron-wallet/src/database/chain/migrations/1701234043431-IndexerTxHashCacheRemoveField.ts new file mode 100644 index 0000000000..1eade1a4cd --- /dev/null +++ b/packages/neuron-wallet/src/database/chain/migrations/1701234043431-IndexerTxHashCacheRemoveField.ts @@ -0,0 +1,13 @@ +import {MigrationInterface, QueryRunner} from "typeorm"; + +export class IndexerTxHashCacheRemoveField1701234043431 implements MigrationInterface { + name = 'IndexerTxHashCacheRemoveField1701234043431' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.dropColumn('indexer_tx_hash_cache', 'blockHash'); + await queryRunner.dropColumn('indexer_tx_hash_cache', 'blockTimestamp'); + } + + public async down(): Promise {} + +} diff --git a/packages/neuron-wallet/src/database/chain/ormconfig.ts b/packages/neuron-wallet/src/database/chain/ormconfig.ts index 5a2a9440aa..d15a65e907 100644 --- a/packages/neuron-wallet/src/database/chain/ormconfig.ts +++ b/packages/neuron-wallet/src/database/chain/ormconfig.ts @@ -55,6 +55,7 @@ import { AddTypeSyncProgress1681360188494 } from './migrations/1681360188494-Add import { TxLock1684488676083 } from './migrations/1684488676083-TxLock' import { ResetSyncProgressPrimaryKey1690361215400 } from './migrations/1690361215400-ResetSyncProgressPrimaryKey' import { TxLockAddArgs1694746034975 } from './migrations/1694746034975-TxLockAddArgs' +import { IndexerTxHashCacheRemoveField1701234043431 } from './migrations/1701234043431-IndexerTxHashCacheRemoveField' export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError' @@ -126,6 +127,7 @@ const connectOptions = async (genesisBlockHash: string): Promise>((pre, cur) => ({ ...pre, [cur.walletId]: cur.blockStartNumber }), {}) } - static async getOtherTypeSyncProgress() { + static async getOtherTypeSyncBlockNumber() { const items = await getConnection().getRepository(SyncProgress).find({ addressType: SyncAddressType.Multisig, }) @@ -126,14 +119,11 @@ export default class SyncProgressService { .getMany() } - static async clearCurrentWalletProgress() { - const currentWallet = WalletService.getInstance().getCurrent() - await getConnection().getRepository(SyncProgress).delete({ walletId: currentWallet?.id }) + static async clearWalletProgress() { await getConnection() .createQueryBuilder() .update(SyncProgress) - .set({ blockEndNumber: 0, cursor: undefined }) - .where({ walletId: Not(Equal(currentWallet?.id)) }) + .set({ blockStartNumber: 0, blockEndNumber: 0 }) .execute() } } diff --git a/packages/neuron-wallet/src/utils/ckb-rpc.ts b/packages/neuron-wallet/src/utils/ckb-rpc.ts index 880a55d125..724ca2fd07 100644 --- a/packages/neuron-wallet/src/utils/ckb-rpc.ts +++ b/packages/neuron-wallet/src/utils/ckb-rpc.ts @@ -1,3 +1,4 @@ +import type { Script } from '@ckb-lumos/base' import { HexString } from '@ckb-lumos/base' import { CKBRPC } from '@ckb-lumos/rpc' import { Method } from '@ckb-lumos/rpc/lib/method' @@ -16,7 +17,7 @@ import CommonUtils from './common' import { NetworkType } from '../models/network' export interface LightScriptFilter { - script: CKBComponents.Script + script: Script blockNumber: CKBComponents.BlockNumber scriptType: CKBRPC.ScriptType } diff --git a/packages/neuron-wallet/tests/block-sync-renderer/connector.test.ts b/packages/neuron-wallet/tests/block-sync-renderer/connector.test.ts new file mode 100644 index 0000000000..acc7b37354 --- /dev/null +++ b/packages/neuron-wallet/tests/block-sync-renderer/connector.test.ts @@ -0,0 +1,411 @@ +import { scriptToAddress } from '../../src/utils/scriptAndAddress' +import { AddressType } from '../../src/models/keys/address' +import { Address, AddressVersion } from '../../src/models/address' +import SystemScriptInfo from '../../src/models/system-script-info' +import { Connector, type LumosCell, type LumosCellQuery } from '../../src/block-sync-renderer/sync/connector' +import AddressMeta from '../../src/database/address/meta' +import IndexerTxHashCache from '../../src/database/chain/entities/indexer-tx-hash-cache' +import { ScriptHashType } from '../../src/models/chain/script' + +const stubbedNextUnprocessedBlockFn = jest.fn() +const stubbedNextUnprocessedTxsGroupedByBlockNumberFn = jest.fn() +const stubbedProcessTxsInNextBlockNumberFn = jest.fn() +const stubbedIndexerConstructor = jest.fn() +const stubbedCellCollectorConstructor = jest.fn() +const stubbedBlockTipsSubscribe = jest.fn() +const stubbedCellCellectFn = jest.fn() + +class TestConnector extends Connector { + async connect() {} + async processTxsInNextBlockNumber(): Promise { + return stubbedProcessTxsInNextBlockNumberFn() + } + + async upsertTxHashes(): Promise {} + + async notifyCurrentBlockNumberProcessed() {} + + getAddressesByWalletId() { + return this.addressesByWalletId + } +} + +const script = SystemScriptInfo.generateSecpScript('0x36c329ed630d6ce750712a477543672adab57f4c') +const address = scriptToAddress(script, false) +const walletId1 = 'walletid1' +const walletId2 = 'walletid2' +const addressObj1: Address = { + address, + blake160: '0x', + walletId: walletId1, + path: '', + addressType: AddressType.Receiving, + addressIndex: 0, + txCount: 0, + liveBalance: '', + sentBalance: '', + pendingBalance: '', + balance: '', + version: AddressVersion.Testnet, +} +const addressObj2: Address = { + address, + blake160: '0x', + walletId: walletId2, + path: '', + addressType: AddressType.Receiving, + addressIndex: 0, + txCount: 0, + liveBalance: '', + sentBalance: '', + pendingBalance: '', + balance: '', + version: AddressVersion.Testnet, +} + +const resetMocks = () => { + stubbedNextUnprocessedBlockFn.mockReset() + stubbedNextUnprocessedTxsGroupedByBlockNumberFn.mockReset() + stubbedProcessTxsInNextBlockNumberFn.mockReset() + stubbedIndexerConstructor.mockReset() + stubbedCellCollectorConstructor.mockReset() + stubbedBlockTipsSubscribe.mockReset() + stubbedCellCellectFn.mockReset() +} + +jest.mock('@ckb-lumos/ckb-indexer', () => { + return { + Indexer: class { + constructor(...params: unknown[]) { + stubbedIndexerConstructor(...params) + } + }, + CellCollector: class { + constructor(...params: unknown[]) { + return stubbedCellCollectorConstructor(...params) + } + + collect(...params: unknown[]) { + return stubbedCellCellectFn(...params) + } + }, + } +}) + +jest.mock('../../src/block-sync-renderer/sync/indexer-cache-service', () => ({ + nextUnprocessedBlock: () => stubbedNextUnprocessedBlockFn(), + nextUnprocessedTxsGroupedByBlockNumber: (walletId: string) => + stubbedNextUnprocessedTxsGroupedByBlockNumberFn(walletId), +})) + +describe('unit tests for IndexerConnector', () => { + const nodeUrl = 'http://nodeurl:8114' + + beforeEach(() => { + resetMocks() + jest.useFakeTimers('legacy') + }) + afterEach(() => { + jest.clearAllTimers() + }) + + describe('#constructor', () => { + const STUB_URI = 'stub_uri' + + it('inits lumos indexer with a node url and indexer folder path', () => { + new TestConnector({ + addresses: [], + nodeUrl, + indexerUrl: STUB_URI, + }) + expect(stubbedIndexerConstructor).toHaveBeenCalledWith(nodeUrl, STUB_URI) + }) + + it('init with addresses', () => { + const connector = new TestConnector({ + addresses: [addressObj1, addressObj2], + nodeUrl, + indexerUrl: STUB_URI, + }) + expect(connector.getAddressesByWalletId().get(walletId1)?.[0]).toStrictEqual(AddressMeta.fromObject(addressObj1)) + expect(connector.getAddressesByWalletId().get(walletId2)?.[0]).toStrictEqual(AddressMeta.fromObject(addressObj2)) + }) + }) + + describe('#getTxHashesWithNextUnprocessedBlockNumber', () => { + const connector = new TestConnector({ + addresses: [addressObj1, addressObj2], + nodeUrl, + indexerUrl: '', + }) + it('no cached tx', async () => { + stubbedNextUnprocessedTxsGroupedByBlockNumberFn.mockResolvedValue([]) + // @ts-ignore private method + const result = await connector.getTxHashesWithNextUnprocessedBlockNumber() + expect(result).toStrictEqual([undefined, []]) + }) + it('get cached tx and sort by block number', async () => { + stubbedNextUnprocessedTxsGroupedByBlockNumberFn.mockImplementation(walletId => + walletId === walletId1 + ? [ + IndexerTxHashCache.fromObject({ + txHash: 'hash1', + blockNumber: 10, + lockHash: script.computeHash(), + address, + walletId, + }), + ] + : [ + IndexerTxHashCache.fromObject({ + txHash: 'hash2', + blockNumber: 2, + lockHash: script.computeHash(), + address, + walletId, + }), + ] + ) + // @ts-ignore private method + const result = await connector.getTxHashesWithNextUnprocessedBlockNumber() + expect(result).toStrictEqual(['2', ['hash2']]) + }) + }) + + describe('#notifyAndSyncNext', () => { + const connector = new TestConnector({ + addresses: [addressObj1, addressObj2], + nodeUrl, + indexerUrl: '', + }) + connector.blockTipsSubject.subscribe(stubbedBlockTipsSubscribe) + + it('exist unprocessed block and no current process block', async () => { + //@ts-ignore private property + connector.processingBlockNumber = undefined + stubbedNextUnprocessedBlockFn.mockResolvedValue('10') + //@ts-ignore private method + await connector.notifyAndSyncNext(100) + expect(stubbedBlockTipsSubscribe).toHaveBeenCalledWith({ + cacheTipNumber: 10, + indexerTipNumber: 100, + }) + expect(stubbedProcessTxsInNextBlockNumberFn).toHaveBeenCalled() + }) + it('exist unprocessed block and has current process block', async () => { + //@ts-ignore private property + connector.processingBlockNumber = '5' + stubbedNextUnprocessedBlockFn.mockResolvedValue('10') + //@ts-ignore private method + await connector.notifyAndSyncNext(100) + expect(stubbedBlockTipsSubscribe).toHaveBeenCalledWith({ + cacheTipNumber: 10, + indexerTipNumber: 100, + }) + expect(stubbedProcessTxsInNextBlockNumberFn).toHaveBeenCalledTimes(0) + }) + it('no unprocessed block', async () => { + //@ts-ignore private property + connector.processingBlockNumber = '5' + stubbedNextUnprocessedBlockFn.mockResolvedValue(undefined) + //@ts-ignore private method + await connector.notifyAndSyncNext(100) + expect(stubbedBlockTipsSubscribe).toHaveBeenCalledWith({ + cacheTipNumber: 100, + indexerTipNumber: 100, + }) + }) + }) + + describe('#getLiveCellsByScript', () => { + let fakeCell1: LumosCell, fakeCell2: LumosCell + let cells: LumosCell[] + + fakeCell1 = { + blockHash: '0x', + outPoint: { + txHash: '0x', + index: '0x0', + }, + cellOutput: { + capacity: '0x0', + lock: { + hashType: 'type', + codeHash: '0xcode', + args: '0x1', + }, + type: { + hashType: 'data', + codeHash: '0xcode', + args: '0x1', + }, + }, + } + fakeCell2 = { + blockHash: '0x', + outPoint: { + txHash: '0x', + index: '0x0', + }, + cellOutput: { + capacity: '0x0', + lock: { + hashType: 'type', + codeHash: '0xcode', + args: '0x2', + }, + type: { + hashType: 'lock', + codeHash: '0xcode', + args: '0x2', + }, + }, + } + const fakeCells = [fakeCell1, fakeCell2] + + const connector = new TestConnector({ + addresses: [addressObj1, addressObj2], + nodeUrl, + indexerUrl: '', + }) + + describe('when success', () => { + const query: LumosCellQuery = { + lock: { + hashType: ScriptHashType.Data, + codeHash: '0xcode', + args: '0x', + }, + type: { + hashType: ScriptHashType.Data, + codeHash: '0xcode', + args: '0x', + }, + data: null, + } + + beforeEach(async () => { + stubbedCellCellectFn.mockReturnValueOnce([ + new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[0])))), + new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[1])))), + ]) + + //@ts-ignore + cells = await connector.getLiveCellsByScript(query) + }) + it('transform the query parameter', () => { + expect(stubbedCellCollectorConstructor.mock.calls[0][1]).toEqual({ + lock: { + hashType: query.lock!.hashType, + codeHash: query.lock!.codeHash, + args: query.lock!.args, + }, + type: { + hashType: query.type!.hashType, + codeHash: query.type!.codeHash, + args: query.type!.args, + }, + data: 'any', + }) + }) + it('returns live cells with property value fix', async () => { + fakeCell2.cellOutput.type!.hashType = 'data' + expect(cells).toEqual([fakeCell1, fakeCell2]) + }) + }) + describe('when handling concurrent requests', () => { + const query1: LumosCellQuery = { + lock: { + hashType: ScriptHashType.Data, + codeHash: '0xcode', + args: '0x1', + }, + type: { + hashType: ScriptHashType.Data, + codeHash: '0xcode', + args: '0x1', + }, + data: null, + } + const query2: LumosCellQuery = { + lock: { + hashType: ScriptHashType.Type, + codeHash: '0xcode', + args: '0x2', + }, + type: { + hashType: ScriptHashType.Type, + codeHash: '0xcode', + args: '0x2', + }, + data: null, + } + + const results: unknown[] = [] + beforeEach(async () => { + const stubbedCellCellect1 = jest.fn() + stubbedCellCellect1.mockReturnValueOnce([ + new Promise(resolve => { + //fake the waiting, the other concurrent requests should wait until this is finished + setTimeout(() => { + resolve(JSON.parse(JSON.stringify(fakeCells[0]))) + }, 500) + }), + ]) + + const stubbedCellCellect2 = jest.fn() + stubbedCellCellect2.mockReturnValueOnce([ + new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[1])))), + ]) + + stubbedCellCollectorConstructor.mockImplementation((_indexer: any, query: any) => { + if (query.lock.args === '0x1') { + return { + collect: stubbedCellCellect1, + } + } + if (query.lock.args === '0x2') { + return { + collect: stubbedCellCellect2, + } + } + }) + + const promises = Promise.all([ + new Promise(resolve => { + connector.getLiveCellsByScript(query1).then(cells => { + results.push(cells) + resolve() + }) + }), + new Promise(resolve => { + connector.getLiveCellsByScript(query2).then(cells => { + results.push(cells) + resolve() + }) + }), + ]) + + jest.advanceTimersByTime(500) + await promises + }) + it('process one by one in order', () => { + expect(results.length).toEqual(2) + expect(results[0]).toEqual([fakeCells[0]]) + }) + }) + describe('when fails', () => { + describe('when both type and lock parameter is not specified', () => { + it('throws error', async () => { + let err + try { + await connector.getLiveCellsByScript({ lock: null, type: null, data: null }) + } catch (error) { + err = error + } + expect(err).toEqual(new Error('at least one parameter is required')) + }) + }) + }) + }) +}) diff --git a/packages/neuron-wallet/tests/block-sync-renderer/indexer-cache-service.intg.test.ts b/packages/neuron-wallet/tests/block-sync-renderer/indexer-cache-service.intg.test.ts index 509c7808f3..21e922ff1f 100644 --- a/packages/neuron-wallet/tests/block-sync-renderer/indexer-cache-service.intg.test.ts +++ b/packages/neuron-wallet/tests/block-sync-renderer/indexer-cache-service.intg.test.ts @@ -364,7 +364,7 @@ describe('indexer cache service', () => { }) describe('with all unprocessed transactions', () => { it('returns the tx hashes for the next block number', async () => { - const txHashes = await indexerCacheService.nextUnprocessedTxsGroupedByBlockNumber() + const txHashes = await IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId) expect(txHashes).toHaveLength(1) expect(txHashes![0].blockNumber).toEqual(parseInt(fakeBlock1.number)) }) @@ -383,7 +383,7 @@ describe('indexer cache service', () => { .execute() }) it('returns the tx hashes for the next block number', async () => { - const txHashes = await indexerCacheService.nextUnprocessedTxsGroupedByBlockNumber() + const txHashes = await IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId) expect(txHashes).toHaveLength(2) expect(txHashes![0].blockNumber).toEqual(parseInt(fakeBlock2.number)) }) @@ -401,7 +401,7 @@ describe('indexer cache service', () => { .execute() }) it('returns the unprocessed tx hash in the next block number', async () => { - const txHashes = await indexerCacheService.nextUnprocessedTxsGroupedByBlockNumber() + const txHashes = await IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId) expect(txHashes).toHaveLength(1) expect(txHashes![0].blockNumber).toEqual(parseInt(fakeBlock2.number)) expect(txHashes![0].txHash).toEqual(fakeTx3.transaction.hash) @@ -440,7 +440,7 @@ describe('indexer cache service', () => { .execute() }) it('returns empty array when no unprocessed transactions', async () => { - const txHashes = await indexerCacheService.nextUnprocessedTxsGroupedByBlockNumber() + const txHashes = await IndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber(walletId) expect(txHashes).toEqual([]) }) }) @@ -465,10 +465,7 @@ describe('indexer cache service', () => { nextUnprocessedBlock = await IndexerCacheService.nextUnprocessedBlock([walletId]) }) it('returns next unprocessed block number', async () => { - expect(nextUnprocessedBlock).toEqual({ - blockNumber: fakeBlock1.number, - blockHash: fakeBlock1.hash, - }) + expect(nextUnprocessedBlock).toEqual(fakeBlock1.number) }) }) describe('check with walletId that does not have hash caches', () => { diff --git a/packages/neuron-wallet/tests/block-sync-renderer/indexer-connector.test.ts b/packages/neuron-wallet/tests/block-sync-renderer/indexer-connector.test.ts index 878f54f2d4..b79147f525 100644 --- a/packages/neuron-wallet/tests/block-sync-renderer/indexer-connector.test.ts +++ b/packages/neuron-wallet/tests/block-sync-renderer/indexer-connector.test.ts @@ -4,9 +4,7 @@ import { AddressType } from '../../src/models/keys/address' import { Address, AddressVersion } from '../../src/models/address' import SystemScriptInfo from '../../src/models/system-script-info' import IndexerConnector from '../../src/block-sync-renderer/sync/indexer-connector' -import type { LumosCell, LumosCellQuery } from '../../src/block-sync-renderer/sync/connector' import { flushPromises } from '../test-utils' -import { ScriptHashType } from '../../src/models/chain/script' const stubbedTipFn = jest.fn() const stubbedGetTransactionFn = jest.fn() @@ -50,6 +48,7 @@ describe('unit tests for IndexerConnector', () => { stubbedIndexerConstructor = jest.fn() stubbedIndexerCacheService = jest.fn() stubbedIndexerCacheService.nextUnprocessedBlock = stubbedNextUnprocessedBlock + stubbedIndexerCacheService.nextUnprocessedTxsGroupedByBlockNumber = stubbedNextUnprocessedTxsGroupedByBlockNumberFn stubbedRPCServiceConstructor = jest.fn() stubbedCellCollectorConstructor = jest.fn() @@ -75,7 +74,6 @@ describe('unit tests for IndexerConnector', () => { jest.doMock('../../src/block-sync-renderer/sync/indexer-cache-service', () => { return stubbedIndexerCacheService.mockImplementation(() => ({ upsertTxHashes: stubbedUpsertTxHashesFn, - nextUnprocessedTxsGroupedByBlockNumber: stubbedNextUnprocessedTxsGroupedByBlockNumberFn, })) }) stubbedIndexerConnector = require('../../src/block-sync-renderer/sync/indexer-connector').default @@ -227,9 +225,9 @@ describe('unit tests for IndexerConnector', () => { describe('when loaded block number is already in order', () => { beforeEach(async () => { when(stubbedNextUnprocessedTxsGroupedByBlockNumberFn) - .calledWith() + .calledWith(addressObj1.walletId) .mockResolvedValueOnce([fakeTxHashCache1]) - .calledWith() + .calledWith(addressObj2.walletId) .mockResolvedValueOnce([fakeTxHashCache2, fakeTxHashCache3]) await connectIndexer(indexerConnector) @@ -246,9 +244,9 @@ describe('unit tests for IndexerConnector', () => { describe('when loaded block number is not in order', () => { beforeEach(async () => { when(stubbedNextUnprocessedTxsGroupedByBlockNumberFn) - .calledWith() + .calledWith(addressObj1.walletId) .mockResolvedValueOnce([fakeTxHashCache2, fakeTxHashCache3]) - .calledWith() + .calledWith(addressObj2.walletId) .mockResolvedValueOnce([fakeTxHashCache1]) await connectIndexer(indexerConnector) @@ -342,7 +340,7 @@ describe('unit tests for IndexerConnector', () => { }) it('throws error', async () => { expect(stubbedLoggerErrorFn).toHaveBeenCalledWith( - 'Error in processing next block number queue: Error: exception' + 'Connector: \tError in processing next block number queue: Error: exception' ) }) }) @@ -387,10 +385,7 @@ describe('unit tests for IndexerConnector', () => { }) describe('when there are unprocessed blocks', () => { beforeEach(async () => { - stubbedNextUnprocessedBlock.mockResolvedValue({ - blockNumber: fakeBlock3.number, - blockHash: fakeBlock3.hash, - }) + stubbedNextUnprocessedBlock.mockResolvedValue(fakeBlock3.number) jest.advanceTimersByTime(5000) await flushPromises() }) @@ -405,190 +400,5 @@ describe('unit tests for IndexerConnector', () => { }) }) }) - describe('#getLiveCellsByScript', () => { - let fakeCell1: LumosCell, fakeCell2: LumosCell - let cells: LumosCell[] - - fakeCell1 = { - blockHash: '0x', - outPoint: { - txHash: '0x', - index: '0x0', - }, - cellOutput: { - capacity: '0x0', - lock: { - hashType: 'type', - codeHash: '0xcode', - args: '0x1', - }, - type: { - hashType: 'data', - codeHash: '0xcode', - args: '0x1', - }, - }, - } - fakeCell2 = { - blockHash: '0x', - outPoint: { - txHash: '0x', - index: '0x0', - }, - cellOutput: { - capacity: '0x0', - lock: { - hashType: 'type', - codeHash: '0xcode', - args: '0x2', - }, - type: { - hashType: 'lock', - codeHash: '0xcode', - args: '0x2', - }, - }, - } - const fakeCells = [fakeCell1, fakeCell2] - - describe('when success', () => { - const query: LumosCellQuery = { - lock: { - hashType: ScriptHashType.Data, - codeHash: '0xcode', - args: '0x', - }, - type: { - hashType: ScriptHashType.Data, - codeHash: '0xcode', - args: '0x', - }, - data: null, - } - - beforeEach(async () => { - stubbedCellCellectFn.mockReturnValueOnce([ - new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[0])))), - new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[1])))), - ]) - - //@ts-ignore - cells = await indexerConnector.getLiveCellsByScript(query) - }) - it('transform the query parameter', () => { - expect(stubbedCellCollectorConstructor.mock.calls[0][1]).toEqual({ - lock: { - hashType: query.lock!.hashType, - codeHash: query.lock!.codeHash, - args: query.lock!.args, - }, - type: { - hashType: query.type!.hashType, - codeHash: query.type!.codeHash, - args: query.type!.args, - }, - data: 'any', - }) - }) - it('returns live cells with property value fix', async () => { - fakeCell2.cellOutput.type!.hashType = 'data' - expect(cells).toEqual([fakeCell1, fakeCell2]) - }) - }) - describe('when handling concurrent requests', () => { - const query1: LumosCellQuery = { - lock: { - hashType: ScriptHashType.Data, - codeHash: '0xcode', - args: '0x1', - }, - type: { - hashType: ScriptHashType.Data, - codeHash: '0xcode', - args: '0x1', - }, - data: null, - } - const query2: LumosCellQuery = { - lock: { - hashType: ScriptHashType.Type, - codeHash: '0xcode', - args: '0x2', - }, - type: { - hashType: ScriptHashType.Type, - codeHash: '0xcode', - args: '0x2', - }, - data: null, - } - - const results: unknown[] = [] - beforeEach(async () => { - const stubbedCellCellect1 = jest.fn() - stubbedCellCellect1.mockReturnValueOnce([ - new Promise(resolve => { - //fake the waiting, the other concurrent requests should wait until this is finished - setTimeout(() => { - resolve(JSON.parse(JSON.stringify(fakeCells[0]))) - }, 500) - }), - ]) - - const stubbedCellCellect2 = jest.fn() - stubbedCellCellect2.mockReturnValueOnce([ - new Promise(resolve => resolve(JSON.parse(JSON.stringify(fakeCells[1])))), - ]) - - stubbedCellCollectorConstructor.mockImplementation((_indexer: any, query: any) => { - if (query.lock.args === '0x1') { - return { - collect: stubbedCellCellect1, - } - } - if (query.lock.args === '0x2') { - return { - collect: stubbedCellCellect2, - } - } - }) - - const promises = Promise.all([ - new Promise(resolve => { - indexerConnector.getLiveCellsByScript(query1).then(cells => { - results.push(cells) - resolve() - }) - }), - new Promise(resolve => { - indexerConnector.getLiveCellsByScript(query2).then(cells => { - results.push(cells) - resolve() - }) - }), - ]) - - jest.advanceTimersByTime(500) - await promises - }) - it('process one by one in order', () => { - expect(results.length).toEqual(2) - expect(results[0]).toEqual([fakeCells[0]]) - }) - }) - describe('when fails', () => { - describe('when both type and lock parameter is not specified', () => { - it('throws error', async () => { - let err - try { - await indexerConnector.getLiveCellsByScript({ lock: null, type: null, data: null }) - } catch (error) { - err = error - } - expect(err).toEqual(new Error('at least one parameter is required')) - }) - }) - }) - }) }) }) diff --git a/packages/neuron-wallet/tests/block-sync-renderer/light-connector.test.ts b/packages/neuron-wallet/tests/block-sync-renderer/light-connector.test.ts index ef27cccea5..e02ec508e5 100644 --- a/packages/neuron-wallet/tests/block-sync-renderer/light-connector.test.ts +++ b/packages/neuron-wallet/tests/block-sync-renderer/light-connector.test.ts @@ -1,7 +1,5 @@ -import { computeScriptHash as scriptToHash } from '@ckb-lumos/base/lib/utils' +import type { Script } from '@ckb-lumos/base' import LightConnector from '../../src/block-sync-renderer/sync/light-connector' -import SyncProgress from '../../src/database/chain/entities/sync-progress' -import { BI } from '@ckb-lumos/bi' import AddressMeta from '../../src/database/address/meta' const getSyncStatusMock = jest.fn() @@ -13,6 +11,7 @@ const updateSyncProgressFlagMock = jest.fn() const getWalletMinBlockNumberMock = jest.fn() const removeByHashesAndAddressType = jest.fn() const getOtherTypeSyncProgressMock = jest.fn() +const getOtherTypeSyncBlockNumberMock = jest.fn() const setScriptsMock = jest.fn() const getScriptsMock = jest.fn() @@ -33,6 +32,7 @@ function mockReset() { updateSyncStatusMock.mockReset() getWalletMinBlockNumberMock.mockReset() getOtherTypeSyncProgressMock.mockReset() + getOtherTypeSyncBlockNumberMock.mockReset() setScriptsMock.mockReset() getScriptsMock.mockReset() @@ -56,10 +56,11 @@ jest.mock('../../src/services/sync-progress', () => { static updateSyncStatus: any = (hash: string, update: any) => updateSyncStatusMock(hash, update) static updateSyncProgressFlag: any = (walletIds: string[]) => updateSyncProgressFlagMock(walletIds) static getWalletMinBlockNumber: any = () => getWalletMinBlockNumberMock() - static removeByHashesAndAddressType: any = (type: number, scripts: CKBComponents.Script[]) => + static removeByHashesAndAddressType: any = (type: number, scripts: Script[]) => removeByHashesAndAddressType(type, scripts) static getOtherTypeSyncProgress: any = () => getOtherTypeSyncProgressMock() + static getOtherTypeSyncBlockNumber: any = () => getOtherTypeSyncBlockNumberMock() } }) @@ -94,12 +95,12 @@ jest.mock('timers/promises', () => ({ }, })) -const script: CKBComponents.Script = { +const script: Script = { args: '0x403f0d4e833b2a8d372772a63facaa310dfeef92', codeHash: '0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8', hashType: 'type', } -const scriptHash = scriptToHash(script) +// const scriptHash = scriptToHash(script) const address = 'ckt1qzda0cr08m85hc8jlnfp3zer7xulejywt49kt2rr0vthywaa50xwsq2q8ux5aqem92xnwfmj5cl6e233phlwlysqhjx5w' describe('test light connector', () => { @@ -112,161 +113,6 @@ describe('test light connector', () => { afterEach(() => { mockReset() }) - describe('test synchronize', () => { - it('syncQueue is not idle', async () => { - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.syncQueue.push({}) - // @ts-ignore: private-method - await connector.synchronize() - expect(getScriptsMock).not.toBeCalled() - }) - it('syncQueue is idle', async () => { - getScriptsMock.mockResolvedValue([]) - getAllSyncStatusToMapMock.mockResolvedValue(new Map()) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.subscribeSync = jest.fn() - // @ts-ignore: private-method - await connector.synchronize() - expect(getScriptsMock).toBeCalled() - expect(getAllSyncStatusToMapMock).toBeCalled() - }) - it('some script sync cursor is not empty', async () => { - getScriptsMock.mockResolvedValue([]) - const syncProgress = SyncProgress.fromObject({ - script, - scriptType: 'lock', - walletId: 'walletId1', - }) - syncProgress.blockStartNumber = 0 - syncProgress.blockEndNumber = 1 - syncProgress.cursor = '0x1' - getAllSyncStatusToMapMock.mockResolvedValue(new Map([[scriptHash, syncProgress]])) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.subscribeSync = jest.fn() - // @ts-ignore: private-method - await connector.synchronize() - // @ts-ignore: private-method - const queue = connector.syncQueue.workersList() - expect(queue[0].data).toStrictEqual({ - script: { - codeHash: syncProgress.codeHash, - hashType: syncProgress.hashType, - args: syncProgress.args, - }, - blockRange: [ - BI.from(syncProgress.blockStartNumber).toHexString(), - BI.from(syncProgress.blockEndNumber).toHexString(), - ], - scriptType: syncProgress.scriptType, - cursor: syncProgress.cursor, - }) - }) - it('some script sync cursor is not empty but is in sync queue', async () => { - getScriptsMock.mockResolvedValue([]) - const syncProgress = SyncProgress.fromObject({ - script, - scriptType: 'lock', - walletId: 'walletId1', - }) - syncProgress.blockStartNumber = 0 - syncProgress.blockEndNumber = 1 - syncProgress.cursor = '0x1' - getAllSyncStatusToMapMock.mockResolvedValue(new Map([[scriptHash, syncProgress]])) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.syncInQueue.set(scriptHash, {}) - // @ts-ignore: private-method - connector.subscribeSync = jest.fn() - // @ts-ignore: private-method - connector.syncQueue.pause() - // @ts-ignore: private-method - await connector.synchronize() - // @ts-ignore: private-method - expect(connector.syncQueue.length()).toBe(0) - }) - it('some script sync to new block', async () => { - getScriptsMock.mockResolvedValue([ - { - script, - scriptType: 'lock', - blockNumber: '0xaa', - }, - ]) - const syncProgress = SyncProgress.fromObject({ - script, - scriptType: 'lock', - walletId: 'walletId1', - }) - syncProgress.blockStartNumber = 0 - syncProgress.blockEndNumber = 1 - getAllSyncStatusToMapMock.mockResolvedValue(new Map([[scriptHash, syncProgress]])) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.subscribeSync = jest.fn() - // @ts-ignore: private-method - await connector.synchronize() - // @ts-ignore: private-method - const queue = connector.syncQueue.workersList() - expect(queue[0].data).toStrictEqual({ - script: { - codeHash: syncProgress.codeHash, - hashType: syncProgress.hashType, - args: syncProgress.args, - }, - blockRange: [BI.from(syncProgress.blockEndNumber).toHexString(), BI.from('0xaa').toHexString()], - scriptType: syncProgress.scriptType, - cursor: syncProgress.cursor, - }) - }), - it('some script sync to new block but is in sync queue', async () => { - getScriptsMock.mockResolvedValue([ - { - script, - scriptType: 'lock', - blockNumber: '0xaa', - }, - ]) - const syncProgress = SyncProgress.fromObject({ - script, - scriptType: 'lock', - walletId: 'walletId1', - }) - syncProgress.blockStartNumber = 0 - syncProgress.blockEndNumber = 1 - getAllSyncStatusToMapMock.mockResolvedValue(new Map([[scriptHash, syncProgress]])) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.syncInQueue.set(scriptHash, {}) - // @ts-ignore: private-method - connector.subscribeSync = jest.fn() - // @ts-ignore: private-method - connector.syncQueue.pause() - // @ts-ignore: private-method - await connector.synchronize() - // @ts-ignore: private-method - expect(connector.syncQueue.length()).toBe(0) - }) - }) - - describe('test subscribeSync', () => { - it('run success', async () => { - getCurrentWalletMinBlockNumberMock.mockResolvedValue(100) - getTipHeaderMock.mockResolvedValue({ number: '0xaa' }) - const connector = new LightConnector([], '') - // @ts-ignore: private-method - connector.blockTipsSubject = { next: jest.fn() } - // @ts-ignore: private-method - await connector.subscribeSync() - // @ts-ignore: private-method - expect(connector.blockTipsSubject.next).toBeCalledWith({ - cacheTipNumber: 100, - indexerTipNumber: 170, - }) - }) - }) describe('test initSyncProgress', () => { it('there is not exist addressmata', async () => { @@ -278,6 +124,7 @@ describe('test light connector', () => { it('append multisig script', async () => { getScriptsMock.mockResolvedValue([]) const connect = new LightConnector([], '') + getOtherTypeSyncBlockNumberMock.mockResolvedValueOnce({}) //@ts-ignore await connect.initSyncProgress([{ walletId: 'walletId', script, addressType: 1, scriptType: 'lock' }]) expect(getScriptsMock).toBeCalledTimes(1) @@ -319,9 +166,24 @@ describe('test light connector', () => { }, ]) expect(resetSyncProgressMock).toBeCalledWith([ - { script: addressMeta.generateDefaultLockScript().toSDK(), scriptType: 'lock', walletId: 'walletId' }, - { script: addressMeta.generateACPLockScript().toSDK(), scriptType: 'lock', walletId: 'walletId' }, - { script: addressMeta.generateLegacyACPLockScript().toSDK(), scriptType: 'lock', walletId: 'walletId' }, + { + script: addressMeta.generateDefaultLockScript().toSDK(), + scriptType: 'lock', + walletId: 'walletId', + blockNumber: '0xaa', + }, + { + script: addressMeta.generateACPLockScript().toSDK(), + scriptType: 'lock', + walletId: 'walletId', + blockNumber: '0x0', + }, + { + script: addressMeta.generateLegacyACPLockScript().toSDK(), + scriptType: 'lock', + walletId: 'walletId', + blockNumber: '0x0', + }, ]) expect(updateSyncProgressFlagMock).toBeCalledWith(['walletId']) }) @@ -422,73 +284,6 @@ describe('test light connector', () => { }) }) - describe('test syncNextWithScript', () => { - it('no syncprogress in db', async () => { - getSyncStatusMock.mockResolvedValue(undefined) - const connect = new LightConnector([], '') - //@ts-ignore - await connect.syncNextWithScript({ script, scriptType: 'lock' }) - expect(getTransactionsMock).toBeCalledTimes(0) - }) - it('there is no tx in blockRange ', async () => { - const syncProgress = SyncProgress.fromObject({ script, scriptType: 'lock', walletId: 'walletId' }) - getSyncStatusMock.mockResolvedValue(syncProgress) - getTransactionsMock.mockResolvedValue({ txs: [], lastCursor: '0x' }) - const connect = new LightConnector([], '') - //@ts-ignore - await connect.syncNextWithScript({ script, scriptType: 'lock', blockRange: ['0xaa', '0xbb'] }) - expect(getTransactionsMock).toBeCalledWith( - { script, blockRange: ['0xaa', '0xbb'], scriptType: 'lock' }, - 'asc', - '0x64', - undefined - ) - expect(updateSyncStatusMock).toBeCalledWith(scriptHash, { - blockStartNumber: 187, - blockEndNumber: 187, - cursor: undefined, - }) - }) - it('there are some txs in blockRange but no more', async () => { - const syncProgress = SyncProgress.fromObject({ script, scriptType: 'lock', walletId: 'walletId' }) - getSyncStatusMock.mockResolvedValue(syncProgress) - getTransactionsMock.mockResolvedValue({ txs: [{ txHash: '0xhash1' }], lastCursor: '0x' }) - const connect = new LightConnector([], '') - //@ts-ignore - connect.transactionsSubject = { next: jest.fn() } - //@ts-ignore - await connect.syncNextWithScript({ script, scriptType: 'lock', blockRange: ['0xaa', '0xbb'] }) - expect(connect.transactionsSubject.next).toBeCalledWith({ txHashes: ['0xhash1'], params: scriptHash }) - //@ts-ignore - expect(connect.syncInQueue.has(scriptHash)).toBeTruthy() - //@ts-ignore - expect(connect.syncInQueue.get(scriptHash)).toStrictEqual({ - blockStartNumber: 187, - blockEndNumber: 187, - cursor: undefined, - }) - }) - it('there are some txs in blockRange and more', async () => { - const syncProgress = SyncProgress.fromObject({ script, scriptType: 'lock', walletId: 'walletId' }) - getSyncStatusMock.mockResolvedValue(syncProgress) - getTransactionsMock.mockResolvedValue({ txs: [{ txHash: '0xhash1' }], lastCursor: '0xaa' }) - const connect = new LightConnector([], '') - //@ts-ignore - connect.transactionsSubject = { next: jest.fn() } - //@ts-ignore - await connect.syncNextWithScript({ script, scriptType: 'lock', blockRange: ['0xaa', '0xbb'] }) - expect(connect.transactionsSubject.next).toBeCalledWith({ txHashes: ['0xhash1'], params: scriptHash }) - //@ts-ignore - expect(connect.syncInQueue.has(scriptHash)).toBeTruthy() - //@ts-ignore - expect(connect.syncInQueue.get(scriptHash)).toStrictEqual({ - blockStartNumber: 170, - blockEndNumber: 187, - cursor: '0xaa', - }) - }) - }) - describe('test connect', () => { const mockFn = jest.fn() beforeEach(() => { @@ -523,26 +318,30 @@ describe('test light connector', () => { }) }) - describe('test notifyCurrentBlockNumberProcessed', () => { - it('hash is not in syncInQueue', async () => { - const connect = new LightConnector([], '') - const mockFn = jest.fn() - //@ts-ignore - connect.subscribeSync = mockFn - await connect.notifyCurrentBlockNumberProcessed('0xhash1') - expect(updateSyncStatusMock).toBeCalledTimes(0) - expect(mockFn).toBeCalledTimes(1) + describe('#notifyCurrentBlockNumberProcessed', () => { + const connector = new LightConnector([], '') + const updateBlockStartNumberMock = jest.fn() + beforeAll(() => { + // @ts-ignore private property + connector.updateBlockStartNumber = updateBlockStartNumberMock }) - it('hash is in syncInQueue', async () => { - const connect = new LightConnector([], '') - //@ts-ignore - connect.subscribeSync = jest.fn() - //@ts-ignore - connect.syncInQueue.set('0xhash1', { blockStartNumber: 1, blockEndNumber: 1 }) - await connect.notifyCurrentBlockNumberProcessed('0xhash1') - //@ts-ignore - expect(connect.syncInQueue.has('0xhash1')).toBeFalsy() - expect(updateSyncStatusMock).toBeCalledWith('0xhash1', { blockStartNumber: 1, blockEndNumber: 1 }) + beforeEach(() => { + updateBlockStartNumberMock.mockReset() + }) + it('last process block number finish', async () => { + // @ts-ignore private property + connector.processingBlockNumber = '0xaa' + getCurrentWalletMinBlockNumberMock.mockResolvedValueOnce(100) + await connector.notifyCurrentBlockNumberProcessed('0xaa') + // @ts-ignore private property + expect(connector.processingBlockNumber).toBeUndefined() + expect(updateBlockStartNumberMock).toBeCalledWith(100) + }) + it('not last process block number finish', async () => { + // @ts-ignore private property + connector.processingBlockNumber = undefined + await connector.notifyCurrentBlockNumberProcessed('0xaa') + expect(updateBlockStartNumberMock).toBeCalledTimes(0) }) }) })