Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAL-255: Add endpoint for streaming for KAPI #149

Merged
merged 33 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9c158bc
avro streams
AlexandrMov Sep 6, 2023
0f19704
fix e2e
AlexandrMov Sep 18, 2023
e382803
Merge branch 'feat/VAL-189-sr-modules-support' into feat/val-255-sr-m…
AlexandrMov Sep 25, 2023
e017147
pull new code
AlexandrMov Sep 25, 2023
24cc07d
fix e2e tests
AlexandrMov Sep 26, 2023
40d9f97
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Sep 27, 2023
e3f9f51
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Sep 29, 2023
696eced
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Oct 10, 2023
1660b7e
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Oct 18, 2023
9f02fe4
VAL-369: Use postgres in e2e
AlexandrMov Oct 20, 2023
7e729eb
feat: github service-container for e2e tests
AlexandrMov Oct 23, 2023
da5001b
fix: ci
AlexandrMov Oct 23, 2023
0443692
fix tests
AlexandrMov Oct 23, 2023
f14ee77
revert changes
AlexandrMov Oct 23, 2023
01634ac
fix: get rid of console.log
AlexandrMov Oct 23, 2023
a4a480a
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Nov 7, 2023
a197e9c
cr
AlexandrMov Nov 7, 2023
9d0897a
feat: stream json error handling
eddort Nov 7, 2023
3c531b8
Merge pull request #210 from lidofinance/feat/val-413
eddort Nov 7, 2023
4f90002
Merge branch 'feat/val-255-sr-modules-operators-keys-stream-2' into f…
AlexandrMov Nov 8, 2023
3a05f61
cr 1
AlexandrMov Nov 8, 2023
17ed1ee
experiment
AlexandrMov Nov 8, 2023
1c36f23
VALIDATOR_REGISTRY_ENABLE=false
AlexandrMov Nov 8, 2023
be43c98
rollback
AlexandrMov Nov 8, 2023
8d63546
Merge pull request #194 from lidofinance/feat/VAL-369-use-postgres-in…
Amuhar Nov 8, 2023
cf06652
feat: add timeout to keys and operators stream
eddort Nov 8, 2023
65156a0
Merge pull request #211 from lidofinance/feat/add-timeout-to-stream
eddort Nov 8, 2023
4dd16ab
fix: replace console logs by error
eddort Nov 9, 2023
bf16d5b
feat: better error handling
eddort Nov 12, 2023
2617776
Merge pull request #213 from lidofinance/feat/stream-error-handling
Amuhar Nov 14, 2023
305f638
Merge branch 'develop' into feat/val-255-sr-modules-operators-keys-st…
AlexandrMov Nov 20, 2023
789948c
chore: code review
AlexandrMov Nov 20, 2023
dd6d2d3
chore: code review 2
AlexandrMov Nov 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
/dist
/node_modules

# Env
.env

# Logs
logs
*.log
Expand Down
26 changes: 24 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@ on: pull_request
jobs:
test:
runs-on: ubuntu-latest

services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: node_operator_keys_service_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- name: Checkout repo
uses: actions/[email protected]
Expand All @@ -19,9 +35,15 @@ jobs:
run: yarn lint
- name: Run tests
run: yarn test
- name: Run tests
run: yarn test:e2e
- name: Run E2E tests
run: yarn test:e2e:docker
env:
PROVIDERS_URLS: ${{ secrets.PROVIDERS_URLS }}
CL_API_URLS: "https://e2e-test.lido.fi,"
CHAIN_ID: ${{ secrets.CHAIN_ID }}
CHRONIX_PROVIDER_MAINNET_URL: ${{ secrets.CHRONIX_PROVIDER_MAINNET_URL }}
DB_NAME: node_operator_keys_service_db
DB_PORT: 5432
DB_HOST: localhost
DB_USER: postgres
DB_PASSWORD: postgres
55 changes: 55 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug Nest Framework",
"runtimeExecutable": "yarn",
"runtimeArgs": [
"start:debug",
],
"cwd": "${workspaceFolder}/src",
"autoAttachChildProcesses": true,
"restart": true,
"sourceMaps": true,
"stopOnEntry": false,
"console": "integratedTerminal",
},
{
"type": "node",
"request": "launch",
"name": "Debug Jest e2e",
"runtimeExecutable": "yarn",
"runtimeArgs": [
"test:e2e:docker:debug",
],
"cwd": "${workspaceFolder}/src",
"autoAttachChildProcesses": true,
"restart": true,
"sourceMaps": true,
"stopOnEntry": false,
"console": "integratedTerminal",
},
{
"name": "Jest file",
"type": "node",
"request": "launch",
"runtimeExecutable": "${workspaceRoot}/node_modules/.bin/jest",
"args": [
"${fileBasenameNoExtension}",
"--runInBand",
"--watch",
"--coverage=false",
"--no-cache"
],
"cwd": "${workspaceRoot}",
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"sourceMaps": true,
"windows": {
"program": "${workspaceFolder}/node_modules/jest/bin/jest"
}
}
]
}
15 changes: 15 additions & 0 deletions Dockerfile.e2e
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM node:18.14.2-alpine3.16 as building

