Skip to content

Commit

Permalink
Merge pull request #251 from lidofinance/feature/val-671-mev-m-thread…
Browse files Browse the repository at this point in the history
…-times-out-while-receiving-keys-from-kapi

feat(val-671): define STREAM_TIMEOUT as env param
  • Loading branch information
AlexandrMov authored Feb 16, 2024
2 parents 1fea915 + 87a2729 commit 3a9f4fd
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 12 deletions.
4 changes: 4 additions & 0 deletions sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ CL_API_URLS=https://quiknode.pro/<token>
# for some applications we dont need validators registry library
# value below is default
VALIDATOR_REGISTRY_ENABLE=true

# When streaming lasts more than STREAM_TIMEOUT seconds, the stream will be destroyed
# This prevents the retrieval of outdated data.
STREAM_TIMEOUT=60000
5 changes: 5 additions & 0 deletions src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ export class EnvironmentVariables {
@IsPositive()
@Transform(({ value }) => parseInt(value, 10))
KEYS_FETCH_BATCH_SIZE = 1100;

@IsOptional()
@IsPositive()
@Transform(({ value }) => parseInt(value, 10))
STREAM_TIMEOUT = 60_000;
}

export function validate(config: Record<string, unknown>) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/registry/storage/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export const KEY_LEN = 98;
export const DEPOSIT_SIGNATURE_LEN = 194;
export const ADDRESS_LEN = 42;
export const STREAM_TIMEOUT = 60_000;
export const DEFAULT_STREAM_TIMEOUT = 60_000;
export const STREAM_KEYS_TIMEOUT_MESSAGE = 'A timeout occurred loading keys from the database';
export const STREAM_OPERATORS_TIMEOUT_MESSAGE = 'A timeout occurred loading operators from the database';
8 changes: 5 additions & 3 deletions src/common/registry/storage/key.storage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { QueryOrder } from '@mikro-orm/core';
import { FilterQuery, FindOptions } from '@mikro-orm/core';
import { Injectable } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { addTimeoutToStream } from '../utils/stream.utils';
import { RegistryKey } from './key.entity';
import { RegistryKeyRepository } from './key.repository';
import { STREAM_KEYS_TIMEOUT_MESSAGE, STREAM_TIMEOUT } from './constants';
import { DEFAULT_STREAM_TIMEOUT, STREAM_KEYS_TIMEOUT_MESSAGE } from './constants';

