diff --git a/package.json b/package.json index 88f60c20..c6971965 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "class-transformer": "^0.5.1", "class-validator": "^0.14.0", "ethers": "^5.4.7", + "glob": "^7.1.2", "kafkajs": "^1.15.0", "nest-winston": "^1.6.1", "node-abort-controller": "^3.0.1", @@ -106,4 +107,4 @@ "coverageDirectory": "../coverage", "testEnvironment": "node" } -} +} \ No newline at end of file diff --git a/src/cache/cache.constants.ts b/src/cache/cache.constants.ts index 7f9846e3..7a040afd 100644 --- a/src/cache/cache.constants.ts +++ b/src/cache/cache.constants.ts @@ -2,3 +2,4 @@ export const CACHE_DIR = 'cache'; export const CACHE_FILE_NAME = 'cacheFileName'; export const CACHE_DEFAULT_VALUE = 'cacheDefaultValue'; +export const CACHE_BATCH_SIZE = 'cacheBatchSize'; diff --git a/src/cache/cache.module.ts b/src/cache/cache.module.ts index 0e6bad66..569a4d2a 100644 --- a/src/cache/cache.module.ts +++ b/src/cache/cache.module.ts @@ -1,12 +1,16 @@ import { DynamicModule, Module } from '@nestjs/common'; -import { CACHE_DEFAULT_VALUE, CACHE_FILE_NAME } from 'cache'; +import { CACHE_BATCH_SIZE, CACHE_DEFAULT_VALUE, CACHE_FILE_NAME } from 'cache'; import { ProviderModule } from 'provider'; import { CACHE_DIR } from './cache.constants'; import { CacheService } from './cache.service'; @Module({}) export class CacheModule { - static register(fileName: string, defaultValue: unknown): DynamicModule { + static register( + fileName: string, + batchSize: number, + defaultValue: unknown, + ): DynamicModule { return { module: CacheModule, imports: [ProviderModule], @@ -20,6 +24,10 @@ export class CacheModule { provide: CACHE_FILE_NAME, useValue: fileName, }, + { + provide: CACHE_BATCH_SIZE, + useValue: batchSize, + }, { provide: CACHE_DEFAULT_VALUE, useValue: defaultValue, diff --git a/src/cache/cache.service.spec.ts b/src/cache/cache.service.spec.ts index a36f3ff5..13518329 100644 --- a/src/cache/cache.service.spec.ts +++ b/src/cache/cache.service.spec.ts @@ -6,16 +6,24 @@ import { CacheModule } from 'cache'; import { CacheService } from './cache.service'; describe('CacheService', () => { - const defaultCacheValue = {}; + const defaultCacheValue = { + headers: {}, + data: [] as any[], + }; + + const batchSize = 10; + + type C = typeof defaultCacheValue; + const cacheFile = 'test.json'; - let cacheService: CacheService; + let cacheService: CacheService; beforeEach(async () => { const moduleRef = await Test.createTestingModule({ imports: [ ConfigModule.forRoot(), MockProviderModule.forRoot(), - CacheModule.register(cacheFile, defaultCacheValue), + CacheModule.register(cacheFile, batchSize, defaultCacheValue), LoggerModule, ], }).compile(); @@ -32,11 +40,11 @@ describe('CacheService', () => { describe('getCache, setCache', () => { it('should return default cache', async () => { const result = await cacheService.getCache(); - expect(result).toBe(defaultCacheValue); + expect(result).toEqual(defaultCacheValue); }); it('should return saved cache', async () => { - const expected = { foo: 'bar' }; + const expected = { headers: {}, data: [{ foo: 'bar' }] }; await cacheService.setCache(expected); const result = await cacheService.getCache(); diff --git a/src/cache/cache.service.ts b/src/cache/cache.service.ts index 6eee72f2..4d8a86a0 100644 --- a/src/cache/cache.service.ts +++ b/src/cache/cache.service.ts @@ -1,19 +1,29 @@ import { Inject, Injectable } from '@nestjs/common'; import { readFile, writeFile, unlink, mkdir } from 'fs/promises'; import { join } from 'path'; +import { promisify } from 'util'; +import * as gl from 'glob'; import { CACHE_DIR, CACHE_DEFAULT_VALUE, CACHE_FILE_NAME, + CACHE_BATCH_SIZE, } from './cache.constants'; import { ProviderService } from 'provider'; +const glob = promisify(gl.glob); + @Injectable() -export class CacheService { +export class CacheService< + H extends unknown, + D extends unknown, + T extends { headers: H; data: D[] } = { headers: H; data: D[] }, +> { constructor( private providerService: ProviderService, @Inject(CACHE_DIR) private cacheDir: string, @Inject(CACHE_FILE_NAME) private cacheFile: string, + @Inject(CACHE_BATCH_SIZE) private cacheBatchSize: number, @Inject(CACHE_DEFAULT_VALUE) private cacheDefaultValue: T, ) {} @@ -21,7 +31,7 @@ export class CacheService { public async getCache(): Promise { if (!this.cache) { - this.cache = await this.getCacheFromFile(); + this.cache = await this.getCacheFromFiles(); } return this.cache; @@ -29,12 +39,12 @@ export class CacheService { public async setCache(cache: T): Promise { this.cache = cache; - return await this.saveCacheToFile(); + return await this.saveCacheToFiles(); } public async deleteCache(): Promise { this.cache = null; - return await this.deleteCacheFile(); + return await this.deleteCacheFiles(); } private async getCacheDirPath(): Promise { @@ -44,34 +54,75 @@ export class CacheService { return join(this.cacheDir, networkDir); } - private async getCacheFilePath(): Promise { - const dir = await this.getCacheDirPath(); - return join(dir, this.cacheFile); + private getCacheFileName(batchIndex: number): string { + return `${batchIndex}.${this.cacheFile}`; + } + + private async getCacheFilePaths(): Promise { + const dirPath = await this.getCacheDirPath(); + const result = await glob(`*([0-9]).${this.cacheFile}`, { cwd: dirPath }); + + return result + .sort((a, b) => parseInt(a) - parseInt(b)) + .map((filePath) => join(dirPath, filePath)); } - private async getCacheFromFile(): Promise { + private async getCacheFromFiles(): Promise { try { - const filePath = await this.getCacheFilePath(); - const content = await readFile(filePath); - return JSON.parse(String(content)); + const filePaths = await this.getCacheFilePaths(); + + let headers = this.cacheDefaultValue.headers as H; + let data = [] as D[]; + + await Promise.all( + filePaths.map(async (filePath) => { + const content = await readFile(filePath); + const parsed = JSON.parse(String(content)); + + if ( + JSON.stringify(headers) !== JSON.stringify(parsed.headers) && + headers !== this.cacheDefaultValue.headers + ) { + throw new Error('Headers are not equal'); + } + + headers = parsed.headers; + data = data.concat(parsed.data); + }), + ); + + return { headers, data } as T; } catch (error) { return this.cacheDefaultValue; } } - private async saveCacheToFile(): Promise { + private async saveCacheToFiles(): Promise { + if (!this.cache) throw new Error('Cache is not set'); + + const { headers, data } = this.cache; + const dirPath = await this.getCacheDirPath(); - const filePath = await this.getCacheFilePath(); await mkdir(dirPath, { recursive: true }); - return await writeFile(filePath, JSON.stringify(this.cache)); + await this.deleteCacheFiles(); + + const totalBatches = Math.ceil(data.length / this.cacheBatchSize); + + for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) { + const from = batchIndex * this.cacheBatchSize; + const to = (batchIndex + 1) * this.cacheBatchSize; + const batchedData = data.slice(from, to); + + const filePath = join(dirPath, this.getCacheFileName(batchIndex)); + await writeFile(filePath, JSON.stringify({ headers, data: batchedData })); + } } - private async deleteCacheFile(): Promise { + private async deleteCacheFiles(): Promise { try { - const filePath = await this.getCacheFilePath(); - - return await unlink(filePath); + const filePaths = await this.getCacheFilePaths(); + await Promise.all(filePaths.map(async (filePath) => unlink(filePath))); } catch (error) {} } } diff --git a/src/contracts/deposit/deposit.constants.ts b/src/contracts/deposit/deposit.constants.ts index 8ed2ce53..e958bacb 100644 --- a/src/contracts/deposit/deposit.constants.ts +++ b/src/contracts/deposit/deposit.constants.ts @@ -1,5 +1,4 @@ import { CHAINS } from '@lido-sdk/constants'; -import { DepositEventGroup } from './interfaces'; export const DEPLOYMENT_BLOCK_NETWORK: { [key in CHAINS]?: number; @@ -21,10 +20,13 @@ export const DEPOSIT_EVENTS_STEP = 10_000; export const DEPOSIT_EVENTS_CACHE_UPDATE_BLOCK_RATE = 10; export const DEPOSIT_CACHE_FILE_NAME = 'deposit.events.json'; +export const DEPOSIT_CACHE_BATCH_SIZE = 100_000; -export const DEPOSIT_CACHE_DEFAULT: DepositEventGroup = Object.freeze({ - version: '-1', - startBlock: 0, - endBlock: 0, - events: [], +export const DEPOSIT_CACHE_DEFAULT = Object.freeze({ + headers: { + version: '-1', + startBlock: 0, + endBlock: 0, + }, + data: [], }); diff --git a/src/contracts/deposit/deposit.module.ts b/src/contracts/deposit/deposit.module.ts index 3a119e0b..2c730b81 100644 --- a/src/contracts/deposit/deposit.module.ts +++ b/src/contracts/deposit/deposit.module.ts @@ -4,6 +4,7 @@ import { CacheModule } from 'cache'; import { BlsModule } from 'bls'; import { DepositService } from './deposit.service'; import { + DEPOSIT_CACHE_BATCH_SIZE, DEPOSIT_CACHE_DEFAULT, DEPOSIT_CACHE_FILE_NAME, } from './deposit.constants'; @@ -12,7 +13,11 @@ import { imports: [ BlsModule, SecurityModule, - CacheModule.register(DEPOSIT_CACHE_FILE_NAME, DEPOSIT_CACHE_DEFAULT), + CacheModule.register( + DEPOSIT_CACHE_FILE_NAME, + DEPOSIT_CACHE_BATCH_SIZE, + DEPOSIT_CACHE_DEFAULT, + ), ], providers: [DepositService], exports: [DepositService], diff --git a/src/contracts/deposit/deposit.service.spec.ts b/src/contracts/deposit/deposit.service.spec.ts index 0d5987ac..9064b10f 100644 --- a/src/contracts/deposit/deposit.service.spec.ts +++ b/src/contracts/deposit/deposit.service.spec.ts @@ -15,7 +15,10 @@ import { } from 'provider'; import { DepositAbi__factory } from 'generated'; import { RepositoryModule, RepositoryService } from 'contracts/repository'; -import { DepositEventGroup } from './interfaces'; +import { + VerifiedDepositEventsCacheHeaders, + VerifiedDepositEvent, +} from './interfaces'; import { DepositModule } from './deposit.module'; import { DepositService } from './deposit.service'; import { PrometheusModule } from 'common/prometheus'; @@ -31,7 +34,10 @@ const mockSleep = sleep as jest.MockedFunction; describe('DepositService', () => { let providerService: ProviderService; - let cacheService: CacheService; + let cacheService: CacheService< + VerifiedDepositEventsCacheHeaders, + VerifiedDepositEvent + >; let depositService: DepositService; let loggerService: LoggerService; let repositoryService: RepositoryService; @@ -110,9 +116,12 @@ describe('DepositService', () => { it('should return events from cache', async () => { const cache = { - events: [{} as any], - startBlock: deploymentBlock, - endBlock: deploymentBlock + 100, + data: [{} as any], + headers: { + startBlock: deploymentBlock, + endBlock: deploymentBlock + 100, + version: '1', + }, }; const mockCache = jest @@ -127,9 +136,12 @@ describe('DepositService', () => { it('should return deploymentBlock if cache is empty', async () => { const cache = { - events: [{} as any], - startBlock: 0, - endBlock: 0, + data: [{} as any], + headers: { + startBlock: 0, + endBlock: 0, + version: '1', + }, }; const mockCache = jest @@ -139,8 +151,8 @@ describe('DepositService', () => { const result = await depositService.getCachedEvents(); expect(mockCache).toBeCalledTimes(1); - expect(result.startBlock).toBe(deploymentBlock); - expect(result.endBlock).toBe(deploymentBlock); + expect(result.headers.startBlock).toBe(deploymentBlock); + expect(result.headers.endBlock).toBe(deploymentBlock); }); }); @@ -157,7 +169,7 @@ describe('DepositService', () => { expect(mockSetCache).toBeCalledTimes(1); expect(mockSetCache).toBeCalledWith({ ...eventGroup, - version: APP_VERSION, + headers: { version: APP_VERSION }, }); }); }); @@ -293,19 +305,21 @@ describe('DepositService', () => { describe('updateEventsCache', () => { const cachedPubkeys = ['0x1234', '0x5678']; - const cachedEvents = { - startBlock: 0, - endBlock: 2, - version: '1', - events: cachedPubkeys.map((pubkey) => ({ pubkey } as any)), + const cache = { + headers: { + startBlock: 0, + endBlock: 2, + version: '1', + }, + data: cachedPubkeys.map((pubkey) => ({ pubkey } as any)), }; const currentBlock = 1000; - const firstNotCachedBlock = cachedEvents.endBlock + 1; + const firstNotCachedBlock = cache.headers.endBlock + 1; beforeEach(async () => { jest .spyOn(depositService, 'getCachedEvents') - .mockImplementation(async () => cachedEvents); + .mockImplementation(async () => ({ ...cache })); jest .spyOn(providerService, 'getBlockNumber') @@ -350,9 +364,11 @@ describe('DepositService', () => { expect(mockSetCachedEvents).toBeCalledTimes(1); const { calls: cacheCalls } = mockSetCachedEvents.mock; - expect(cacheCalls[0][0].startBlock).toBe(cachedEvents.startBlock); - expect(cacheCalls[0][0].endBlock).toBeLessThan(currentBlock); - expect(cacheCalls[0][0].events).toEqual(cachedEvents.events); + expect(cacheCalls[0][0].headers.startBlock).toBe( + cache.headers.startBlock, + ); + expect(cacheCalls[0][0].headers.endBlock).toBeLessThan(currentBlock); + expect(cacheCalls[0][0].data).toEqual(cache.data); }); }); @@ -360,18 +376,21 @@ describe('DepositService', () => { const cachedPubkeys = ['0x1234', '0x5678']; const freshPubkeys = ['0x4321', '0x8765']; const cachedEvents = { - startBlock: 0, - endBlock: 2, - events: cachedPubkeys.map((pubkey) => ({ pubkey } as any)), + headers: { + startBlock: 0, + endBlock: 2, + version: '1', + }, + data: cachedPubkeys.map((pubkey) => ({ pubkey } as any)), }; const currentBlock = 10; const currentBlockHash = '0x12'; - const firstNotCachedBlock = cachedEvents.endBlock + 1; + const firstNotCachedBlock = cachedEvents.headers.endBlock + 1; beforeEach(async () => { jest .spyOn(depositService, 'getCachedEvents') - .mockImplementation(async () => ({ ...cachedEvents, version: '1' })); + .mockImplementation(async () => ({ ...cachedEvents })); jest .spyOn(providerService, 'getBlockNumber') @@ -391,7 +410,11 @@ describe('DepositService', () => { currentBlock, currentBlockHash, ); - expect(result).toEqual({ ...cachedEvents, endBlock: currentBlock }); + expect(result).toEqual({ + events: cachedEvents.data, + startBlock: cachedEvents.headers.startBlock, + endBlock: currentBlock, + }); expect(mockFetchEventsFallOver).toBeCalledTimes(1); expect(mockFetchEventsFallOver).toBeCalledWith( @@ -414,7 +437,7 @@ describe('DepositService', () => { currentBlockHash, ); expect(result).toEqual({ - startBlock: cachedEvents.startBlock, + startBlock: cachedEvents.headers.startBlock, endBlock: currentBlock, events: cachedPubkeys .concat(freshPubkeys) diff --git a/src/contracts/deposit/deposit.service.ts b/src/contracts/deposit/deposit.service.ts index ba38bcfb..d6402b52 100644 --- a/src/contracts/deposit/deposit.service.ts +++ b/src/contracts/deposit/deposit.service.ts @@ -11,7 +11,9 @@ import { } from './deposit.constants'; import { DepositEvent, + VerifiedDepositEvent, VerifiedDepositEventsCache, + VerifiedDepositEventsCacheHeaders, VerifiedDepositEventGroup, } from './interfaces'; import { OneAtTime } from 'common/decorators'; @@ -27,7 +29,10 @@ export class DepositService { @Inject(WINSTON_MODULE_NEST_PROVIDER) private logger: LoggerService, private providerService: ProviderService, private repositoryService: RepositoryService, - private cacheService: CacheService, + private cacheService: CacheService< + VerifiedDepositEventsCacheHeaders, + VerifiedDepositEvent + >, private blsService: BlsService, ) {} @@ -78,10 +83,10 @@ export class DepositService { public validateCacheVersion( cachedEvents: VerifiedDepositEventsCache, ): boolean { - const isSameVersion = cachedEvents.version === APP_VERSION; + const isSameVersion = cachedEvents.headers.version === APP_VERSION; const versions = { - cachedVersion: cachedEvents.version, + cachedVersion: cachedEvents.headers.version, currentVersion: APP_VERSION, }; @@ -112,11 +117,11 @@ export class DepositService { cachedEvents: VerifiedDepositEventsCache, currentBlock: number, ): boolean { - const isCacheValid = currentBlock >= cachedEvents.endBlock; + const isCacheValid = currentBlock >= cachedEvents.headers.endBlock; const blocks = { - cachedStartBlock: cachedEvents.startBlock, - cachedEndBlock: cachedEvents.endBlock, + cachedStartBlock: cachedEvents.headers.startBlock, + cachedEndBlock: cachedEvents.headers.endBlock, currentBlock, }; @@ -159,13 +164,16 @@ export class DepositService { * @returns event group */ public async getCachedEvents(): Promise { - const cachedEventGroup = await this.cacheService.getCache(); + const { headers, ...rest } = await this.cacheService.getCache(); const deploymentBlock = await this.getDeploymentBlockByNetwork(); return { - ...cachedEventGroup, - startBlock: Math.max(cachedEventGroup.startBlock, deploymentBlock), - endBlock: Math.max(cachedEventGroup.endBlock, deploymentBlock), + headers: { + ...headers, + startBlock: Math.max(headers.startBlock, deploymentBlock), + endBlock: Math.max(headers.endBlock, deploymentBlock), + }, + ...rest, }; } @@ -173,11 +181,14 @@ export class DepositService { * Saves deposited events to cache */ public async setCachedEvents( - eventGroup: VerifiedDepositEventGroup, + cachedEvents: VerifiedDepositEventsCache, ): Promise { return await this.cacheService.setCache({ - ...eventGroup, - version: APP_VERSION, + ...cachedEvents, + headers: { + ...cachedEvents.headers, + version: APP_VERSION, + }, }); } @@ -241,8 +252,11 @@ export class DepositService { this.getCachedEvents(), ]); - const eventGroup = { ...initialCache }; - const firstNotCachedBlock = initialCache.endBlock + 1; + const updatedCachedEvents = { + headers: { ...initialCache.headers }, + data: [...initialCache.data], + }; + const firstNotCachedBlock = initialCache.headers.endBlock + 1; const toBlock = currentBlock - DEPOSIT_EVENTS_CACHE_LAG_BLOCKS; for ( @@ -258,21 +272,23 @@ export class DepositService { chunkToBlock, ); - eventGroup.endBlock = chunkEventGroup.endBlock; - eventGroup.events = eventGroup.events.concat(chunkEventGroup.events); + updatedCachedEvents.headers.endBlock = chunkEventGroup.endBlock; + updatedCachedEvents.data = updatedCachedEvents.data.concat( + chunkEventGroup.events, + ); this.logger.log('Historical events are fetched', { toBlock, startBlock: chunkStartBlock, endBlock: chunkToBlock, - events: eventGroup.events.length, + events: updatedCachedEvents.data.length, }); - await this.setCachedEvents(eventGroup); + await this.setCachedEvents(updatedCachedEvents); } - const totalEvents = eventGroup.events.length; - const newEvents = totalEvents - initialCache.events.length; + const totalEvents = updatedCachedEvents.data.length; + const newEvents = totalEvents - initialCache.data.length; const fetchTimeEnd = performance.now(); const fetchTime = Math.ceil(fetchTimeEnd - fetchTimeStart) / 1000; @@ -299,7 +315,7 @@ export class DepositService { const isCacheValid = this.validateCacheBlock(cachedEvents, blockNumber); if (!isCacheValid) process.exit(1); - const firstNotCachedBlock = cachedEvents.endBlock + 1; + const firstNotCachedBlock = cachedEvents.headers.endBlock + 1; const freshEventGroup = await this.fetchEventsFallOver( firstNotCachedBlock, endBlock, @@ -318,11 +334,11 @@ export class DepositService { lastEventBlockHash, }); - const mergedEvents = cachedEvents.events.concat(freshEvents); + const mergedEvents = cachedEvents.data.concat(freshEvents); return { events: mergedEvents, - startBlock: cachedEvents.startBlock, + startBlock: cachedEvents.headers.startBlock, endBlock, }; } diff --git a/src/contracts/deposit/interfaces/cache.interface.ts b/src/contracts/deposit/interfaces/cache.interface.ts index 435d4022..102daefb 100644 --- a/src/contracts/deposit/interfaces/cache.interface.ts +++ b/src/contracts/deposit/interfaces/cache.interface.ts @@ -1,5 +1,12 @@ -import { VerifiedDepositEventGroup } from './event.interface'; +import { VerifiedDepositEvent } from './event.interface'; -export interface VerifiedDepositEventsCache extends VerifiedDepositEventGroup { +export interface VerifiedDepositEventsCacheHeaders { + startBlock: number; + endBlock: number; version: string; } + +export interface VerifiedDepositEventsCache { + headers: VerifiedDepositEventsCacheHeaders; + data: VerifiedDepositEvent[]; +} diff --git a/test/manifest.e2e-spec.ts b/test/manifest.e2e-spec.ts index 0745bea0..b60b2c9d 100644 --- a/test/manifest.e2e-spec.ts +++ b/test/manifest.e2e-spec.ts @@ -213,7 +213,7 @@ describe('ganache e2e tests', () => { mockKeysApi([goodSig], currentBlock, keysApiService); await depositService.setCachedEvents({ - events: [ + data: [ { valid: true, pubkey: toHexString(pk), @@ -225,8 +225,11 @@ describe('ganache e2e tests', () => { blockNumber: forkBlock.number, }, ], - startBlock: currentBlock.number, - endBlock: currentBlock.number, + headers: { + startBlock: currentBlock.number, + endBlock: currentBlock.number, + version: '1', + }, }); // Check if the service is ok and ready to go @@ -311,9 +314,12 @@ describe('ganache e2e tests', () => { mockKeysApi([goodSig], currentBlock, keysApiService); await depositService.setCachedEvents({ - events: [], - startBlock: currentBlock.number, - endBlock: currentBlock.number, + data: [], + headers: { + startBlock: currentBlock.number, + endBlock: currentBlock.number, + version: '1', + }, }); // Check if the service is ok and ready to go @@ -390,9 +396,12 @@ describe('ganache e2e tests', () => { mockKeysApi([goodSig], currentBlock, keysApiService); await depositService.setCachedEvents({ - events: [], - startBlock: currentBlock.number, - endBlock: currentBlock.number, + data: [], + headers: { + startBlock: currentBlock.number, + endBlock: currentBlock.number, + version: '1', + }, }); // Check if the service is ok and ready to go @@ -482,9 +491,12 @@ describe('ganache e2e tests', () => { const goodDepositDataRoot = DepositData.hashTreeRoot(goodDepositData); await depositService.setCachedEvents({ - events: [], - startBlock: currentBlock.number, - endBlock: currentBlock.number, + data: [], + headers: { + startBlock: currentBlock.number, + endBlock: currentBlock.number, + version: '1', + }, }); // Check if the service is ok and ready to go @@ -563,9 +575,12 @@ describe('ganache e2e tests', () => { const goodDepositDataRoot = DepositData.hashTreeRoot(goodDepositData); await depositService.setCachedEvents({ - events: [], - startBlock: currentBlock.number, - endBlock: currentBlock.number, + data: [], + headers: { + startBlock: currentBlock.number, + endBlock: currentBlock.number, + version: '1', + }, }); // Check if the service is ok and ready to go