diff --git a/packages/node/src/configure/NodeConfig.ts b/packages/node/src/configure/NodeConfig.ts index 162d6ab06..25b41aaa3 100644 --- a/packages/node/src/configure/NodeConfig.ts +++ b/packages/node/src/configure/NodeConfig.ts @@ -4,19 +4,19 @@ import { IConfig, NodeConfig } from '@subql/node-core'; export interface ICosmosConfig extends IConfig { - kyve: string; + kyveEndpoint: string; } export class CosmosNodeConfig extends NodeConfig { /** * This is a wrapper around the core NodeConfig to get additional properties that are provided through args or node runner options - * NOTE: This isn't injected anywhere so you need to wrap the injected node config + * NOTE: This isn't injected anywhere, so you need to wrap the injected node config * * @example * constructor( * nodeConfig: NodeConfig, * ) { - * this.nodeConfig = new EthereumNodeConfig(nodeConfig); + * this.nodeConfig = new CosmosNodeConfig(nodeConfig); * } * */ constructor(config: NodeConfig) { @@ -24,7 +24,7 @@ export class CosmosNodeConfig extends NodeConfig { super((config as any)._config, (config as any)._isTest); } - get kyve(): string { - return this._config.kyve; + get kyveEndpoint(): string { + return this._config.kyveEndpoint; } } diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index aae96f5e7..ccc67ab54 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -38,6 +38,7 @@ import { CosmosNodeConfig } from '../configure/NodeConfig'; import { SubqueryProject } from '../configure/SubqueryProject'; import * as CosmosUtil from '../utils/cosmos'; import { KyveApi } from '../utils/kyve/kyve'; +import { KyveConnection } from '../utils/kyve/kyveConnection'; import { CosmosClientConnection } from './cosmosClient.connection'; import { BlockContent } from './types'; @@ -49,7 +50,6 @@ export class ApiService implements OnApplicationShutdown { private fetchBlocksBatches = CosmosUtil.fetchBlocksBatches; - private kyve: KyveApi; private nodeConfig: CosmosNodeConfig; registry: Registry; @@ -93,27 +93,28 @@ export class ApiService this.registry = await this.buildRegistry(); - if (this.nodeConfig.kyve) { - this.kyve = new KyveApi(network.chainId, this.nodeConfig.kyve); - await this.kyve.init(); - - this.fetchBlocksBatches = this.kyve.fetchBlocksBatches.bind(this.kyve); + if (this.nodeConfig.kyveEndpoint) { + await KyveConnection.create( + network.chainId, + this.nodeConfig.kyveEndpoint, + this.registry, + ); + } else { + await this.createConnections( + network, + (endpoint) => + CosmosClientConnection.create( + endpoint, + this.fetchBlocksBatches, + this.registry, + ), + (connection: CosmosClientConnection) => { + const api = connection.unsafeApi; + return api.getChainId(); + }, + ); } - await this.createConnections( - network, - (endpoint) => - CosmosClientConnection.create( - endpoint, - this.fetchBlocksBatches, - this.registry, - ), - (connection: CosmosClientConnection) => { - const api = connection.unsafeApi; - return api.getChainId(); - }, - ); - return this; } @@ -185,25 +186,6 @@ export class CosmosClient extends CosmWasmClient { return this.tendermintClient.blockResults(height); } - decodeMsg(msg: DecodeObject): T { - try { - const decodedMsg = this.registry.decode(msg); - if ( - [ - '/cosmwasm.wasm.v1.MsgExecuteContract', - '/cosmwasm.wasm.v1.MsgMigrateContract', - '/cosmwasm.wasm.v1.MsgInstantiateContract', - ].includes(msg.typeUrl) - ) { - decodedMsg.msg = JSON.parse(new TextDecoder().decode(decodedMsg.msg)); - } - return decodedMsg; - } catch (e) { - logger.error(e, 'Failed to decode message'); - throw e; - } - } - static handleError(e: Error): Error { const formatted_error: Error = e; try { diff --git a/packages/node/src/indexer/cosmosClient.connection.ts b/packages/node/src/indexer/cosmosClient.connection.ts index 435e626eb..d3c85a870 100644 --- a/packages/node/src/indexer/cosmosClient.connection.ts +++ b/packages/node/src/indexer/cosmosClient.connection.ts @@ -23,10 +23,10 @@ const RETRY_DELAY = 2_500; const logger = getLogger('cosmos-client-connection'); -type FetchFunc = ( - api: CosmosClient, +export type FetchFunc = ( + registry: Registry, batch: number[], - kyve: KyveApi | undefined, + api?: CosmosClient, ) => Promise; export class CosmosClientConnection @@ -54,7 +54,6 @@ export class CosmosClientConnection endpoint: string, fetchBlocksBatches: FetchFunc, registry: Registry, - kyve?: KyveApi, ): Promise { const httpEndpoint: HttpEndpoint = { url: endpoint, @@ -85,10 +84,6 @@ export class CosmosClientConnection logger.info(`connected to ${endpoint}`); - if (kyve) { - connection.setKyveApi(kyve); - } - return connection; } @@ -96,10 +91,6 @@ export class CosmosClientConnection return new CosmosSafeClient(this.tmClient, height); } - private setKyveApi(kyveApi: KyveApi): void { - this.kyve = kyveApi; - } - private setTmClient(tmClient: Tendermint37Client): void { this.tmClient = tmClient; } @@ -120,9 +111,9 @@ export class CosmosClientConnection async fetchBlocks(heights: number[]): Promise { const blocks = await this.fetchBlocksBatches( - this.unsafeApi, + this.registry, heights, - this.kyve, + this.unsafeApi, ); return blocks; } diff --git a/packages/node/src/utils/cosmos.spec.ts b/packages/node/src/utils/cosmos.spec.ts index a12e6ad52..13a9da571 100644 --- a/packages/node/src/utils/cosmos.spec.ts +++ b/packages/node/src/utils/cosmos.spec.ts @@ -26,7 +26,7 @@ import { } from 'cosmjs-types/cosmwasm/wasm/v1/tx'; import { CosmosClient } from '../indexer/api.service'; import { HttpClient } from '../indexer/rpc-clients'; -import { filterMessageData, wrapEvent } from './cosmos'; +import { decodeMsg, filterMessageData, wrapEvent } from './cosmos'; const ENDPOINT = 'https://rpc-archive.junonetwork.io/'; const CHAINID = 'juno-1'; @@ -115,7 +115,7 @@ describe('CosmosUtils', () => { msg: { typeUrl: decodedTx.body.messages[0].typeUrl, get decodedMsg() { - return api.decodeMsg(decodedTx.body.messages[0]); + return decodeMsg(decodedTx.body.messages[0], registry); }, }, }; @@ -159,7 +159,7 @@ describe('CosmosUtils', () => { hash: '', decodedTx: {} as DecodedTxRaw, }; - const events = wrapEvent({} as CosmosBlock, [tx], api, 0); + const events = wrapEvent({} as CosmosBlock, [tx], api.registry, 0); expect(events.length).toEqual(0); }); diff --git a/packages/node/src/utils/cosmos.ts b/packages/node/src/utils/cosmos.ts index 948bd94c3..c51622c22 100644 --- a/packages/node/src/utils/cosmos.ts +++ b/packages/node/src/utils/cosmos.ts @@ -2,9 +2,10 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; +import { TextDecoder } from 'util'; import { sha256 } from '@cosmjs/crypto'; import { toHex } from '@cosmjs/encoding'; -import { decodeTxRaw } from '@cosmjs/proto-signing'; +import { DecodeObject, decodeTxRaw, Registry } from '@cosmjs/proto-signing'; import { fromTendermintEvent } from '@cosmjs/stargate'; import { Log, parseRawLog } from '@cosmjs/stargate/build/logs'; import { @@ -32,6 +33,28 @@ import { KyveApi } from './kyve/kyve'; const logger = getLogger('fetch'); +export function decodeMsg( + msg: DecodeObject, + registry: Registry, +): T { + try { + const decodedMsg = registry.decode(msg); + if ( + [ + '/cosmwasm.wasm.v1.MsgExecuteContract', + '/cosmwasm.wasm.v1.MsgMigrateContract', + '/cosmwasm.wasm.v1.MsgInstantiateContract', + ].includes(msg.typeUrl) + ) { + decodedMsg.msg = JSON.parse(new TextDecoder().decode(decodedMsg.msg)); + } + return decodedMsg; + } catch (e) { + logger.error(e, 'Failed to decode message'); + throw e; + } +} + export function filterBlock( data: CosmosBlock, filter?: CosmosBlockFilter, @@ -176,9 +199,9 @@ export function filterEvents( return filteredEvents; } -async function getBlockByHeight( - api: CosmosClient, +async function getBlockByHeightByRpc( height: number, + api: CosmosClient, ): Promise<[BlockResponse, BlockResultsResponse]> { return Promise.all([ api.blockInfo(height).catch((e) => { @@ -190,12 +213,16 @@ async function getBlockByHeight( ]); } -export async function fetchCosmosBlocksArray( - api: CosmosClient, +export async function fetchCosmosBlocksArray( + getBlockByHeight: ( + height: number, + api: T, + ) => Promise<[BlockResponse, BlockResultsResponse]>, blockArray: number[], + api: T, ): Promise<[BlockResponse, BlockResultsResponse][]> { return Promise.all( - blockArray.map(async (height) => getBlockByHeight(api, height)), + blockArray.map(async (height) => getBlockByHeight(height, api)), ); } @@ -228,7 +255,7 @@ export function wrapCosmosMsg( block: CosmosBlock, tx: CosmosTransaction, idx: number, - api: CosmosClient, + registry: Registry, ): CosmosMessage { const rawMessage = tx.decodedTx.body.messages[idx]; return { @@ -239,7 +266,8 @@ export function wrapCosmosMsg( typeUrl: rawMessage.typeUrl, get decodedMsg() { delete this.decodedMsg; - return (this.decodedMsg = api.decodeMsg(rawMessage)); + // TODO, unsure how this will impact the decode + return (this.decodedMsg = decodeMsg(rawMessage, registry)); }, }, }; @@ -248,12 +276,12 @@ export function wrapCosmosMsg( function wrapMsg( block: CosmosBlock, txs: CosmosTransaction[], - api: CosmosClient, + registry: Registry, ): CosmosMessage[] { const msgs: CosmosMessage[] = []; for (const tx of txs) { for (let i = 0; i < tx.decodedTx.body.messages.length; i++) { - msgs.push(wrapCosmosMsg(block, tx, i, api)); + msgs.push(wrapCosmosMsg(block, tx, i, registry)); } } return msgs; @@ -280,7 +308,7 @@ export function wrapBlockBeginAndEndEvents( export function wrapEvent( block: CosmosBlock, txs: CosmosTransaction[], - api: CosmosClient, + registry: Registry, idxOffset: number, //use this offset to avoid clash with idx of begin block events ): CosmosEvent[] { const events: CosmosEvent[] = []; @@ -296,7 +324,7 @@ export function wrapEvent( for (const log of logs) { let msg: CosmosMessage; try { - msg = wrapCosmosMsg(block, tx, log.msg_index, api); + msg = wrapCosmosMsg(block, tx, log.msg_index, registry); } catch (e) { // Example where this can happen https://sei.explorers.guru/transaction/8D4CA68E917E15652E10CB960DE604AEEB1B183D6E94A85E9CD98403F15550B7 logger.warn( @@ -321,10 +349,15 @@ export function wrapEvent( } export async function fetchBlocksBatches( - api: CosmosClient, + registry: Registry, blockArray: number[], + api?: CosmosClient, ): Promise { - const blocks = await fetchCosmosBlocksArray(api, blockArray); + const blocks = await fetchCosmosBlocksArray( + getBlockByHeightByRpc, + blockArray, + api, + ); return blocks.map(([blockInfo, blockResults]) => { try { assert( @@ -332,7 +365,7 @@ export async function fetchBlocksBatches( `txInfos doesn't match up with block (${blockInfo.block.header.height}) transactions expected ${blockInfo.block.txs.length}, received: ${blockResults.results.length}`, ); - return new LazyBlockContent(blockInfo, blockResults, api); + return new LazyBlockContent(blockInfo, blockResults, registry); } catch (e) { logger.error( e, @@ -355,7 +388,7 @@ export class LazyBlockContent implements BlockContent { constructor( private _blockInfo: BlockResponse, private _results: BlockResultsResponse, - private _api: CosmosClient, + private _registry: Registry, private _kyve?: KyveApi, ) {} @@ -377,7 +410,11 @@ export class LazyBlockContent implements BlockContent { get messages() { if (!this._wrappedMessage) { - this._wrappedMessage = wrapMsg(this.block, this.transactions, this._api); + this._wrappedMessage = wrapMsg( + this.block, + this.transactions, + this._registry, + ); } return this._wrappedMessage; } @@ -388,10 +425,15 @@ export class LazyBlockContent implements BlockContent { ? this._kyve.wrapEvent( this.block, this.transactions, - this._api, + this._registry, this._eventIdx, ) - : wrapEvent(this.block, this.transactions, this._api, this._eventIdx); + : wrapEvent( + this.block, + this.transactions, + this._registry, + this._eventIdx, + ); this._eventIdx += this._wrappedEvent.length; } return this._wrappedEvent; diff --git a/packages/node/src/utils/kyve/kyve.spec.ts b/packages/node/src/utils/kyve/kyve.spec.ts index b625d984f..34fdb3824 100644 --- a/packages/node/src/utils/kyve/kyve.spec.ts +++ b/packages/node/src/utils/kyve/kyve.spec.ts @@ -120,9 +120,9 @@ describe('KyveApi', () => { rpcLazyBlockContent = new LazyBlockContent( tendermintBlockInfo, tendermintBlockResult, - api, + registry, ); - kyveLazyBlockContent = new LazyBlockContent(bi, br, api, kyveApi); + kyveLazyBlockContent = new LazyBlockContent(bi, br, registry, kyveApi); }); it('wrapTransaction', () => { // note: kyve log is undefined diff --git a/packages/node/src/utils/kyve/kyve.ts b/packages/node/src/utils/kyve/kyve.ts index 67eef1ae9..bae62dd13 100644 --- a/packages/node/src/utils/kyve/kyve.ts +++ b/packages/node/src/utils/kyve/kyve.ts @@ -3,30 +3,35 @@ import assert from 'assert'; import { JsonRpcSuccessResponse } from '@cosmjs/json-rpc'; +import { Registry } from '@cosmjs/proto-signing'; import { adaptor37 } from '@cosmjs/tendermint-rpc/build/tendermint37/adaptor'; import { BlockResponse, BlockResultsResponse, } from '@cosmjs/tendermint-rpc/build/tendermint37/responses'; // Currently these types are not exported +import { StorageReceipt } from '@kyvejs/protocol'; import { Gzip } from '@kyvejs/protocol/dist/src/reactors/compression/Gzip'; import KyveSDK, { KyveLCDClientType } from '@kyvejs/sdk'; import { SupportedChains } from '@kyvejs/sdk/src/constants'; +import { PoolResponse } from '@kyvejs/types/lcd/kyve/query/v1beta1/pools'; import { getLogger } from '@subql/node-core'; import { CosmosBlock, CosmosEvent, CosmosTransaction, } from '@subql/types-cosmos'; -import { CosmosClient } from '../../indexer/api.service'; +import axios, { AxiosRequestConfig } from 'axios'; import { BlockContent } from '../../indexer/types'; import { LazyBlockContent, wrapCosmosMsg } from '../cosmos'; import { BundleDetails } from './kyveTypes'; -import { StorageRetriever } from './storageRetriever'; const BUNDLE_TIMEOUT = 10000; //ms +const RADIX = 10; -const logger = getLogger('kyve-fetch'); +const parseIntRadix = (value: string) => parseInt(value, RADIX); + +const logger = getLogger('kyve'); interface UnZippedKyveBlockReponse { value: { block: any; block_results: any }; @@ -53,7 +58,6 @@ export class KyveApi { async init(): Promise { this.currentBundleId = 0; await this.setPoolId(); - logger.info('kyve-api init'); } private async getAllPools() { return this.lcdClient.kyve.query.v1beta1.pools(); @@ -61,22 +65,27 @@ export class KyveApi { private async setPoolId(): Promise { const pools = await this.getAllPools(); - const pool = pools.pools.find( - (p) => JSON.parse(p.data.config).network === this.chainId, - ); + + let pool: PoolResponse; + for (const p of pools.pools) { + try { + const config = JSON.parse(p.data.config); + if (config.network === this.chainId) { + pool = p as unknown as PoolResponse; + break; + } + } catch (error) { + throw new Error( + `Error parsing JSON for pool with id ${p.id}:, ${error}`, + ); + } + } + if (!pool) { throw new Error(`${this.chainId} is not available on Kyve network`); } - this.poolId = pool.id; - } - private async retrieveBundleData( - storageId: string, - ): Promise<{ storageId: string; storageData: any }> { - return new StorageRetriever(this.storageUrl).retrieveBundle( - storageId, - BUNDLE_TIMEOUT, - ); + this.poolId = pool.id; } private async unzipStorageData( @@ -84,7 +93,7 @@ export class KyveApi { storageData: any, ): Promise { const g = new Gzip(); - if (parseInt(compressionId) === 0) { + if (parseIntRadix(compressionId) === 0) { throw new Error('No Compression'); } @@ -96,7 +105,7 @@ export class KyveApi { private decodeBlock(block: JsonRpcSuccessResponse): BlockResponse { return this.respAdaptor.decodeBlock({ - id: 10, // todo + id: 1, jsonrpc: '2.0', result: block, }); @@ -106,7 +115,7 @@ export class KyveApi { blockResult: JsonRpcSuccessResponse, ): BlockResultsResponse { return this.respAdaptor.decodeBlockResults({ - id: 10, + id: 1, jsonrpc: '2.0', result: blockResult, }); @@ -123,7 +132,7 @@ export class KyveApi { private async getLatestBundleId(): Promise { return ( - parseInt( + parseIntRadix( ( await this.lcdClient.kyve.query.v1beta1.finalizedBundles({ pool_id: this.poolId, @@ -148,8 +157,8 @@ export class KyveApi { const mid = Math.floor((low + high) / 2); const midBundle = await this.getBundleById(mid); - const fromKey = parseInt(midBundle.from_key); - const toKey = parseInt(midBundle.to_key); + const fromKey = parseIntRadix(midBundle.from_key); + const toKey = parseIntRadix(midBundle.to_key); if (height >= fromKey && height <= toKey) { startBundleId = mid; @@ -173,9 +182,10 @@ export class KyveApi { } private async validateCache(height: number, bundleDetails: BundleDetails) { - if (!this.cachedBundle || parseInt(bundleDetails.to_key) > height) { + if (!this.cachedBundle || parseIntRadix(bundleDetails.to_key) > height) { this.cachedBundle = await this.retrieveBundleData( bundleDetails.storage_id, + BUNDLE_TIMEOUT, ); this.cachedBlocks = await this.unzipStorageData( @@ -190,10 +200,8 @@ export class KyveApi { ): Promise<[BlockResponse, BlockResultsResponse]> { const bundleId = await this.getBundleId(height); const bundleDetails = await this.getBundleById(bundleId); - console.log('fetching from kyve'); await this.validateCache(height, bundleDetails); - console.log('fetched from kyve'); const blockData = this.findBlockByHeight(height); @@ -206,7 +214,7 @@ export class KyveApi { wrapEvent( block: CosmosBlock, txs: CosmosTransaction[], - api: CosmosClient, + registry: Registry, idxOffset: number, //use this offset to avoid clash with idx of begin block events ): CosmosEvent[] { const events: CosmosEvent[] = []; @@ -221,7 +229,7 @@ export class KyveApi { } if (msgIndex >= 0) { - const msg = wrapCosmosMsg(block, tx, msgIndex, api); + const msg = wrapCosmosMsg(block, tx, msgIndex, registry); const cosmosEvent: CosmosEvent = { idx: idxOffset++, msg, @@ -240,14 +248,29 @@ export class KyveApi { private async fetchBlocksArray( blockArray: number[], ): Promise<[BlockResponse, BlockResultsResponse][]> { - logger.info('using kyve blocks'); return Promise.all( blockArray.map(async (height) => this.getBlockByHeight(height)), ); } + private async retrieveBundleData( + storageId: string, + timeout: number, + ): Promise { + const axiosConfig: AxiosRequestConfig = { + method: 'get', + url: `/${storageId}`, + baseURL: this.storageUrl, + responseType: 'arraybuffer', + timeout, + }; + const { data: storageData } = await axios(axiosConfig); + + return { storageId, storageData }; + } + async fetchBlocksBatches( - api: CosmosClient, + registry: Registry, blockArray: number[], ): Promise { const blocks = await this.fetchBlocksArray(blockArray); @@ -258,7 +281,7 @@ export class KyveApi { `txInfos doesn't match up with block (${blockInfo.block.header.height}) transactions expected ${blockInfo.block.txs.length}, received: ${blockResults.results.length}`, ); - return new LazyBlockContent(blockInfo, blockResults, api, this); + return new LazyBlockContent(blockInfo, blockResults, registry, this); } catch (e) { logger.error( e, diff --git a/packages/node/src/utils/kyve/kyveConnection.ts b/packages/node/src/utils/kyve/kyveConnection.ts new file mode 100644 index 000000000..b0247b2a0 --- /dev/null +++ b/packages/node/src/utils/kyve/kyveConnection.ts @@ -0,0 +1,87 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import { Registry } from '@cosmjs/proto-signing'; +import { + ApiConnectionError, + ApiErrorType, + getLogger, + IApiConnectionSpecific, + NetworkMetadataPayload, +} from '@subql/node-core'; +import { FetchFunc } from '../../indexer/cosmosClient.connection'; +import { BlockContent } from '../../indexer/types'; +import { KyveApi } from './kyve'; + +const logger = getLogger('kyve-API'); + +export class KyveConnection + implements IApiConnectionSpecific +{ + readonly networkMeta: NetworkMetadataPayload; // this is not needed + unsafeApi: any; // this isnt needed + private registry: Registry; + + constructor( + private fetchBlocksBatches: FetchFunc, + private chainId: string, + ) // todo do i need registry ? + { + this.networkMeta = { + chain: this.chainId, + specName: undefined, + genesisHash: undefined, + }; + } + + static async create( + chainId: string, + kyveEndpoint: string, + registry: Registry, + ): Promise { + const kyveApi = new KyveApi(chainId, kyveEndpoint); + await kyveApi.init(); + + const connection = new KyveConnection( + kyveApi.fetchBlocksBatches.bind(kyveApi), + chainId, + ); + connection.setRegistry(registry); + + logger.info(`connected to Kyve via ${kyveEndpoint}`); + + return connection; + } + + async fetchBlocks(heights: number[]): Promise { + const blocks = await this.fetchBlocksBatches(this.registry, heights); + return blocks; + } + + handleError = KyveConnection.handleError; + + static handleError(error: Error): ApiConnectionError { + return new ApiConnectionError( + 'KyveError', + error.message, + ApiErrorType.Default, + ); + } + + private setRegistry(registry: Registry): void { + this.registry = registry; + } + + // No safeAPi + safeApi(height: number): any { + return undefined; + } + + async apiConnect(): Promise { + return Promise.resolve(undefined); + } + + async apiDisconnect(): Promise { + return Promise.resolve(undefined); + } +} diff --git a/packages/node/src/utils/kyve/storageRetriever.ts b/packages/node/src/utils/kyve/storageRetriever.ts deleted file mode 100644 index 775a6b136..000000000 --- a/packages/node/src/utils/kyve/storageRetriever.ts +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { StorageReceipt } from '@kyvejs/protocol'; -import axios, { AxiosRequestConfig } from 'axios'; - -interface IStorageRetriever { - retrieveBundle(storageId: string, timeout: number): Promise; -} - -export class StorageRetriever implements IStorageRetriever { - private readonly storageUrl: string; - constructor(url: string) { - this.storageUrl = url; - } - async retrieveBundle( - storageId: string, - timeout: number, - ): Promise { - const axiosConfig: AxiosRequestConfig = { - method: 'get', - url: `/${storageId}`, - baseURL: this.storageUrl, - responseType: 'arraybuffer', - timeout, - }; - const { data: storageData } = await axios(axiosConfig); - - return { storageId, storageData }; - } -}