From 6ac404b855b7a48133d954ec44dd86b5a48be297 Mon Sep 17 00:00:00 2001 From: vgorkavenko Date: Mon, 5 Feb 2024 12:49:37 +0400 Subject: [PATCH] feat: daemon --- src/common/handlers/handlers.service.ts | 9 +- src/daemon/daemon.module.ts | 15 +- src/daemon/daemon.service.ts | 46 +++++- src/daemon/services/keys-indexer.ts | 210 ++++++++++++++++++++++++ src/daemon/services/roots-processor.ts | 33 ++++ src/daemon/services/roots-provider.ts | 54 ++++++ src/daemon/services/roots-stack.ts | 62 +++++++ src/daemon/utils/sleep.ts | 1 + 8 files changed, 414 insertions(+), 16 deletions(-) create mode 100644 src/daemon/services/keys-indexer.ts create mode 100644 src/daemon/services/roots-processor.ts create mode 100644 src/daemon/services/roots-provider.ts create mode 100644 src/daemon/services/roots-stack.ts create mode 100644 src/daemon/utils/sleep.ts diff --git a/src/common/handlers/handlers.service.ts b/src/common/handlers/handlers.service.ts index 5eed365..6340bfb 100644 --- a/src/common/handlers/handlers.service.ts +++ b/src/common/handlers/handlers.service.ts @@ -23,15 +23,22 @@ export class HandlersService { const payload = await this.buildProvePayload(slashings, withdrawals); // TODO: ask before sending if CLI or daemon in watch mode await this.sendProves(payload); + this.logger.log(`🏁 Proves sent. Root [${blockRoot}]`); } private async buildProvePayload(slashings: string[], withdrawals: string[]): Promise { // TODO: implement // this.consensus.getState(...) + if (slashings.length || withdrawals.length) { + this.logger.warn(`📦 Prove payload: slashings [${slashings}], withdrawals [${withdrawals}]`); + } return {}; } private async sendProves(payload: any): Promise { // TODO: implement + if (payload) { + this.logger.warn(`📡 Sending proves`); + } } private async getUnprovenSlashings( @@ -114,7 +121,7 @@ export class HandlersService { ): string[] { const fullWithdrawals = []; const blockEpoch = Number(blockInfo.message.slot) / 32; - const withdrawals = blockInfo.message.body.execution_payload.withdrawals; + const withdrawals = blockInfo.message.body.execution_payload?.withdrawals ?? []; for (const withdrawal of withdrawals) { const keyInfo = keyInfoFn(Number(withdrawal.validator_index)); if (keyInfo && blockEpoch >= keyInfo.withdrawableEpoch) { diff --git a/src/daemon/daemon.module.ts b/src/daemon/daemon.module.ts index 1773959..eddb16a 100644 --- a/src/daemon/daemon.module.ts +++ b/src/daemon/daemon.module.ts @@ -1,6 +1,10 @@ import { Module } from '@nestjs/common'; import { DaemonService } from './daemon.service'; +import { KeysIndexer } from './services/keys-indexer'; +import { RootsProcessor } from './services/roots-processor'; +import { RootsProvider } from './services/roots-provider'; +import { RootsStack } from './services/roots-stack'; import { ConfigModule } from '../common/config/config.module'; import { HandlersModule } from '../common/handlers/handlers.module'; import { LoggerModule } from '../common/logger/logger.module'; @@ -8,13 +12,8 @@ import { PrometheusModule } from '../common/prometheus/prometheus.module'; import { ProvidersModule } from '../common/providers/providers.module'; @Module({ - imports: [ - LoggerModule, - ConfigModule, - PrometheusModule, - ProvidersModule, - HandlersModule, - ], - providers: [DaemonService], + imports: [LoggerModule, ConfigModule, PrometheusModule, ProvidersModule, HandlersModule], + providers: [DaemonService, KeysIndexer, RootsProvider, RootsProcessor, RootsStack], + exports: [DaemonService], }) export class DaemonModule {} diff --git a/src/daemon/daemon.service.ts b/src/daemon/daemon.service.ts index 59494d5..c999dc2 100644 --- a/src/daemon/daemon.service.ts +++ b/src/daemon/daemon.service.ts @@ -1,18 +1,50 @@ import { LOGGER_PROVIDER } from '@lido-nestjs/logger'; -import { - Inject, - Injectable, - LoggerService, - OnApplicationBootstrap, -} from '@nestjs/common'; +import { Inject, Injectable, LoggerService, OnApplicationBootstrap } from '@nestjs/common'; + +import { KeysIndexer } from './services/keys-indexer'; +import { RootsProcessor } from './services/roots-processor'; +import { RootsProvider } from './services/roots-provider'; +import sleep from './utils/sleep'; +import { ConfigService } from '../common/config/config.service'; +import { Consensus } from '../common/providers/consensus/consensus'; @Injectable() export class DaemonService implements OnApplicationBootstrap { constructor( @Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService, + protected readonly config: ConfigService, + protected readonly consensus: Consensus, + protected readonly keysIndexer: KeysIndexer, + protected readonly rootsProvider: RootsProvider, + protected readonly rootsProcessor: RootsProcessor, ) {} async onApplicationBootstrap() { - this.logger.log('Working mode: DAEMON'); + this.loop().then(); + } + + private async loop() { + while (true) { + try { + await this.baseRun(); + } catch (e) { + this.logger.error(e); + await sleep(1000); + } + } + } + + private async baseRun() { + this.logger.log('🗿 Get finalized header'); + const header = await this.consensus.getBeaconHeader('finalized'); + this.logger.log(`💎 Finalized slot [${header.header.message.slot}]. Root [${header.root}]`); + await this.keysIndexer.run(header); + const nextRoot = await this.rootsProvider.getNext(header); + if (nextRoot) { + await this.rootsProcessor.process(nextRoot); + } else { + this.logger.log(`💤 Wait for the next finalized root`); + await sleep(12000); + } } } diff --git a/src/daemon/services/keys-indexer.ts b/src/daemon/services/keys-indexer.ts new file mode 100644 index 0000000..c1f1458 --- /dev/null +++ b/src/daemon/services/keys-indexer.ts @@ -0,0 +1,210 @@ +import { Low } from '@huanshiwushuang/lowdb'; +import { JSONFile } from '@huanshiwushuang/lowdb/node'; +import { LOGGER_PROVIDER } from '@lido-nestjs/logger'; +import { Inject, Injectable, LoggerService, OnApplicationBootstrap } from '@nestjs/common'; + +import { ConfigService } from '../../common/config/config.service'; +import { KeyInfo } from '../../common/handlers/handlers.service'; +import { Consensus } from '../../common/providers/consensus/consensus'; +import { + BlockHeaderResponse, + RootHex, + Slot, + StateValidatorResponse, +} from '../../common/providers/consensus/response.interface'; +import { Keysapi } from '../../common/providers/keysapi/keysapi'; + +type Info = { + moduleAddress: string; + moduleId: number; + storageStateSlot: number; + lastValidatorsCount: number; +}; + +type Storage = { + [valIndex: number]: KeyInfo; +}; + +@Injectable() +export class KeysIndexer implements OnApplicationBootstrap { + private startedAt: number = 0; + + private info: Low; + private storage: Low; + + constructor( + @Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService, + protected readonly config: ConfigService, + protected readonly consensus: Consensus, + protected readonly keysapi: Keysapi, + ) {} + + public async onApplicationBootstrap(): Promise { + await this.initOrReadServiceData(); + } + + public getKey = (valIndex: number): KeyInfo | undefined => { + return this.storage.data[valIndex]; + }; + + public async run(finalizedHeader: BlockHeaderResponse): Promise { + // At one time only one task should be running + if (this.startedAt > 0) { + this.logger.warn(`🔑 Keys indexer has been running for ${Date.now() - this.startedAt}ms`); + return; + } + const finalizedSlot = Number(finalizedHeader.header.message.slot); + if (this.isNotTimeToRun(finalizedSlot)) { + this.logger.log('No need to run keys indexer'); + return; + } + this.logger.log(`🔑 Keys indexer is running`); + const finalizedStateRoot = finalizedHeader.header.message.state_root; + if (this.info.data.storageStateSlot == 0) { + await this.baseRun(finalizedStateRoot, finalizedSlot); + return; + } + // We shouldn't wait for task to finish + // to avoid block processing if indexing fails or stuck + this.startedAt = Date.now(); + this.baseRun(finalizedStateRoot, finalizedSlot) + .catch((e) => this.logger.error(e)) + .finally(() => (this.startedAt = 0)); + } + + private async baseRun(stateRoot: RootHex, finalizedSlot: Slot): Promise { + this.logger.log(`Get validators. State root [${stateRoot}]`); + const validators = await this.consensus.getValidators(stateRoot); + this.logger.log(`Total validators count: ${validators.length}`); + // TODO: do we need to store already full withdrawn keys ? + this.info.data.lastValidatorsCount == 0 + ? await this.initStorage(validators, finalizedSlot) + : await this.updateStorage(validators, finalizedSlot); + this.logger.log(`CSM validators count: ${Object.keys(this.storage.data).length}`); + this.info.data.storageStateSlot = finalizedSlot; + this.info.data.lastValidatorsCount = validators.length; + await this.info.write(); + } + + private async initStorage(validators: StateValidatorResponse[], finalizedSlot: Slot): Promise { + this.logger.log(`Init keys data`); + const csmKeys = await this.keysapi.getModuleKeys(this.info.data.moduleId); + this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta); + const keysMap = new Map(); + csmKeys.data.keys.forEach((k: any) => keysMap.set(k.key, { ...k })); + for (const v of validators) { + const keyInfo = keysMap.get(v.validator.pubkey); + if (!keyInfo) continue; + this.storage.data[Number(v.index)] = { + operatorId: keyInfo.operatorIndex, + keyIndex: keyInfo.index, + pubKey: v.validator.pubkey, + // TODO: bigint? + withdrawableEpoch: Number(v.validator.withdrawable_epoch), + }; + } + await this.storage.write(); + } + + private async updateStorage(vals: StateValidatorResponse[], finalizedSlot: Slot): Promise { + // TODO: should we think about re-using validator indexes? + // TODO: should we think about changing WC for existing old vaidators ? + if (vals.length - this.info.data.lastValidatorsCount == 0) { + this.logger.log(`No new validators in the state`); + return; + } + vals = vals.slice(this.info.data.lastValidatorsCount); + const valKeys = vals.map((v: StateValidatorResponse) => v.validator.pubkey); + this.logger.log(`New appeared validators count: ${vals.length}`); + const csmKeys = await this.keysapi.findModuleKeys(this.info.data.moduleId, valKeys); + this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta); + this.logger.log(`New appeared CSM validators count: ${csmKeys.data.keys.length}`); + for (const csmKey of csmKeys.data.keys) { + for (const newVal of vals) { + if (newVal.validator.pubkey != csmKey.key) continue; + this.storage.data[Number(newVal.index)] = { + operatorId: csmKey.operatorIndex, + keyIndex: csmKey.index, + pubKey: csmKey.key, + // TODO: bigint? + withdrawableEpoch: Number(newVal.validator.withdrawable_epoch), + }; + } + } + await this.storage.write(); + } + + public isNotTimeToRun(finalizedSlot: Slot): boolean { + const storageTimestamp = this.consensus.slotToTimestamp(this.info.data.storageStateSlot) * 1000; + return ( + this.info.data.storageStateSlot == finalizedSlot || + this.config.get('KEYS_INDEXER_RUNNING_PERIOD') >= Date.now() - storageTimestamp + ); + } + + public eligibleForAnyDuty(slotNumber: Slot): boolean { + return this.eligibleForSlashings(slotNumber) || this.eligibleForFullWithdrawals(slotNumber); + } + + public eligibleForEveryDuty(slotNumber: Slot): boolean { + const eligibleForSlashings = this.eligibleForSlashings(slotNumber); + const eligibleForFullWithdrawals = this.eligibleForFullWithdrawals(slotNumber); + if (!eligibleForSlashings) + this.logger.warn( + '🚨 Current keys indexer data might not be ready to detect slashing. ' + + 'The root will be processed later again', + ); + if (!eligibleForFullWithdrawals) + this.logger.warn( + '⚠️ Current keys indexer data might not be ready to detect full withdrawal. ' + + 'The root will be processed later again', + ); + return eligibleForSlashings && eligibleForFullWithdrawals; + } + + private eligibleForSlashings(slotNumber: Slot): boolean { + // We are ok with oudated indexer for detection slasing + // because of a bunch of delays between deposit and validator appearing + // TODO: get constants from node + const ETH1_FOLLOW_DISTANCE = 2048; // ~8 hours + const EPOCHS_PER_ETH1_VOTING_PERIOD = 64; // ~6.8 hours + const safeDelay = ETH1_FOLLOW_DISTANCE + EPOCHS_PER_ETH1_VOTING_PERIOD * 32; + if (this.info.data.storageStateSlot >= slotNumber) return true; + return slotNumber - this.info.data.storageStateSlot <= safeDelay; // ~14.8 hours + } + + private eligibleForFullWithdrawals(slotNumber: Slot): boolean { + // We are ok with oudated indexer for detection withdrawal + // because of MIN_VALIDATOR_WITHDRAWABILITY_DELAY + // TODO: get constants from node + const MIN_VALIDATOR_WITHDRAWABILITY_DELAY = 256; + const safeDelay = MIN_VALIDATOR_WITHDRAWABILITY_DELAY * 32; + if (this.info.data.storageStateSlot >= slotNumber) return true; + return slotNumber - this.info.data.storageStateSlot <= safeDelay; // ~27 hours + } + + private async initOrReadServiceData() { + const defaultInfo: Info = { + moduleAddress: this.config.get('LIDO_STAKING_MODULE_ADDRESS'), + moduleId: 0, + storageStateSlot: 0, + lastValidatorsCount: 0, + }; + this.info = new Low(new JSONFile('.keys-indexer-info.json'), defaultInfo); + this.storage = new Low(new JSONFile('.keys-indexer-storage.json'), {}); + await this.info.read(); + await this.storage.read(); + + if (this.info.data.moduleId == 0) { + const modules = (await this.keysapi.getModules()).data; + const module = modules.find( + (m: any) => m.stakingModuleAddress.toLowerCase() === this.info.data.moduleAddress.toLowerCase(), + ); + if (!module) { + throw new Error(`Module with address ${this.info.data.moduleAddress} not found`); + } + this.info.data.moduleId = module.id; + await this.info.write(); + } + } +} diff --git a/src/daemon/services/roots-processor.ts b/src/daemon/services/roots-processor.ts new file mode 100644 index 0000000..5ca18fd --- /dev/null +++ b/src/daemon/services/roots-processor.ts @@ -0,0 +1,33 @@ +import { LOGGER_PROVIDER } from '@lido-nestjs/logger'; +import { Inject, Injectable, LoggerService } from '@nestjs/common'; + +import { KeysIndexer } from './keys-indexer'; +import { RootsStack } from './roots-stack'; +import { HandlersService } from '../../common/handlers/handlers.service'; +import { Consensus } from '../../common/providers/consensus/consensus'; +import { RootHex } from '../../common/providers/consensus/response.interface'; + +@Injectable() +export class RootsProcessor { + constructor( + @Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService, + protected readonly consensus: Consensus, + protected readonly keysIndexer: KeysIndexer, + protected readonly rootsStack: RootsStack, + protected readonly handlers: HandlersService, + ) {} + + public async process(blockRoot: RootHex): Promise { + this.logger.log(`🛃 Root in processing [${blockRoot}]`); + const blockInfo = await this.consensus.getBlockInfo(blockRoot); + const rootSlot = { + blockRoot, + slotNumber: Number(blockInfo.message.slot), + }; + const indexerIsOK = this.keysIndexer.eligibleForEveryDuty(rootSlot.slotNumber); + if (!indexerIsOK) await this.rootsStack.push(rootSlot); // only new will be pushed + await this.handlers.prove(blockRoot, blockInfo, this.keysIndexer.getKey); + if (indexerIsOK) await this.rootsStack.purge(blockRoot); + await this.rootsStack.setLastProcessed(rootSlot); + } +} diff --git a/src/daemon/services/roots-provider.ts b/src/daemon/services/roots-provider.ts new file mode 100644 index 0000000..965b182 --- /dev/null +++ b/src/daemon/services/roots-provider.ts @@ -0,0 +1,54 @@ +import { LOGGER_PROVIDER } from '@lido-nestjs/logger'; +import { Inject, Injectable, LoggerService } from '@nestjs/common'; + +import { RootSlot, RootsStack } from './roots-stack'; +import { ConfigService } from '../../common/config/config.service'; +import { Consensus } from '../../common/providers/consensus/consensus'; +import { BlockHeaderResponse, RootHex } from '../../common/providers/consensus/response.interface'; + +@Injectable() +export class RootsProvider { + constructor( + @Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService, + protected readonly config: ConfigService, + protected readonly consensus: Consensus, + protected readonly rootsStack: RootsStack, + ) {} + + public async getNext(finalizedHeader: BlockHeaderResponse): Promise { + const stacked = this.getStacked(); + if (stacked) return stacked; + const lastProcessed = this.rootsStack.getLastProcessed(); + if (!lastProcessed) return this.getKnown(finalizedHeader); + return await this.getChild(lastProcessed, finalizedHeader); + } + + private getStacked(): RootHex | undefined { + const stacked = this.rootsStack.getNextEligible(); + if (!stacked) return; + this.logger.warn(`⏭️ Next root to process [${stacked.blockRoot}]. Taken from 📚 stack of unprocessed roots`); + return stacked.blockRoot; + } + + private getKnown(finalizedHeader: BlockHeaderResponse): RootHex | undefined { + const configured = this.config.get('START_ROOT'); + if (configured) { + this.logger.log(`No processed roots. Start from ⚙️ configured root [${configured}]`); + return configured; + } + this.logger.log(`No processed roots. Start from 💎 last finalized root [${finalizedHeader.root}]`); + return finalizedHeader.root; + } + + private async getChild(lastProcessed: RootSlot, finalizedHeader: BlockHeaderResponse): Promise { + this.logger.log(`⏮️ Last processed root [${lastProcessed.blockRoot}]`); + if (lastProcessed.blockRoot == finalizedHeader.root) return; + const diff = Number(finalizedHeader.header.message.slot) - lastProcessed.slotNumber; + this.logger.warn(`Diff between last processed and finalized is ${diff} slots`); + const childHeader = await this.consensus.getBeaconHeadersByParentRoot(lastProcessed.blockRoot); + if (!childHeader || !childHeader.finalized) return; + const child = childHeader.data[0].root; + this.logger.log(`⏭️ Next root to process [${child}]. Child of last processed`); + return child; + } +} diff --git a/src/daemon/services/roots-stack.ts b/src/daemon/services/roots-stack.ts new file mode 100644 index 0000000..d430ba6 --- /dev/null +++ b/src/daemon/services/roots-stack.ts @@ -0,0 +1,62 @@ +import { Low } from '@huanshiwushuang/lowdb'; +import { JSONFile } from '@huanshiwushuang/lowdb/node'; +import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; + +import { KeysIndexer } from './keys-indexer'; +import { RootHex } from '../../common/providers/consensus/response.interface'; + +export type RootSlot = { blockRoot: string; slotNumber: number }; + +type Info = { + lastProcessedRootSlot: RootSlot | undefined; +}; + +type Storage = RootSlot[]; + +@Injectable() +export class RootsStack implements OnApplicationBootstrap { + private info: Low; + private storage: Low; + + constructor(protected readonly keysIndexer: KeysIndexer) {} + + async onApplicationBootstrap(): Promise { + await this.initOrReadServiceData(); + } + + public getNextEligible(): RootSlot | undefined { + return this.storage.data.find((s) => this.keysIndexer.eligibleForAnyDuty(s.slotNumber)); + } + + public async push(rs: RootSlot): Promise { + const idx = this.storage.data.findIndex((i) => rs.blockRoot == i.blockRoot); + if (idx !== -1) return; + this.storage.data.push(rs); + await this.storage.write(); + } + + public async purge(blockRoot: RootHex): Promise { + const idx = this.storage.data.findIndex((i) => blockRoot == i.blockRoot); + if (idx == -1) return; + this.storage.data.splice(idx, 1); + await this.storage.write(); + } + + public getLastProcessed(): RootSlot | undefined { + return this.info.data.lastProcessedRootSlot; + } + + public async setLastProcessed(item: RootSlot): Promise { + this.info.data.lastProcessedRootSlot = item; + await this.info.write(); + } + + private async initOrReadServiceData() { + this.info = new Low(new JSONFile('.roots-stack-info.json'), { + lastProcessedRootSlot: undefined, + }); + this.storage = new Low(new JSONFile('.roots-stack-storage.json'), []); + await this.info.read(); + await this.storage.read(); + } +} diff --git a/src/daemon/utils/sleep.ts b/src/daemon/utils/sleep.ts new file mode 100644 index 0000000..fc4f3a2 --- /dev/null +++ b/src/daemon/utils/sleep.ts @@ -0,0 +1 @@ +export default async (ms: number): Promise => await new Promise((resolve) => setTimeout(resolve, ms));