@Injectable()
export class RegistryKeyStorageService {
constructor(private readonly repository: RegistryKeyRepository) {}
constructor(private readonly repository: RegistryKeyRepository, private readonly configService: ConfigService) {}

/** find keys */
async find<P extends string = never>(
Expand All @@ -27,7 +28,8 @@ export class RegistryKeyStorageService {
const knex = qb.getKnexQuery();
const stream = knex.stream();

addTimeoutToStream(stream, STREAM_TIMEOUT, STREAM_KEYS_TIMEOUT_MESSAGE);
const streamTimeout = this.configService.get('STREAM_TIMEOUT') ?? DEFAULT_STREAM_TIMEOUT;
addTimeoutToStream(stream, streamTimeout, STREAM_KEYS_TIMEOUT_MESSAGE);

return stream;
}
Expand Down
8 changes: 5 additions & 3 deletions src/common/registry/storage/operator.storage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { QueryOrder } from '@mikro-orm/core';
import { FilterQuery, FindOptions } from '@mikro-orm/core';
import { Injectable } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { addTimeoutToStream } from '../utils/stream.utils';
import { RegistryOperator } from './operator.entity';
import { RegistryOperatorRepository } from './operator.repository';
import { STREAM_OPERATORS_TIMEOUT_MESSAGE, STREAM_TIMEOUT } from './constants';
import { DEFAULT_STREAM_TIMEOUT, STREAM_OPERATORS_TIMEOUT_MESSAGE } from './constants';

@Injectable()
export class RegistryOperatorStorageService {
constructor(private readonly repository: RegistryOperatorRepository) {}
constructor(private readonly repository: RegistryOperatorRepository, private readonly configService: ConfigService) {}

/** find operators */
async find<P extends string = never>(
Expand All @@ -28,7 +29,8 @@ export class RegistryOperatorStorageService {
const knex = qb.getKnexQuery();
const stream = knex.stream();

addTimeoutToStream(stream, STREAM_TIMEOUT, STREAM_OPERATORS_TIMEOUT_MESSAGE);
const streamTimeout = this.configService.get('STREAM_TIMEOUT') ?? DEFAULT_STREAM_TIMEOUT;
addTimeoutToStream(stream, streamTimeout, STREAM_OPERATORS_TIMEOUT_MESSAGE);

return stream;
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/registry/storage/registry-storage.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import { RegistryKeyStorageService } from './key.storage';
import { MikroOrmModule } from '@mikro-orm/nestjs';
import { RegistryKey } from './key.entity';
import { RegistryOperator } from './operator.entity';
import { ConfigModule } from 'common/config';

@Module({
imports: [
MikroOrmModule.forFeature({
entities: [RegistryKey, RegistryOperator],
}),
ConfigModule,
],
providers: [RegistryStorageService, RegistryOperatorStorageService, RegistryKeyStorageService],
exports: [RegistryStorageService, RegistryOperatorStorageService, RegistryKeyStorageService],
Expand Down
16 changes: 14 additions & 2 deletions src/common/registry/test/storage/key.storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { key } from '../fixtures/key.fixture';
import { RegistryKeyStorageService, RegistryKey, RegistryKeyRepository } from '../../';
import { REGISTRY_CONTRACT_ADDRESSES } from '@lido-nestjs/contracts';
import * as streamUtils from '../../utils/stream.utils';
import { STREAM_KEYS_TIMEOUT_MESSAGE, STREAM_TIMEOUT } from '../../../registry/storage/constants';
import { STREAM_KEYS_TIMEOUT_MESSAGE, DEFAULT_STREAM_TIMEOUT } from '../../../registry/storage/constants';
import { ConfigModule, ConfigService } from 'common/config';

describe('Keys', () => {
const CHAIN_ID = process.env.CHAIN_ID || 1;
Expand Down Expand Up @@ -53,13 +54,24 @@ describe('Keys', () => {
};

let storageService: RegistryKeyStorageService;
let configService: ConfigService;

beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [ConfigModule],
providers: [RegistryKeyStorageService, { provide: RegistryKeyRepository, useValue: mockRegistryKeyRepository }],
}).compile();

storageService = moduleRef.get(RegistryKeyStorageService);
configService = moduleRef.get(ConfigService);

jest.spyOn(configService, 'get').mockImplementation((path) => {
if (path === 'STREAM_TIMEOUT') {
return DEFAULT_STREAM_TIMEOUT;
}

return configService.get(path);
});
});

beforeEach(() => {
Expand Down Expand Up @@ -87,7 +99,7 @@ describe('Keys', () => {
expect(mockedCreateQueryBuilder.orderBy).toBeCalledWith({ index: 'asc', operator_index: 'asc' });
expect(mockedCreateQueryBuilder.getKnexQuery).toBeCalledTimes(1);
expect(streamValue).toBeCalledTimes(1);
expect(addTimeoutToStream).toBeCalledWith(stream, STREAM_TIMEOUT, STREAM_KEYS_TIMEOUT_MESSAGE);
expect(addTimeoutToStream).toBeCalledWith(stream, DEFAULT_STREAM_TIMEOUT, STREAM_KEYS_TIMEOUT_MESSAGE);
});

test('findAll', async () => {
Expand Down
16 changes: 14 additions & 2 deletions src/common/registry/test/storage/operator.storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { operator } from '../fixtures/operator.fixture';
import { RegistryOperatorStorageService, RegistryOperator, RegistryOperatorRepository } from '../../';
import { REGISTRY_CONTRACT_ADDRESSES } from '@lido-nestjs/contracts';
import * as streamUtils from '../../utils/stream.utils';
import { STREAM_OPERATORS_TIMEOUT_MESSAGE, STREAM_TIMEOUT } from '../../../registry/storage/constants';
import { DEFAULT_STREAM_TIMEOUT, STREAM_OPERATORS_TIMEOUT_MESSAGE } from '../../../registry/storage/constants';
import { ConfigModule, ConfigService } from 'common/config';

describe('Operators', () => {
const CHAIN_ID = process.env.CHAIN_ID || 1;
Expand Down Expand Up @@ -53,9 +54,11 @@ describe('Operators', () => {
};

let storageService: RegistryOperatorStorageService;
let configService: ConfigService;

beforeEach(async () => {
const moduleRef = await Test.createTestingModule({
imports: [ConfigModule],
providers: [
RegistryOperatorStorageService,
{
Expand All @@ -66,6 +69,15 @@ describe('Operators', () => {
}).compile();

storageService = moduleRef.get(RegistryOperatorStorageService);
configService = moduleRef.get(ConfigService);

jest.spyOn(configService, 'get').mockImplementation((path) => {
if (path === 'STREAM_TIMEOUT') {
return DEFAULT_STREAM_TIMEOUT;
}

return configService.get(path);
});
});

beforeEach(() => {
Expand Down Expand Up @@ -93,7 +105,7 @@ describe('Operators', () => {
expect(mockedCreateQueryBuilder.orderBy).toBeCalledWith({ index: 'asc', module_address: 'asc' });
expect(mockedCreateQueryBuilder.getKnexQuery).toBeCalledTimes(1);
expect(streamValue).toBeCalledTimes(1);
expect(addTimeoutToStream).toBeCalledWith(stream, STREAM_TIMEOUT, STREAM_OPERATORS_TIMEOUT_MESSAGE);
expect(addTimeoutToStream).toBeCalledWith(stream, DEFAULT_STREAM_TIMEOUT, STREAM_OPERATORS_TIMEOUT_MESSAGE);
});

test('findAll', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@ export class SRModulesOperatorsKeysController {
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
);
} catch (error) {
this.logger.error('modules-operators-keys error', error);
jsonStream.destroy();

if (error instanceof Error) {
const message = error.message;
const stack = error.stack;
this.logger.error(`modules-operators-keys error: ${message}`, stack);
} else {
this.logger.error('modules-operators-keys unknown error');
}
}
}
}

0 comments on commit 3a9f4fd

Please sign in to comment.