RUN apk add --no-cache git=2.36.6-r0

WORKDIR /app

COPY package.json yarn.lock chronix.config.ts .env ./
COPY jest* ./
COPY ./tsconfig*.json ./

RUN yarn install --frozen-lockfile --non-interactive && yarn cache clean
COPY ./src ./src
RUN yarn typechain

CMD ["yarn", "test:e2e:docker"]
27 changes: 27 additions & 0 deletions docker-compose.e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: '3.7'

services:
e2e_pgdb:
container_name: e2e_pgdb
image: postgres:14-alpine
restart: unless-stopped
environment:
- POSTGRES_DB=${DB_NAME}
- POSTGRES_USER=${DB_USER}
- POSTGRES_PASSWORD=${DB_PASSWORD}
expose:
- 5432:5432
volumes:
- ./.volumes/pgdata-${CHAIN_ID}/:/var/lib/postgresql/data

e2e_keys_api:
container_name: e2e_keys_api
build:
context: ./
dockerfile: Dockerfile.e2e
environment:
- NODE_ENV=production
- DB_PORT=5432
- DB_HOST=e2e_pgdb
depends_on:
- e2e_pgdb
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config jest-e2e.json && chronix compile && chronix test",
"test:e2e": "docker-compose --env-file=./.env -f docker-compose.e2e.yml up --build --abort-on-container-exit",
"test:e2e:docker": "mikro-orm schema:drop -r && mikro-orm migration:up && jest -i --config jest-e2e.json && chronix compile && chronix test",
"test:e2e:docker:debug": "mikro-orm schema:drop -r && mikro-orm migration:up && node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest -i --config jest-e2e.json && chronix compile && chronix test",
"typechain": "typechain --target=ethers-v5 --out-dir ./src/generated ./src/staking-router-modules/contracts/abi/*.json",
"chronix:compile": "chronix compile",
"chronix:test": "chronix test",
Expand Down Expand Up @@ -73,6 +75,7 @@
"ethers": "^5.5.4",
"fastify-swagger": "^4.13.1",
"jsonstream": "^1.0.3",
"pg-query-stream": "^4.5.3",
"prom-client": "^14.0.1",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
Expand Down
29 changes: 29 additions & 0 deletions src/app/database-testing.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Module } from '@nestjs/common';
import { MikroOrmModule } from '@mikro-orm/nestjs';
import config from 'mikro-orm.config';
import { ConfigModule, ConfigService } from 'common/config';

@Module({
imports: [
ConfigModule,
MikroOrmModule.forRootAsync({
async useFactory(configService: ConfigService) {
return {
...config,
dbName: configService.get('DB_NAME'),
host: configService.get('DB_HOST'),
port: configService.get('DB_PORT'),
user: configService.get('DB_USER'),
password: configService.get('DB_PASSWORD'),
autoLoadEntities: false,
cache: { enabled: false },
debug: false,
registerRequestContext: true,
allowGlobalContext: true,
};
},
inject: [ConfigService],
}),
],
})
export class DatabaseTestingModule {}
1 change: 1 addition & 0 deletions src/app/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './app.constants';
export * from './app.module';
export * from './app.service';
export * from './database-testing.module';
18 changes: 18 additions & 0 deletions src/common/registry/storage/key.storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { QueryOrder } from '@mikro-orm/core';
import { FilterQuery, FindOptions } from '@mikro-orm/core';
import { Injectable } from '@nestjs/common';
import { addTimeoutToStream } from '../utils/stream.utils';
import { RegistryKey } from './key.entity';
import { RegistryKeyRepository } from './key.repository';

Expand All @@ -16,6 +17,23 @@ export class RegistryKeyStorageService {
return await this.repository.find(where, options);
}

findStream(where: FilterQuery<RegistryKey>, fields?: string[]): AsyncIterable<RegistryKey> {
const knex = this.repository.getKnex();
const stream = knex
.select(fields || '*')
.from<RegistryKey>('registry_key')
.where(where)
AlexandrMov marked this conversation as resolved.
Show resolved Hide resolved
.orderBy([
{ column: 'operatorIndex', order: 'asc' },
{ column: 'index', order: 'asc' },
])
.stream();

addTimeoutToStream(stream, 60_000, 'A timeout occurred loading keys from the database');

return stream;
}

