From fb74c2516f9ecef905c7a52036e8108ba9fd03a1 Mon Sep 17 00:00:00 2001 From: mainnet-pat Date: Sun, 29 Dec 2024 09:08:16 +0000 Subject: [PATCH] Support ESM targets, add BCH support --- bch/bch-processor/README.md | 3 + bch/bch-processor/package.json | 41 + bch/bch-processor/src/ds-archive/client.ts | 90 +++ bch/bch-processor/src/ds-archive/schema.ts | 22 + bch/bch-processor/src/ds-rpc/client.ts | 269 +++++++ bch/bch-processor/src/ds-rpc/filter.ts | 55 ++ bch/bch-processor/src/ds-rpc/mapping.ts | 60 ++ bch/bch-processor/src/ds-rpc/request.ts | 50 ++ bch/bch-processor/src/ds-rpc/rpc-data.ts | 181 +++++ bch/bch-processor/src/ds-rpc/rpc.ts | 724 ++++++++++++++++++ bch/bch-processor/src/ds-rpc/schema.ts | 36 + bch/bch-processor/src/ds-rpc/util.ts | 57 ++ bch/bch-processor/src/index.ts | 4 + bch/bch-processor/src/interfaces/base.ts | 5 + bch/bch-processor/src/interfaces/bch.ts | 26 + bch/bch-processor/src/interfaces/chain.ts | 14 + .../src/interfaces/data-request.ts | 14 + bch/bch-processor/src/interfaces/data.ts | 112 +++ bch/bch-processor/src/mapping/entities.ts | 67 ++ bch/bch-processor/src/mapping/relations.ts | 12 + bch/bch-processor/src/mapping/schema.ts | 100 +++ bch/bch-processor/src/mapping/selection.ts | 42 + bch/bch-processor/src/processor.ts | 620 +++++++++++++++ bch/bch-processor/tsconfig.json | 22 + common/config/rush/pnpm-lock.yaml | 119 ++- .../openreader/src/dialect/opencrud/where.ts | 2 +- graphql/openreader/src/model.tools.ts | 4 +- rush.json | 6 + typeorm/typeorm-codegen/src/codegen.ts | 22 +- typeorm/typeorm-codegen/src/main.ts | 2 +- typeorm/typeorm-config/src/config.ts | 2 +- typeorm/typeorm-migration/src/create.ts | 5 +- typeorm/typeorm-migration/src/generate.ts | 4 +- typeorm/typeorm-store/src/database.ts | 38 +- typeorm/typeorm-store/src/hot.ts | 11 +- .../typeorm-store/src/test/database.test.ts | 178 +++++ .../src/database.ts | 5 + .../src/datasource.ts | 8 + .../src/runner.ts | 65 +- 39 files changed, 3052 insertions(+), 45 deletions(-) create mode 100644 bch/bch-processor/README.md create mode 100644 bch/bch-processor/package.json create mode 100644 bch/bch-processor/src/ds-archive/client.ts create mode 100644 bch/bch-processor/src/ds-archive/schema.ts create mode 100644 bch/bch-processor/src/ds-rpc/client.ts create mode 100644 bch/bch-processor/src/ds-rpc/filter.ts create mode 100644 bch/bch-processor/src/ds-rpc/mapping.ts create mode 100644 bch/bch-processor/src/ds-rpc/request.ts create mode 100644 bch/bch-processor/src/ds-rpc/rpc-data.ts create mode 100644 bch/bch-processor/src/ds-rpc/rpc.ts create mode 100644 bch/bch-processor/src/ds-rpc/schema.ts create mode 100644 bch/bch-processor/src/ds-rpc/util.ts create mode 100644 bch/bch-processor/src/index.ts create mode 100644 bch/bch-processor/src/interfaces/base.ts create mode 100644 bch/bch-processor/src/interfaces/bch.ts create mode 100644 bch/bch-processor/src/interfaces/chain.ts create mode 100644 bch/bch-processor/src/interfaces/data-request.ts create mode 100644 bch/bch-processor/src/interfaces/data.ts create mode 100644 bch/bch-processor/src/mapping/entities.ts create mode 100644 bch/bch-processor/src/mapping/relations.ts create mode 100644 bch/bch-processor/src/mapping/schema.ts create mode 100644 bch/bch-processor/src/mapping/selection.ts create mode 100644 bch/bch-processor/src/processor.ts create mode 100644 bch/bch-processor/tsconfig.json diff --git a/bch/bch-processor/README.md b/bch/bch-processor/README.md new file mode 100644 index 000000000..b6ae3a1fa --- /dev/null +++ b/bch/bch-processor/README.md @@ -0,0 +1,3 @@ +# @subsquid/bch-processor + +Data fetcher and mappings executor for BitcoinCash chain. diff --git a/bch/bch-processor/package.json b/bch/bch-processor/package.json new file mode 100644 index 000000000..15e2f61af --- /dev/null +++ b/bch/bch-processor/package.json @@ -0,0 +1,41 @@ +{ + "name": "@subsquid/bch-processor", + "type": "module", + "version": "1.0.0", + "description": "Data fetcher and mappings executor for BitcoinCash chain", + "license": "GPL-3.0-or-later", + "repository": "git@github.com:subsquid/squid.git", + "publishConfig": { + "access": "public", + "registry": "https://registry.npmjs.org/" + }, + "files": [ + "lib", + "src" + ], + "main": "lib/index.js", + "dependencies": { + "@bitauth/libauth": "^3.0.0", + "@subsquid/http-client": "^1.6.0", + "@subsquid/logger": "^1.3.3", + "@subsquid/rpc-client": "^4.11.0", + "@subsquid/util-internal": "^3.2.0", + "@subsquid/util-internal-archive-client": "^0.1.2", + "@subsquid/util-internal-hex": "^1.2.2", + "@subsquid/util-internal-ingest-tools": "^1.1.4", + "@subsquid/util-internal-processor-tools": "^4.1.1", + "@subsquid/util-internal-range": "^0.3.0", + "@subsquid/util-internal-validation": "^0.7.0", + "@subsquid/util-timeout": "^2.3.2", + "bitcoin-minimal": "^1.1.7", + "lru-cache": "^11.0.2", + "p2p-cash": "^1.1.12" + }, + "devDependencies": { + "@types/node": "^18.18.14", + "typescript": "~5.5.4" + }, + "scripts": { + "build": "rm -rf lib && tsc" + } +} \ No newline at end of file diff --git a/bch/bch-processor/src/ds-archive/client.ts b/bch/bch-processor/src/ds-archive/client.ts new file mode 100644 index 000000000..34d6f5fa3 --- /dev/null +++ b/bch/bch-processor/src/ds-archive/client.ts @@ -0,0 +1,90 @@ +import {addErrorContext} from '@subsquid/util-internal' +import {ArchiveClient} from '@subsquid/util-internal-archive-client' +import {archiveIngest} from '@subsquid/util-internal-ingest-tools' +import {Batch, DataSource} from '@subsquid/util-internal-processor-tools' +import {getRequestAt, RangeRequest} from '@subsquid/util-internal-range' +import {cast} from '@subsquid/util-internal-validation' +import assert from 'assert' +import {Bytes32} from '../interfaces/base.js' +import {FieldSelection} from '../interfaces/data.js' +import {DataRequest} from '../interfaces/data-request.js' +import { + Block, + BlockHeader, + Transaction +} from '../mapping/entities.js' +import {setUpRelations} from '../mapping/relations.js' +import {getBlockValidator} from './schema.js' + + +const NO_FIELDS = {} + + +export class BchArchive implements DataSource { + constructor(private client: ArchiveClient) {} + + getFinalizedHeight(): Promise { + return this.client.getHeight() + } + + async getBlockHash(height: number): Promise { + let blocks = await this.client.query({ + fromBlock: height, + toBlock: height, + includeAllBlocks: true + }) + assert(blocks.length == 1) + return blocks[0].header.hash + } + + async *getFinalizedBlocks(requests: RangeRequest[], stopOnHead?: boolean | undefined): AsyncIterable> { + for await (let batch of archiveIngest({ + requests, + client: this.client, + stopOnHead + })) { + let fields = getRequestAt(requests, batch.blocks[0].header.number)?.fields || NO_FIELDS + + let blocks = batch.blocks.map(b => { + try { + return this.mapBlock(b, fields) + } catch(err: any) { + throw addErrorContext(err, { + blockHeight: b.header.number, + blockHash: b.header.hash + }) + } + }) + + yield {blocks, isHead: batch.isHead, mempoolTransactions: []} + } + } + + private mapBlock(rawBlock: unknown, fields: FieldSelection): Block { + let validator = getBlockValidator(fields) + + let src = cast(validator, rawBlock) + + let {height, hash, parentHash, ...hdr} = src.header + if (hdr.timestamp) { + hdr.timestamp = hdr.timestamp * 1000 // convert to ms + } + + let header = new BlockHeader(height, hash, parentHash) + Object.assign(header, hdr) + + let block = new Block(header) + + if (src.transactions) { + for (let {...props} of src.transactions) { + let tx = new Transaction(header, 0) + Object.assign(tx, props) + block.transactions.push(tx) + } + } + + setUpRelations(block) + + return block + } +} diff --git a/bch/bch-processor/src/ds-archive/schema.ts b/bch/bch-processor/src/ds-archive/schema.ts new file mode 100644 index 000000000..0a34790b8 --- /dev/null +++ b/bch/bch-processor/src/ds-archive/schema.ts @@ -0,0 +1,22 @@ +import {weakMemo} from '@subsquid/util-internal' +import {array, BYTES, object, option} from '@subsquid/util-internal-validation' +import {FieldSelection} from '../interfaces/data.js' +import { + getBlockHeaderProps, + getTxProps, +} from '../mapping/schema.js' + + +export const getBlockValidator = weakMemo((fields: FieldSelection) => { + let BlockHeader = object(getBlockHeaderProps(fields.block, true)) + + let Transaction = object({ + hash: fields.transaction?.hash ? BYTES : undefined, + ...getTxProps(fields.transaction, true), + }) + + return object({ + header: BlockHeader, + transactions: option(array(Transaction)), + }) +}) diff --git a/bch/bch-processor/src/ds-rpc/client.ts b/bch/bch-processor/src/ds-rpc/client.ts new file mode 100644 index 000000000..1284e9687 --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/client.ts @@ -0,0 +1,269 @@ +import {Logger, LogLevel} from '@subsquid/logger' +import {RpcClient} from '@subsquid/rpc-client' +import {AsyncQueue, ensureError, last, maybeLast, Throttler, wait} from '@subsquid/util-internal' +import { + BlockConsistencyError, + BlockHeader as Head, + BlockRef, + coldIngest, + HashAndHeight, + HotProcessor, + isDataConsistencyError +} from '@subsquid/util-internal-ingest-tools' +import {Batch, HotDatabaseState, HotDataSource, HotUpdate} from '@subsquid/util-internal-processor-tools' +import { + getRequestAt, + mapRangeRequestList, + rangeEnd, + RangeRequest, + RangeRequestList, + splitRange, + splitRangeByRequest, + SplitRequest +} from '@subsquid/util-internal-range' +import {cast, NAT, object} from '@subsquid/util-internal-validation' +import {addTimeout, TimeoutError} from '@subsquid/util-timeout' +import assert from 'assert' +import {Bytes32} from '../interfaces/base.js' +import {DataRequest} from '../interfaces/data-request.js' +import {Block} from '../mapping/entities.js' +import {mapBlock} from './mapping.js' +import {MappingRequest, toMappingRequest} from './request.js' +import {Rpc, RpcValidationFlags} from './rpc.js' +import { HEX } from './rpc-data.js' + + +const NO_REQUEST = toMappingRequest() + + +export interface BchRpcDataSourceOptions { + rpc: RpcClient + p2pEndpoint?: string // endpoint in format "ip:port" + finalityConfirmation: number + newHeadTimeout?: number + headPollInterval?: number + log?: Logger + validationFlags?: RpcValidationFlags +} + +export class BchRpcDataSource implements HotDataSource { + public rpc: Rpc + private finalityConfirmation: number + private headPollInterval: number + private newHeadTimeout: number + + private log?: Logger + + constructor(options: BchRpcDataSourceOptions) { + this.log = options.log + this.rpc = new Rpc(options.rpc, this.log, options.validationFlags, 0, {p2pEndpoint: options.p2pEndpoint }) + this.finalityConfirmation = options.finalityConfirmation + this.headPollInterval = options.headPollInterval || 5_000 + this.newHeadTimeout = options.newHeadTimeout || 0 + } + + async getFinalizedHeight(): Promise { + let height = await this.rpc.getHeight() + return Math.max(0, height - this.finalityConfirmation) + } + + getBlockHash(height: number): Promise { + return this.rpc.getBlockHash(height) + } + + getFinalizedBlocks( + requests: RangeRequest[], + stopOnHead?: boolean + ): AsyncIterable> { + return coldIngest({ + getFinalizedHeight: () => this.getFinalizedHeight(), + getSplit: req => this._getColdSplit(req), + requests: mapRangeRequestList(requests, req => this.toMappingRequest(req)), + splitSize: 10, + concurrency: Math.min(5, this.rpc.client.getConcurrency()), + stopOnHead, + headPollInterval: this.headPollInterval + }) + } + + private async _getColdSplit(req: SplitRequest): Promise { + let rpc = this.rpc.withPriority(req.range.from) + let blocks = await rpc.getColdSplit(req).catch(err => { + if (isDataConsistencyError(err)) { + err.message += '. Perhaps finality confirmation was not large enough' + } + throw err + }) + await rpc.cleanupRpc() + return blocks.map(b => mapBlock(b, req.request)) + } + + private toMappingRequest(req?: DataRequest): MappingRequest { + let r = toMappingRequest(req) + return r + } + + async processHotBlocks( + requests: RangeRequestList, + state: HotDatabaseState, + cb: (upd: HotUpdate) => Promise + ): Promise { + if (requests.length == 0) return + + let mappingRequests = mapRangeRequestList(requests, req => this.toMappingRequest(req)) + + let self = this + + let proc = new HotProcessor(state, { + process: cb, + getBlock: async ref => { + let req = getRequestAt(mappingRequests, ref.height) || NO_REQUEST + let block = await this.rpc.getColdBlock(ref.hash, req, proc.getFinalizedHeight()) + return mapBlock(block, req) + }, + async *getBlockRange(from: number, to: BlockRef): AsyncIterable { + assert(to.height != null) + if (from > to.height) { + from = to.height + } + for (let split of splitRangeByRequest(mappingRequests, {from, to: to.height})) { + let request = split.request || NO_REQUEST + for (let range of splitRange(10, split.range)) { + let rpcBlocks = await self.rpc.getHotSplit({ + range, + request, + finalizedHeight: proc.getFinalizedHeight() + }) + let blocks = rpcBlocks.map(b => mapBlock(b, request)) + let lastBlock = maybeLast(blocks)?.header.height ?? range.from - 1 + yield blocks + if (lastBlock < range.to) { + throw new BlockConsistencyError({height: lastBlock + 1}) + } + } + } + }, + getHeader(block) { + return block.header + } + }) + + let isEnd = () => proc.getFinalizedHeight() >= rangeEnd(last(requests).range) + + let navigate = (head: {height: number, hash?: Bytes32}): Promise => { + return proc.goto({ + best: head, + finalized: { + height: Math.max(head.height - this.finalityConfirmation, 0) + } + }) + } + + if (this.rpc.client.supportsNotifications()) { + return this.subscription(navigate, isEnd) + } else { + return this.polling(navigate, isEnd) + } + } + + async processMempool( + requests: RangeRequestList, + state: HotDatabaseState, + cb: (upd: HotUpdate) => Promise + ): Promise<() => Promise> { + return await this.rpc.watchMempool(requests as any, state, cb as any); + } + + private async polling(cb: (head: {height: number}) => Promise, isEnd: () => boolean): Promise { + let prev = -1 + let height = new Throttler(() => this.rpc.getHeight(), this.headPollInterval) + while (!isEnd()) { + let next = await height.call() + if (next <= prev) continue + prev = next + for (let i = 0; i < 100; i++) { + try { + await cb({height: next}) + break + } catch(err: any) { + if (isDataConsistencyError(err)) { + this.log?.write( + i > 0 ? LogLevel.WARN : LogLevel.DEBUG, + err.message + ) + await wait(100) + } else { + throw err + } + } + } + } + } + + private async subscription(cb: (head: HashAndHeight) => Promise, isEnd: () => boolean): Promise { + let lastHead: HashAndHeight = {height: -1, hash: '0x'} + let heads = await this.subscribeNewHeads() + try { + while (!isEnd()) { + let next = await addTimeout(heads.take(), this.newHeadTimeout).catch(ensureError) + assert(next) + if (next instanceof TimeoutError) { + this.log?.warn(`resetting RPC connection, because we haven't seen a new head for ${this.newHeadTimeout} ms`) + this.rpc.client.reset() + } else if (next instanceof Error) { + throw next + } else if (next.height >= lastHead.height) { + lastHead = next + for (let i = 0; i < 3; i++) { + try { + await cb(next) + break + } catch(err: any) { + if (isDataConsistencyError(err)) { + this.log?.write( + i > 0 ? LogLevel.WARN : LogLevel.DEBUG, + err.message + ) + await wait(100) + if (heads.peek()) break + } else { + throw err + } + } + } + } + } + } finally { + heads.close() + } + } + + private async subscribeNewHeads(): Promise> { + let queue = new AsyncQueue(1) + + const unsubscribe = await this.rpc.watchNewBlocks(async (head) => { + try { + let {height, hash, parentHash} = cast(NewHeadMessage, head) + queue.forcePut({ + height, + hash, + parentHash + }) + } catch(err: any) { + queue.forcePut(ensureError(err)) + queue.close() + } + }) + + queue.addCloseListener(async () => await unsubscribe()) + + return queue + } +} + + +const NewHeadMessage = object({ + height: NAT, + hash: HEX, + parentHash: HEX +}) diff --git a/bch/bch-processor/src/ds-rpc/filter.ts b/bch/bch-processor/src/ds-rpc/filter.ts new file mode 100644 index 000000000..b1e9abe2a --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/filter.ts @@ -0,0 +1,55 @@ +import {weakMemo} from '@subsquid/util-internal' +import {EntityFilter, FilterBuilder} from '@subsquid/util-internal-processor-tools' +import {Block, Transaction} from '../mapping/entities.js' +import {DataRequest} from '../interfaces/data-request.js' + + +function buildTransactionFilter(dataRequest: DataRequest): EntityFilter { + let items = new EntityFilter() + for (let req of dataRequest.transactions || []) { + let { + // address, tokenId + ...relations} = req + let filter = new FilterBuilder() + // filter.propIn('address', address) + // filter.propIn('tokenId', tokenId) + items.add(filter, relations) + } + return items +} + + +const getItemFilter = weakMemo((dataRequest: DataRequest) => { + return { + transactions: buildTransactionFilter(dataRequest), + } +}) + + +class IncludeSet { + transactions = new Set() + addTransaction(tx?: Transaction): void { + if (tx) this.transactions.add(tx) + } +} + + +export function filterBlock(block: Block, dataRequest: DataRequest): void { + let items = getItemFilter(dataRequest) + + let include = new IncludeSet() + + if (items.transactions.present()) { + for (let tx of block.transactions) { + let rel = items.transactions.match(tx) + if (rel == null) continue + include.addTransaction(tx) + } + } + + block.transactions = block.transactions.filter(tx => { + if (!include.transactions.has(tx)) return false + return true + }) +} diff --git a/bch/bch-processor/src/ds-rpc/mapping.ts b/bch/bch-processor/src/ds-rpc/mapping.ts new file mode 100644 index 000000000..48769dd09 --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/mapping.ts @@ -0,0 +1,60 @@ +import {addErrorContext} from '@subsquid/util-internal' +import {cast} from '@subsquid/util-internal-validation' +import { + Block, + BlockHeader, + Transaction +} from '../mapping/entities.js' +import {setUpRelations} from '../mapping/relations.js' +import {filterBlock} from './filter.js' +import {MappingRequest} from './request.js' +import {Block as RpcBlock} from './rpc-data.js' +import {getBlockValidator} from './schema.js' + + +export function mapBlock(rpcBlock: RpcBlock, req: MappingRequest): Block { + try { + return tryMapBlock(rpcBlock, req) + } catch(err: any) { + throw addErrorContext(err, { + blockHash: rpcBlock.hash, + blockHeight: rpcBlock.height + }) + } +} + + +function tryMapBlock(rpcBlock: RpcBlock, req: MappingRequest): Block { + let src = cast(getBlockValidator(req), rpcBlock) + + let {height, hash, parentHash, transactions, ...headerProps} = src.block + if (headerProps.timestamp) { + headerProps.timestamp = headerProps.timestamp * 1000 // convert to ms + } + + let header = new BlockHeader(height, hash, parentHash) + Object.assign(header, headerProps) + + let block = new Block(header) + + if (req.transactionList) { + for (let i = 0; i < transactions.length; i++) { + let stx = transactions[i] + let tx = new Transaction(header, i) + if (typeof stx == 'string') { + if (req.fields.transaction?.hash) { + tx.hash = stx + } + } else { + let {...props} = stx + Object.assign(tx, props) + } + block.transactions.push(tx) + } + } + + setUpRelations(block) + filterBlock(block, req.dataRequest) + + return block +} diff --git a/bch/bch-processor/src/ds-rpc/request.ts b/bch/bch-processor/src/ds-rpc/request.ts new file mode 100644 index 000000000..12bdcbc69 --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/request.ts @@ -0,0 +1,50 @@ +import {FieldSelection} from '../interfaces/data.js' +import {DataRequest} from '../interfaces/data-request.js' +import {BchTransaction} from '../interfaces/bch.js' +import {DataRequest as RpcDataRequest} from './rpc-data.js' + + +export interface MappingRequest extends RpcDataRequest { + fields: FieldSelection + transactionList: boolean + dataRequest: DataRequest +} + + +export function toMappingRequest(req?: DataRequest): MappingRequest { + let txs = transactionsRequested(req) + return { + fields: req?.fields || {}, + transactionList: txs, + transactions: !!req?.transactions?.length || txs && isRequested(TX_FIELDS, req?.fields?.transaction), + dataRequest: req || {} + } +} + + +function transactionsRequested(req?: DataRequest): boolean { + if (req == null) return false + if (req.transactions?.length) return true + return false +} + + +const TX_FIELDS: {[K in Exclude]: true} = { + inputs: true, + locktime: true, + outputs: true, + size: true, + sourceOutputs: true, + transactionIndex: true, + version: true, + fee: true, +} + + +function isRequested(set: Record, selection?: Record): boolean { + if (selection == null) return false + for (let key in selection) { + if (set[key] && selection[key]) return true + } + return false +} diff --git a/bch/bch-processor/src/ds-rpc/rpc-data.ts b/bch/bch-processor/src/ds-rpc/rpc-data.ts new file mode 100644 index 000000000..18750586b --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/rpc-data.ts @@ -0,0 +1,181 @@ +import { + array, + GetSrcType, + NAT, + object, + option, + STRING, + ValidationFailure, + Validator} from '@subsquid/util-internal-validation' +import {Bytes, Bytes32} from '../interfaces/base.js' + +export class ValidationFailureEx extends ValidationFailure { + toString(): string { + let msg = this.message + if (msg.includes('{value}')) { + msg = msg.replace('{value}', JSON.stringify(this.value)) + } + if (this.path.length) { + msg = `invalid value at ${this.getPathString()}: ${msg}` + } + return msg + } +} + +/** + * Hex encoded binary string or natural number without 0x prefix + */ +type Hex = string + + +function isHex(value: unknown): value is Hex { + return typeof value == 'string' && /^[0-9a-fA-F]*$/.test(value) +} + +/** + * Hex encoded binary string without 0x prefix + */ +export const HEX: Validator = { + cast(value: unknown): Hex | ValidationFailureEx { + return this.validate(value) || (value as Hex).toLowerCase() + }, + validate(value: unknown): ValidationFailureEx | undefined { + if (isHex(value)) return + return new ValidationFailureEx(value, `{value} is not a hex encoded binary string`) + }, + phantom(): Hex { + return '' + } +} + +function isBigint(value: unknown): value is bigint { + return typeof value == 'bigint'; +} + +/** + * Hex encoded binary string without 0x prefix + */ +export const BIGINT: Validator = { + cast(value: unknown): bigint | ValidationFailureEx { + return this.validate(value) || (value as bigint) + }, + validate(value: unknown): ValidationFailureEx | undefined { + if (isBigint(value)) return + return new ValidationFailureEx(value, `{value} is not a bigint`) + }, + phantom(): bigint { + return 0n + } +} + +export interface RpcBlock { + hash: string, + confirmations: number, + size: number, + height: number, + version: number, + tx: string[], + time: number, + nonce: number, + difficulty: number, + nTx: number, + previousblockhash: string, +} + +export interface DataRequest { + transactions?: boolean + sourceOutputs?: boolean + fee?: boolean +} + + +export interface Block { + height: number + hash: Bytes32 + block: GetBlock + _isInvalid?: boolean + _errorMessage?: string +} + + +const Transaction = object({ + blockNumber: NAT, + blockHash: HEX, + transactionIndex: NAT, + size: NAT, + hash: HEX, + sourceOutputs: option(array(object({ + lockingBytecode: HEX, + token: option(object({ + amount: BIGINT, + category: HEX, + nft: option(object({ + capability: STRING, + commitment: HEX, + })), + })), + valueSatoshis: BIGINT, + address: STRING, + }))), + inputs: array(object({ + outpointIndex: NAT, + outpointTransactionHash: HEX, + sequenceNumber: NAT, + unlockingBytecode: HEX, + })), + outputs: array(object({ + lockingBytecode: HEX, + token: option(object({ + amount: BIGINT, + category: HEX, + nft: option(object({ + capability: STRING, + commitment: HEX, + })), + })), + valueSatoshis: BIGINT, + address: STRING, + })), + locktime: NAT, + version: NAT, + fee: option(NAT), +}) + + +export type Transaction = GetSrcType + + +export const GetBlockWithTransactions = object({ + height: NAT, + hash: HEX, + parentHash: HEX, + transactions: array(Transaction), + difficulty: NAT, + size: NAT, + timestamp: NAT, + nonce: NAT, +}) + + +export const GetBlockNoTransactions = object({ + height: NAT, + hash: HEX, + parentHash: HEX, + transactions: array(HEX), + difficulty: NAT, + size: NAT, + timestamp: NAT, + nonce: NAT, +}) + + +export interface GetBlock { + height: number + hash: Bytes32 + parentHash: Bytes32 + transactions: Bytes[] | Transaction[] + difficulty: number + size: number + timestamp: number + nonce: number +} diff --git a/bch/bch-processor/src/ds-rpc/rpc.ts b/bch/bch-processor/src/ds-rpc/rpc.ts new file mode 100644 index 000000000..e2324eb7e --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/rpc.ts @@ -0,0 +1,724 @@ +import {createLogger, Logger} from '@subsquid/logger' +import {CallOptions, RpcClient, RpcError} from '@subsquid/rpc-client' +import {assertIsValid, BlockConsistencyError, BlockHeader, trimInvalid} from '@subsquid/util-internal-ingest-tools' +import {RangeRequestList, rangeToArray, SplitRequest} from '@subsquid/util-internal-range' +import {DataValidationError, GetSrcType, nullable, Validator} from '@subsquid/util-internal-validation' +import {Bytes, Bytes32} from '../interfaces/base.js' +import { + Block, + DataRequest, + GetBlock, + GetBlockNoTransactions, + GetBlockWithTransactions, + RpcBlock, + Transaction, +} from './rpc-data.js' +import { assertSuccess, binToHex, CashAddressNetworkPrefix, CashAddressType, decodeTransaction, encodeCashAddress, hash160, hash256, hexToBin, Input, lockingBytecodeToAddressContents, Output, TransactionCommon } from '@bitauth/libauth' +import { LRUCache } from 'lru-cache' +import { Peer } from 'p2p-cash' +import { Block as P2pBlock } from 'bitcoin-minimal' +import { HotDatabaseState, HotUpdate } from '@subsquid/util-internal-processor-tools' +import { Graph } from './util.js' + +const ZERO_HASH = "00".repeat(32) + +function getResultValidator(validator: V, transformer?: (input: any) => any): (result: unknown) => GetSrcType { + return function(result: unknown) { + if (transformer) { + result = transformer(result) + } + let err = validator.validate(result) + if (err) { + throw new DataValidationError(`server returned unexpected result: ${err.toString()}`) + } else { + return result as any + } + } +} + + +export interface RpcValidationFlags { +} + +type TransactionBCH = TransactionCommon, Output> + +export type TransactionBCHWithAddress = TransactionBCH & { + outputs: (TransactionBCH['outputs'][0] & { + address: string + })[] +} + +const transactionCache = new LRUCache({ + max: 1000, +}) + +const addressCache = new LRUCache({ + max: 1000, +}) + +// A composite class to get data from ElectrumX server (tip, historical transactions) and from p2p network layer (blocks, mempool) +// A variant of this class could be implemented to fetch data from a BCH node's RPC instead of ElectrumX +export class Rpc { + private props: RpcProps + private p2pEndpoint: string + private p2p!: Peer + private mempoolWatchCancel?: () => Promise + private newBlocksWatchCancel?: () => Promise + + constructor( + public readonly client: RpcClient, + private log?: Logger, + private validation: RpcValidationFlags = {}, + private priority: number = 0, + props?: RpcProps, + ) { + this.props = props ?? {} + this.p2pEndpoint = this.props.p2pEndpoint ?? '3.142.98.179:8333' + + this.log = createLogger('sqd:processor:rpc', {rpcUrl: this.client.url, p2pEndpoint: this.p2pEndpoint}); + } + + private setupP2P() { + const [ip, port] = this.p2pEndpoint.split(':') + const peer = new Peer({ + ticker: "BCH", + node: ip, + port: Number(port), + validate: false, + magic: Buffer.from("e3e1f3e8", "hex"), + userAgent: "/subsquid/", + version: 70012, + listenRelay: true, + timeoutConnect: 60 * 1000, + reconnectTimeout: 100, + autoReconnectWait: 100, + autoReconnect: true, + DEBUG_LOG: false, + logger: this.log, + }) + + peer.on('connected', () => { + this.log?.debug( + `P2p peer: connected to node` + ) + }) + + peer.connect().catch(() => {}); + return peer + } + public async cleanupRpc() { + await this.mempoolWatchCancel?.() + this.p2p.disconnect(false) + } + + private async p2pReady() { + this.p2p = this.setupP2P() + if (!this.p2p.connected) { + await new Promise(async (resolve) => { + while (!this.p2p.connected) { + await new Promise(resolve => setTimeout(resolve, 5000)) + } + resolve() + }) + } + } + + withPriority(priority: number): Rpc { + return new Rpc(this.client, this.log, this.validation, priority, this.props) + } + + async call(method: string, params?: any[], options?: CallOptions): Promise { + if (method == 'blockchain.block.get') { + await this.p2pReady() + + if (typeof params?.[0] === "number") { + const result = await this.getBlockByHeightInternal(params[0], params[1]) as T + return options?.validateResult ? options.validateResult(result, undefined as any) : result + } else if (typeof params?.[0] === "string") { + const result = await this.getBlockByHashInternal(params[0], params[1], params[2]) as T + return options?.validateResult ? options.validateResult(result, undefined as any) : result + } + } + + return this.client.call(method, params, {priority: this.priority, ...options}) + } + + async batchCall(batch: {method: string, params?: any[]}[], options?: CallOptions): Promise { + const indices: number[] = [] + + const [blockBatchHeights, blockBatchHashes, otherBatches] = batch.reduce((acc, b) => { + if (b.method === 'blockchain.block.get' && typeof b.params?.[0] === "number") { + indices.push(0) + acc[0].push(b) + } else if (b.method === 'blockchain.block.get' && typeof b.params?.[0] === "string") { + indices.push(1) + acc[1].push(b) + } else { + indices.push(2) + acc[2].push(b) + } + return acc + }, [[], [], []] as [{method: string, params?: any[]}[], {method: string, params?: any[]}[], {method: string, params?: any[]}[]]) + + if (blockBatchHashes.length || blockBatchHeights.length) { + await this.p2pReady() + } + + // allow rpc client to be blocked by priority + const otherResults = await this.client.batchCall(otherBatches, {priority: this.priority, ...options}) + + const blockResultHeights = await this.getBlockByHeightInternalBatch(blockBatchHeights.map(b => b.params![0] as number), blockBatchHeights.map(b => b.params![1] as number)) as T[] + const blockResultHashes = await this.getBlockByHashInternalBatch(blockBatchHashes.map(b => b.params![0] as string), blockBatchHashes.map(b => b.params![1] as number), blockBatchHeights.map(b => b.params![0] as number)) as T[] + + const blockResultHeightsMaybeValidated = options?.validateResult ? blockResultHeights.map((r) => options.validateResult!(r, undefined as any)) : blockResultHeights + const blockResultHashesMaybeValidated = options?.validateResult ? blockResultHashes.map((r) => options.validateResult!(r, undefined as any)) : blockResultHashes + + // restore the original order + return indices.map((i) => { + if (i === 0) { + return blockResultHeightsMaybeValidated.shift()! + } else if (i === 1) { + return blockResultHashesMaybeValidated.shift()! + } else { + return otherResults.shift()! + } + }) + } + + async watchMempool( + requests: RangeRequestList, + state: HotDatabaseState, + cb: (upd: HotUpdate) => Promise + ): Promise<() => Promise> { + await this.p2pReady() + + await this.mempoolWatchCancel?.() + + const process = async (rawMempoolHashes: Buffer[]) => { + const rawMempool = await this.p2p.getRawTransactions(rawMempoolHashes) + const transactions = rawMempool.map((tx, index) => transformTransaction(tx as unknown as Uint8Array, -1, { + hash: ZERO_HASH, + height: -1, + } as RpcBlock, rawMempoolHashes[index].toString('hex'))) + + const graph = new Graph() + for (const tx of transactions) { + for (const input of tx.inputs) { + graph.addEdge(input.outpointTransactionHash, tx.hash) + } + } + + const sortedTxHashes = graph.topologicalSort() + const orderedTransactions: typeof transactions = sortedTxHashes.map(txHash => transactions.find(tx => tx.hash === txHash)).filter(tx => tx !== undefined) + + if (requests.some(req => req.request.sourceOutputs || (req.request as any).fields?.transaction?.sourceOutputs)) { + await this.addSourceOutputs([{ + height: -1, hash: ZERO_HASH, + block: {transactions: orderedTransactions}, + }] as Block[]) + + if (requests.some(req => req.request.fee || (req.request as any).fields?.transaction?.fee)) { + await this.addFees([{ + height: -1, hash: ZERO_HASH, + block: {transactions: orderedTransactions}, + }] as Block[]) + } + } + + await cb({ + blocks: [], + baseHead: {hash: ZERO_HASH, height: -1}, + finalizedHead: {hash: ZERO_HASH, height: -1}, + mempoolTransactions: orderedTransactions + }) + } + + const rawMempoolHashes = await this.p2p.getMempool() + await process(rawMempoolHashes) + + const watchCancel = this.p2p.watchMempoolTransactionHashes(async (txHash: Buffer) => { + const rawMempoolHashes = [txHash] + await process(rawMempoolHashes) + }) + + this.mempoolWatchCancel = async () => { + watchCancel() + this.mempoolWatchCancel = undefined + } + + return this.mempoolWatchCancel + } + + async watchNewBlocks( + cb: (head: BlockHeader) => Promise + ): Promise<() => Promise> { + await this.p2pReady() + + await this.newBlocksWatchCancel?.() + + const watchCancel = this.p2p.watchNewBlocks(async (blockHash: Buffer) => { + const block = await this.p2p.getBlock(blockHash) + cb({ + hash: block.getHash().toString("hex"), + height: block.getHeight(), + parentHash: block.header!.prevHash.toString("hex"), + }) + }) + + this.mempoolWatchCancel = async () => { + watchCancel() + this.mempoolWatchCancel = undefined + } + + return this.mempoolWatchCancel + } + + getBlockByNumber(height: number, withTransactions: boolean): Promise { + return this.call('blockchain.block.get', [ + height, + withTransactions ? 1.5 : 1 + ], { + validateResult: getResultValidator( + withTransactions ? nullable(GetBlockWithTransactions) : nullable(GetBlockNoTransactions), + transformBlock(withTransactions), + ) + }) + } + + getBlockByHash(hash: Bytes, withTransactions: boolean): Promise { + return this.call('blockchain.block.get', [hash, withTransactions ? 1.5 : 1], { + validateResult: getResultValidator( + withTransactions ? nullable(GetBlockWithTransactions) : nullable(GetBlockNoTransactions), + transformBlock(withTransactions), + ) + }) + } + + getRawTransaction(hash: Bytes): Promise { + return this.call('blockchain.transaction.get', [hash, false]) + } + + async getTransaction(hash: Bytes): Promise { + if (transactionCache.has(hash)) { + return transactionCache.get(hash)! + } + + const tx: TransactionBCHWithAddress = fromLibauthTransaction(assertSuccess(decodeTransaction(hexToBin(await this.getRawTransaction(hash))))) + + transactionCache.set(hash, tx) + return tx; + } + + async getBlockHash(height: number): Promise { + let block = await this.getBlockByNumber(height, false) + return block?.hash + } + + async getHeight(): Promise { + let { height } = await this.call<{height: number}>('blockchain.headers.get_tip') + return height + } + + async getColdBlock(blockHash: Bytes32, req?: DataRequest, finalizedHeight?: number): Promise { + let block = await this.getBlockByHash(blockHash, req?.transactions || false).then(toBlock) + if (block == null) throw new BlockConsistencyError({hash: blockHash}) + if (req) { + await this.addRequestedData([block], req, finalizedHeight) + } + if (block._isInvalid) throw new BlockConsistencyError(block, block._errorMessage) + return block + } + + async getColdSplit(req: SplitRequest): Promise { + let blocks = await this.getColdBlockBatch( + rangeToArray(req.range), + req.request.transactions ?? false, + 1 + ) + return this.addColdRequestedData(blocks, req.request, 1) + } + + private async addColdRequestedData(blocks: Block[], req: DataRequest, depth: number): Promise { + let result = blocks.map(b => ({...b})) + + await this.addRequestedData(result, req) + + if (depth > 9) { + assertIsValid(result) + return result + } + + let missing: number[] = [] + for (let i = 0; i < result.length; i++) { + if (result[i]._isInvalid) { + missing.push(i) + } + } + + if (missing.length == 0) return result + + let missed = await this.addColdRequestedData( + missing.map(i => blocks[i]), + req, + depth + 1 + ) + + for (let i = 0; i < missing.length; i++) { + result[missing[i]] = missed[i] + } + + return result + } + + private async getColdBlockBatch(numbers: number[], withTransactions: boolean, depth: number): Promise { + let result = await this.getBlockBatch(numbers, withTransactions) + let missing: number[] = [] + for (let i = 0; i < result.length; i++) { + if (result[i] == null) { + missing.push(i) + } + } + + if (missing.length == 0) return result as Block[] + + if (depth > 9) throw new BlockConsistencyError({ + height: numbers[missing[0]] + }, `failed to get finalized block after ${depth} attempts`) + + let missed = await this.getColdBlockBatch( + missing.map(i => numbers[i]), + withTransactions, + depth + 1 + ) + + for (let i = 0; i < missing.length; i++) { + result[missing[i]] = missed[i] + } + + return result as Block[] + } + + async getHotSplit(req: SplitRequest & {finalizedHeight: number}): Promise { + let blocks = await this.getBlockBatch(rangeToArray(req.range), req.request.transactions ?? false) + + let chain: Block[] = [] + + for (let i = 0; i < blocks.length; i++) { + let block = blocks[i] + if (block == null) break + if (i > 0 && chain[i - 1].hash !== block.block.parentHash) break + chain.push(block) + } + + await this.addRequestedData(chain, req.request, req.finalizedHeight) + + return trimInvalid(chain) + } + + private async getBlockBatch(numbers: number[], withTransactions: boolean): Promise<(Block | undefined)[]> { + let call = numbers.map(height => { + return { + method: 'blockchain.block.get', + params: [height, withTransactions ? 1.5 : 1] + } + }) + let blocks = await this.batchCall(call, { + validateResult: getResultValidator( + withTransactions ? nullable(GetBlockWithTransactions) : nullable(GetBlockNoTransactions), + transformBlock(withTransactions), + ), + validateError: info => { + // Avalanche + if (/cannot query unfinalized data/i.test(info.message)) return null + throw new RpcError(info) + } + }) + return blocks.map(toBlock) + } + + private async addRequestedData(blocks: Block[], req: DataRequest, finalizedHeight?: number): Promise { + if (blocks.length == 0) return + let subtasks: any[] = [] + + if (req.sourceOutputs || (req as any).fields?.transaction?.sourceOutputs) { + subtasks.push(this.addSourceOutputs(blocks)) + if (req.fee || (req as any).fields?.transaction?.fee) { + subtasks.at(-1).then(() => this.addFees(blocks)) + } + } + + await Promise.all(subtasks) + } + + private async addSourceOutputs(blocks: Block[]): Promise { + if (blocks.length == 0) return + if (blocks.some((block => block.block.transactions.some(tx => typeof tx === "string")))) { + return + } + + // get all unique prevout txIds excluding the coinbase tx + const txIds = [...new Set( + blocks.map(block => block.block.transactions.map(tx => (tx as Transaction).inputs.map(input => input.outpointTransactionHash))).flat(3) + )].filter(txId => txId !== ZERO_HASH && !transactionCache.has(txId, {updateAgeOnHas: true})) + + console.log(`addSourceOutputs for blocks: ${blocks.map(b => b.height).join(', ')}. txIds: ${txIds.length}`) + console.time(`addSourceOutputs for blocks: ${blocks.map(b => b.height).join(', ')}. txIds: ${txIds.length}`) + + if (true) { + // batched to reduce network overhead + const txIdsToFetch = txIds.filter(txId => !transactionCache.has(txId)) + const rawTxs = await this.batchCall(txIdsToFetch.map(txId => ({ + method: 'blockchain.transaction.get', + params: [txId, false] + }))) + + rawTxs.map((rawTx, index) => { + const tx: TransactionBCHWithAddress = fromLibauthTransaction(assertSuccess(decodeTransaction(hexToBin(rawTx)))) + + transactionCache.set(txIdsToFetch[index], tx) + }) + } else { + // sequential to avoid batch limiting + await Promise.all(txIds.map(async (txId) => { + if (transactionCache.has(txId)) { + return + } + + const tx: TransactionBCHWithAddress = fromLibauthTransaction(assertSuccess(decodeTransaction(hexToBin(await this.getRawTransaction(txId))))) + + transactionCache.set(txId, tx) + })) + } + for (const block of blocks) { + for (const transaction of block.block.transactions) { + const tx = transaction as Transaction + + if (tx.transactionIndex === 0) { + tx.sourceOutputs = undefined + } else { + const txIdsToFetch = tx.inputs.filter(input => !transactionCache.has(input.outpointTransactionHash)).map(input => input.outpointTransactionHash) + const rawTxs = await this.batchCall(txIdsToFetch.map(txId => ({ + method: 'blockchain.transaction.get', + params: [txId, false] + }))) + + rawTxs.map((rawTx, index) => { + const tx: TransactionBCHWithAddress = fromLibauthTransaction(assertSuccess(decodeTransaction(hexToBin(rawTx)))) + + transactionCache.set(txIdsToFetch[index], tx) + }) + + tx.sourceOutputs = await Promise.all(tx.inputs.map(async (input) => { + const txId = input.outpointTransactionHash + let cachedTx = transactionCache.get(txId); + if (!cachedTx) { + console.error(txId, "not found in cache"); + // const tx = assertSuccess(decodeTransaction(hexToBin(await this.getRawTransaction(txId)))) as TransactionBCHWithAddress; + // transactionCache.set(txId, tx); + // cachedTx = tx; + } + + return cachedTx!.outputs[input.outpointIndex] + })) + } + } + } + console.timeEnd(`addSourceOutputs for blocks: ${blocks.map(b => b.height).join(', ')}. txIds: ${txIds.length}`) + } + + private async addFees(blocks: Block[]): Promise { + if (blocks.length == 0) return + if (blocks.some((block => block.block.transactions.some(tx => typeof tx === "string")))) { + return + } + this.log?.debug(`addFees for blocks: ${blocks.map(b => b.height).join(', ')}`) + + for (const block of blocks) { + for (const transaction of block.block.transactions) { + const tx = transaction as Transaction + + if (tx.transactionIndex === 0) { + tx.fee = 0 + } else { + const sumInputs = tx.sourceOutputs!.reduce((acc, input) => acc + input.valueSatoshis, 0n) + const sumOutputs = tx.outputs.reduce((acc, output) => acc + output.valueSatoshis, 0n) + + tx.fee = Number(sumInputs - sumOutputs) + } + } + } + } + + private async getBlockByHeightInternal(blockHeight: number, verbosity: number): Promise { + const { height, hex } = await this.call("blockchain.header.get", [blockHeight]) + const hash = binToHex(hash256(hexToBin(hex)).reverse()) + + return await this.getBlockByHashInternal(hash, verbosity, height) + } + + private async getBlockByHeightInternalBatch(blockHeights: number[], verbosities: number[]): Promise<(RpcBlock | string | null)[]> { + if (blockHeights.length === 0) return [] + + const batch = blockHeights.map((blockHeight) => { + return { + method: 'blockchain.header.get', + params: [blockHeight] + } + }) + const result = await this.batchCall<{ height: number, hex: string }>(batch) + + return await this.getBlockByHashInternalBatch( + result.map(({hex}) => binToHex(hash256(hexToBin(hex)).reverse())), + verbosities, + result.map(({height}) => height) + ) + } + + private async getBlockHeightByHashInternal(blockHash: string): Promise { + const { height } = await this.call("blockchain.header.get", [blockHash]) + return height + } + + private async mapToRpcBlock(blockHash: string, block: P2pBlock, verbosity: number, height?: number): Promise { + if (!verbosity || Number(verbosity) === 0) { + const result = block.toBuffer().toString("hex") + return result + } else if (Number(verbosity === 1) || Number(verbosity === 1.5)) { + height = height ?? (() => { + try { + return block.getHeight() + } catch (e) { + return undefined + } + })() ?? await this.getBlockHeightByHashInternal(blockHash) + const result = { + hash: block.getHash().toString("hex"), + confirmations: -1, + size: block.size, + height: height, + version: block.header!.version.slice().readUint32BE(0), + versionHex: block.header!.version.toString("hex"), + merkleroot: block.header!.merkleRoot.toString("hex"), + tx: [] as string[], + time: block.header!.time, + mediantime: 0, + nonce: block.header!.nonce, + bits: block.header!.bits.toString("hex"), + difficulty: 0, + nTx: block.txCount, + previousblockhash: block.header!.prevHash.toString("hex"), + nextblockhash: "", + } + + const rawTransactions = block.getRawTransactions() + if (Number(verbosity === 1)) { + result.tx = rawTransactions.map(tx => binToHex(hash256(tx as unknown as Uint8Array).reverse())) + } else { + result.tx = rawTransactions.map(tx => tx.toString("hex")) + } + return result as RpcBlock + } + + return null + } + + private async getBlockByHashInternalBatch(blockHashes: string[], verbosities: number[], heights?: number[]): Promise<(RpcBlock | string | null)[]> { + if (blockHashes.length === 0) return [] + const blocks = await this.p2p.getBlocks(blockHashes.map(blockHash => Buffer.from(blockHash, "hex"))) + + return await Promise.all(blocks.map((block, index) => this.mapToRpcBlock(blockHashes[index], block, verbosities[index], heights?.[index]))) + } + + private async getBlockByHashInternal(blockHash: string, verbosity: number, height?: number): Promise { + const block = await this.p2p.getBlock(Buffer.from(blockHash, "hex")) + + return await this.mapToRpcBlock(blockHash, block, verbosity, height) + } +} + + +interface RpcProps { + p2pEndpoint?: string; // endpoint in format "ip:port" +} + +function toBlock(getBlock: GetBlock): Block +function toBlock(getBlock?: null): undefined +function toBlock(getBlock?: GetBlock | null): Block | undefined +function toBlock(getBlock?: GetBlock | null): Block | undefined { + if (getBlock == null) return + return { + height: getBlock.height, + hash: getBlock.hash, + block: getBlock + } +} + +const transformBlock = (withTransactions: boolean) => (rpcBlock: RpcBlock | null): GetBlock | null => { + if (rpcBlock === null) return null + return { + height: rpcBlock.height, + hash: rpcBlock.hash, + parentHash: rpcBlock.previousblockhash, + transactions: (withTransactions ? rpcBlock.tx.map((txHex, index) => transformTransaction(txHex, index, rpcBlock)) : rpcBlock.tx) as Transaction[] | string[], + difficulty: rpcBlock.difficulty, + size: rpcBlock.size, + timestamp: rpcBlock.time, + nonce: rpcBlock.nonce, + } +} + +const fromLibauthTransaction = (tx: TransactionCommon): TransactionBCHWithAddress => { + return { + ...tx, + inputs: tx.inputs.map(input => ({ + ...input, + outpointTransactionHash: binToHex(input.outpointTransactionHash), + unlockingBytecode: binToHex(input.unlockingBytecode), + })), + outputs: tx.outputs.map(output => ({ + ...output, + address: getAddress(output.lockingBytecode), + lockingBytecode: binToHex(output.lockingBytecode), + token: output.token ? { + category: binToHex(output.token.category), + amount: output.token.amount, + nft: output.token.nft ? { + commitment: binToHex(output.token.nft.commitment), + capability: output.token.nft.capability, + } : undefined, + } : undefined, + })) + } +} + +const getAddress = (lockingBytecode: Uint8Array): string => { + if (addressCache.has(lockingBytecode)) { + return addressCache.get(lockingBytecode)! + } + + const contents = lockingBytecodeToAddressContents(lockingBytecode) + + const encodeResult = encodeCashAddress({ + prefix: process.env.BCH_PREFIX as CashAddressNetworkPrefix, + type: contents.type.toLowerCase() as CashAddressType, + payload: contents.type === 'P2PK' ? hash160(contents.payload) : contents.payload, + throwErrors: false + }) + return typeof encodeResult === "string" ? binToHex(contents.payload) : encodeResult.address +} + +const transformTransaction = (txHexOrBin: string | Uint8Array, txIndex: number, rpcBlock: RpcBlock, txHash?: string): Transaction => { + const rawTx = typeof txHexOrBin === "string" ? hexToBin(txHexOrBin) : txHexOrBin + const tx = fromLibauthTransaction(assertSuccess(decodeTransaction(rawTx))) + txHash ??= binToHex(hash256(rawTx).reverse()) + transactionCache.set(txHash, tx) + + return { + hash: txHash, + blockHash: rpcBlock.hash, + blockNumber: rpcBlock.height, + transactionIndex: txIndex, + size: rawTx.length, + ...tx + } +} diff --git a/bch/bch-processor/src/ds-rpc/schema.ts b/bch/bch-processor/src/ds-rpc/schema.ts new file mode 100644 index 000000000..07721988d --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/schema.ts @@ -0,0 +1,36 @@ +import {weakMemo} from '@subsquid/util-internal' +import { + array, + NAT, + object, +} from '@subsquid/util-internal-validation' +import { + getBlockHeaderProps, + getTxProps} from '../mapping/schema.js' +import {MappingRequest} from './request.js' +import { + HEX, +} from './rpc-data.js' + +// Here we must be careful to include all fields, +// that can potentially be used in item filters +// (no matter what field selection is telling us to omit) +export const getBlockValidator = weakMemo((req: MappingRequest) => { + let Transaction = req.transactions + ? object({ + ...getTxProps(req.fields.transaction, false), + hash: HEX, + }) + : HEX + + let GetBlock = object({ + ...getBlockHeaderProps(req.fields.block, false), + transactions: array(Transaction) + }) + + return object({ + height: NAT, + hash: HEX, + block: GetBlock, + }) +}) diff --git a/bch/bch-processor/src/ds-rpc/util.ts b/bch/bch-processor/src/ds-rpc/util.ts new file mode 100644 index 000000000..1284183fc --- /dev/null +++ b/bch/bch-processor/src/ds-rpc/util.ts @@ -0,0 +1,57 @@ +import assert from 'assert' +import {Bytes32, Qty} from '../interfaces/base.js' + + +export function qty2Int(qty: Qty): number { + let i = parseInt(qty, 16) + assert(Number.isSafeInteger(i)) + return i +} + + +export function toQty(i: number): Qty { + return '0x' + i.toString(16) +} + + +export function getTxHash(tx: Bytes32 | {hash: Bytes32}): Bytes32 { + if (typeof tx == 'string') { + return tx + } else { + return tx.hash + } +} + +export class Graph { + public graph: Map; + + constructor() { + this.graph = new Map(); + } + addEdge(u: string, v: string) { + if (!this.graph.has(u)) { + this.graph.set(u, []); + } + this.graph.get(u)!.push(v); + } + topologicalSortUtil(v: string, visited: Set, stack: Array) { + visited.add(v); + const neighbors = this.graph.get(v) || []; + for (const neighbor of neighbors) { + if (!visited.has(neighbor)) { + this.topologicalSortUtil(neighbor, visited, stack); + } + } + stack.push(v); + } + topologicalSort() { + const visited = new Set(); + const stack: string[] = []; + for (const vertex of this.graph.keys()) { + if (!visited.has(vertex)) { + this.topologicalSortUtil(vertex, visited, stack); + } + } + return stack.reverse(); + } + } \ No newline at end of file diff --git a/bch/bch-processor/src/index.ts b/bch/bch-processor/src/index.ts new file mode 100644 index 000000000..233866322 --- /dev/null +++ b/bch/bch-processor/src/index.ts @@ -0,0 +1,4 @@ +export {assertNotNull} from '@subsquid/util-internal' +export {toHex, decodeHex} from '@subsquid/util-internal-hex' +export * from './processor.js' +export * from './interfaces/data.js' diff --git a/bch/bch-processor/src/interfaces/base.ts b/bch/bch-processor/src/interfaces/base.ts new file mode 100644 index 000000000..a50143fb8 --- /dev/null +++ b/bch/bch-processor/src/interfaces/base.ts @@ -0,0 +1,5 @@ +export type Bytes = string +export type Bytes8 = string +export type Bytes20 = string +export type Bytes32 = string +export type Qty = string diff --git a/bch/bch-processor/src/interfaces/bch.ts b/bch/bch-processor/src/interfaces/bch.ts new file mode 100644 index 000000000..944d026e8 --- /dev/null +++ b/bch/bch-processor/src/interfaces/bch.ts @@ -0,0 +1,26 @@ +import { Input, Output, TransactionCommon } from '@bitauth/libauth' +import {Bytes32} from './base.js' + + +export interface BchBlockHeader { + height: number + hash: Bytes32 + parentHash: Bytes32 + nonce: number + difficulty: number + size: number + timestamp: number +} + +type TransactionBCH = TransactionCommon, Output> + +export type OutputWithAddress = TransactionBCH["outputs"][0] & { address: string } + +export interface BchTransaction extends TransactionBCH { + hash: string + transactionIndex: number + size: number + outputs: OutputWithAddress[] + sourceOutputs: OutputWithAddress[] + fee: number +} diff --git a/bch/bch-processor/src/interfaces/chain.ts b/bch/bch-processor/src/interfaces/chain.ts new file mode 100644 index 000000000..f563ce6f8 --- /dev/null +++ b/bch/bch-processor/src/interfaces/chain.ts @@ -0,0 +1,14 @@ +import { TransactionBCHWithAddress } from "../ds-rpc/rpc.js" +import { Bytes } from "./base.js" + +export interface RpcClient { + call(method: string, params?: unknown[]): Promise + batchCall(batch: {method: string, params?: unknown[]}[]): Promise + getRawTransaction(hash: Bytes): Promise + getTransaction(hash: Bytes): Promise +} + + +export interface Chain { + readonly client: RpcClient +} diff --git a/bch/bch-processor/src/interfaces/data-request.ts b/bch/bch-processor/src/interfaces/data-request.ts new file mode 100644 index 000000000..4f9b8b4e3 --- /dev/null +++ b/bch/bch-processor/src/interfaces/data-request.ts @@ -0,0 +1,14 @@ +import {FieldSelection} from './data.js' + +export interface DataRequest { + fields?: FieldSelection + includeAllBlocks?: boolean + transactions?: TransactionRequest[] + sourceOutputs?: boolean + fee?: boolean +} + +export interface TransactionRequest { + // address?: string + // token?: string +} diff --git a/bch/bch-processor/src/interfaces/data.ts b/bch/bch-processor/src/interfaces/data.ts new file mode 100644 index 000000000..3da294c23 --- /dev/null +++ b/bch/bch-processor/src/interfaces/data.ts @@ -0,0 +1,112 @@ +import { + BchBlockHeader, + BchTransaction +} from './bch.js' + + +type Simplify = { + [K in keyof T]: T[K] +} & {} + + +type Selector = { + [P in Exclude]?: boolean +} + +export type BlockRequiredFields = 'height' | 'hash' | 'parentHash' +export type TransactionRequiredFields = 'transactionIndex' + + +export interface FieldSelection { + block?: Selector + transaction?: Selector +} + + +export const DEFAULT_FIELDS = { + block: { + timestamp: true, + size: true, + }, + transaction: { + hash: true, + size: true, + inputs: true, + outputs: true, + version: true, + locktime: true, + }, +} as const + + +type DefaultFields = typeof DEFAULT_FIELDS + + +type ExcludeUndefined = { + [K in keyof T as undefined extends T[K] ? never : K]: T[K] +} & {} + + +type MergeDefault = Simplify< + undefined extends T ? D : Omit> & ExcludeUndefined +> + + +type TrueFields = keyof { + [K in keyof F as true extends F[K] ? K : never]: true +} + + +type GetFields + = TrueFields> + + +type Select = T extends any ? Simplify>> : never + + +export type BlockHeader = Simplify< + {id: string} & + Pick & + Select> +> + + +export type Transaction = Simplify< + {id: string} & + Pick & + Select> & + { + block: BlockHeader + } +> + +type RemovePrefix + = T extends `${Prefix}${infer S}` + ? Uncapitalize + : never + + + + +type RemoveEmptyObjects = { + [K in keyof T as {} extends T[K] ? never : K]: T[K] +} + + + + +export type BlockData = { + header: BlockHeader + transactions: Transaction[] +} + + +export type AllFields = { + block: Trues + transaction: Trues +} + + +type Trues = { + [K in keyof Exclude]-?: true +} diff --git a/bch/bch-processor/src/mapping/entities.ts b/bch/bch-processor/src/mapping/entities.ts new file mode 100644 index 000000000..066242c13 --- /dev/null +++ b/bch/bch-processor/src/mapping/entities.ts @@ -0,0 +1,67 @@ +import {formatId} from '@subsquid/util-internal-processor-tools' +import {Bytes20, Bytes32, Bytes8} from '../interfaces/base.js' +import { + OutputWithAddress +} from '../interfaces/bch.js' +import { TransactionBCH } from '@bitauth/libauth' + + +export class Block { + header: BlockHeader + transactions: Transaction[] = [] + + constructor(header: BlockHeader) { + this.header = header + } +} + + +export class BlockHeader { + id: string + height: number + hash: Bytes32 + parentHash: Bytes32 + nonce?: Bytes8 + difficulty?: bigint + size?: bigint + timestamp?: number + + constructor( + height: number, + hash: Bytes20, + parentHash: Bytes20 + ) { + this.id = formatId({height, hash}) + this.height = height + this.hash = hash + this.parentHash = parentHash + } +} + + +export class Transaction { + id: string + transactionIndex: number + hash?: string + size?: number + sourceOutputs?: OutputWithAddress[] + inputs?: TransactionBCH["inputs"] + locktime?: number + outputs?: OutputWithAddress[] + version?: number + + #block: BlockHeader + + constructor( + block: BlockHeader, + transactionIndex: number + ) { + this.id = formatId(block, transactionIndex) + this.transactionIndex = transactionIndex + this.#block = block + } + + get block(): BlockHeader { + return this.#block + } +} diff --git a/bch/bch-processor/src/mapping/relations.ts b/bch/bch-processor/src/mapping/relations.ts new file mode 100644 index 000000000..c3c4e62fe --- /dev/null +++ b/bch/bch-processor/src/mapping/relations.ts @@ -0,0 +1,12 @@ +import {maybeLast} from '@subsquid/util-internal' +import {Block, Transaction} from './entities.js' + + +export function setUpRelations(block: Block): void { + block.transactions.sort((a, b) => a.transactionIndex - b.transactionIndex) + + let txs: (Transaction | undefined)[] = new Array((maybeLast(block.transactions)?.transactionIndex ?? -1) + 1) + for (let tx of block.transactions) { + txs[tx.transactionIndex] = tx + } +} diff --git a/bch/bch-processor/src/mapping/schema.ts b/bch/bch-processor/src/mapping/schema.ts new file mode 100644 index 000000000..445f91d33 --- /dev/null +++ b/bch/bch-processor/src/mapping/schema.ts @@ -0,0 +1,100 @@ +import { BIGINT, HEX } from '../ds-rpc/rpc-data.js' +import {FieldSelection} from '../interfaces/data.js' +import { + array, + NAT, + object, + option, + SMALL_QTY, + STRING, + withSentinel +} from '@subsquid/util-internal-validation' + + +export function getBlockHeaderProps(fields: FieldSelection['block'], forArchive: boolean) { + let natural = forArchive ? NAT : SMALL_QTY + return { + height: NAT, + hash: HEX, + parentHash: HEX, + ...project(fields, { + nonce: withSentinel('BlockHeader.nonce', 0, NAT), + difficulty: withSentinel('BlockHeader.difficulty', 0, NAT), + size: withSentinel('BlockHeader.size', 0, NAT), + timestamp: withSentinel('BlockHeader.timestamp', 0, NAT), + }) + } +} + + +export function getTxProps(fields: FieldSelection['transaction'], forArchive: boolean) { + // let natural = forArchive ? NAT : SMALL_QTY + return { + transactionIndex: NAT, + ...project(fields, { + hash: HEX, + size: NAT, + sourceOutputs: option(array(object({ + lockingBytecode: HEX, + token: option(object({ + amount: BIGINT, + category: HEX, + nft: option(object({ + capability: STRING, + commitment: HEX, + })), + })), + valueSatoshis: BIGINT, + address: STRING, + }))), + inputs: array(object({ + outpointIndex: NAT, + outpointTransactionHash: HEX, + sequenceNumber: NAT, + unlockingBytecode: HEX, + })), + outputs: array(object({ + lockingBytecode: HEX, + token: option(object({ + amount: BIGINT, + category: HEX, + nft: option(object({ + capability: STRING, + commitment: HEX, + })), + })), + valueSatoshis: BIGINT, + address: STRING, + })), + locktime: NAT, + version: NAT, + }) + } +} + + +export function project( + fields: F | undefined, + obj: T +): Partial { + if (fields == null) return {} + let result: Partial = {} + let key: keyof T + for (key in obj) { + if (fields[key]) { + result[key] = obj[key] + } + } + return result +} + + +export function isEmpty(obj: object): boolean { + for (let _ in obj) { + return false + } + return true +} + + +export function assertAssignable(): void {} diff --git a/bch/bch-processor/src/mapping/selection.ts b/bch/bch-processor/src/mapping/selection.ts new file mode 100644 index 000000000..37988228b --- /dev/null +++ b/bch/bch-processor/src/mapping/selection.ts @@ -0,0 +1,42 @@ +import {FieldSelection} from '../interfaces/data.js' +import {object, option, BOOLEAN} from '@subsquid/util-internal-validation' + + +type GetFieldSelectionSchema = {[K in keyof T]-?: typeof FIELD} + + +const FIELD = option(BOOLEAN) + + +export function getBlockHeaderSelectionValidator() { + let fields: GetFieldSelectionSchema = { + nonce: FIELD, + difficulty: FIELD, + size: FIELD, + timestamp: FIELD, + } + return object(fields) +} + + +export function getTxSelectionValidator() { + let fields: GetFieldSelectionSchema = { + hash: FIELD, + inputs: FIELD, + locktime: FIELD, + outputs: FIELD, + version: FIELD, + size: FIELD, + sourceOutputs: FIELD, + fee: FIELD, + } + return object(fields) +} + + +export function getFieldSelectionValidator() { + return object({ + block: option(getBlockHeaderSelectionValidator()), + transaction: option(getTxSelectionValidator()), + }) +} \ No newline at end of file diff --git a/bch/bch-processor/src/processor.ts b/bch/bch-processor/src/processor.ts new file mode 100644 index 000000000..65901e75f --- /dev/null +++ b/bch/bch-processor/src/processor.ts @@ -0,0 +1,620 @@ +import {HttpAgent, HttpClient} from '@subsquid/http-client' +import {createLogger, Logger} from '@subsquid/logger' +import {RpcClient} from '@subsquid/rpc-client' +import {assertNotNull, def, runProgram} from '@subsquid/util-internal' +import {ArchiveClient} from '@subsquid/util-internal-archive-client' +import {Database, getOrGenerateSquidId, PrometheusServer, Runner} from '@subsquid/util-internal-processor-tools' +import {applyRangeBound, mergeRangeRequests, Range, RangeRequest} from '@subsquid/util-internal-range' +import {cast} from '@subsquid/util-internal-validation' +import assert from 'assert' +import {BchArchive} from './ds-archive/client.js' +import {BchRpcDataSource} from './ds-rpc/client.js' +import {Chain} from './interfaces/chain.js' +import {BlockData, DEFAULT_FIELDS, FieldSelection, Transaction} from './interfaces/data.js' +import {DataRequest, + TransactionRequest} from './interfaces/data-request.js' +import {getFieldSelectionValidator} from './mapping/selection.js' +import {RpcValidationFlags} from './ds-rpc/rpc.js' + + +export interface RpcEndpointSettings { + /** + * RPC endpoint URL (either http(s) or ws(s)) + */ + url: string + /** + * Maximum number of ongoing concurrent requests + */ + capacity?: number + /** + * Maximum number of requests per second + */ + rateLimit?: number + /** + * Request timeout in `ms` + */ + requestTimeout?: number + /** + * Maximum number of retry attempts. + * + * By default, retries all "retryable" errors indefinitely. + */ + retryAttempts?: number + /** + * Maximum number of requests in a single batch call + */ + maxBatchCallSize?: number + /** + * HTTP headers + */ + headers?: Record +} + + +export interface RpcDataIngestionSettings { + /** + * Poll interval for new blocks in `ms` + * + * Poll mechanism is used to get new blocks via HTTP connection. + */ + headPollInterval?: number + /** + * When websocket subscription is used to get new blocks, + * this setting specifies timeout in `ms` after which connection + * will be reset and subscription re-initiated if no new block where received. + */ + newHeadTimeout?: number + /** + * Disable RPC data ingestion entirely + */ + disabled?: boolean + + /** + * Flags to switch off the data consistency checks + */ + validationFlags?: RpcValidationFlags +} + + +export interface GatewaySettings { + /** + * Subsquid Network Gateway url + */ + url: string + /** + * Request timeout in ms + */ + requestTimeout?: number +} + + +/** + * @deprecated + */ +export type ArchiveSettings = GatewaySettings + + +/** + * @deprecated + */ +export type DataSource = ArchiveDataSource | ChainDataSource + + + +interface ArchiveDataSource { + /** + * Subsquid evm archive endpoint URL + */ + archive: string | GatewaySettings + /** + * Chain node RPC endpoint URL + */ + chain?: string | RpcEndpointSettings +} + + +interface ChainDataSource { + archive?: undefined + /** + * Chain node RPC endpoint URL + */ + chain: string | RpcEndpointSettings +} + + +interface BlockRange { + /** + * Block range + */ + range?: Range +} + + +/** + * API and data that is passed to the data handler + */ +export interface DataHandlerContext { + /** + * @internal + */ + _chain: Chain + /** + * An instance of a structured logger. + */ + log: Logger + /** + * Storage interface provided by the database + */ + store: Store + /** + * List of blocks to map and process + */ + blocks: BlockData[] + /** + * List of mempool transactions to process + */ + mempoolTransactions: Transaction[] + /** + * Signals, that the processor reached the head of a chain. + * + * The head block is always included in `.blocks`. + */ + isHead: boolean +} + + +export type BchBatchProcessorFields = T extends BchBatchProcessor ? F : never + + +/** + * Provides methods to configure and launch data processing. + */ +export class BchBatchProcessor { + private requests: RangeRequest[] = [] + private blockRange?: Range + private fields?: FieldSelection + private finalityConfirmation?: number + private archive?: GatewaySettings + private rpcIngestSettings?: RpcDataIngestionSettings + private rpcEndpoint?: RpcEndpointSettings + private p2pEndpoint?: string // endpoint in format "ip:port" + private running = false + + private hotDataSource!: BchRpcDataSource + + /** + * @deprecated Use {@link .setGateway()} + */ + setArchive(url: string | GatewaySettings): this { + return this.setGateway(url) + } + + /** + * Set Subsquid Network Gateway endpoint (ex Archive). + * + * Subsquid Network allows to get data from finalized blocks up to + * infinite times faster and more efficient than via regular RPC. + * + * @example + * processor.setGateway('https://v2.archive.subsquid.io/network/ethereum-mainnet') + */ + setGateway(url: string | GatewaySettings): this { + this.assertNotRunning() + if (typeof url == 'string') { + this.archive = {url} + } else { + this.archive = url + } + return this + } + + /** + * Set chain RPC endpoint + * + * @example + * // just pass a URL + * processor.setRpcEndpoint('https://eth-mainnet.public.blastapi.io') + * + * // adjust some connection options + * processor.setRpcEndpoint({ + * url: 'https://eth-mainnet.public.blastapi.io', + * rateLimit: 10 + * }) + */ + setRpcEndpoint(url: string | RpcEndpointSettings | undefined): this { + this.assertNotRunning() + if (typeof url == 'string') { + this.rpcEndpoint = {url} + } else { + this.rpcEndpoint = url + } + return this + } + + /** + * Optionaly set chain p2p endpoint, which is used to fetch blocks, monitor mempool, etc. Will default to a hardcoded one + * + * @example + * // just pass a an 'ip:port' string, see https://gitlab.com/bitcoin-cash-node/bitcoin-cash-node/-/blob/master/src/chainparamsseeds.h + * processor.setP2pEndpoint('3.142.98.179:8333') + * + */ + setP2pEndpoint(url: string): this { + this.assertNotRunning() + this.p2pEndpoint = url + return this + } + + /** + * Sets blockchain data source. + * + * @example + * processor.setDataSource({ + * archive: 'https://v2.archive.subsquid.io/network/ethereum-mainnet', + * chain: 'https://eth-mainnet.public.blastapi.io' + * }) + * + * @deprecated Use separate {@link .setGateway()} and {@link .setRpcEndpoint()} methods + * to specify data sources. + */ + setDataSource(src: DataSource): this { + this.assertNotRunning() + if (src.archive) { + this.setGateway(src.archive) + } else { + this.archive = undefined + } + if (src.chain) { + this.setRpcEndpoint(src.chain) + } else { + this.rpcEndpoint = undefined + } + return this + } + + /** + * Set up RPC data ingestion settings + */ + setRpcDataIngestionSettings(settings: RpcDataIngestionSettings): this { + this.assertNotRunning() + this.rpcIngestSettings = settings + return this + } + + /** + * @deprecated Use {@link .setRpcDataIngestionSettings()} instead + */ + setChainPollInterval(ms: number): this { + assert(ms >= 0) + this.assertNotRunning() + this.rpcIngestSettings = {...this.rpcIngestSettings, headPollInterval: ms} + return this + } + + /** + * Never use RPC endpoint for data ingestion. + * + * @deprecated This is the same as `.setRpcDataIngestionSettings({disabled: true})` + */ + useArchiveOnly(yes?: boolean): this { + this.assertNotRunning() + this.rpcIngestSettings = {...this.rpcIngestSettings, disabled: yes !== false} + return this + } + + /** + * Distance from the head block behind which all blocks are considered to be finalized. + */ + setFinalityConfirmation(nBlocks: number): this { + this.assertNotRunning() + this.finalityConfirmation = nBlocks + return this + } + + /** + * Configure a set of fetched fields + */ + setFields(fields: T): BchBatchProcessor { + this.assertNotRunning() + let validator = getFieldSelectionValidator() + this.fields = cast(validator, fields) + + if (this.fields?.transaction?.sourceOutputs) { + this.add({sourceOutputs: true}, {from: this.requests.sort((a, b) => a.range.from - b.range.from)[0].range.from}) + } + + if (this.fields?.transaction?.fee) { + if (!this.fields?.transaction?.sourceOutputs) { + throw new Error('Fee calculation requires sourceOutputs') + } + this.add({fee: true}, {from: this.requests.sort((a, b) => a.range.from - b.range.from)[0].range.from}) + } + + return this as any + } + + private add(request: DataRequest, range?: Range): void { + this.requests.push({ + range: range || {from: 0}, + request + }) + } + + /** + * By default, the processor will fetch only blocks + * which contain requested items. This method + * modifies such behaviour to fetch all chain blocks. + * + * Optionally a range of blocks can be specified + * for which the setting should be effective. + */ + includeAllBlocks(range?: Range): this { + this.assertNotRunning() + this.add({includeAllBlocks: true}, range) + return this + } + + addTransaction(options: TransactionRequest & BlockRange): this { + this.assertNotRunning() + this.add({ + transactions: [mapRequest(options)] + }, options.range) + return this + } + + /** + * Limits the range of blocks to be processed. + * + * When the upper bound is specified, + * the processor will terminate with exit code 0 once it reaches it. + */ + setBlockRange(range?: Range): this { + this.assertNotRunning() + this.blockRange = range + return this + } + + /** + * Sets the port for a built-in prometheus metrics server. + * + * By default, the value of `PROMETHEUS_PORT` environment + * variable is used. When it is not set, + * the processor will pick up an ephemeral port. + */ + setPrometheusPort(port: number | string): this { + this.assertNotRunning() + this.getPrometheusServer().setPort(port) + return this + } + + private assertNotRunning(): void { + if (this.running) { + throw new Error('Settings modifications are not allowed after start of processing') + } + } + + @def + private getLogger(): Logger { + return createLogger('sqd:processor') + } + + @def + private getSquidId(): string { + return getOrGenerateSquidId() + } + + @def + private getPrometheusServer(): PrometheusServer { + return new PrometheusServer() + } + + @def + private getChainRpcClient(): RpcClient { + if (this.rpcEndpoint == null) { + throw new Error(`use .setRpcEndpoint() to specify chain RPC endpoint`) + } + let client = new RpcClient({ + url: this.rpcEndpoint.url, + headers: this.rpcEndpoint.headers, + maxBatchCallSize: this.rpcEndpoint.maxBatchCallSize ?? 25, + requestTimeout: this.rpcEndpoint.requestTimeout ?? 10_000, + capacity: this.rpcEndpoint.capacity ?? 2, + rateLimit: this.rpcEndpoint.rateLimit, + retryAttempts: this.rpcEndpoint.retryAttempts ?? Number.MAX_SAFE_INTEGER, + log: this.getLogger().child('rpc', {rpcUrl: this.rpcEndpoint.url}) + }) + this.getPrometheusServer().addChainRpcMetrics(() => client.getMetrics()) + return client + } + + @def + private getChain(): Chain { + let self = this + return { + get client() { + return self.getHotDataSource().rpc + } + } + } + + @def + private getHotDataSource(): BchRpcDataSource { + if (this.finalityConfirmation == null) { + throw new Error(`use .setFinalityConfirmation() to specify number of children required to confirm block's finality`) + } + + if (!this.hotDataSource) { + this.hotDataSource = new BchRpcDataSource({ + rpc: this.getChainRpcClient(), + p2pEndpoint: this.p2pEndpoint, + finalityConfirmation: this.finalityConfirmation, + headPollInterval: this.rpcIngestSettings?.headPollInterval, + newHeadTimeout: this.rpcIngestSettings?.newHeadTimeout, + validationFlags: this.rpcIngestSettings?.validationFlags, + log: this.getLogger().child('rpc', {rpcUrl: this.getChainRpcClient().url}) + }) + } + + return this.hotDataSource + } + + @def + private getArchiveDataSource(): BchArchive { + let archive = assertNotNull(this.archive) + + let log = this.getLogger().child('archive') + + let http = new HttpClient({ + headers: { + 'x-squid-id': this.getSquidId() + }, + agent: new HttpAgent({ + keepAlive: true + }), + log + }) + + return new BchArchive( + new ArchiveClient({ + http, + url: archive.url, + queryTimeout: archive.requestTimeout, + log + }) + ) + } + + @def + private getBatchRequests(): RangeRequest[] { + let requests = mergeRangeRequests(this.requests, function merge(a: DataRequest, b: DataRequest) { + let res: DataRequest = {} + if (a.includeAllBlocks || b.includeAllBlocks) { + res.includeAllBlocks = true + } + res.transactions = concatRequestLists(a.transactions, b.transactions) + // res.logs = concatRequestLists(a.logs, b.logs) + // res.traces = concatRequestLists(a.traces, b.traces) + // res.stateDiffs = concatRequestLists(a.stateDiffs, b.stateDiffs) + return res + }) + + let fields = addDefaultFields(this.fields) + for (let req of requests) { + req.request.fields = fields + } + + return applyRangeBound(requests, this.blockRange) + } + + /** + * Run data processing. + * + * This method assumes full control over the current OS process as + * it terminates the entire program in case of error or + * at the end of data processing. + * + * @param database - database is responsible for providing storage to data handlers + * and persisting mapping progress and status. + * + * @param handler - The data handler, see {@link DataHandlerContext} for an API available to the handler. + */ + run(database: Database, handler: (ctx: DataHandlerContext) => Promise): void { + this.assertNotRunning() + this.running = true + let log = this.getLogger() + + runProgram(async () => { + let chain = this.getChain() + let mappingLog = log.child('mapping') + + if (this.archive == null && this.rpcEndpoint == null) { + throw new Error( + 'No data source where specified. ' + + 'Use .setArchive() to specify Subsquid Archive and/or .setRpcEndpoint() to specify RPC endpoint.' + ) + } + + if (this.archive == null && this.rpcIngestSettings?.disabled) { + throw new Error('Subsquid Archive is required when RPC data ingestion is disabled') + } + + return new Runner({ + database, + requests: this.getBatchRequests(), + archive: this.archive ? this.getArchiveDataSource() : undefined, + hotDataSource: this.rpcEndpoint && !this.rpcIngestSettings?.disabled + ? this.getHotDataSource() + : undefined, + allBlocksAreFinal: this.finalityConfirmation === 0, + prometheus: this.getPrometheusServer(), + log, + watchMempool: true, + process(store, batch) { + return handler({ + _chain: chain, + log: mappingLog, + store, + blocks: batch.blocks as any, + mempoolTransactions: batch.mempoolTransactions ?? [], + isHead: batch.isHead + }) + } + }).run() + }, err => log.fatal(err)) + } +} + + +function mapRequest(options: T): Omit { + let {range, ...req} = options + for (let key in req) { + let val = (req as any)[key] + if (Array.isArray(val)) { + (req as any)[key] = val.map(s => s.toLowerCase()) + } + } + return req +} + + +function concatRequestLists(a?: T[], b?: T[]): T[] | undefined { + let result: T[] = [] + if (a) { + result.push(...a) + } + if (b) { + result.push(...b) + } + return result.length == 0 ? undefined : result +} + + +function addDefaultFields(fields?: FieldSelection): FieldSelection { + return { + block: mergeDefaultFields(DEFAULT_FIELDS.block, fields?.block), + transaction: mergeDefaultFields(DEFAULT_FIELDS.transaction, fields?.transaction), + } +} + + +type Selector = { + [P in Props]?: boolean +} + + +function mergeDefaultFields( + defaults: Selector, + selection?: Selector +): Selector { + let result: Selector = {...defaults} + for (let key in selection) { + if (selection[key] != null) { + if (selection[key]) { + result[key] = true + } else { + delete result[key] + } + } + } + return result +} diff --git a/bch/bch-processor/tsconfig.json b/bch/bch-processor/tsconfig.json new file mode 100644 index 000000000..6f21f9ec7 --- /dev/null +++ b/bch/bch-processor/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "module": "ESNext", + "target": "ESNext", + "moduleResolution": "node", + "outDir": "lib", + "rootDir": "src", + "allowJs": true, + "strict": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "esModuleInterop": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "skipLibCheck": true + }, + "include": ["src"], + "exclude": [ + "node_modules" + ] +} diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 901ed1a9b..233f389d7 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -14,6 +14,9 @@ dependencies: '@aws-sdk/client-s3': specifier: ^3.462.0 version: 3.462.0 + '@bitauth/libauth': + specifier: ^3.0.0 + version: 3.0.0 '@coral-xyz/anchor': specifier: ^0.29.0 version: 0.29.0 @@ -44,6 +47,9 @@ dependencies: '@rush-temp/batch-processor': specifier: file:./projects/batch-processor.tgz version: file:projects/batch-processor.tgz + '@rush-temp/bch-processor': + specifier: file:./projects/bch-processor.tgz + version: file:projects/bch-processor.tgz '@rush-temp/big-decimal': specifier: file:./projects/big-decimal.tgz version: file:projects/big-decimal.tgz @@ -398,6 +404,9 @@ dependencies: big.js: specifier: ~6.2.1 version: 6.2.1 + bitcoin-minimal: + specifier: ^1.1.7 + version: 1.1.7 blake2b: specifier: ^2.1.4 version: 2.1.4 @@ -455,12 +464,18 @@ dependencies: latest-version: specifier: ^5.1.0 version: 5.1.0 + lru-cache: + specifier: ^11.0.2 + version: 11.0.2 mocha: specifier: ^10.7.0 version: 10.7.3 node-fetch: specifier: ^3.3.2 version: 3.3.2 + p2p-cash: + specifier: ^1.1.12 + version: 1.1.12 pg: specifier: ^8.11.3 version: 8.11.3 @@ -1278,6 +1293,11 @@ packages: regenerator-runtime: 0.14.0 dev: false + /@bitauth/libauth@3.0.0: + resolution: {integrity: sha512-3yoL31XpnhAnf5nDVMFk4xPqebxDwXrgYAwpa31ARJnV5A/eXWlpNYvCd6FTZPFM4VvKfjCBi+jRCrw1hOZ0Jg==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dev: false + /@coral-xyz/anchor-errors@0.30.1: resolution: {integrity: sha512-9Mkradf5yS5xiLWrl9WrpjqOrAV+/W2RQHDlbnAZBivoGpOs1ECjoDCkVk4aRG8ZdiFiB8zQEVlxf+8fKkmSfQ==} engines: {node: '>=10'} @@ -2657,6 +2677,15 @@ packages: '@subsquid/util-internal-binary-heap': 1.0.0 dev: false + /@subsquid/util-internal-validation@0.6.0: + resolution: {integrity: sha512-OjrtBS9oJQApNa/ar9IMB0l2+IIydxLKIlxpJsyHgI0buK+aWofDq1aPaPh3XtCKrHzLDkrM9KAqkt8fQirifQ==} + peerDependencies: + '@subsquid/logger': ^1.3.3 + peerDependenciesMeta: + '@subsquid/logger': + optional: true + dev: false + /@subsquid/util-internal@2.5.2: resolution: {integrity: sha512-N7lfZdWEkM35jG5wdGYx25TJKGGLMOx9VInSeRhW9T/3BEmHAuSWI2mIIYnZ8w5L041V8HGo61ijWF6qsXvZjg==} dev: false @@ -3288,6 +3317,14 @@ packages: resolution: {integrity: sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==} dev: false + /async-mutex@0.4.1: + resolution: {integrity: sha512-WfoBo4E/TbCX1G95XTjbWTE3X2XLG0m1Xbv2cwOtuPdyH9CZvnaA5nCt1ucjaKEgW2A5IF71hxrRhr83Je5xjA==} + requiresBuild: true + dependencies: + tslib: 2.6.2 + dev: false + optional: true + /async-retry@1.3.3: resolution: {integrity: sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==} dependencies: @@ -3362,6 +3399,11 @@ packages: resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==} dev: false + /bitcoin-minimal@1.1.7: + resolution: {integrity: sha512-Rs3MOO0rApYhWWl+M2QT+jCEAsz47h2+iRrUibToM/xFQrlhgfC1k/ufxzvXwlcBMTO2krOcn0+4pmiFCGeGFQ==} + engines: {node: '>=12.0.0'} + dev: false + /blake2b-wasm@2.4.0: resolution: {integrity: sha512-S1kwmW2ZhZFFFOghcx73+ZajEfKBqhP82JMssxtLVMxlaPea1p9uoLiUZ5WYyHn0KddwbLc+0vh4wR0KBNoT5w==} dependencies: @@ -4825,6 +4867,16 @@ packages: ws: 7.5.10 dev: false + /isomorphic-ws@5.0.0(ws@8.17.1): + resolution: {integrity: sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==} + requiresBuild: true + peerDependencies: + ws: '*' + dependencies: + ws: 8.17.1(bufferutil@4.0.7)(utf-8-validate@5.0.10) + dev: false + optional: true + /isows@1.0.4(ws@8.17.1): resolution: {integrity: sha512-hEzjY+x9u9hPmBom9IIAqdJCwNLax+xrPb51vEPpERoFlIxgmZcHzsT5jKG06nvInKOBGvReAVz80Umed5CczQ==} peerDependencies: @@ -5076,6 +5128,11 @@ packages: engines: {node: 14 || >=16.14} dev: false + /lru-cache@11.0.2: + resolution: {integrity: sha512-123qHRfJBmo2jXDbo/a5YOQrJoHF/GNQTLzQ5+IdK5pWpceK17yRc6ozlWd25FxvGKQbIUs91fDFkXmDHTKcyA==} + engines: {node: 20 || >=22} + dev: false + /lru-cache@6.0.0: resolution: {integrity: sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==} engines: {node: '>=10'} @@ -5407,6 +5464,19 @@ packages: p-limit: 3.1.0 dev: false + /p2p-cash@1.1.12: + resolution: {integrity: sha512-26Fngsyp+SEtd7zU/Zv40Om2jvOr3qQI9xmmlv94y7DMYsKggOjBiDsV8fWqAgmC5piarsObr+kC1/mKNiZZwg==} + dependencies: + bitcoin-minimal: 1.1.7 + optionalDependencies: + async-mutex: 0.4.1 + isomorphic-ws: 5.0.0(ws@8.17.1) + ws: 8.17.1(bufferutil@4.0.7)(utf-8-validate@5.0.10) + transitivePeerDependencies: + - bufferutil + - utf-8-validate + dev: false + /package-json@6.5.0: resolution: {integrity: sha512-k3bdm2n25tkyxcjSKzB5x8kfVxlMdgsbPr0GkZcwHsLpba6cBjqCt1KlcChKEvxHIcTB1FVMuwoijZ26xex5MQ==} engines: {node: '>=8'} @@ -6943,6 +7013,23 @@ packages: typescript: 5.5.4 dev: false + file:projects/bch-processor.tgz: + resolution: {integrity: sha512-onnymQyesO8Zu/ABtv7R0GrkvhhIPwGsa2OLwX8LNfAIMfZJUQRU60HZC7lI/2kWRjJiWSq4TZYgfotcAzQ0DQ==, tarball: file:projects/bch-processor.tgz} + name: '@rush-temp/bch-processor' + version: 0.0.0 + dependencies: + '@bitauth/libauth': 3.0.0 + '@subsquid/util-internal-validation': 0.6.0 + '@types/node': 18.19.0 + bitcoin-minimal: 1.1.7 + lru-cache: 11.0.2 + p2p-cash: 1.1.12 + typescript: 5.5.4 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + dev: false + file:projects/big-decimal.tgz: resolution: {integrity: sha512-/Er4O4+BTjIgMcLL/oy3KUDz5o/97kqUHu1HGrIvixflTucO3M2Dm/GnwE1d81EtsEaCR57yWvZ7u07HLEU3og==, tarball: file:projects/big-decimal.tgz} name: '@rush-temp/big-decimal' @@ -6991,7 +7078,7 @@ packages: dev: false file:projects/data-test.tgz: - resolution: {integrity: sha512-vfocRZTAM/R/+T+dR+5OwAzOim5k+JWu+uWq5NqTINBuin6gQ4H6ufbTXjXeAvD3OvAyaUqS2TPYENntSFAHdA==, tarball: file:projects/data-test.tgz} + resolution: {integrity: sha512-s+hwonZVmCEShGErXBavkrZKxMnQxPGBIsraWM+8LK74aGUAD1nYqshUh9jh05RlwavSWEilNTo+BbI81fFriw==, tarball: file:projects/data-test.tgz} name: '@rush-temp/data-test' version: 0.0.0 dependencies: @@ -7100,7 +7187,7 @@ packages: dev: false file:projects/evm-processor.tgz: - resolution: {integrity: sha512-JEYc/4KCDHotB8lMj+SjVPzNPOPpzsWXWwreQy+hAjZQes/fWnMg4FGQ71NFQOYwZ/zL0k+imKUpvvcTQ69ALw==, tarball: file:projects/evm-processor.tgz} + resolution: {integrity: sha512-1s05mLWlR1INC4Qe5exVe+klgXztqZlwPGs/a4V0tUYjdiaxz3IENfOhuyWHWGRYf0CIGFf0KQ0FigGLumYY+A==, tarball: file:projects/evm-processor.tgz} name: '@rush-temp/evm-processor' version: 0.0.0 dependencies: @@ -7109,7 +7196,7 @@ packages: dev: false file:projects/evm-typegen.tgz: - resolution: {integrity: sha512-zze/6ha5YyYINCNglygQHh63jV8/nsVqPbKpZeeznnnEldNG2n6/WMajR0vdAnGDRTMR+nmP9/TveOPbDTX+mg==, tarball: file:projects/evm-typegen.tgz} + resolution: {integrity: sha512-3DiW6rLdLmAUqUB+P1iYHfRHi9123lAVabeIrJB0klQj16q70mvel+mrY9hCk1nUvxiEVEnPCueay3Ldf2ZQTg==, tarball: file:projects/evm-typegen.tgz} name: '@rush-temp/evm-typegen' version: 0.0.0 dependencies: @@ -7140,7 +7227,7 @@ packages: dev: false file:projects/fuel-data.tgz: - resolution: {integrity: sha512-6XW1Dvmzit570V/bEwNjiD5jJ9ZHbvJ3gyNoS6HE2qoR+C2/NPqPjMcev6xhOd/XS2/raaw6y8c6ywT3lTWTUw==, tarball: file:projects/fuel-data.tgz} + resolution: {integrity: sha512-LZwxHIPqKJw/590kj/j7tba2SzlHejChA/x2JTnSD9HW1ijOrrmit8TYH9zPf4mcW4gZTM9hcZFFtjIOfPLsCw==, tarball: file:projects/fuel-data.tgz} name: '@rush-temp/fuel-data' version: 0.0.0 dependencies: @@ -7149,7 +7236,7 @@ packages: dev: false file:projects/fuel-dump.tgz: - resolution: {integrity: sha512-P93UXjAjSIwTD/ZyemYajxts+j65qTByNGxAApyDXZOWywHdt0K3EPnDyLDz4/gm8LG6OtalcJfOhyPU/iKk2A==, tarball: file:projects/fuel-dump.tgz} + resolution: {integrity: sha512-ImKKGnItSKiCytmEqLaz148rhsH0b2kxYWkAMAR+JN3s89/rVDLKcFrqOh6CJHinIeZ1/zcT4ZoFzVXyYIS9mQ==, tarball: file:projects/fuel-dump.tgz} name: '@rush-temp/fuel-dump' version: 0.0.0 dependencies: @@ -7216,7 +7303,7 @@ packages: dev: false file:projects/fuel-stream.tgz: - resolution: {integrity: sha512-iEuYdfArMof7F87gKvpnuGx4j/PzsgkMrruS29W2UTx+7FhwkUnujQxqLQx2E2++SuKSb18Ga+eI+GP0lsl07g==, tarball: file:projects/fuel-stream.tgz} + resolution: {integrity: sha512-6J2VBe48XWIqSZ3M8lZtBrsHZr+p8zRTofEkolzhhT4kighSSCG/HVPp3tp1ZXc5rPGpQGT/JZeCArOsGo3EhQ==, tarball: file:projects/fuel-stream.tgz} name: '@rush-temp/fuel-stream' version: 0.0.0 dependencies: @@ -7225,7 +7312,7 @@ packages: dev: false file:projects/gql-test-client.tgz(graphql@15.8.0): - resolution: {integrity: sha512-JGE+gV8EgQ8u90IfJW06yab+oC1gw+HsJdwvSj6LdzY6dbtlIDXNRFeJMV988eLN8IxLSdDhd98h4YYt6n3suw==, tarball: file:projects/gql-test-client.tgz} + resolution: {integrity: sha512-bivyHNQZ2H4YrNdCo4dwEc53cIe+iXpU2UF6vMjBwbgHqiIbrhGpMbtzgKoZLdNAkbqchh0E543pkDQy318iow==, tarball: file:projects/gql-test-client.tgz} id: file:projects/gql-test-client.tgz name: '@rush-temp/gql-test-client' version: 0.0.0 @@ -7404,7 +7491,7 @@ packages: dev: false file:projects/rpc-client.tgz: - resolution: {integrity: sha512-B54boDboO5+HelVasKnnBZs/VahkkLvB9ETLN4nlILcTBwv2SZP0UbC4VYNaSRm5n1PtYb52954pqKlNkEecmA==, tarball: file:projects/rpc-client.tgz} + resolution: {integrity: sha512-T49gc/C/cmRUB956khwoKu41Izdk4BPYLlFBZAWkPn0xDje0kbmmjb76WyueHTu7mrjvSdjVNqrIkg/RYxGV8Q==, tarball: file:projects/rpc-client.tgz} name: '@rush-temp/rpc-client' version: 0.0.0 dependencies: @@ -7580,7 +7667,7 @@ packages: dev: false file:projects/solana-stream.tgz: - resolution: {integrity: sha512-xt4F3+TWZqYkHFwWyxO9FFeCi1ZwfDjgt3AkM9qPwhnxBfyawaGdEeTUwfJtPJsNrxaroMLXmLRFmKp65iXO8w==, tarball: file:projects/solana-stream.tgz} + resolution: {integrity: sha512-krZzJP5QBey2MWE6BMGBn4fRnV4P1weoVts3iAyfNNYs+ukKIZLn76qFrzJ8yIpYlZUF4tq49CK0v0jnZ+XHEA==, tarball: file:projects/solana-stream.tgz} name: '@rush-temp/solana-stream' version: 0.0.0 dependencies: @@ -7590,7 +7677,7 @@ packages: dev: false file:projects/solana-typegen.tgz: - resolution: {integrity: sha512-Qva321wiPxpOzXBUf6Qj5vlQ2eZILvwFdBCfchCmxQY7EqzLILY+FjMfNcj/VzaA3tTnfp8adYgO1YfoKHdCMw==, tarball: file:projects/solana-typegen.tgz} + resolution: {integrity: sha512-P2R//97dE6hHXv65zV5e3SuD+ArfjY1jxt9Ct2j5ckibvec646B47UHZsBRTlu6u9tVJSJIFth4OvWa7CogkYw==, tarball: file:projects/solana-typegen.tgz} name: '@rush-temp/solana-typegen' version: 0.0.0 dependencies: @@ -7687,7 +7774,7 @@ packages: dev: false file:projects/starknet-stream.tgz: - resolution: {integrity: sha512-WSp/9Dd2y5+GeN0F01Az1HS5BtnOAjlDQ6o1qwlUptfoYMB6DBf4udXGAA7i2Sj3AijE+t3pAxoRXkX82Y3GmA==, tarball: file:projects/starknet-stream.tgz} + resolution: {integrity: sha512-xOStr+km+Cj7Q9yY8YNp0ledJQhcYDzparQYEVgg55Na1fPHHJls2qvHiGb5mqofRjVfQlYsmWkD1QeyXpojUg==, tarball: file:projects/starknet-stream.tgz} name: '@rush-temp/starknet-stream' version: 0.0.0 dependencies: @@ -7760,7 +7847,7 @@ packages: dev: false file:projects/substrate-processor.tgz: - resolution: {integrity: sha512-bv9yCssIgRcDJfSIiJqw97p9t03FHIBMk5Qvfs3cQuN9MOOCB4retTUPhzT+yhgtD1czIriXIcAgmGBgcEeOHw==, tarball: file:projects/substrate-processor.tgz} + resolution: {integrity: sha512-nvjuzvcpbwWO7Bg5mfDrBT+r5HlkldIxjHBStsgPT3OZXKAMN+hSCkGhdwF5urVNLNcuokPT5QexZq0grc3lDw==, tarball: file:projects/substrate-processor.tgz} name: '@rush-temp/substrate-processor' version: 0.0.0 dependencies: @@ -7782,7 +7869,7 @@ packages: dev: false file:projects/substrate-typegen.tgz: - resolution: {integrity: sha512-HDrlRIPga2gKlNtr8I+7jJPb4UMygW8+7BEIBJsIOVjpk/jPBQX5EjWOg6bmuiL6PtODQTN4gDutgrmptMOLZg==, tarball: file:projects/substrate-typegen.tgz} + resolution: {integrity: sha512-hYFLl7+BgkKy15M2J31XXmpXDo67jmhmh1/j/C0uJbDPwiTKkLleFaDM9bPBgynB+WF3AmGv9XUkvCGvLawzbQ==, tarball: file:projects/substrate-typegen.tgz} name: '@rush-temp/substrate-typegen' version: 0.0.0 dependencies: @@ -7792,7 +7879,7 @@ packages: dev: false file:projects/tron-data.tgz: - resolution: {integrity: sha512-TSoknC0M1/aV4Jg070uz8i/V4YBRBcXkZzrSZwc1SaTQUOYVtQh8UpBdQyJLcRlVTi+K6SZtUM99wsE/VJgB3Q==, tarball: file:projects/tron-data.tgz} + resolution: {integrity: sha512-AnZ2mjIhDmxqQ6je661F2AzDRvUJHoZTkdqa0lWo5QScjjvvdEOGv42VAGXqPuYe4n4gY4Dj+K/eFzfeDCAQ+Q==, tarball: file:projects/tron-data.tgz} name: '@rush-temp/tron-data' version: 0.0.0 dependencies: @@ -7840,7 +7927,7 @@ packages: dev: false file:projects/tron-processor.tgz: - resolution: {integrity: sha512-jxOST5hrGQAEkbehlZuOZPDQ47SX+nQ1u0vp0UAmaj5XcgxoRm3u9oX807X2YOWxAfgrjX7fBWeIassnFewLVg==, tarball: file:projects/tron-processor.tgz} + resolution: {integrity: sha512-hyWAMCO/NbpJPmnw4Jo+CpML5m6wcS1j3gXe+IL7nZYUAjZzvNEa4NCtsaKaVRSNR1qr8kNz1++56Kt2VvQQ1A==, tarball: file:projects/tron-processor.tgz} name: '@rush-temp/tron-processor' version: 0.0.0 dependencies: @@ -7995,7 +8082,7 @@ packages: dev: false file:projects/util-internal-archive-client.tgz: - resolution: {integrity: sha512-cq/2eIibQ3DuQCV1bCylf0IPexvdRTf27x7BhOjR4lkGMS6DeqlF0qgVfct9WY4TCHljIey09EMnRwTJzn+7uQ==, tarball: file:projects/util-internal-archive-client.tgz} + resolution: {integrity: sha512-aTWT++wNI2gux8EwA+nSZgiaU6tM7Rh+DVnumLCpJF3z44ujTs0ipFTbnktt0pVOtkbN7G7oyTrZUXUMQgjthA==, tarball: file:projects/util-internal-archive-client.tgz} name: '@rush-temp/util-internal-archive-client' version: 0.0.0 dependencies: diff --git a/graphql/openreader/src/dialect/opencrud/where.ts b/graphql/openreader/src/dialect/opencrud/where.ts index 248efb25d..b316e84ed 100644 --- a/graphql/openreader/src/dialect/opencrud/where.ts +++ b/graphql/openreader/src/dialect/opencrud/where.ts @@ -125,5 +125,5 @@ const WHERE_KEY_REGEX = (() => { "in", "not_in", ] - return new RegExp(`^([^_]*)_(${ops.join('|')})$`) + return new RegExp(`^(.*)_(${ops.join('|')})$`) })() \ No newline at end of file diff --git a/graphql/openreader/src/model.tools.ts b/graphql/openreader/src/model.tools.ts index 3428fca4f..28e549881 100644 --- a/graphql/openreader/src/model.tools.ts +++ b/graphql/openreader/src/model.tools.ts @@ -137,8 +137,8 @@ export function validateModel(model: Model) { } -const TYPE_NAME_REGEX = /^[A-Z][a-zA-Z0-9]*$/ -const PROP_NAME_REGEX = /^[a-z][a-zA-Z0-9]*$/ +const TYPE_NAME_REGEX = /^[A-Z][a-zA-Z0-9_]*$/ +const PROP_NAME_REGEX = /^[a-z][a-zA-Z0-9_]*$/ export function validateNames(model: Model) { diff --git a/rush.json b/rush.json index d68c165df..8df7fb1ab 100644 --- a/rush.json +++ b/rush.json @@ -419,6 +419,12 @@ // */ // // "versionPolicyName": "" // }, + { + "packageName": "@subsquid/bch-processor", + "projectFolder": "bch/bch-processor", + "shouldPublish": true, + "versionPolicyName": "npm" + }, { "packageName": "@subsquid/evm-processor", "projectFolder": "evm/evm-processor", diff --git a/typeorm/typeorm-codegen/src/codegen.ts b/typeorm/typeorm-codegen/src/codegen.ts index e3e177870..42903cf5c 100644 --- a/typeorm/typeorm-codegen/src/codegen.ts +++ b/typeorm/typeorm-codegen/src/codegen.ts @@ -32,7 +32,7 @@ export function generateOrmModels(model: Model, dir: OutDir): void { dir.add('marshal.ts', path.resolve(__dirname, '../src/marshal.ts')) function generateEntity(name: string, entity: Entity): void { - index.line(`export * from "./${toCamelCase(name)}.model"`) + index.line(`export * from "./${toCamelCase(name)}.model.js"`) const out = dir.file(`${toCamelCase(name)}.model.ts`) const imports = new ImportRegistry(name) imports.useTypeormStore('Entity', 'Column', 'PrimaryColumn') @@ -80,7 +80,7 @@ export function generateOrmModels(model: Model, dir: OutDir): void { imports.useTypeormStore('OneToOne', 'Index', 'JoinColumn') out.line(`@Index_({unique: true})`) out.line( - `@OneToOne_(() => ${prop.type.entity}, {nullable: true})` + `@OneToOne_('${prop.type.entity}', {nullable: true})` ) out.line(`@JoinColumn_()`) } else { @@ -90,20 +90,20 @@ export function generateOrmModels(model: Model, dir: OutDir): void { } // Make foreign entity references always nullable out.line( - `@ManyToOne_(() => ${prop.type.entity}, {nullable: true})` + `@ManyToOne_('${prop.type.entity}', {nullable: true})` ) } break case 'lookup': imports.useTypeormStore('OneToOne') out.line( - `@OneToOne_(() => ${prop.type.entity}, e => e.${prop.type.field})` + `@OneToOne_('${prop.type.entity}', '${prop.type.field}')` ) break case 'list-lookup': imports.useTypeormStore('OneToMany') out.line( - `@OneToMany_(() => ${prop.type.entity}, e => e.${prop.type.field})` + `@OneToMany_('${prop.type.entity}', '${prop.type.field}')` ) break case 'object': @@ -196,7 +196,7 @@ export function generateOrmModels(model: Model, dir: OutDir): void { } function generateObject(name: string, object: JsonObject): void { - index.line(`export * from "./_${toCamelCase(name)}"`) + index.line(`export * from "./_${toCamelCase(name)}.js"`) const out = dir.file(`_${toCamelCase(name)}.ts`) const imports = new ImportRegistry(name) imports.useMarshal() @@ -352,7 +352,7 @@ export function generateOrmModels(model: Model, dir: OutDir): void { } function generateUnion(name: string, union: Union): void { - index.line(`export * from "./_${toCamelCase(name)}"`) + index.line(`export * from "./_${toCamelCase(name)}.js"`) const out = dir.file(`_${toCamelCase(name)}.ts`) const imports = new ImportRegistry(name) out.lazy(() => imports.render(model, out)) @@ -374,7 +374,7 @@ export function generateOrmModels(model: Model, dir: OutDir): void { } function generateEnum(name: string, e: Enum): void { - index.line(`export * from "./_${toCamelCase(name)}"`) + index.line(`export * from "./_${toCamelCase(name)}.js"`) const out = dir.file(`_${toCamelCase(name)}.ts`) out.block(`export enum ${name}`, () => { for (const val in e.values) { @@ -552,13 +552,13 @@ class ImportRegistry { out.line(`import {${importList.join(', ')}} from "@subsquid/typeorm-store"`) } if (this.marshal) { - out.line(`import * as marshal from "./marshal"`) + out.line(`import * as marshal from "./marshal.js"`) } for (const name of this.model) { switch(model[name].kind) { case 'entity': out.line( - `import {${name}} from "./${toCamelCase(name)}.model"` + `import {type ${name}} from "./${toCamelCase(name)}.model.js"` ) break default: { @@ -567,7 +567,7 @@ class ImportRegistry { names.push('fromJson' + name) } out.line( - `import {${names.join(', ')}} from "./_${toCamelCase(name)}"` + `import {type ${names.join(', ')}} from "./_${toCamelCase(name)}.js"` ) } } diff --git a/typeorm/typeorm-codegen/src/main.ts b/typeorm/typeorm-codegen/src/main.ts index 8b09d7818..1193902d6 100644 --- a/typeorm/typeorm-codegen/src/main.ts +++ b/typeorm/typeorm-codegen/src/main.ts @@ -28,7 +28,7 @@ and db migrations (if any) at db/migrations. generateOrmModels(model, generatedOrm) if (!fs.existsSync(orm.path('index.ts'))) { let index = orm.file('index.ts') - index.line(`export * from "./generated"`) + index.line(`export * from "./generated/index.js"`) index.write() } diff --git a/typeorm/typeorm-config/src/config.ts b/typeorm/typeorm-config/src/config.ts index 3c5418ba7..52fcff817 100644 --- a/typeorm/typeorm-config/src/config.ts +++ b/typeorm/typeorm-config/src/config.ts @@ -24,7 +24,7 @@ export function createOrmConfig(options?: OrmOptions): OrmConfig { let migrationsDir = path.join(dir, MIGRATIONS_DIR) let locations = { entities: [model], - migrations: [migrationsDir + '/*.js'] + migrations: [migrationsDir + '/*.?(?)js'] } log.debug(locations, 'typeorm locations') return { diff --git a/typeorm/typeorm-migration/src/create.ts b/typeorm/typeorm-migration/src/create.ts index 2336d05cf..e8b4bd751 100644 --- a/typeorm/typeorm-migration/src/create.ts +++ b/typeorm/typeorm-migration/src/create.ts @@ -3,16 +3,17 @@ import {runProgram} from "@subsquid/util-internal" import {OutDir} from "@subsquid/util-internal-code-printer" import {program} from "commander" - runProgram(async () => { program.description('Create template file for a new migration') program.option('--name', 'name suffix for new migration', 'Data') + program.option('--esm', 'generate esm module', false) let {name} = program.parse().opts() as {name: string} + let {esm} = program.parse().opts() as {esm: boolean} let dir = new OutDir(MIGRATIONS_DIR) let timestamp = Date.now() - let out = dir.file(`${timestamp}-${name}.js`) + let out = dir.file(`${timestamp}-${name}.${!esm ? 'js' : 'cjs'}`) out.block(`module.exports = class ${name}${timestamp}`, () => { out.line(`name = '${name}${timestamp}'`) out.line() diff --git a/typeorm/typeorm-migration/src/generate.ts b/typeorm/typeorm-migration/src/generate.ts index a2d8263bf..27eabe8f4 100644 --- a/typeorm/typeorm-migration/src/generate.ts +++ b/typeorm/typeorm-migration/src/generate.ts @@ -12,8 +12,10 @@ import {SqlInMemory} from "typeorm/driver/SqlInMemory" runProgram(async () => { program.description('Analyze the current database state and generate migration to match the target schema') program.option('-n, --name ', 'name suffix for new migration', 'Data') + program.option('--esm', 'generate esm module', false) let {name} = program.parse().opts() as {name: string} + let {esm} = program.parse().opts() as {esm: boolean} dotenv.config() @@ -43,7 +45,7 @@ runProgram(async () => { let dir = new OutDir(MIGRATIONS_DIR) let timestamp = Date.now() - let out = dir.file(`${timestamp}-${name}.js`) + let out = dir.file(`${timestamp}-${name}.${!esm ? 'js' : 'cjs'}`) out.block(`module.exports = class ${name}${timestamp}`, () => { out.line(`name = '${name}${timestamp}'`) out.line() diff --git a/typeorm/typeorm-store/src/database.ts b/typeorm/typeorm-store/src/database.ts index 9cfe5ff82..95db77046 100644 --- a/typeorm/typeorm-store/src/database.ts +++ b/typeorm/typeorm-store/src/database.ts @@ -17,6 +17,7 @@ export interface TypeormDatabaseOptions { projectDir?: string } +export const mempoolHeight = -2; export class TypeormDatabase { private statusSchema: string @@ -95,7 +96,7 @@ export class TypeormDatabase { } let top: HashAndHeight[] = await em.query( - `SELECT height, hash FROM ${schema}.hot_block ORDER BY height` + `SELECT height, hash FROM ${schema}.hot_block where height >= -1 ORDER BY height` ) return assertStateInvariants({...status[0], top}) @@ -111,7 +112,7 @@ export class TypeormDatabase { assert(status.length == 1) let top: HashAndHeight[] = await em.query( - `SELECT hash, height FROM ${schema}.hot_block ORDER BY height` + `SELECT hash, height FROM ${schema}.hot_block where height >= -1 ORDER BY height` ) return assertStateInvariants({...status[0], top}) @@ -122,6 +123,12 @@ export class TypeormDatabase { let state = await this.getState(em) let {prevHead: prev, nextHead: next} = info + const topHeight = prev.height + const newTopHeight = next.height + if (newTopHeight !== topHeight) { + await this.clearMempool() + } + assert(state.hash === info.prevHead.hash, RACE_MSG) assert(state.height === prev.height) assert(prev.height < next.height) @@ -151,6 +158,12 @@ export class TypeormDatabase { let state = await this.getState(em) let chain = [state, ...state.top] + const topHeight = maybeLast(chain)?.height ?? -1 + const newTopHeight = maybeLast(info.newBlocks)?.height ?? -1 + if (newTopHeight !== topHeight) { + await this.clearMempool() + } + assertChainContinuity(info.baseHead, info.newBlocks) assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height) @@ -194,6 +207,25 @@ export class TypeormDatabase { }) } + transactMempool(cb: (store: Store) => Promise): Promise { + return this.submit(async em => { + await this.insertHotBlock(em, { + height: mempoolHeight, + hash: '0x' + }) + + await this.performUpdates( + store => cb(store), + em, + new ChangeTracker(em, this.statusSchema, mempoolHeight) + ) + }) + } + + clearMempool(): Promise { + return this.submit(em => rollbackBlock(this.statusSchema, em, mempoolHeight)) + } + private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise { return em.query( `DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, @@ -203,7 +235,7 @@ export class TypeormDatabase { private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise { return em.query( - `INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, + `INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2) ON CONFLICT (height) DO NOTHING;`, [block.height, block.hash] ) } diff --git a/typeorm/typeorm-store/src/hot.ts b/typeorm/typeorm-store/src/hot.ts index 33030331b..74c118013 100644 --- a/typeorm/typeorm-store/src/hot.ts +++ b/typeorm/typeorm-store/src/hot.ts @@ -2,6 +2,7 @@ import {assertNotNull} from '@subsquid/util-internal' import type {EntityManager, EntityMetadata} from 'typeorm' import {ColumnMetadata} from 'typeorm/metadata/ColumnMetadata' import {Entity, EntityClass} from './store' +import { mempoolHeight } from './database' export interface RowRef { @@ -133,15 +134,21 @@ export class ChangeTracker { return entities } - private writeChangeRows(changes: ChangeRecord[]): Promise { + private async writeChangeRows(changes: ChangeRecord[]): Promise { let height = new Array(changes.length) + let currentMempoolIndex = 0; + if (this.blockHeight <= mempoolHeight) { + const result: {max:number}[] = await this.em.query(`SELECT MAX(index) from ${this.statusSchema}.hot_change_log WHERE block_height <= $1`, [mempoolHeight]) + currentMempoolIndex = result[0]?.max ?? 0; + } let index = new Array(changes.length) let change = new Array(changes.length) height.fill(this.blockHeight) for (let i = 0; i < changes.length; i++) { - index[i] = this.index++ + this.index++ + index[i] = height[i] <= mempoolHeight ? this.index + currentMempoolIndex : this.index change[i] = JSON.stringify(changes[i]) } diff --git a/typeorm/typeorm-store/src/test/database.test.ts b/typeorm/typeorm-store/src/test/database.test.ts index 48ffbf1c4..9d0fef7ef 100644 --- a/typeorm/typeorm-store/src/test/database.test.ts +++ b/typeorm/typeorm-store/src/test/database.test.ts @@ -87,6 +87,97 @@ describe('TypeormDatabase', function() { }) }) + it('.transact() with transactMempool() flow', async function() { + await db.connect() + + let em = await getEntityManager() + + await db.transact({ + prevHead: {height: -1, hash: '0x'}, + nextHead: {height: 10, hash: '0x10'} + }, async store => { + await store.insert(new Data({ + id: '1', + text: 'hello', + integer: 10 + })) + }) + + await db.transactMempool(async (store) => { + await store.insert(new Data({ + id: '2', + text: 'world', + integer: 20 + })) + + await store.insert(new Data({ + id: '3', + text: 'foobar', + integer: 30 + })) + }) + + { + let records = await em.find(Data, { + order: {id: 'asc'} + }) + + expect(records).toMatchObject([ + { + id: '1', + text: 'hello', + integer: 10 + }, + { + id: '2', + text: 'world', + integer: 20 + }, + { + id: '3', + text: 'foobar', + integer: 30 + } + ]) + } + + await db.transact({ + prevHead: {height: 10, hash: '0x10'}, + nextHead: {height: 20, hash: '0x20'} + }, async store => { + await store.insert(new Data({ + id: '2', + text: 'world', + integer: 20 + })) + }) + + let records = await em.find(Data, { + order: {id: 'asc'} + }) + + expect(records).toMatchObject([ + { + id: '1', + text: 'hello', + integer: 10 + }, + { + id: '2', + text: 'world', + integer: 20 + } + ]) + + await db.disconnect() + + expect(await db.connect()).toMatchObject({ + height: 20, + hash: '0x20', + top: [] + }) + }) + it('.transactHot() flow', async function() { let em = await getEntityManager() @@ -233,4 +324,91 @@ describe('TypeormDatabase', function() { ] }) }) + + it('.transactHot() with transactMempool() flow', async function() { + let em = await getEntityManager() + + await db.connect() + + await db.transactHot({ + baseHead: {height: -1, hash: '0x'}, + newBlocks: [ + {height: 0, hash: '0'}, + ], + finalizedHead: {height: 0, hash: '0'} + }, async () => {}) + + let a1 = new Data({ + id: '1', + text: 'a1', + textArray: ['a1', 'A1'], + integer: 1, + integerArray: [1, 10], + bigInteger: 1000000000000000000000000000000000000000000000000000000000n, + dateTime: new Date(1000000000000), + bytes: Buffer.from([100, 100, 100]), + json: [1, {foo: 'bar'}] + }) + + let a2 = new Data({ + id: '2', + text: 'a2', + textArray: ['a2', 'A2'], + integer: 2, + integerArray: [2, 20], + bigInteger: 2000000000000000000000000000000000000000000000000000000000n, + dateTime: new Date(2000000000000), + bytes: Buffer.from([200, 200, 200]), + json: [2, {foo: 'baz'}] + }) + + let a3 = new Data({ + id: '3', + text: 'a3', + textArray: ['a3', 'A30'], + integer: 30, + integerArray: [30, 300], + bigInteger: 3000000000000000000000000000000000000000000000000000000000n, + dateTime: new Date(3000000000000), + bytes: Buffer.from([3, 3, 3]), + json: [3, {foo: 'qux'}] + }) + + await db.transactHot({ + baseHead: {height: 0, hash: '0'}, + finalizedHead: {height: 0, hash: '0'}, + newBlocks: [ + {height: 1, hash: 'a-1'}, + ] + }, async (store) => { + await store.insert(a1) + }) + + expect(await em.find(Data, {order: {id: 'asc'}})).toEqual([ + a1 + ]) + + await db.transactMempool(async (store) => { + await store.insert(a2) + await store.insert(a3) + }) + + expect(await em.find(Data, {order: {id: 'asc'}})).toEqual([ + a1, a2, a3 + ]) + + await db.transactHot({ + finalizedHead: {height: 0, hash: '0'}, + baseHead: {height: 1, hash: 'a-1'}, + newBlocks: [ + {height: 2, hash: 'a-2'} + ] + }, async (store, block) => { + await store.insert(a2) + }) + + expect(await em.find(Data, {order: {id: 'asc'}})).toEqual([ + a1, a2 + ]) + }) }) diff --git a/util/util-internal-processor-tools/src/database.ts b/util/util-internal-processor-tools/src/database.ts index cbec050a0..ad3abff05 100644 --- a/util/util-internal-processor-tools/src/database.ts +++ b/util/util-internal-processor-tools/src/database.ts @@ -16,6 +16,8 @@ export interface FinalDatabase { supportsHotBlocks?: false connect(): Promise transact(info: FinalTxInfo, cb: (store: S) => Promise): Promise + transactMempool(cb: (store: S) => Promise): Promise + clearMempool(): Promise } @@ -39,6 +41,9 @@ export interface HotDatabase { info: HotTxInfo, cb: (store: S, blockSliceStart: number, blockSliceEnd: number) => Promise ): Promise + + transactMempool(cb: (store: S) => Promise): Promise + clearMempool(): Promise } diff --git a/util/util-internal-processor-tools/src/datasource.ts b/util/util-internal-processor-tools/src/datasource.ts index c7bba419c..048a0b247 100644 --- a/util/util-internal-processor-tools/src/datasource.ts +++ b/util/util-internal-processor-tools/src/datasource.ts @@ -17,6 +17,7 @@ export interface Block { export interface Batch { blocks: B[] isHead: boolean + mempoolTransactions?: any[] } @@ -24,6 +25,7 @@ export interface HotUpdate { blocks: B[] baseHead: HashAndHeight finalizedHead: HashAndHeight + mempoolTransactions?: any[] } @@ -40,4 +42,10 @@ export interface HotDataSource extends DataSource { state: HotDatabaseState, cb: (upd: HotUpdate) => Promise ): Promise + + processMempool?( + requests: RangeRequestList, + state: HotDatabaseState, + cb: (upd: HotUpdate) => Promise + ): Promise<() => Promise> } diff --git a/util/util-internal-processor-tools/src/runner.ts b/util/util-internal-processor-tools/src/runner.ts index b2f9b573f..da688790d 100644 --- a/util/util-internal-processor-tools/src/runner.ts +++ b/util/util-internal-processor-tools/src/runner.ts @@ -18,6 +18,7 @@ export interface RunnerConfig { database: Database log: Logger prometheus: PrometheusServer + watchMempool?: boolean } @@ -25,6 +26,8 @@ export class Runner { private metrics: RunnerMetrics private statusReportTimer?: any private hasStatusNews = false + private startedWatchingMempool = false + private cancelMempoolWatch?: () => Promise constructor(private config: RunnerConfig) { this.metrics = new RunnerMetrics(this.config.requests) @@ -38,6 +41,8 @@ export class Runner { let state = await this.getDatabaseState() if (state.height >= 0) { log.info(`last processed final block was ${state.height}`) + log.debug(`clearing mempool state`) + await this.config.database.clearMempool() } if (this.getLeftRequests(state).length == 0) { @@ -83,6 +88,8 @@ export class Runner { this.chainHeightUpdateLoop(hot) ) if (this.getLeftRequests(state).length == 0) return + } else { + await this.processMempool(state) } if (chainFinalizedHeight > state.height + state.top.length) { @@ -110,9 +117,9 @@ export class Runner { state = nextState } - return this.processHotBlocks(state).finally( + return this.processHotBlocks(state).finally(async () => { this.chainHeightUpdateLoop(hot) - ) + }) } private async assertWeAreOnTheSameChain(src: DataSource, state: HashAndHeight): Promise { @@ -149,7 +156,8 @@ export class Runner { if (prevBatch) { batch = { blocks: prevBatch.blocks.concat(batch.blocks), - isHead: batch.isHead + isHead: batch.isHead, + mempoolTransactions: [], } } if (last(batch.blocks).header.height < minimumCommitHeight) { @@ -168,6 +176,7 @@ export class Runner { } private async handleFinalizedBlocks(state: HotDatabaseState, batch: Batch): Promise { + await this.cancelMempoolWatch?.() let lastBlock = last(batch.blocks) assert(state.height < lastBlock.header.height) @@ -188,6 +197,11 @@ export class Runner { }) }) + const isHead = nextState.height === this.metrics.getChainHeight() + if (isHead && !this.startedWatchingMempool && this.config.watchMempool) { + await this.processMempool(state) + } + return nextState } @@ -200,6 +214,8 @@ export class Runner { this.getLeftRequests(state), state, async upd => { + await this.cancelMempoolWatch?.() + let newHead = maybeLast(upd.blocks)?.header || upd.baseHead if (upd.baseHead.hash !== lastHead.hash) { @@ -219,7 +235,8 @@ export class Runner { return db.transactHot2(info, (store, blockSliceStart, blockSliceEnd) => { return this.config.process(store, { blocks: upd.blocks.slice(blockSliceStart, blockSliceEnd), - isHead: blockSliceEnd === upd.blocks.length + isHead: blockSliceEnd === upd.blocks.length, + mempoolTransactions: [] }) }) } else { @@ -232,15 +249,53 @@ export class Runner { return this.config.process(store, { blocks: [block], - isHead: newHead.height === ref.height + isHead: newHead.height === ref.height, + mempoolTransactions: [] }) }) } }) lastHead = newHead + const isHead = newHead.height === this.metrics.getChainHeight() + if (isHead && !this.startedWatchingMempool && this.config.watchMempool) { + await this.processMempool(state) + } + } + ) + } + + private async processMempool(state: HotDatabaseState): Promise { + assert(this.startedWatchingMempool === false) + this.startedWatchingMempool = true + + assert(this.config.database.supportsHotBlocks) + assert(this.config.watchMempool) + let db = this.config.database + let ds = assertNotNull(this.config.hotDataSource) + + this.log.debug("started watching mempool") + const cancel = await ds.processMempool?.( + this.getLeftRequests(state), + state, + async upd => { + return db.transactMempool((store) => { + return this.config.process(store, { + blocks: [], + isHead: true, + mempoolTransactions: upd.mempoolTransactions, + }) + }) } ) + + this.cancelMempoolWatch = async () => { + this.log.debug(`stopped watching mempool`) + this.startedWatchingMempool = false + this.cancelMempoolWatch = undefined + await cancel?.() + await this.config.database.clearMempool() + } } private chainHeightUpdateLoop(src: DataSource): () => void {