Skip to content

Commit

Permalink
fix: Fix light client sync miss some tx. (#2992)
Browse files Browse the repository at this point in the history
* refactor: Rename connector to synchronizer

* fix: fix light sync logic

1. Use `partial` and `delete` for set_scripts
2. Sync the next block number after all addresses are synced to the block number
3. Rename `blockStartNumber` to `localSavedBlockNumber`, rename `blockEndNumber` to `syncedBlockNumber`
  • Loading branch information
yanguoyu authored Jan 10, 2024
1 parent 1eea695 commit 5f9f767
Show file tree
Hide file tree
Showing 23 changed files with 258 additions and 197 deletions.
2 changes: 1 addition & 1 deletion packages/neuron-ui/src/components/MultisigAddress/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ export const useSubscription = ({
const tmp: Record<string, number> = {}
res.result.forEach(v => {
if (hashToPayload[v.hash]) {
tmp[hashToPayload[v.hash]] = v.blockStartNumber
tmp[hashToPayload[v.hash]] = v.localSavedBlockNumber
}
})
setMultisigSyncProgress(tmp)
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-ui/src/services/remote/multisig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ export const generateMultisigSendAllTx = remoteApi<{
multisigConfig: MultisigConfig
}>('generate-multisig-send-all-tx')
export const loadMultisigTxJson = remoteApi<string, OfflineSignJSON>('load-multisig-tx-json')
export const getMultisigSyncProgress = remoteApi<string[], { hash: string; blockStartNumber: number }[]>(
export const getMultisigSyncProgress = remoteApi<string[], { hash: string; localSavedBlockNumber: number }[]>(
'get-sync-progress-by-addresses'
)
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/block-sync-renderer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import DataUpdateSubject from '../models/subjects/data-update'
import AddressCreatedSubject from '../models/subjects/address-created-subject'
import WalletDeletedSubject from '../models/subjects/wallet-deleted-subject'
import TxDbChangedSubject from '../models/subjects/tx-db-changed-subject'
import { LumosCellQuery, LumosCell } from './sync/connector'
import { LumosCellQuery, LumosCell } from './sync/synchronizer'
import { WorkerMessage, StartParams, QueryIndexerParams } from './task'
import logger from '../utils/logger'
import CommonUtils from '../utils/common'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import logger from '../../utils/logger'
import CommonUtils from '../../utils/common'
import RpcService from '../../services/rpc-service'
import { Address } from '../../models/address'
import { Connector } from './connector'
import { Synchronizer } from './synchronizer'
import { NetworkType } from '../../models/network'
import IndexerCacheService from './indexer-cache-service'

export default class IndexerConnector extends Connector {
export default class FullSynchronizer extends Synchronizer {
private rpcService: RpcService

constructor(addresses: Address[], nodeUrl: string, indexerUrl: string, nodeType: NetworkType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Address } from '../../models/address'
import AddressMeta from '../../database/address/meta'
import { scheduler } from 'timers/promises'
import SyncProgressService from '../../services/sync-progress'
import { Connector, AppendScript } from './connector'
import { Synchronizer, AppendScript } from './synchronizer'
import { computeScriptHash as scriptToHash } from '@ckb-lumos/base/lib/utils'
import { FetchTransactionReturnType, LightRPC, LightScriptFilter } from '../../utils/ckb-rpc'
import Multisig from '../../services/multisig'
Expand All @@ -24,7 +24,7 @@ import NetworksService from '../../services/networks'

const unpackGroup = molecule.vector(blockchain.OutPoint)

export default class LightConnector extends Connector {
export default class LightSynchronizer extends Synchronizer {
private lightRpc: LightRPC
private addressMetas: AddressMeta[]

Expand Down Expand Up @@ -87,7 +87,7 @@ export default class LightConnector extends Connector {
private async synchronize() {
const syncScripts = await this.upsertTxHashes()
await this.updateSyncedBlockOfScripts(syncScripts)
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber()
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
const hasNextBlock = await this.notifyAndSyncNext(minSyncBlockNumber)
if (!hasNextBlock) {
await this.updateBlockStartNumber(minSyncBlockNumber)
Expand Down Expand Up @@ -138,7 +138,7 @@ export default class LightConnector extends Connector {
const txs = await this.getTransactions({
script: syncScript.script,
scriptType: syncScript.scriptType,
blockRange: [BI.from(syncStatus.blockEndNumber).toHexString(), syncScript.blockNumber],
blockRange: [BI.from(syncStatus.syncedBlockNumber).toHexString(), syncScript.blockNumber],
})
insertTxCaches.push(
...txs.map(v => ({
Expand All @@ -164,7 +164,7 @@ export default class LightConnector extends Connector {
syncScripts.forEach(v => {
const currentSyncProgress = syncStatusMap.get(scriptToHash(v.script))
if (currentSyncProgress) {
currentSyncProgress.blockEndNumber = parseInt(v.blockNumber)
currentSyncProgress.syncedBlockNumber = parseInt(v.blockNumber)
updatedSyncProgress.push(currentSyncProgress)
}
})
Expand Down Expand Up @@ -196,35 +196,42 @@ export default class LightConnector extends Connector {
}))
})
.flat()
const walletMinBlockNumber = await SyncProgressService.getWalletMinBlockNumber()
const walletMinBlockNumber = await SyncProgressService.getWalletMinLocalSavedBlockNumber()
const wallets = await WalletService.getInstance().getAll()
const walletStartBlockMap = wallets.reduce<Record<string, string | undefined>>(
(pre, cur) => ({ ...pre, [cur.id]: cur.startBlockNumber }),
{}
)
const otherTypeSyncBlockNumber = await SyncProgressService.getOtherTypeSyncBlockNumber()
const setScriptsParams = [
...allScripts.map(v => {
const blockNumber = Math.max(
parseInt(walletStartBlockMap[v.walletId] ?? '0x0'),
walletMinBlockNumber?.[v.walletId] ?? 0,
parseInt(existSyncscripts[scriptToHash(v.script)]?.blockNumber ?? '0x0')
)
return {
const addScripts = [
...allScripts
.filter(v => !existSyncscripts[scriptToHash(v.script)])
.map(v => {
const blockNumber = Math.max(
parseInt(walletStartBlockMap[v.walletId] ?? '0x0'),
walletMinBlockNumber?.[v.walletId] ?? 0
)
return {
...v,
blockNumber: `0x${blockNumber.toString(16)}`,
}
}),
...appendScripts
.filter(v => !existSyncscripts[scriptToHash(v.script)])
.map(v => ({
...v,
blockNumber: `0x${blockNumber.toString(16)}`,
}
}),
...appendScripts.map(v => ({
...v,
blockNumber:
existSyncscripts[scriptToHash(v.script)]?.blockNumber ??
`0x${(otherTypeSyncBlockNumber[scriptToHash(v.script)] ?? 0).toString(16)}`,
})),
blockNumber: `0x${(otherTypeSyncBlockNumber[scriptToHash(v.script)] ?? 0).toString(16)}`,
})),
]
await this.lightRpc.setScripts(setScriptsParams)
await this.lightRpc.setScripts(addScripts, 'partial')
const allScriptHashes = new Set([
...allScripts.map(v => scriptToHash(v.script)),
...appendScripts.map(v => scriptToHash(v.script)),
])
const deleteScript = syncScripts.filter(v => !allScriptHashes.has(scriptToHash(v.script)))
await this.lightRpc.setScripts(deleteScript, 'delete')
const walletIds = [...new Set(this.addressMetas.map(v => v.walletId))]
await SyncProgressService.resetSyncProgress(setScriptsParams)
await SyncProgressService.resetSyncProgress(addScripts)
await SyncProgressService.updateSyncProgressFlag(walletIds)
await SyncProgressService.removeByHashesAndAddressType(
SyncAddressType.Multisig,
Expand Down Expand Up @@ -281,9 +288,25 @@ export default class LightConnector extends Connector {
this.initSyncProgress(scripts)
}

private async checkTxExist(txHashes: string[]) {
const transactions = await this.lightRpc
.createBatchRequest<'getTransaction', string[], TransactionWithStatus[]>(txHashes.map(v => ['getTransaction', v]))
.exec()
return transactions.every(v => !!v.transaction)
}

async processTxsInNextBlockNumber() {
const [nextBlockNumber, txHashesInNextBlock] = await this.getTxHashesWithNextUnprocessedBlockNumber()
if (nextBlockNumber !== undefined && txHashesInNextBlock.length) {
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
if (
nextBlockNumber !== undefined &&
txHashesInNextBlock.length &&
// For light client, if tx hash has been called with fetch_transaction, the tx can not return by get_transactions
// So before derived address synced to bigger than next synced block number, do not sync the next block number
minSyncBlockNumber >= parseInt(nextBlockNumber) &&
// check whether the tx is sync from light client, after split the light client and full DB file, this check will remove
(await this.checkTxExist(txHashesInNextBlock))
) {
this.processingBlockNumber = nextBlockNumber
await this.fetchPreviousOutputs(txHashesInNextBlock)
this.transactionsSubject.next({ txHashes: txHashesInNextBlock, params: this.processingBlockNumber })
Expand All @@ -296,7 +319,7 @@ export default class LightConnector extends Connector {
} else {
return
}
const minCachedBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber()
const minCachedBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
await this.updateBlockStartNumber(Math.min(parseInt(blockNumber), minCachedBlockNumber))
this.processNextBlockNumber()
}
Expand Down
14 changes: 7 additions & 7 deletions packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import AddressParser from '../../models/address-parser'
import Multisig from '../../models/multisig'
import BlockHeader from '../../models/chain/block-header'
import TxAddressFinder from './tx-address-finder'
import IndexerConnector from './indexer-connector'
import FullSynchronizer from './full-synchronizer'
import IndexerCacheService from './indexer-cache-service'
import logger from '../../utils/logger'
import CommonUtils from '../../utils/common'
import { ShouldInChildProcess } from '../../exceptions'
import { AppendScript, BlockTips, Connector } from './connector'
import LightConnector from './light-connector'
import { AppendScript, BlockTips, Synchronizer } from './synchronizer'
import LightSynchronizer from './light-synchronizer'
import { generateRPC } from '../../utils/ckb-rpc'
import { BUNDLED_LIGHT_CKB_URL } from '../../utils/const'
import { NetworkType } from '../../models/network'
Expand All @@ -30,7 +30,7 @@ export default class Queue {
#indexerUrl: string
#addresses: AddressInterface[]
#rpcService: RpcService
#indexerConnector: Connector | undefined
#indexerConnector: Synchronizer | undefined
#checkAndSaveQueue: QueueObject<{ txHashes: CKBComponents.Hash[]; params: unknown }> | undefined
#lockArgsSet: Set<string> = new Set()

Expand Down Expand Up @@ -67,9 +67,9 @@ export default class Queue {
logger.info('Queue:\tstart')
try {
if (this.#url === BUNDLED_LIGHT_CKB_URL) {
this.#indexerConnector = new LightConnector(this.#addresses, this.#url)
this.#indexerConnector = new LightSynchronizer(this.#addresses, this.#url)
} else {
this.#indexerConnector = new IndexerConnector(this.#addresses, this.#url, this.#indexerUrl, this.#nodeType)
this.#indexerConnector = new FullSynchronizer(this.#addresses, this.#url, this.#indexerUrl, this.#nodeType)
}
await this.#indexerConnector!.connect()
} catch (error) {
Expand Down Expand Up @@ -110,7 +110,7 @@ export default class Queue {
})
}

getIndexerConnector = (): Connector => this.#indexerConnector!
getIndexerConnector = (): Synchronizer => this.#indexerConnector!

stop = () => this.#indexerConnector!.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface AppendScript {
scriptType: CKBRPC.ScriptType
}

export abstract class Connector {
export abstract class Synchronizer {
public readonly blockTipsSubject: Subject<BlockTips> = new Subject<BlockTips>()
public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: string }>()
protected indexer: CkbIndexer
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/block-sync-renderer/task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { LumosCellQuery } from './sync/connector'
import type { LumosCellQuery } from './sync/synchronizer'
import initConnection from '../database/chain/ormconfig'
import { register as registerTxStatusListener } from './tx-status-listener'
import SyncQueue from './sync/queue'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ export default class SyncProgress {
walletId!: string

@Column()
blockStartNumber: number = 0
lightStartBlockNumber: number = 0

@Column()
blockEndNumber: number = 0
localSavedBlockNumber: number = 0

@Column()
syncedBlockNumber: number = 0

@Column({ type: 'varchar' })
cursor?: HexString
Expand All @@ -58,8 +61,9 @@ export default class SyncProgress {
res.scriptType = obj.scriptType
res.delete = false
res.addressType = obj.addressType ?? SyncAddressType.Default
res.blockStartNumber = parseInt(obj.blockNumber)
res.blockEndNumber = parseInt(obj.blockNumber)
res.lightStartBlockNumber = parseInt(obj.blockNumber)
res.localSavedBlockNumber = parseInt(obj.blockNumber)
res.syncedBlockNumber = parseInt(obj.blockNumber)
return res
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {MigrationInterface, QueryRunner} from "typeorm";
import SyncProgress from "../entities/sync-progress";

const chunk = 100
export class ResetSyncProgressPrimaryKey1690361215400 implements MigrationInterface {
name = 'ResetSyncProgressPrimaryKey1690361215400'

public async up(queryRunner: QueryRunner): Promise<void> {
const syncProgresses = await queryRunner.manager.find(SyncProgress)
const syncProgresses = await queryRunner.manager.query('select * from sync_progress')
await queryRunner.query(`DROP TABLE "sync_progress"`)
await queryRunner.query(`CREATE TABLE "sync_progress" ("hash" varchar NOT NULL, "args" varchar NOT NULL, "codeHash" varchar NOT NULL, "hashType" varchar NOT NULL, "scriptType" varchar NOT NULL, "walletId" varchar NOT NULL, "blockStartNumber" integer NOT NULL, "blockEndNumber" integer, "cursor" varchar, "delete" boolean, "addressType" integer, PRIMARY KEY ("hash", "walletId"))`)
for (let index = 0; index < syncProgresses.length; index += 500) {
await queryRunner.manager.save(syncProgresses.slice(index, index + 500))
for (let index = 0; index < syncProgresses.length; index += chunk) {
await queryRunner.manager.query(`INSERT INTO sync_progress VALUES ${syncProgresses.slice(index, index + chunk).reduce((pre: string, cur: any) => `${pre ? `${pre},` : ''}("${cur.hash}","${cur.args}","${cur.codeHash}","${cur.hashType}","${cur.scriptType}","${cur.walletId}",${cur.blockStartNumber},${cur.blockEndNumber},${cur.cursor ? `"${cur.cursor}"` : 'NULL'},${cur.delete},${cur.addressType})`, '')};`)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {MigrationInterface, QueryRunner, TableColumn} from "typeorm";

export class RenameSyncProgress1702781527414 implements MigrationInterface {
name = 'RenameSyncProgress1702781527414'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn('sync_progress', 'blockStartNumber', 'localSavedBlockNumber')
await queryRunner.renameColumn('sync_progress', 'blockEndNumber', 'syncedBlockNumber')
await queryRunner.addColumn('sync_progress', new TableColumn({
name: 'lightStartBlockNumber',
type: 'integer',
default: 0
}))
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn('sync_progress', 'localSavedBlockNumber', 'blockStartNumber')
await queryRunner.renameColumn('sync_progress', 'syncedBlockNumber', 'blockEndNumber')
}

}
2 changes: 2 additions & 0 deletions packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { ResetSyncProgressPrimaryKey1690361215400 } from './migrations/169036121
import { TxLockAddArgs1694746034975 } from './migrations/1694746034975-TxLockAddArgs'
import { IndexerTxHashCacheRemoveField1701234043431 } from './migrations/1701234043431-IndexerTxHashCacheRemoveField'
import { CreateCellLocalInfo1701234043432 } from './migrations/1701234043432-CreateCellLocalInfo'
import { RenameSyncProgress1702781527414 } from './migrations/1702781527414-RenameSyncProgress'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand Down Expand Up @@ -132,6 +133,7 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
TxLockAddArgs1694746034975,
IndexerTxHashCacheRemoveField1701234043431,
CreateCellLocalInfo1701234043432,
RenameSyncProgress1702781527414,
],
logger: 'simple-console',
logging,
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/models/chain/live-cell.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Script, { ScriptHashType } from './script'
import OutPoint from './out-point'
import { LumosCell } from '../../block-sync-renderer/sync/connector'
import { LumosCell } from '../../block-sync-renderer/sync/synchronizer'

const LUMOS_HASH_TYPE_MAP: Record<string, ScriptHashType> = {
type: ScriptHashType.Type,
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/services/live-cell-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Script from '../models/chain/script'
import LiveCell from '../models/chain/live-cell'
import { queryIndexer } from '../block-sync-renderer/index'
import { LumosCell, LumosCellQuery } from '../block-sync-renderer/sync/connector'
import { LumosCell, LumosCellQuery } from '../block-sync-renderer/sync/synchronizer'

export default class LiveCellService {
private static instance: LiveCellService
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/services/multisig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export default class MultisigService {
.where({ hash: In(multisigScriptHashList) })
.getMany()
const syncBlockNumbersMap: Record<string, number> = syncBlockNumbers.reduce(
(pre, cur) => ({ ...pre, [cur.hash]: cur.blockStartNumber }),
(pre, cur) => ({ ...pre, [cur.hash]: cur.localSavedBlockNumber }),
{}
)
await getConnection()
Expand Down
Loading

2 comments on commit 5f9f767

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaging for test is done in 7469070761

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaging for test is done in 7469071695

Please sign in to comment.