Skip to content

Commit

Permalink
feat: use SSZ state representation from node
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Feb 8, 2024
1 parent fb8d12f commit 7805685
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 37 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"dependencies": {
"@huanshiwushuang/lowdb": "^6.0.2",
"@lido-nestjs/logger": "^1.3.2",
"@lodestar/types": "^1.15.0",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.0.0",
Expand All @@ -45,8 +46,8 @@
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@swc/cli": "^0.1.63",
"@swc/core": "^1.3.104",
"@swc/cli": "^0.3.9",
"@swc/core": "^1.4.0",
"@swc/jest": "^0.2.30",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.2",
Expand Down
2 changes: 2 additions & 0 deletions src/common/handlers/handlers.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { Module } from '@nestjs/common';

import { HandlersService } from './handlers.service';
import { ProvidersModule } from '../providers/providers.module';

@Module({
imports: [ProvidersModule],
providers: [HandlersService],
exports: [HandlersService],
})
Expand Down
18 changes: 15 additions & 3 deletions src/common/handlers/handlers.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { Consensus } from '../providers/consensus/consensus';
import { BlockInfoResponse, RootHex } from '../providers/consensus/response.interface';

export interface KeyInfo {
Expand All @@ -14,24 +15,35 @@ type KeyInfoFn = (valIndex: number) => KeyInfo | undefined;

@Injectable()
export class HandlersService {
constructor(@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService) {}
constructor(
@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService,
protected readonly consensus: Consensus,
) {}

public async prove(blockRoot: RootHex, blockInfo: BlockInfoResponse, keyInfoFn: KeyInfoFn): Promise<void> {
const slashings = await this.getUnprovenSlashings(blockRoot, blockInfo, keyInfoFn);
const withdrawals = await this.getUnprovenWithdrawals(blockRoot, blockInfo, keyInfoFn);
if (!slashings.length && !withdrawals.length) return;
const payload = await this.buildProvePayload(slashings, withdrawals);
const payload = await this.buildProvePayload(blockInfo, slashings, withdrawals);
// TODO: ask before sending if CLI or daemon in watch mode
await this.sendProves(payload);
this.logger.log(`🏁 Proves sent. Root [${blockRoot}]`);
}

private async buildProvePayload(slashings: string[], withdrawals: string[]): Promise<any> {
private async buildProvePayload(
blockInfo: BlockInfoResponse,
slashings: string[],
withdrawals: string[],
): Promise<any> {
// TODO: implement
// this.consensus.getState(...)
if (slashings.length || withdrawals.length) {
this.logger.warn(`📦 Prove payload: slashings [${slashings}], withdrawals [${withdrawals}]`);
}
// const { ssz } = await eval('import("@lodestar/types")');
// const stateSSZ = await this.consensus.getStateSSZ(header.header.message.slot);
// const stateView = ssz.deneb.BeaconState.deserializeToView(stateSSZ);
// const validatorsInfo = stateView.validators.type.elementType.toJson(stateView.validators.get(1337));
return {};
}
private async sendProves(payload: any): Promise<void> {
Expand Down
2 changes: 2 additions & 0 deletions src/common/providers/base/rest-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface RequestOptions {
streamed?: boolean;
requestPolicy?: RequestPolicy;
signal?: AbortSignal;
headers?: Record<string, string>;
}

export abstract class BaseRestProvider {
Expand Down Expand Up @@ -55,6 +56,7 @@ export abstract class BaseRestProvider {
method: 'GET',
headersTimeout: (options.requestPolicy as RequestPolicy).timeout,
signal: options.signal,
headers: options.headers,
});
if (statusCode !== 200) {
const hostname = new URL(base).hostname;
Expand Down
21 changes: 19 additions & 2 deletions src/common/providers/consensus/consensus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export class Consensus extends BaseRestProvider implements OnApplicationBootstra
}

public async getState(stateId: StateId, signal?: AbortSignal): Promise<any> {
const resp: { body: BodyReadable; headers: IncomingHttpHeaders } = await this.baseGet(
const { body } = await this.baseGet<{ body: BodyReadable; headers: IncomingHttpHeaders }>(
this.mainUrl,
this.endpoints.state(stateId),
{
Expand All @@ -115,9 +115,26 @@ export class Consensus extends BaseRestProvider implements OnApplicationBootstra
// TODO: Enable for CLI only
//this.progress.show(`State [${stateId}]`, resp);
// Data processing
const pipeline = chain([resp.body, parser()]);
const pipeline = chain([body, parser()]);
return await new Promise((resolve) => {
connectTo(pipeline).on('done', (asm) => resolve(asm.current));
});
}

public async getStateSSZ(stateId: StateId, signal?: AbortSignal): Promise<any> {
const { body } = await this.baseGet<{ body: BodyReadable; headers: IncomingHttpHeaders }>(
this.mainUrl,
this.endpoints.state(stateId),
{
streamed: true,
signal,
headers: { accept: 'application/octet-stream' },
},
);
// Progress bar
// TODO: Enable for CLI only
//this.progress.show(`State [${stateId}]`, resp);
// Data processing
return new Uint8Array(await body.arrayBuffer());
}
}
67 changes: 42 additions & 25 deletions src/daemon/services/keys-indexer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { BooleanType, ByteVectorType, ContainerNodeStructType, UintNumberType } from '@chainsafe/ssz';
import { ListCompositeTreeView } from '@chainsafe/ssz/lib/view/listComposite';
import { Low } from '@huanshiwushuang/lowdb';
import { JSONFile } from '@huanshiwushuang/lowdb/node';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
Expand All @@ -6,12 +8,7 @@ import { Inject, Injectable, LoggerService, OnApplicationBootstrap } from '@nest
import { ConfigService } from '../../common/config/config.service';
import { KeyInfo } from '../../common/handlers/handlers.service';
import { Consensus } from '../../common/providers/consensus/consensus';
import {
BlockHeaderResponse,
RootHex,
Slot,
StateValidatorResponse,
} from '../../common/providers/consensus/response.interface';
import { BlockHeaderResponse, RootHex, Slot } from '../../common/providers/consensus/response.interface';
import { Keysapi } from '../../common/providers/keysapi/keysapi';

type Info = {
Expand All @@ -25,6 +22,21 @@ type Storage = {
[valIndex: number]: KeyInfo;
};

let types: typeof import('@lodestar/types');

type Validators = ListCompositeTreeView<
ContainerNodeStructType<{
pubkey: ByteVectorType;
withdrawalCredentials: ByteVectorType;
effectiveBalance: UintNumberType;
slashed: BooleanType;
activationEligibilityEpoch: UintNumberType;
activationEpoch: UintNumberType;
exitEpoch: UintNumberType;
withdrawableEpoch: UintNumberType;
}>
>;

@Injectable()
export class KeysIndexer implements OnApplicationBootstrap {
private startedAt: number = 0;
Expand All @@ -40,6 +52,8 @@ export class KeysIndexer implements OnApplicationBootstrap {
) {}

public async onApplicationBootstrap(): Promise<void> {
// ugly hack to import ESModule to CommonJS project
types = await eval(`import('@lodestar/types')`);
await this.initOrReadServiceData();
}

Expand Down Expand Up @@ -74,60 +88,63 @@ export class KeysIndexer implements OnApplicationBootstrap {

private async baseRun(stateRoot: RootHex, finalizedSlot: Slot): Promise<void> {
this.logger.log(`Get validators. State root [${stateRoot}]`);
const validators = await this.consensus.getValidators(stateRoot);
this.logger.log(`Total validators count: ${validators.length}`);
const stateSSZ = await this.consensus.getStateSSZ(stateRoot);
const stateView = types.ssz.deneb.BeaconState.deserializeToView(stateSSZ);
this.logger.log(`Total validators count: ${stateView.validators.length}`);
// TODO: do we need to store already full withdrawn keys ?
this.info.data.lastValidatorsCount == 0
? await this.initStorage(validators, finalizedSlot)
: await this.updateStorage(validators, finalizedSlot);
? await this.initStorage(stateView.validators, finalizedSlot)
: await this.updateStorage(stateView.validators, finalizedSlot);
this.logger.log(`CSM validators count: ${Object.keys(this.storage.data).length}`);
this.info.data.storageStateSlot = finalizedSlot;
this.info.data.lastValidatorsCount = validators.length;
this.info.data.lastValidatorsCount = stateView.validators.length;
await this.info.write();
}

private async initStorage(validators: StateValidatorResponse[], finalizedSlot: Slot): Promise<void> {
private async initStorage(validators: Validators, finalizedSlot: Slot): Promise<void> {
this.logger.log(`Init keys data`);
const csmKeys = await this.keysapi.getModuleKeys(this.info.data.moduleId);
this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta);
const keysMap = new Map<string, { operatorIndex: number; index: number }>();
csmKeys.data.keys.forEach((k: any) => keysMap.set(k.key, { ...k }));
for (const v of validators) {
const keyInfo = keysMap.get(v.validator.pubkey);
for (const [i, v] of validators.getAllReadonlyValues().entries()) {
const keyInfo = keysMap.get('0x'.concat(Buffer.from(v.pubkey).toString('hex')));
if (!keyInfo) continue;
this.storage.data[Number(v.index)] = {
this.storage.data[i] = {
operatorId: keyInfo.operatorIndex,
keyIndex: keyInfo.index,
pubKey: v.validator.pubkey,
pubKey: v.pubkey.toString(),
// TODO: bigint?
withdrawableEpoch: Number(v.validator.withdrawable_epoch),
withdrawableEpoch: v.withdrawableEpoch,
};
}
await this.storage.write();
}

private async updateStorage(vals: StateValidatorResponse[], finalizedSlot: Slot): Promise<void> {
private async updateStorage(validators: Validators, finalizedSlot: Slot): Promise<void> {
// TODO: should we think about re-using validator indexes?
// TODO: should we think about changing WC for existing old vaidators ?
if (vals.length - this.info.data.lastValidatorsCount == 0) {
if (validators.length - this.info.data.lastValidatorsCount == 0) {
this.logger.log(`No new validators in the state`);
return;
}
vals = vals.slice(this.info.data.lastValidatorsCount);
const valKeys = vals.map((v: StateValidatorResponse) => v.validator.pubkey);
// TODO: can be better
const vals = validators.getAllReadonlyValues().slice(this.info.data.lastValidatorsCount);
const valKeys = vals.map((v) => '0x'.concat(Buffer.from(v.pubkey).toString('hex')));
this.logger.log(`New appeared validators count: ${vals.length}`);
const csmKeys = await this.keysapi.findModuleKeys(this.info.data.moduleId, valKeys);
this.keysapi.healthCheck(this.consensus.slotToTimestamp(finalizedSlot), csmKeys.meta);
this.logger.log(`New appeared CSM validators count: ${csmKeys.data.keys.length}`);
for (const csmKey of csmKeys.data.keys) {
for (const newVal of vals) {
if (newVal.validator.pubkey != csmKey.key) continue;
this.storage.data[Number(newVal.index)] = {
for (const [i, v] of vals.entries()) {
if (valKeys[i] != csmKey.key) continue;
const index = i + this.info.data.lastValidatorsCount;
this.storage.data[index] = {
operatorId: csmKey.operatorIndex,
keyIndex: csmKey.index,
pubKey: csmKey.key,
// TODO: bigint?
withdrawableEpoch: Number(newVal.validator.withdrawable_epoch),
withdrawableEpoch: v.withdrawableEpoch,
};
}
}
Expand Down
Loading

0 comments on commit 7805685

Please sign in to comment.