Skip to content

Commit

Permalink
use nodeConfig for kyve cli integration, update kyve api, add storage…
Browse files Browse the repository at this point in the history
… retriever
  • Loading branch information
bz888 committed Apr 2, 2024
1 parent 7b827a5 commit 963d724
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 105 deletions.
30 changes: 30 additions & 0 deletions packages/node/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { IConfig, NodeConfig } from '@subql/node-core';

export interface ICosmosConfig extends IConfig {
kyve: string;
}

export class CosmosNodeConfig extends NodeConfig<ICosmosConfig> {
/**
* This is a wrapper around the core NodeConfig to get additional properties that are provided through args or node runner options
* NOTE: This isn't injected anywhere so you need to wrap the injected node config
*
* @example
* constructor(
* nodeConfig: NodeConfig,
* ) {
* this.nodeConfig = new EthereumNodeConfig(nodeConfig);
* }
* */
constructor(config: NodeConfig) {
// Rebuild with internal config
super((config as any)._config, (config as any)._isTest);
}

get kyve(): string {
return this._config.kyve;
}
}
22 changes: 11 additions & 11 deletions packages/node/src/indexer/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
getLogger,
ConnectionPoolService,
ApiService as BaseApiService,
NodeConfig,
} from '@subql/node-core';
import { CosmWasmSafeClient } from '@subql/types-cosmos/interfaces';
import {
Expand All @@ -33,10 +34,10 @@ import {
MsgStoreCode,
MsgUpdateAdmin,
} from 'cosmjs-types/cosmwasm/wasm/v1/tx';
import { CosmosNodeConfig } from '../configure/NodeConfig';
import { SubqueryProject } from '../configure/SubqueryProject';
import * as CosmosUtil from '../utils/cosmos';
import { KyveApi } from '../utils/kyve';
import { yargsOptions } from '../yargs';
import { KyveApi } from '../utils/kyve/kyve';
import { CosmosClientConnection } from './cosmosClient.connection';
import { BlockContent } from './types';

Expand All @@ -48,15 +49,18 @@ export class ApiService
implements OnApplicationShutdown
{
private fetchBlocksBatches = CosmosUtil.fetchBlocksBatches;
private kyveClient: KyveApi;
private kyve: KyveApi;
private nodeConfig: CosmosNodeConfig;
registry: Registry;

constructor(
@Inject('ISubqueryProject') private project: SubqueryProject,
connectionPoolService: ConnectionPoolService<CosmosClientConnection>,
eventEmitter: EventEmitter2,
nodeConfig: NodeConfig,
) {
super(connectionPoolService, eventEmitter);
this.nodeConfig = new CosmosNodeConfig(nodeConfig);
}

private async buildRegistry(): Promise<Registry> {
Expand Down Expand Up @@ -89,10 +93,11 @@ export class ApiService

this.registry = await this.buildRegistry();

const { argv } = yargsOptions;
if (this.nodeConfig.kyve) {
this.kyve = new KyveApi(network.chainId, this.nodeConfig.kyve);
await this.kyve.init();

if (argv.kyve) {
this.kyveClient = new KyveApi(network.chainId);
this.fetchBlocksBatches = this.kyve.fetchBlocksBatches.bind(this.kyve);
}

await this.createConnections(
Expand All @@ -102,7 +107,6 @@ export class ApiService
endpoint,
this.fetchBlocksBatches,
this.registry,
this.kyveApi,
),
(connection: CosmosClientConnection) => {
const api = connection.unsafeApi;
Expand All @@ -113,10 +117,6 @@ export class ApiService
return this;
}

get kyveApi(): KyveApi {
return this.kyveClient;
}

get api(): CosmosClient {
return this.unsafeApi;
}
Expand Down
16 changes: 8 additions & 8 deletions packages/node/src/indexer/cosmosClient.connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
NetworkMetadataPayload,
} from '@subql/node-core';
import { getLogger } from '@subql/node-core/dist';
import { KyveApi } from '../utils/kyve';
import { KyveApi } from '../utils/kyve/kyve';
import { CosmosClient, CosmosSafeClient } from './api.service';
import { HttpClient, WebsocketClient } from './rpc-clients';
import { BlockContent } from './types';
Expand All @@ -26,7 +26,7 @@ const logger = getLogger('cosmos-client-connection');
type FetchFunc = (
api: CosmosClient,
batch: number[],
kyveApi: KyveApi | undefined,
kyve: KyveApi | undefined,
) => Promise<BlockContent[]>;

export class CosmosClientConnection
Expand All @@ -35,7 +35,7 @@ export class CosmosClientConnection
{
private tmClient: Tendermint37Client;
private registry: Registry;
private kyveApi?: KyveApi;
private kyve?: KyveApi;
readonly networkMeta: NetworkMetadataPayload;

constructor(
Expand All @@ -54,7 +54,7 @@ export class CosmosClientConnection
endpoint: string,
fetchBlocksBatches: FetchFunc,
registry: Registry,
kyveApi?: KyveApi,
kyve?: KyveApi,
): Promise<CosmosClientConnection> {
const httpEndpoint: HttpEndpoint = {
url: endpoint,
Expand Down Expand Up @@ -85,8 +85,8 @@ export class CosmosClientConnection

logger.info(`connected to ${endpoint}`);

if (kyveApi) {
connection.setKyveApi(kyveApi);
if (kyve) {
connection.setKyveApi(kyve);
}

return connection;
Expand All @@ -97,7 +97,7 @@ export class CosmosClientConnection
}

private setKyveApi(kyveApi: KyveApi): void {
this.kyveApi = kyveApi;
this.kyve = kyveApi;
}

private setTmClient(tmClient: Tendermint37Client): void {
Expand All @@ -122,7 +122,7 @@ export class CosmosClientConnection
const blocks = await this.fetchBlocksBatches(
this.unsafeApi,
heights,
this.kyveApi,
this.kyve,
);
return blocks;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/indexer/worker/worker-fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
WorkerConnectionPoolStateManager,
InMemoryCacheService,
WorkerInMemoryCacheService,
NodeConfig,
} from '@subql/node-core';
import { SubqueryProject } from '../../configure/SubqueryProject';
import { ApiService } from '../api.service';
Expand Down Expand Up @@ -38,11 +39,13 @@ import { WorkerUnfinalizedBlocksService } from './worker.unfinalizedBlocks.servi
project: SubqueryProject,
connectionPoolService: ConnectionPoolService<CosmosClientConnection>,
eventEmitter: EventEmitter2,
nodeConfig: NodeConfig,
) => {
const apiService = new ApiService(
project,
connectionPoolService,
eventEmitter,
nodeConfig,
);
await apiService.init();
return apiService;
Expand Down
1 change: 0 additions & 1 deletion packages/node/src/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export async function bootstrap(): Promise<void> {
process.exit(1);
}

console.log('argv at init', argv);
if (argv.unsafe) {
logger.warn(
'UNSAFE MODE IS ENABLED. This is not recommended for most projects and will not be supported by our hosted service',
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/subcommands/testing.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
ConnectionPoolStateManager,
DbModule,
InMemoryCacheService,
NodeConfig,
PoiService,
PoiSyncService,
StoreCacheService,
Expand Down Expand Up @@ -50,11 +51,13 @@ import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service';
project: SubqueryProject,
connectionPoolService: ConnectionPoolService<CosmosClientConnection>,
eventEmitter: EventEmitter2,
nodeConfig: NodeConfig,
) => {
const apiService = new ApiService(
project,
connectionPoolService,
eventEmitter,
nodeConfig,
);
await apiService.init();
return apiService;
Expand Down
56 changes: 8 additions & 48 deletions packages/node/src/utils/cosmos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { isObjectLike } from 'lodash';
import { isLong } from 'long';
import { CosmosClient } from '../indexer/api.service';
import { BlockContent } from '../indexer/types';
import { KyveApi } from './kyve';
import { KyveApi } from './kyve/kyve';

const logger = getLogger('fetch');

Expand Down Expand Up @@ -193,13 +193,9 @@ async function getBlockByHeight(
export async function fetchCosmosBlocksArray(
api: CosmosClient,
blockArray: number[],
kyve?: KyveApi,
): Promise<[BlockResponse, BlockResultsResponse][]> {
// todo this is where kyve introduction should be.
return Promise.all(
blockArray.map(async (height) =>
kyve ? kyve.getBlockByHeight(height) : getBlockByHeight(api, height),
),
blockArray.map(async (height) => getBlockByHeight(api, height)),
);
}

Expand Down Expand Up @@ -228,7 +224,7 @@ export function wrapTx(
}));
}

function wrapCosmosMsg(
export function wrapCosmosMsg(
block: CosmosBlock,
tx: CosmosTransaction,
idx: number,
Expand Down Expand Up @@ -280,40 +276,6 @@ export function wrapBlockBeginAndEndEvents(
},
);
}
// TODO this should be replacing the current implementation, but then, the rpc request should be source of truth.
export function kyveWrapEvent(
block: CosmosBlock,
txs: CosmosTransaction[],
api: CosmosClient,
idxOffset: number, //use this offset to avoid clash with idx of begin block events
): CosmosEvent[] {
const events: CosmosEvent[] = [];
for (const tx of txs) {
let msgIndex = -1;
for (const event of tx.tx.events) {
if (
event.type === 'message' &&
event.attributes.find((e) => e.key === 'action')
) {
msgIndex += 1;
}

if (msgIndex >= 0) {
const msg = wrapCosmosMsg(block, tx, msgIndex, api);
const cosmosEvent: CosmosEvent = {
idx: idxOffset++,
msg,
tx,
block,
log: undefined,
event,
};
events.push(cosmosEvent);
}
}
}
return events;
}

export function wrapEvent(
block: CosmosBlock,
Expand Down Expand Up @@ -361,17 +323,16 @@ export function wrapEvent(
export async function fetchBlocksBatches(
api: CosmosClient,
blockArray: number[],
kyveApi?: KyveApi,
): Promise<BlockContent[]> {
const blocks = await fetchCosmosBlocksArray(api, blockArray, kyveApi);
const blocks = await fetchCosmosBlocksArray(api, blockArray);
return blocks.map(([blockInfo, blockResults]) => {
try {
assert(
blockResults.results.length === blockInfo.block.txs.length,
`txInfos doesn't match up with block (${blockInfo.block.header.height}) transactions expected ${blockInfo.block.txs.length}, received: ${blockResults.results.length}`,
);

return new LazyBlockContent(blockInfo, blockResults, api, kyveApi);
return new LazyBlockContent(blockInfo, blockResults, api);
} catch (e) {
logger.error(
e,
Expand All @@ -395,7 +356,7 @@ export class LazyBlockContent implements BlockContent {
private _blockInfo: BlockResponse,
private _results: BlockResultsResponse,
private _api: CosmosClient,
private _kyveBlock?: KyveApi,
private _kyve?: KyveApi,
) {}

get block() {
Expand All @@ -422,10 +383,9 @@ export class LazyBlockContent implements BlockContent {
}

get events() {
console.log('using kyve', !!this._kyveBlock);
if (!this._wrappedEvent) {
this._wrappedEvent = this._kyveBlock
? kyveWrapEvent(
this._wrappedEvent = this._kyve
? this._kyve.wrapEvent(
this.block,
this.transactions,
this._api,
Expand Down
Loading

0 comments on commit 963d724

Please sign in to comment.