Skip to content

Commit

Permalink
Merge pull request #252 from lidofinance/fix/val-668
Browse files Browse the repository at this point in the history
Read stream without streamify
  • Loading branch information
Amuhar authored Feb 16, 2024
2 parents 3a9f4fd + 868f41c commit 83f557f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { pipeline } from 'node:stream/promises';
import { IsolationLevel } from '@mikro-orm/core';
import {
Controller,
Expand All @@ -20,7 +19,6 @@ import { TooEarlyResponse } from '../common/entities/http-exceptions';
import { EntityManager } from '@mikro-orm/knex';
import * as JSONStream from 'jsonstream';
import type { FastifyReply } from 'fastify';
import { streamify } from 'common/streams';
import { ModuleIdPipe } from 'http/common/pipeline/module-id-pipe';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';

Expand Down Expand Up @@ -122,21 +120,29 @@ export class SRModulesOperatorsKeysController {

reply.type('application/json').send(jsonStream);

try {
await this.entityManager.transactional(
() => pipeline([streamify(this.srModulesOperatorsKeys.getModulesOperatorsKeysGenerator()), jsonStream]),
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
);
} catch (error) {
jsonStream.destroy();
const generator = await this.srModulesOperatorsKeys.getModulesOperatorsKeysGenerator();

if (error instanceof Error) {
const message = error.message;
const stack = error.stack;
this.logger.error(`modules-operators-keys error: ${message}`, stack);
} else {
this.logger.error('modules-operators-keys unknown error');
}
}
await this.entityManager.transactional(
async () => {
try {
for await (const value of generator) {
jsonStream.write(value);
}

jsonStream.end();
} catch (error) {
if (error instanceof Error) {
const message = error.message;
const stack = error.stack;
this.logger.error(`modules-operators-keys error: ${message}`, stack);
} else {
this.logger.error('modules-operators-keys unknown error');
}

jsonStream.destroy();
}
},
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
);
}
}
5 changes: 5 additions & 0 deletions src/jobs/keys-update/keys-update.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ export class KeysUpdateService {
// read from database last execution layer data
const prevElMeta = await this.elMetaStorage.get();

this.logger.log('Fetched current execution meta and meta from database');

// handle the situation when the node has fallen behind the service state
if (prevElMeta && prevElMeta?.blockNumber > currElMeta.number) {
this.logger.warn('Previous data is newer than current data', prevElMeta);
Expand All @@ -133,6 +135,9 @@ export class KeysUpdateService {

// Get modules from storage
const storageModules = await this.srModulesStorage.findAll();

this.logger.log('Fetched modules from database');

// Get staking modules from SR contract
const contractModules = await this.stakingRouterFetchService.getStakingModules({ blockHash: currElMeta.hash });

Expand Down

0 comments on commit 83f557f

Please sign in to comment.