Skip to content

Commit

Permalink
feat: daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Feb 5, 2024
1 parent dabea50 commit 407655c
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 16 deletions.
9 changes: 8 additions & 1 deletion src/common/handlers/handlers.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ export class HandlersService {
const payload = await this.buildProvePayload(slashings, withdrawals);
// TODO: ask before sending if CLI or daemon in watch mode
await this.sendProves(payload);
this.logger.log(`🏁 Proves sent. Root [${blockRoot}]`);
}

private async buildProvePayload(slashings: string[], withdrawals: string[]): Promise<any> {
// TODO: implement
// this.consensus.getState(...)
if (slashings.length || withdrawals.length) {
this.logger.warn(`📦 Prove payload: slashings [${slashings}], withdrawals [${withdrawals}]`);
}
return {};
}
private async sendProves(payload: any): Promise<void> {
// TODO: implement
if (payload) {
this.logger.warn(`📡 Sending proves`);
}
}

private async getUnprovenSlashings(
Expand Down Expand Up @@ -114,7 +121,7 @@ export class HandlersService {
): string[] {
const fullWithdrawals = [];
const blockEpoch = Number(blockInfo.message.slot) / 32;
const withdrawals = blockInfo.message.body.execution_payload.withdrawals;
const withdrawals = blockInfo.message.body.execution_payload?.withdrawals ?? [];
for (const withdrawal of withdrawals) {
const keyInfo = keyInfoFn(Number(withdrawal.validator_index));
if (keyInfo && blockEpoch >= keyInfo.withdrawableEpoch) {
Expand Down
15 changes: 7 additions & 8 deletions src/daemon/daemon.module.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import { Module } from '@nestjs/common';

import { DaemonService } from './daemon.service';
import { KeysIndexer } from './services/keys-indexer';
import { RootsProcessor } from './services/roots-processor';
import { RootsProvider } from './services/roots-provider';
import { RootsStack } from './services/roots-stack';
import { ConfigModule } from '../common/config/config.module';
import { HandlersModule } from '../common/handlers/handlers.module';
import { LoggerModule } from '../common/logger/logger.module';
import { PrometheusModule } from '../common/prometheus/prometheus.module';
import { ProvidersModule } from '../common/providers/providers.module';

@Module({
imports: [
LoggerModule,
ConfigModule,
PrometheusModule,
ProvidersModule,
HandlersModule,
],
providers: [DaemonService],
imports: [LoggerModule, ConfigModule, PrometheusModule, ProvidersModule, HandlersModule],
providers: [DaemonService, KeysIndexer, RootsProvider, RootsProcessor, RootsStack],
exports: [DaemonService],
})
export class DaemonModule {}
46 changes: 39 additions & 7 deletions src/daemon/daemon.service.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,50 @@
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import {
Inject,
Injectable,
LoggerService,
OnApplicationBootstrap,
} from '@nestjs/common';
import { Inject, Injectable, LoggerService, OnApplicationBootstrap } from '@nestjs/common';

import { KeysIndexer } from './services/keys-indexer';
import { RootsProcessor } from './services/roots-processor';
import { RootsProvider } from './services/roots-provider';
import sleep from './utils/sleep';
import { ConfigService } from '../common/config/config.service';
import { Consensus } from '../common/providers/consensus/consensus';

@Injectable()
export class DaemonService implements OnApplicationBootstrap {
constructor(
@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService,
protected readonly config: ConfigService,
protected readonly consensus: Consensus,
protected readonly keysIndexer: KeysIndexer,
protected readonly rootsProvider: RootsProvider,
protected readonly rootsProcessor: RootsProcessor,
) {}

async onApplicationBootstrap() {
this.logger.log('Working mode: DAEMON');
this.loop().then();
}

private async loop() {
while (true) {
try {
await this.baseRun();
} catch (e) {
this.logger.error(e);
await sleep(1000);
}
}
}

private async baseRun() {
this.logger.log('🗿 Get finalized header');
const header = await this.consensus.getBeaconHeader('finalized');
this.logger.log(`💎 Finalized slot [${header.header.message.slot}]. Root [${header.root}]`);
await this.keysIndexer.run(header);
const nextRoot = await this.rootsProvider.getNext(header);
if (nextRoot) {
await this.rootsProcessor.process(nextRoot);
} else {
this.logger.log(`💤 Wait for the next finalized root`);
await sleep(12000);
}
}
}
210 changes: 210 additions & 0 deletions src/daemon/services/keys-indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { Low } from '@huanshiwushuang/lowdb';
import { JSONFile } from '@huanshiwushuang/lowdb/node';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService, OnApplicationBootstrap } from '@nestjs/common';

import { ConfigService } from '../../common/config/config.service';
import { KeyInfo } from '../../common/handlers/handlers.service';
import { Consensus } from '../../common/providers/consensus/consensus';
import {
BlockHeaderResponse,
RootHex,
Slot,
StateValidatorResponse,
} from '../../common/providers/consensus/response.interface';
import { Keysapi } from '../../common/providers/keysapi/keysapi';

type Info = {
moduleAddress: string;
moduleId: number;
storageStateSlot: number;
lastValidatorsCount: number;
};

type Storage = {
[valIndex: number]: KeyInfo;
};

@Injectable()
export class KeysIndexer implements OnApplicationBootstrap {
private startedAt: number = 0;

private info: Low<Info>;
private storage: Low<Storage>;

constructor(
@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService,
protected readonly config: ConfigService,
protected readonly consensus: Consensus,
protected readonly keysapi: Keysapi,
) {}

public async onApplicationBootstrap(): Promise<void> {
await this.initOrReadServiceData();
}

public getKey = (valIndex: number): KeyInfo | undefined => {
return this.storage.data[valIndex];
};

public async run(finalizedHeader: BlockHeaderResponse): Promise<unknown> {
// At one time only one task should be running
if (this.startedAt > 0) {
this.logger.warn(`🔑 Keys indexer has been running for ${Date.now() - this.startedAt}ms`);
return;
}
const finalizedSlot = Number(finalizedHeader.header.message.slot);
if (this.isNotTimeToRun(finalizedSlot)) {
this.logger.log('No need to run keys indexer');
return;
}
this.logger.log(`🔑 Keys indexer is running`);
const finalizedStateRoot = finalizedHeader.header.message.state_root;
if (this.info.data.storageStateSlot == 0) {
await this.baseRun(finalizedStateRoot, finalizedSlot);
return;
}
// We shouldn't wait for task to finish
// to avoid block processing if indexing fails or stuck
this.startedAt = Date.now();
this.baseRun(finalizedStateRoot, finalizedSlot)
.catch((e) => this.logger.error(e))
.finally(() => (this.startedAt = 0));
}

private async baseRun(stateRoot: RootHex, finalizedSlot: Slot): Promise<void> {
this.logger.log(`Get validators. State root [${stateRoot}]`);
const validators = await this.consensus.getValidators(stateRoot);
this.logger.log(`Total validators count: ${validators.length}`);
// TODO: do we need to store already full withdrawn keys ?
this.info.data.lastValidatorsCount == 0
? await this.initStorage(validators, finalizedSlot)
: await this.updateStorage(validators, finalizedSlot);
this.logger.log(`CSM validators count: ${Object.keys(this.storage.data).length}`);
this.info.data.storageStateSlot = finalizedSlot;
this.info.data.lastValidatorsCount = validators.length;
await this.info.write();
}

private async initStorage(validators: StateValidatorResponse[], finalizedSlot: Slot): Promise<void> {
this.logger.log(`Init keys data`);
const csmKeys = await this.keysapi.getModuleKeys(this.info.data.moduleId);
this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta);
const keysMap = new Map<string, { operatorIndex: number; index: number }>();
csmKeys.data.keys.forEach((k: any) => keysMap.set(k.key, { ...k }));
for (const v of validators) {
const keyInfo = keysMap.get(v.validator.pubkey);
if (!keyInfo) continue;
this.storage.data[Number(v.index)] = {
operatorId: keyInfo.operatorIndex,
keyIndex: keyInfo.index,
pubKey: v.validator.pubkey,
// TODO: bigint?
withdrawableEpoch: Number(v.validator.withdrawable_epoch),
};
}
await this.storage.write();
}

private async updateStorage(vals: StateValidatorResponse[], finalizedSlot: Slot): Promise<void> {
// TODO: should we think about re-using validator indexes?
// TODO: should we think about changing WC for existing old vaidators ?
if (vals.length - this.info.data.lastValidatorsCount == 0) {
this.logger.log(`No new validators in the state`);
return;
}
vals = vals.slice(this.info.data.lastValidatorsCount);
const valKeys = vals.map((v: StateValidatorResponse) => v.validator.pubkey);
this.logger.log(`New appeared validators count: ${vals.length}`);
const csmKeys = await this.keysapi.findModuleKeys(this.info.data.moduleId, valKeys);
this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta);
this.logger.log(`New appeared CSM validators count: ${csmKeys.data.keys.length}`);
for (const csmKey of csmKeys.data.keys) {
for (const newVal of vals) {
if (newVal.validator.pubkey != csmKey.key) continue;
this.storage.data[Number(newVal.index)] = {
operatorId: csmKey.operatorIndex,
keyIndex: csmKey.index,
pubKey: csmKey.key,
// TODO: bigint?
withdrawableEpoch: Number(newVal.validator.withdrawable_epoch),
};
}
}
await this.storage.write();
}

public isNotTimeToRun(finalizedSlot: Slot): boolean {
const storageTimestamp = this.consensus.slotToTimestamp(this.info.data.storageStateSlot) * 1000;
return (
this.info.data.storageStateSlot == finalizedSlot ||
this.config.get('KEYS_INDEXER_RUNNING_PERIOD') >= Date.now() - storageTimestamp
);
}

public eligibleForAnyDuty(slotNumber: Slot): boolean {
return this.eligibleForSlashings(slotNumber) || this.eligibleForFullWithdrawals(slotNumber);
}

public eligibleForEveryDuty(slotNumber: Slot): boolean {
const eligibleForSlashings = this.eligibleForSlashings(slotNumber);
const eligibleForFullWithdrawals = this.eligibleForFullWithdrawals(slotNumber);
if (!eligibleForSlashings)
this.logger.warn(
'🚨 Current keys indexer data might not be ready to detect slashing. ' +
'The root will be processed later again',
);
if (!eligibleForFullWithdrawals)
this.logger.warn(
'⚠️ Current keys indexer data might not be ready to detect full withdrawal. ' +
'The root will be processed later again',
);
return eligibleForSlashings && eligibleForFullWithdrawals;
}

private eligibleForSlashings(slotNumber: Slot): boolean {
// We are ok with oudated indexer for detection slasing
// because of a bunch of delays between deposit and validator appearing
// TODO: get constants from node
const ETH1_FOLLOW_DISTANCE = 2048; // ~8 hours
const EPOCHS_PER_ETH1_VOTING_PERIOD = 64; // ~6.8 hours
const safeDelay = ETH1_FOLLOW_DISTANCE + EPOCHS_PER_ETH1_VOTING_PERIOD * 32;
if (this.info.data.storageStateSlot >= slotNumber) return true;
return slotNumber - this.info.data.storageStateSlot <= safeDelay; // ~14.8 hours
}

private eligibleForFullWithdrawals(slotNumber: Slot): boolean {
// We are ok with oudated indexer for detection withdrawal
// because of MIN_VALIDATOR_WITHDRAWABILITY_DELAY
// TODO: get constants from node
const MIN_VALIDATOR_WITHDRAWABILITY_DELAY = 256;
const safeDelay = MIN_VALIDATOR_WITHDRAWABILITY_DELAY * 32;
if (this.info.data.storageStateSlot >= slotNumber) return true;
return slotNumber - this.info.data.storageStateSlot <= safeDelay; // ~27 hours
}

private async initOrReadServiceData() {
const defaultInfo: Info = {
moduleAddress: this.config.get('LIDO_STAKING_MODULE_ADDRESS'),
moduleId: 0,
storageStateSlot: 0,
lastValidatorsCount: 0,
};
this.info = new Low<Info>(new JSONFile<Info>('.keys-indexer-info.json'), defaultInfo);
this.storage = new Low<Storage>(new JSONFile<Storage>('.keys-indexer-storage.json'), {});
await this.info.read();
await this.storage.read();

if (this.info.data.moduleId == 0) {
const modules = (await this.keysapi.getModules()).data;
const module = modules.find(
(m: any) => m.stakingModuleAddress.toLowerCase() === this.info.data.moduleAddress.toLowerCase(),
);
if (!module) {
throw new Error(`Module with address ${this.info.data.moduleAddress} not found`);
}
this.info.data.moduleId = module.id;
await this.info.write();
}
}
}
33 changes: 33 additions & 0 deletions src/daemon/services/roots-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { KeysIndexer } from './keys-indexer';
import { RootsStack } from './roots-stack';
import { HandlersService } from '../../common/handlers/handlers.service';
import { Consensus } from '../../common/providers/consensus/consensus';
import { RootHex } from '../../common/providers/consensus/response.interface';

@Injectable()
export class RootsProcessor {
constructor(
@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService,
protected readonly consensus: Consensus,
protected readonly keysIndexer: KeysIndexer,
protected readonly rootsStack: RootsStack,
protected readonly handlers: HandlersService,
) {}

public async process(blockRoot: RootHex): Promise<any> {
this.logger.log(`🛃 Root in processing [${blockRoot}]`);
const blockInfo = await this.consensus.getBlockInfo(blockRoot);
const rootSlot = {
blockRoot,
slotNumber: Number(blockInfo.message.slot),
};
const indexerIsOK = this.keysIndexer.eligibleForEveryDuty(rootSlot.slotNumber);
if (!indexerIsOK) await this.rootsStack.push(rootSlot); // only new will be pushed
await this.handlers.prove(blockRoot, blockInfo, this.keysIndexer.getKey);
if (indexerIsOK) await this.rootsStack.purge(blockRoot);
await this.rootsStack.setLastProcessed(rootSlot);
}
}
Loading

0 comments on commit 407655c

Please sign in to comment.