From 2d1d76b35967fa0c8ca0f8e144940c2e63e2afd3 Mon Sep 17 00:00:00 2001 From: bz888 Date: Mon, 22 Apr 2024 12:44:14 +0800 Subject: [PATCH] refactor cacheBundle to be object --- packages/node/src/utils/kyve/kyve.spec.ts | 61 ++++++++++--------- packages/node/src/utils/kyve/kyve.ts | 74 +++++++++++++---------- 2 files changed, 76 insertions(+), 59 deletions(-) diff --git a/packages/node/src/utils/kyve/kyve.spec.ts b/packages/node/src/utils/kyve/kyve.spec.ts index 9cc5f3c62..9e34ca7bf 100644 --- a/packages/node/src/utils/kyve/kyve.spec.ts +++ b/packages/node/src/utils/kyve/kyve.spec.ts @@ -111,7 +111,10 @@ describe('KyveApi', () => { readerSpy.mockRestore(); // reset cache - (kyveApi as any).cachedBundleDetails = []; + ((kyveApi as any).cachedBundleDetails as Record< + string, + Promise + >) = {}; }); afterAll(async () => { await promisify(rimraf)(tmpPath); @@ -250,7 +253,11 @@ describe('KyveApi', () => { expect(cachedBundles.length).toBe(3); expect(blocks.length).toBe(5); - for (const bundle of (kyveApi as any).cachedBundleDetails) { + const bundles = (await Promise.all( + Object.values((kyveApi as any).cachedBundleDetails), + )) as BundleDetails[]; + + for (const bundle of bundles) { const stats = await fs.promises.stat( (kyveApi as any).getBundleFilePath(bundle.id), ); @@ -277,14 +284,11 @@ describe('KyveApi', () => { expect(files).not.toContain('bundle_2_0.json'); }); it('Should increment bundleId when height exceeds cache', async () => { - const bundle = await (kyveApi as any).getBundleById(0); - (kyveApi as any).cachedBundleDetails.push(bundle); + (kyveApi as any).addToCachedBundle(0, (kyveApi as any).getBundleById(0)); jest.spyOn(kyveApi as any, 'getBundleData').mockResolvedValueOnce('{}'); await (kyveApi as any).updateCurrentBundleAndDetails(160); - expect( - (kyveApi as any).cachedBundleDetails.find((b) => b.id === '1'), - ).toBeDefined(); + expect((kyveApi as any).cachedBundleDetails['1']).toBeDefined(); }); it('compare block info', async () => { const height = 3901476; @@ -302,8 +306,9 @@ describe('KyveApi', () => { expect(poolId).toBe('2'); }); it('remove bundle.json if bundle fetch fails', async () => { - const bundleDetail = await (kyveApi as any).getBundleById(8); - (kyveApi as any).cachedBundleDetails = [bundleDetail]; + (kyveApi as any).cachedBundleDetails = { + '8': (kyveApi as any).getBundleById(8), + }; jest.spyOn(axios, 'isAxiosError').mockImplementationOnce(() => true); @@ -313,7 +318,8 @@ describe('KyveApi', () => { }); }); - await expect((kyveApi as any).getBundleData(bundleDetail)).rejects.toBe( + const bundleDetails = await (kyveApi as any).cachedBundleDetails['8']; + await expect((kyveApi as any).getBundleData(bundleDetails)).rejects.toBe( 'Failed to fetch', ); @@ -321,33 +327,32 @@ describe('KyveApi', () => { expect(files.length).toBe(0); }); it('ensure to remove logic', async () => { - const cachedBundleDetails = [ - { id: '0', from_key: '1', to_key: '150' }, - { id: '1', from_key: '151', to_key: '300' }, - { id: '2', from_key: '301', to_key: '500' }, - { id: '3', from_key: '501', to_key: '800' }, - ] as BundleDetails[]; - (kyveApi as any).cachedBundleDetails = cachedBundleDetails; + const mockCachedBundles: Record> = { + '0': (kyveApi as any).getBundleById(0), + '1': (kyveApi as any).getBundleById(1), + '2': (kyveApi as any).getBundleById(2), + '3': (kyveApi as any).getBundleById(3), + '4': (kyveApi as any).getBundleById(4), + }; + + (kyveApi as any).cachedBundleDetails = mockCachedBundles; const height = 650; const bufferSize = 300; const toRemoveBundles = await (kyveApi as any).getToRemoveBundles( - cachedBundleDetails, + mockCachedBundles, height, bufferSize, ); - expect(toRemoveBundles.sort()).toEqual( - [ - { id: '0', from_key: '1', to_key: '150' }, - { id: '1', from_key: '151', to_key: '300' }, - ].sort(), - ); + expect(toRemoveBundles.sort().map((b) => b.id)).toEqual(['0', '1'].sort()); }); it('Able to poll with simulated workers', async () => { - const bundleDetail = await (kyveApi as any).getBundleById(130265); - (kyveApi as any).cachedBundleDetails = [bundleDetail]; + const mockCacheDetails = { + '130265': (kyveApi as any).getBundleById(130265), + }; + (kyveApi as any).cachedBundleDetails = mockCacheDetails; const workerKyveApi = await KyveApi.create( 'archway-1', @@ -357,7 +362,7 @@ describe('KyveApi', () => { tmpPath, ); - (workerKyveApi as any).cachedBundleDetails = [bundleDetail]; + (workerKyveApi as any).cachedBundleDetails = mockCacheDetails; jest .spyOn(workerKyveApi, 'downloadAndProcessBundle') @@ -385,7 +390,7 @@ describe('KyveApi', () => { expect(pollSpy).toHaveBeenCalledTimes(1); const r = await kyveApi.readFromFile( - (kyveApi as any).getBundleFilePath(bundleDetail.id), + (kyveApi as any).getBundleFilePath('130265'), ); expect(r).toEqual(JSON.stringify(block_3856726)); diff --git a/packages/node/src/utils/kyve/kyve.ts b/packages/node/src/utils/kyve/kyve.ts index 7be9635b9..3ea5a36a6 100644 --- a/packages/node/src/utils/kyve/kyve.ts +++ b/packages/node/src/utils/kyve/kyve.ts @@ -46,7 +46,7 @@ interface KyveBundleData { } export class KyveApi { - private cachedBundleDetails: BundleDetails[] = []; + private cachedBundleDetails: Record> = {}; private constructor( private readonly storageUrl: string, @@ -149,9 +149,11 @@ export class KyveApi { private async getBundleId(height: number): Promise { const latestBundleId = await this.getLatestBundleId(); + const lowestCacheHeight = Object.keys(this.cachedBundleDetails); + let low = - this.cachedBundleDetails.length > 0 - ? Math.min(...this.cachedBundleDetails.map((b) => parseDecimal(b.id))) + lowestCacheHeight.length > 0 + ? Math.min(...lowestCacheHeight.map((id) => parseDecimal(id))) : -1; let high = latestBundleId; let startBundleId = -1; // Initialize to an invalid ID initially @@ -186,14 +188,23 @@ export class KyveApi { ); } - private addToCachedBundle(bundle: BundleDetails): void { - if (!this.cachedBundleDetails.find((b) => b.id === bundle.id)) { - this.cachedBundleDetails.push(bundle); + private addToCachedBundle( + bundleId: string, + bundlePromise: Promise, + ): void { + // eslint-disable-next-line @typescript-eslint/no-misused-promises + if (!this.cachedBundleDetails[bundleId]) { + this.cachedBundleDetails[bundleId] = bundlePromise; } } - private getBundleFromCache(height: number): BundleDetails | undefined { - return this.cachedBundleDetails.find( + private async getBundleFromCache( + height: number, + ): Promise | undefined { + const bundles: BundleDetails[] = await Promise.all( + Object.values(this.cachedBundleDetails), + ); + return bundles.find( (b) => parseDecimal(b.from_key) <= height && height <= parseDecimal(b.to_key), ); @@ -202,16 +213,16 @@ export class KyveApi { private async updateCurrentBundleAndDetails( height: number, ): Promise { - let bundle = this.getBundleFromCache(height); + let bundle = await this.getBundleFromCache(height); if (!bundle) { + const bundleIds = Object.keys(this.cachedBundleDetails); const bundleId = - this.cachedBundleDetails.length !== 0 - ? Math.max( - ...this.cachedBundleDetails.map((b) => parseDecimal(b.id)), - ) + 1 + bundleIds.length !== 0 + ? Math.max(...bundleIds.map((key) => parseDecimal(key))) + 1 : await this.getBundleId(height); - bundle = await this.getBundleById(bundleId); - this.addToCachedBundle(bundle); + this.addToCachedBundle(bundleId.toString(), this.getBundleById(bundleId)); + + bundle = await this.cachedBundleDetails[bundleId]; } return JSON.parse(await this.getBundleData(bundle)); } @@ -310,32 +321,33 @@ export class KyveApi { } private async getToRemoveBundles( - cachedBundles: BundleDetails[], + cachedBundles: Record>, height: number, bufferSize: number, ): Promise { - if (!cachedBundles.length) { + if (!Object.keys(cachedBundles).length) { return this.getExistingBundlesFromCacheDirectory(this.tmpCacheDir); } - const currentBundle = this.getBundleFromCache(height); + const currentBundle = await this.getBundleFromCache(height); + if (!currentBundle) return []; + + const bundles = await Promise.all(Object.values(cachedBundles)); - return currentBundle - ? cachedBundles.filter((b) => { - const isNotCurrentBundleAndLower = - currentBundle.id !== b.id && - parseDecimal(currentBundle.id) > parseDecimal(b.id); - const isOutsiderBuffer = - height < parseDecimal(b.from_key) - bufferSize || - height > parseDecimal(b.to_key) + bufferSize; + return bundles.filter((b) => { + const isNotCurrentBundleAndLower = + currentBundle.id !== b.id && + parseDecimal(currentBundle.id) > parseDecimal(b.id); + const isOutsiderBuffer = + height < parseDecimal(b.from_key) - bufferSize || + height > parseDecimal(b.to_key) + bufferSize; - return isNotCurrentBundleAndLower && isOutsiderBuffer; - }) - : []; + return isNotCurrentBundleAndLower && isOutsiderBuffer; + }); } async clearFileCache( - cachedBundles: BundleDetails[], + cachedBundles: Record>, height: number, bufferSize: number, ): Promise { @@ -349,7 +361,7 @@ export class KyveApi { const bundlePath = this.getBundleFilePath(bundle.id); try { await fs.promises.unlink(bundlePath); - remove(this.cachedBundleDetails, (b) => b.id === bundle.id); + delete this.cachedBundleDetails[bundle.id]; } catch (e) { if (e.code !== 'ENOENT') { // if it does not exist, should be removed