Skip to content

Commit

Permalink
refactor cacheBundle to be object
Browse files Browse the repository at this point in the history
  • Loading branch information
bz888 committed Apr 22, 2024
1 parent da5cb94 commit 2d1d76b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 59 deletions.
61 changes: 33 additions & 28 deletions packages/node/src/utils/kyve/kyve.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ describe('KyveApi', () => {
readerSpy.mockRestore();

// reset cache
(kyveApi as any).cachedBundleDetails = [];
((kyveApi as any).cachedBundleDetails as Record<
string,
Promise<BundleDetails>
>) = {};
});
afterAll(async () => {
await promisify(rimraf)(tmpPath);
Expand Down Expand Up @@ -250,7 +253,11 @@ describe('KyveApi', () => {
expect(cachedBundles.length).toBe(3);
expect(blocks.length).toBe(5);

for (const bundle of (kyveApi as any).cachedBundleDetails) {
const bundles = (await Promise.all(
Object.values((kyveApi as any).cachedBundleDetails),
)) as BundleDetails[];

for (const bundle of bundles) {
const stats = await fs.promises.stat(
(kyveApi as any).getBundleFilePath(bundle.id),
);
Expand All @@ -277,14 +284,11 @@ describe('KyveApi', () => {
expect(files).not.toContain('bundle_2_0.json');
});
it('Should increment bundleId when height exceeds cache', async () => {
const bundle = await (kyveApi as any).getBundleById(0);
(kyveApi as any).cachedBundleDetails.push(bundle);
(kyveApi as any).addToCachedBundle(0, (kyveApi as any).getBundleById(0));
jest.spyOn(kyveApi as any, 'getBundleData').mockResolvedValueOnce('{}');
await (kyveApi as any).updateCurrentBundleAndDetails(160);

expect(
(kyveApi as any).cachedBundleDetails.find((b) => b.id === '1'),
).toBeDefined();
expect((kyveApi as any).cachedBundleDetails['1']).toBeDefined();
});
it('compare block info', async () => {
const height = 3901476;
Expand All @@ -302,8 +306,9 @@ describe('KyveApi', () => {
expect(poolId).toBe('2');
});
it('remove bundle.json if bundle fetch fails', async () => {
const bundleDetail = await (kyveApi as any).getBundleById(8);
(kyveApi as any).cachedBundleDetails = [bundleDetail];
(kyveApi as any).cachedBundleDetails = {
'8': (kyveApi as any).getBundleById(8),
};

jest.spyOn(axios, 'isAxiosError').mockImplementationOnce(() => true);

Expand All @@ -313,41 +318,41 @@ describe('KyveApi', () => {
});
});

await expect((kyveApi as any).getBundleData(bundleDetail)).rejects.toBe(
const bundleDetails = await (kyveApi as any).cachedBundleDetails['8'];
await expect((kyveApi as any).getBundleData(bundleDetails)).rejects.toBe(
'Failed to fetch',
);

const files = await fs.promises.readdir(tmpPath);
expect(files.length).toBe(0);
});
it('ensure to remove logic', async () => {
const cachedBundleDetails = [
{ id: '0', from_key: '1', to_key: '150' },
{ id: '1', from_key: '151', to_key: '300' },
{ id: '2', from_key: '301', to_key: '500' },
{ id: '3', from_key: '501', to_key: '800' },
] as BundleDetails[];
(kyveApi as any).cachedBundleDetails = cachedBundleDetails;
const mockCachedBundles: Record<string, Promise<BundleDetails>> = {
'0': (kyveApi as any).getBundleById(0),
'1': (kyveApi as any).getBundleById(1),
'2': (kyveApi as any).getBundleById(2),
'3': (kyveApi as any).getBundleById(3),
'4': (kyveApi as any).getBundleById(4),
};

(kyveApi as any).cachedBundleDetails = mockCachedBundles;

const height = 650;
const bufferSize = 300;

const toRemoveBundles = await (kyveApi as any).getToRemoveBundles(
cachedBundleDetails,
mockCachedBundles,
height,
bufferSize,
);

expect(toRemoveBundles.sort()).toEqual(
[
{ id: '0', from_key: '1', to_key: '150' },
{ id: '1', from_key: '151', to_key: '300' },
].sort(),
);
expect(toRemoveBundles.sort().map((b) => b.id)).toEqual(['0', '1'].sort());
});
it('Able to poll with simulated workers', async () => {
const bundleDetail = await (kyveApi as any).getBundleById(130265);
(kyveApi as any).cachedBundleDetails = [bundleDetail];
const mockCacheDetails = {
'130265': (kyveApi as any).getBundleById(130265),
};
(kyveApi as any).cachedBundleDetails = mockCacheDetails;

const workerKyveApi = await KyveApi.create(
'archway-1',
Expand All @@ -357,7 +362,7 @@ describe('KyveApi', () => {
tmpPath,
);

(workerKyveApi as any).cachedBundleDetails = [bundleDetail];
(workerKyveApi as any).cachedBundleDetails = mockCacheDetails;

jest
.spyOn(workerKyveApi, 'downloadAndProcessBundle')
Expand Down Expand Up @@ -385,7 +390,7 @@ describe('KyveApi', () => {
expect(pollSpy).toHaveBeenCalledTimes(1);

const r = await kyveApi.readFromFile(
(kyveApi as any).getBundleFilePath(bundleDetail.id),
(kyveApi as any).getBundleFilePath('130265'),
);

expect(r).toEqual(JSON.stringify(block_3856726));
Expand Down
74 changes: 43 additions & 31 deletions packages/node/src/utils/kyve/kyve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ interface KyveBundleData {
}

export class KyveApi {
private cachedBundleDetails: BundleDetails[] = [];
private cachedBundleDetails: Record<string, Promise<BundleDetails>> = {};

private constructor(
private readonly storageUrl: string,
Expand Down Expand Up @@ -149,9 +149,11 @@ export class KyveApi {
private async getBundleId(height: number): Promise<number> {
const latestBundleId = await this.getLatestBundleId();

const lowestCacheHeight = Object.keys(this.cachedBundleDetails);

let low =
this.cachedBundleDetails.length > 0
? Math.min(...this.cachedBundleDetails.map((b) => parseDecimal(b.id)))
lowestCacheHeight.length > 0
? Math.min(...lowestCacheHeight.map((id) => parseDecimal(id)))
: -1;
let high = latestBundleId;
let startBundleId = -1; // Initialize to an invalid ID initially
Expand Down Expand Up @@ -186,14 +188,23 @@ export class KyveApi {
);
}

private addToCachedBundle(bundle: BundleDetails): void {
if (!this.cachedBundleDetails.find((b) => b.id === bundle.id)) {
this.cachedBundleDetails.push(bundle);
private addToCachedBundle(
bundleId: string,
bundlePromise: Promise<BundleDetails>,
): void {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
if (!this.cachedBundleDetails[bundleId]) {
this.cachedBundleDetails[bundleId] = bundlePromise;
}
}

private getBundleFromCache(height: number): BundleDetails | undefined {
return this.cachedBundleDetails.find(
private async getBundleFromCache(
height: number,
): Promise<BundleDetails> | undefined {
const bundles: BundleDetails[] = await Promise.all(
Object.values(this.cachedBundleDetails),
);
return bundles.find(
(b) =>
parseDecimal(b.from_key) <= height && height <= parseDecimal(b.to_key),
);
Expand All @@ -202,16 +213,16 @@ export class KyveApi {
private async updateCurrentBundleAndDetails(
height: number,
): Promise<KyveBundleData[]> {
let bundle = this.getBundleFromCache(height);
let bundle = await this.getBundleFromCache(height);
if (!bundle) {
const bundleIds = Object.keys(this.cachedBundleDetails);
const bundleId =
this.cachedBundleDetails.length !== 0
? Math.max(
...this.cachedBundleDetails.map((b) => parseDecimal(b.id)),
) + 1
bundleIds.length !== 0
? Math.max(...bundleIds.map((key) => parseDecimal(key))) + 1
: await this.getBundleId(height);
bundle = await this.getBundleById(bundleId);
this.addToCachedBundle(bundle);
this.addToCachedBundle(bundleId.toString(), this.getBundleById(bundleId));

bundle = await this.cachedBundleDetails[bundleId];
}
return JSON.parse(await this.getBundleData(bundle));
}
Expand Down Expand Up @@ -310,32 +321,33 @@ export class KyveApi {
}

private async getToRemoveBundles(
cachedBundles: BundleDetails[],
cachedBundles: Record<string, Promise<BundleDetails>>,
height: number,
bufferSize: number,
): Promise<BundleDetails[]> {
if (!cachedBundles.length) {
if (!Object.keys(cachedBundles).length) {
return this.getExistingBundlesFromCacheDirectory(this.tmpCacheDir);
}

const currentBundle = this.getBundleFromCache(height);
const currentBundle = await this.getBundleFromCache(height);
if (!currentBundle) return [];

const bundles = await Promise.all(Object.values(cachedBundles));

return currentBundle
? cachedBundles.filter((b) => {
const isNotCurrentBundleAndLower =
currentBundle.id !== b.id &&
parseDecimal(currentBundle.id) > parseDecimal(b.id);
const isOutsiderBuffer =
height < parseDecimal(b.from_key) - bufferSize ||
height > parseDecimal(b.to_key) + bufferSize;
return bundles.filter((b) => {
const isNotCurrentBundleAndLower =
currentBundle.id !== b.id &&
parseDecimal(currentBundle.id) > parseDecimal(b.id);
const isOutsiderBuffer =
height < parseDecimal(b.from_key) - bufferSize ||
height > parseDecimal(b.to_key) + bufferSize;

return isNotCurrentBundleAndLower && isOutsiderBuffer;
})
: [];
return isNotCurrentBundleAndLower && isOutsiderBuffer;
});
}

async clearFileCache(
cachedBundles: BundleDetails[],
cachedBundles: Record<string, Promise<BundleDetails>>,
height: number,
bufferSize: number,
): Promise<void> {
Expand All @@ -349,7 +361,7 @@ export class KyveApi {
const bundlePath = this.getBundleFilePath(bundle.id);
try {
await fs.promises.unlink(bundlePath);
remove(this.cachedBundleDetails, (b) => b.id === bundle.id);
delete this.cachedBundleDetails[bundle.id];
} catch (e) {
if (e.code !== 'ENOENT') {
// if it does not exist, should be removed
Expand Down

0 comments on commit 2d1d76b

Please sign in to comment.