Skip to content

Commit

Permalink
update bundle regex, tidy up based on review
Browse files Browse the repository at this point in the history
  • Loading branch information
bz888 committed Apr 21, 2024
1 parent 14b2b5a commit 6fbf6b0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 32 deletions.
34 changes: 8 additions & 26 deletions packages/node/src/configure/SubqueryProject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import fs from 'fs';
import os from 'os';
import path from 'path';
import { Injectable } from '@nestjs/common';
import { LocalReader, validateSemver } from '@subql/common';
import { validateSemver } from '@subql/common';
import {
CosmosProjectNetworkConfig,
parseCosmosProjectManifest,
Expand All @@ -31,7 +28,8 @@ import {
import { buildSchemaFromString } from '@subql/utils';
import Cron from 'cron-converter';
import { GraphQLSchema } from 'graphql';
import { isTmpDir, processNetworkConfig } from '../utils/project';
import { KyveApi } from '../utils/kyve/kyve';
import { processNetworkConfig } from '../utils/project';

const { version: packageVersion } = require('../../package.json');

Expand Down Expand Up @@ -114,26 +112,6 @@ export class SubqueryProject implements ISubqueryProject {
}
}

async function getFileCacheDir(
reader: Reader,
projectRoot: string,
chainId: string,
): Promise<string> {
if (isTmpDir(projectRoot)) return projectRoot;
if (reader instanceof LocalReader) {
const tmpDir = path.join(os.tmpdir(), `kyveTmpFileCache_${chainId}`);
try {
await fs.promises.mkdir(tmpDir);
} catch (e) {
if (e.code === 'EEXIST') {
return tmpDir;
}
}
return tmpDir;
}
return projectRoot;
}

type SUPPORT_MANIFEST = ProjectManifestV1_0_0Impl;

async function loadProjectFromManifestBase(
Expand Down Expand Up @@ -192,7 +170,11 @@ async function loadProjectFromManifestBase(
),
);

const fileCacheDir = await getFileCacheDir(reader, root, network.chainId);
const fileCacheDir = await KyveApi.getFileCacheDir(
reader,
root,
network.chainId,
);

return new SubqueryProject(
reader.root ? reader.root : path, //TODO, need to method to get project_id
Expand Down
16 changes: 16 additions & 0 deletions packages/node/src/utils/kyve/kyve.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,22 @@ describe('KyveApi', () => {

expect(removeFiles.map((r) => r.id).sort()).toEqual(['0', '1'].sort());
});
it('ensure file bundle id regex is correct', () => {
const files = [
'bundle_2_0.json',
'bundle_2_1.json',
'bundle_2_2.json',
'bundle_2_3.json',
'bundle_5_0.json',
'bundle_4_0.json',
'bundle_3_0.json',
'bundle_1_0.json',
];

expect(files.filter((f) => (kyveApi as any).isBundleFile(f)).length).toBe(
4,
);
});
describe('able to wrap kyveBlock', () => {
let rpcLazyBlockContent: LazyBlockContent;
let kyveLazyBlockContent: LazyBlockContent;
Expand Down
40 changes: 34 additions & 6 deletions packages/node/src/utils/kyve/kyve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import assert from 'assert';
import fs from 'fs';
import os from 'os';
import path from 'path';
import * as zlib from 'zlib';
import { JsonRpcSuccessResponse } from '@cosmjs/json-rpc';
Expand All @@ -16,7 +17,9 @@ import {
import KyveSDK, { KyveLCDClientType } from '@kyvejs/sdk';
import { SupportedChains } from '@kyvejs/sdk/src/constants'; // Currently these types are not exported
import { QueryPoolsResponse } from '@kyvejs/types/lcd/kyve/query/v1beta1/pools';
import { LocalReader } from '@subql/common';
import { delay, getLogger, IBlock } from '@subql/node-core';
import { Reader } from '@subql/types-core';
import axios, { AxiosResponse } from 'axios';
import { remove } from 'lodash';
import { BlockContent } from '../../indexer/types';
Expand All @@ -27,6 +30,8 @@ import { BundleDetails } from './kyveTypes';
const BUNDLE_TIMEOUT = 10000; //ms
const POLL_TIMER = 3; // sec
const MAX_COMPRESSION_BYTE_SIZE = 2 * 10 ** 9;
const BUNDLE_FILE_ID_REG = (poolId: string) =>
new RegExp(`^bundle_${poolId}_\\d+\\.json$`);

const parseDecimal = (value: string) => parseInt(value, 10);

Expand Down Expand Up @@ -112,7 +117,7 @@ export class KyveApi {
}

private getBundleFilePath(id: string): string {
return path.join(this.tmpCacheDir, `bundle_${id}.json`);
return path.join(this.tmpCacheDir, `bundle_${this.poolId}_${id}.json`);
}

private async getBundleById(bundleId: number): Promise<BundleDetails> {
Expand Down Expand Up @@ -276,18 +281,18 @@ export class KyveApi {
}

private isBundleFile(filename: string): boolean {
return /^bundle_\d+\.json$/.test(filename);
return BUNDLE_FILE_ID_REG(this.poolId).test(filename);
}

private async getExisitngBundlesFromCacheDirectory(
private async getExistingBundlesFromCacheDirectory(
tmpDir: string,
): Promise<BundleDetails[]> {
const bundles: BundleDetails[] = [];
const files = await fs.promises.readdir(tmpDir);

for (const file of files) {
if (this.isBundleFile(file)) {
const id = parseDecimal(file.match(/^bundle_(\d+)\.json$/)[1]);
const id = parseDecimal(file.match(BUNDLE_FILE_ID_REG(this.poolId))[1]);
bundles.push(await this.getBundleById(id));
}
}
Expand All @@ -301,7 +306,7 @@ export class KyveApi {
bufferSize: number,
): Promise<BundleDetails[]> {
if (!cachedBundles.length) {
return this.getExisitngBundlesFromCacheDirectory(this.tmpCacheDir);
return this.getExistingBundlesFromCacheDirectory(this.tmpCacheDir);
}

const currentBundle = this.getBundleFromCache(height);
Expand Down Expand Up @@ -362,7 +367,9 @@ export class KyveApi {
try {
kyveBlockResult.results.forEach((b) => {
// log is readonly hence needing to cast it
(b.log as any) = JSON.stringify(this.reconstructLogs(kyveBlockResult));
(b.log as string) = JSON.stringify(
this.reconstructLogs(kyveBlockResult),
);
});
} catch (e) {
throw new Error(`Failed to inject kyveBlock, ${e}`);
Expand Down Expand Up @@ -459,4 +466,25 @@ export class KyveApi {
}
});
}

static async getFileCacheDir(
reader: Reader,
projectRoot: string,
chainId: string,
): Promise<string> {
if (isTmpDir(projectRoot)) return projectRoot;
if (reader instanceof LocalReader) {
const tmpDir = path.join(os.tmpdir(), `kyveTmpFileCache_${chainId}`);
try {
await fs.promises.mkdir(tmpDir);
} catch (e) {
if (e.code === 'EEXIST') {
return tmpDir;
}
throw e;
}
return tmpDir;
}
return projectRoot;
}
}

0 comments on commit 6fbf6b0

Please sign in to comment.