/** find all keys */
async findAll(moduleAddress: string): Promise<RegistryKey[]> {
return await this.repository.find(
Expand Down
18 changes: 18 additions & 0 deletions src/common/registry/storage/operator.storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { QueryOrder } from '@mikro-orm/core';
import { FilterQuery, FindOptions } from '@mikro-orm/core';
import { Injectable } from '@nestjs/common';
import { addTimeoutToStream } from '../utils/stream.utils';
import { RegistryOperator } from './operator.entity';
import { RegistryOperatorRepository } from './operator.repository';

Expand All @@ -16,6 +17,23 @@ export class RegistryOperatorStorageService {
return await this.repository.find(where, options);
}

findStream(where: FilterQuery<RegistryOperator>, fields?: string[]): AsyncIterable<RegistryOperator> {
const knex = this.repository.getKnex();
const stream = knex
.select(fields || '*')
.from<RegistryOperator>('registry_operator')
.where(where)
AlexandrMov marked this conversation as resolved.
Show resolved Hide resolved
.orderBy([
{ column: 'operatorIndex', order: 'asc' },
{ column: 'index', order: 'asc' },
])
.stream();

addTimeoutToStream(stream, 60_000, 'A timeout occurred loading operators from the database');

return stream;
}

/** find all operators */
async findAll(moduleAddress: string): Promise<RegistryOperator[]> {
return await this.repository.find(
Expand Down
1 change: 0 additions & 1 deletion src/common/registry/test/fetch/operator.fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ describe('Operators', () => {
// operatorFields(operator);
return iface.encodeFunctionResult('getNodeOperator', operatorFields(operator));
});
console.log('aaaaa', expected);
const result = await fetchService.fetch(address);

expect(result).toEqual([expected]);
Expand Down
30 changes: 30 additions & 0 deletions src/common/registry/utils/stream.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { PassThrough } from 'stream';

/**
* This util adds a timeout for streaming,
* preventing errors related to stream hangs
* this can happen when working with knex
* @param stream nodejs stream
* @param ms timeout in ms
* @param errorMessage error text to be displayed when the stream is closed
*/
export const addTimeoutToStream = (stream: PassThrough, ms: number, errorMessage: string) => {
let timeout = setTimeout(() => {
stream.destroy(new Error(errorMessage));
}, ms);

const debounce = () => {
clearTimeout(timeout);
timeout = setTimeout(() => {
stream.destroy(new Error(errorMessage));
}, ms);
};

// we should stop streaming if more than "ms" has elapsed since the last receipt of data.
const debounceTimeoutHandler = stream.on('data', debounce);

stream.once('close', () => {
clearTimeout(timeout);
debounceTimeoutHandler.off('data', debounce);
});
};
1 change: 1 addition & 0 deletions src/common/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './streamify';
30 changes: 30 additions & 0 deletions src/common/streams/streamify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Readable, ReadableOptions } from 'stream';

class GeneratorToStream<GeneratorResultType> extends Readable {
constructor(options: ReadableOptions, protected readonly generator: AsyncGenerator<GeneratorResultType>) {
super(options);
}

_read() {
try {
this.generator
.next()
.then((result) => {
if (!result.done) {
this.push(result.value);
} else {
this.push(null);
}
})
.catch((e) => {
this.emit('error', e);
});
} catch (e) {
this.emit('error', e);
}
}
}

export function streamify<GeneratorResultType>(generator: AsyncGenerator<GeneratorResultType>) {
return new GeneratorToStream<GeneratorResultType>({ objectMode: true }, generator);
}
19 changes: 16 additions & 3 deletions src/http/keys/keys.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import {
HttpStatus,
NotFoundException,
Res,
LoggerService,
Inject,
} from '@nestjs/common';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import type { FastifyReply } from 'fastify';
import { ApiNotFoundResponse, ApiOperation, ApiParam, ApiResponse, ApiTags } from '@nestjs/swagger';
import { KeysService } from './keys.service';
Expand All @@ -25,7 +28,11 @@ import { IsolationLevel } from '@mikro-orm/core';
@Controller('keys')
@ApiTags('keys')
export class KeysController {
constructor(protected readonly keysService: KeysService, protected readonly entityManager: EntityManager) {}
constructor(
@Inject(LOGGER_PROVIDER) protected logger: LoggerService,
protected readonly keysService: KeysService,
protected readonly entityManager: EntityManager,
) {}

@Version('1')
@Get()
Expand All @@ -48,16 +55,22 @@ export class KeysController {

const jsonStream = JSONStream.stringify('{ "meta": ' + JSON.stringify(meta) + ', "data": [', ',', ']}');
reply.type('application/json').send(jsonStream);
// TODO: is it necessary to check the error? or 'finally' is ok?

try {
for (const keysGenerator of keysGenerators) {
for await (const key of keysGenerator) {
const keyReponse = new Key(key);
jsonStream.write(keyReponse);
}
}
} finally {

jsonStream.end();
} catch (streamError) {
// Handle the error during streaming.
this.logger.log('keys streaming error', streamError);
// destroy method closes the stream without ']' and corrupt the result
// https://github.com/dominictarr/through/blob/master/index.js#L78
jsonStream.destroy();
}
},
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
Expand Down
Loading
Loading