Skip to content

Commit

Permalink
add kyveConnection class, refactor cosmos wrappers to use registry ov…
Browse files Browse the repository at this point in the history
…er api. update based on review
  • Loading branch information
bz888 committed Apr 4, 2024
1 parent 963d724 commit fc34a0c
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 143 deletions.
10 changes: 5 additions & 5 deletions packages/node/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@
import { IConfig, NodeConfig } from '@subql/node-core';

export interface ICosmosConfig extends IConfig {
kyve: string;
kyveEndpoint: string;
}

export class CosmosNodeConfig extends NodeConfig<ICosmosConfig> {
/**
* This is a wrapper around the core NodeConfig to get additional properties that are provided through args or node runner options
* NOTE: This isn't injected anywhere so you need to wrap the injected node config
* NOTE: This isn't injected anywhere, so you need to wrap the injected node config
*
* @example
* constructor(
* nodeConfig: NodeConfig,
* ) {
* this.nodeConfig = new EthereumNodeConfig(nodeConfig);
* this.nodeConfig = new CosmosNodeConfig(nodeConfig);
* }
* */
constructor(config: NodeConfig) {
// Rebuild with internal config
super((config as any)._config, (config as any)._isTest);
}

get kyve(): string {
return this._config.kyve;
get kyveEndpoint(): string {
return this._config.kyveEndpoint;
}
}
60 changes: 21 additions & 39 deletions packages/node/src/indexer/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { CosmosNodeConfig } from '../configure/NodeConfig';
import { SubqueryProject } from '../configure/SubqueryProject';
import * as CosmosUtil from '../utils/cosmos';
import { KyveApi } from '../utils/kyve/kyve';
import { KyveConnection } from '../utils/kyve/kyveConnection';
import { CosmosClientConnection } from './cosmosClient.connection';
import { BlockContent } from './types';

Expand All @@ -49,7 +50,6 @@ export class ApiService
implements OnApplicationShutdown
{
private fetchBlocksBatches = CosmosUtil.fetchBlocksBatches;
private kyve: KyveApi;
private nodeConfig: CosmosNodeConfig;
registry: Registry;

Expand Down Expand Up @@ -93,27 +93,28 @@ export class ApiService

this.registry = await this.buildRegistry();

if (this.nodeConfig.kyve) {
this.kyve = new KyveApi(network.chainId, this.nodeConfig.kyve);
await this.kyve.init();

this.fetchBlocksBatches = this.kyve.fetchBlocksBatches.bind(this.kyve);
if (this.nodeConfig.kyveEndpoint) {
await KyveConnection.create(
network.chainId,
this.nodeConfig.kyveEndpoint,
this.registry,
);
} else {
await this.createConnections(
network,
(endpoint) =>
CosmosClientConnection.create(
endpoint,
this.fetchBlocksBatches,
this.registry,
),
(connection: CosmosClientConnection) => {
const api = connection.unsafeApi;
return api.getChainId();
},
);
}

await this.createConnections(
network,
(endpoint) =>
CosmosClientConnection.create(
endpoint,
this.fetchBlocksBatches,
this.registry,
),
(connection: CosmosClientConnection) => {
const api = connection.unsafeApi;
return api.getChainId();
},
);

return this;
}

Expand Down Expand Up @@ -185,25 +186,6 @@ export class CosmosClient extends CosmWasmClient {
return this.tendermintClient.blockResults(height);
}

decodeMsg<T = unknown>(msg: DecodeObject): T {
try {
const decodedMsg = this.registry.decode(msg);
if (
[
'/cosmwasm.wasm.v1.MsgExecuteContract',
'/cosmwasm.wasm.v1.MsgMigrateContract',
'/cosmwasm.wasm.v1.MsgInstantiateContract',
].includes(msg.typeUrl)
) {
decodedMsg.msg = JSON.parse(new TextDecoder().decode(decodedMsg.msg));
}
return decodedMsg;
} catch (e) {
logger.error(e, 'Failed to decode message');
throw e;
}
}

static handleError(e: Error): Error {
const formatted_error: Error = e;
try {
Expand Down
19 changes: 5 additions & 14 deletions packages/node/src/indexer/cosmosClient.connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ const RETRY_DELAY = 2_500;

const logger = getLogger('cosmos-client-connection');

type FetchFunc = (
api: CosmosClient,
export type FetchFunc = (
registry: Registry,
batch: number[],
kyve: KyveApi | undefined,
api?: CosmosClient,
) => Promise<BlockContent[]>;

export class CosmosClientConnection
Expand Down Expand Up @@ -54,7 +54,6 @@ export class CosmosClientConnection
endpoint: string,
fetchBlocksBatches: FetchFunc,
registry: Registry,
kyve?: KyveApi,
): Promise<CosmosClientConnection> {
const httpEndpoint: HttpEndpoint = {
url: endpoint,
Expand Down Expand Up @@ -85,21 +84,13 @@ export class CosmosClientConnection

logger.info(`connected to ${endpoint}`);

if (kyve) {
connection.setKyveApi(kyve);
}

return connection;
}

safeApi(height: number): CosmosSafeClient {
return new CosmosSafeClient(this.tmClient, height);
}

private setKyveApi(kyveApi: KyveApi): void {
this.kyve = kyveApi;
}

private setTmClient(tmClient: Tendermint37Client): void {
this.tmClient = tmClient;
}
Expand All @@ -120,9 +111,9 @@ export class CosmosClientConnection

async fetchBlocks(heights: number[]): Promise<BlockContent[]> {
const blocks = await this.fetchBlocksBatches(
this.unsafeApi,
this.registry,
heights,
this.kyve,
this.unsafeApi,
);
return blocks;
}
Expand Down
6 changes: 3 additions & 3 deletions packages/node/src/utils/cosmos.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
} from 'cosmjs-types/cosmwasm/wasm/v1/tx';
import { CosmosClient } from '../indexer/api.service';
import { HttpClient } from '../indexer/rpc-clients';
import { filterMessageData, wrapEvent } from './cosmos';
import { decodeMsg, filterMessageData, wrapEvent } from './cosmos';

const ENDPOINT = 'https://rpc-archive.junonetwork.io/';
const CHAINID = 'juno-1';
Expand Down Expand Up @@ -115,7 +115,7 @@ describe('CosmosUtils', () => {
msg: {
typeUrl: decodedTx.body.messages[0].typeUrl,
get decodedMsg() {
return api.decodeMsg<any>(decodedTx.body.messages[0]);
return decodeMsg<any>(decodedTx.body.messages[0], registry);
},
},
};
Expand Down Expand Up @@ -159,7 +159,7 @@ describe('CosmosUtils', () => {
hash: '',
decodedTx: {} as DecodedTxRaw,
};
const events = wrapEvent({} as CosmosBlock, [tx], api, 0);
const events = wrapEvent({} as CosmosBlock, [tx], api.registry, 0);
expect(events.length).toEqual(0);
});

Expand Down
80 changes: 61 additions & 19 deletions packages/node/src/utils/cosmos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { TextDecoder } from 'util';
import { sha256 } from '@cosmjs/crypto';
import { toHex } from '@cosmjs/encoding';
import { decodeTxRaw } from '@cosmjs/proto-signing';
import { DecodeObject, decodeTxRaw, Registry } from '@cosmjs/proto-signing';
import { fromTendermintEvent } from '@cosmjs/stargate';
import { Log, parseRawLog } from '@cosmjs/stargate/build/logs';
import {
Expand Down Expand Up @@ -32,6 +33,28 @@ import { KyveApi } from './kyve/kyve';

const logger = getLogger('fetch');

export function decodeMsg<T = unknown>(
msg: DecodeObject,
registry: Registry,
): T {
try {
const decodedMsg = registry.decode(msg);
if (
[
'/cosmwasm.wasm.v1.MsgExecuteContract',
'/cosmwasm.wasm.v1.MsgMigrateContract',
'/cosmwasm.wasm.v1.MsgInstantiateContract',
].includes(msg.typeUrl)
) {
decodedMsg.msg = JSON.parse(new TextDecoder().decode(decodedMsg.msg));
}
return decodedMsg;
} catch (e) {
logger.error(e, 'Failed to decode message');
throw e;
}
}

export function filterBlock(
data: CosmosBlock,
filter?: CosmosBlockFilter,
Expand Down Expand Up @@ -176,9 +199,9 @@ export function filterEvents(
return filteredEvents;
}

async function getBlockByHeight(
api: CosmosClient,
async function getBlockByHeightByRpc(
height: number,
api: CosmosClient,
): Promise<[BlockResponse, BlockResultsResponse]> {
return Promise.all([
api.blockInfo(height).catch((e) => {
Expand All @@ -190,12 +213,16 @@ async function getBlockByHeight(
]);
}

export async function fetchCosmosBlocksArray(
api: CosmosClient,
export async function fetchCosmosBlocksArray<T>(
getBlockByHeight: (
height: number,
api: T,
) => Promise<[BlockResponse, BlockResultsResponse]>,
blockArray: number[],
api: T,
): Promise<[BlockResponse, BlockResultsResponse][]> {
return Promise.all(
blockArray.map(async (height) => getBlockByHeight(api, height)),
blockArray.map(async (height) => getBlockByHeight(height, api)),
);
}

Expand Down Expand Up @@ -228,7 +255,7 @@ export function wrapCosmosMsg(
block: CosmosBlock,
tx: CosmosTransaction,
idx: number,
api: CosmosClient,
registry: Registry,
): CosmosMessage {
const rawMessage = tx.decodedTx.body.messages[idx];
return {
Expand All @@ -239,7 +266,8 @@ export function wrapCosmosMsg(
typeUrl: rawMessage.typeUrl,
get decodedMsg() {
delete this.decodedMsg;
return (this.decodedMsg = api.decodeMsg(rawMessage));
// TODO, unsure how this will impact the decode
return (this.decodedMsg = decodeMsg(rawMessage, registry));
},
},
};
Expand All @@ -248,12 +276,12 @@ export function wrapCosmosMsg(
function wrapMsg(
block: CosmosBlock,
txs: CosmosTransaction[],
api: CosmosClient,
registry: Registry,
): CosmosMessage[] {
const msgs: CosmosMessage[] = [];
for (const tx of txs) {
for (let i = 0; i < tx.decodedTx.body.messages.length; i++) {
msgs.push(wrapCosmosMsg(block, tx, i, api));
msgs.push(wrapCosmosMsg(block, tx, i, registry));
}
}
return msgs;
Expand All @@ -280,7 +308,7 @@ export function wrapBlockBeginAndEndEvents(
export function wrapEvent(
block: CosmosBlock,
txs: CosmosTransaction[],
api: CosmosClient,
registry: Registry,
idxOffset: number, //use this offset to avoid clash with idx of begin block events
): CosmosEvent[] {
const events: CosmosEvent[] = [];
Expand All @@ -296,7 +324,7 @@ export function wrapEvent(
for (const log of logs) {
let msg: CosmosMessage;
try {
msg = wrapCosmosMsg(block, tx, log.msg_index, api);
msg = wrapCosmosMsg(block, tx, log.msg_index, registry);
} catch (e) {
// Example where this can happen https://sei.explorers.guru/transaction/8D4CA68E917E15652E10CB960DE604AEEB1B183D6E94A85E9CD98403F15550B7
logger.warn(
Expand All @@ -321,18 +349,23 @@ export function wrapEvent(
}

export async function fetchBlocksBatches(
api: CosmosClient,
registry: Registry,
blockArray: number[],
api?: CosmosClient,
): Promise<BlockContent[]> {
const blocks = await fetchCosmosBlocksArray(api, blockArray);
const blocks = await fetchCosmosBlocksArray(
getBlockByHeightByRpc,
blockArray,
api,
);
return blocks.map(([blockInfo, blockResults]) => {
try {
assert(
blockResults.results.length === blockInfo.block.txs.length,
`txInfos doesn't match up with block (${blockInfo.block.header.height}) transactions expected ${blockInfo.block.txs.length}, received: ${blockResults.results.length}`,
);

return new LazyBlockContent(blockInfo, blockResults, api);
return new LazyBlockContent(blockInfo, blockResults, registry);
} catch (e) {
logger.error(
e,
Expand All @@ -355,7 +388,7 @@ export class LazyBlockContent implements BlockContent {
constructor(
private _blockInfo: BlockResponse,
private _results: BlockResultsResponse,
private _api: CosmosClient,
private _registry: Registry,
private _kyve?: KyveApi,
) {}

Expand All @@ -377,7 +410,11 @@ export class LazyBlockContent implements BlockContent {

get messages() {
if (!this._wrappedMessage) {
this._wrappedMessage = wrapMsg(this.block, this.transactions, this._api);
this._wrappedMessage = wrapMsg(
this.block,
this.transactions,
this._registry,
);
}
return this._wrappedMessage;
}
Expand All @@ -388,10 +425,15 @@ export class LazyBlockContent implements BlockContent {
? this._kyve.wrapEvent(
this.block,
this.transactions,
this._api,
this._registry,
this._eventIdx,
)
: wrapEvent(this.block, this.transactions, this._api, this._eventIdx);
: wrapEvent(
this.block,
this.transactions,
this._registry,
this._eventIdx,
);
this._eventIdx += this._wrappedEvent.length;
}
return this._wrappedEvent;
Expand Down
Loading

0 comments on commit fc34a0c

Please sign in to comment.