From 572b5155eaea31ed85a5c97e752a4497e406d582 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Thu, 29 Feb 2024 10:44:16 -0500 Subject: [PATCH] Use separate eventstream per namespace (#135) Use separate eventstream per namespace Signed-off-by: Nicko Guyer --- .env | 2 +- .github/workflows/docker_main.yml | 10 +- .github/workflows/docker_release.yml | 4 +- .github/workflows/test.yml | 8 +- .vscode/launch.json | 33 +++ .vscode/settings.json | 9 + Dockerfile | 16 +- src/app.module.ts | 2 +- src/event-stream/event-stream.interfaces.ts | 2 +- src/event-stream/event-stream.module.ts | 2 +- src/event-stream/event-stream.service.spec.ts | 2 +- src/event-stream/event-stream.service.ts | 78 +++--- .../eventstream-proxy.base.ts | 198 +++++++++------ .../eventstream-proxy.gateway.spec.ts | 2 +- .../eventstream-proxy.gateway.ts | 2 +- .../eventstream-proxy.interfaces.ts | 9 +- .../eventstream-proxy.module.ts | 2 +- src/health/health.controller.spec.ts | 2 +- src/health/health.controller.ts | 11 +- src/health/health.module.ts | 2 +- src/main.ts | 7 +- src/request-context/request-id.middleware.ts | 2 +- src/request-logging.interceptor.spec.ts | 2 +- src/request-logging.interceptor.ts | 2 +- src/tokens/abimapper.service.ts | 4 +- src/tokens/blockchain.service.ts | 2 +- src/tokens/erc1155.ts | 2 +- src/tokens/erc165.ts | 2 +- src/tokens/tokens.controller.spec.ts | 2 +- src/tokens/tokens.controller.ts | 9 +- src/tokens/tokens.interfaces.ts | 25 +- src/tokens/tokens.listener.ts | 12 +- src/tokens/tokens.module.ts | 2 +- src/tokens/tokens.service.spec.ts | 46 +--- src/tokens/tokens.service.ts | 138 ++++------- src/tokens/tokens.util.spec.ts | 2 +- src/tokens/tokens.util.ts | 33 ++- src/utils.ts | 4 + src/websocket-events/websocket-events.base.ts | 22 +- test/app.e2e-context.ts | 13 + test/app.e2e-spec.ts | 2 +- test/suites/api.ts | 6 + test/suites/websocket.ts | 230 +++++++++++++----- tsconfig.json | 4 +- 44 files changed, 601 insertions(+), 368 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json diff --git a/.env b/.env index d1611f7..6b6012f 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 +ETHCONNECT_INSTANCE=/contracts/erc1155 ETHCONNECT_TOPIC=token CONTRACT_ADDRESS= -AUTO_INIT=true diff --git a/.github/workflows/docker_main.yml b/.github/workflows/docker_main.yml index e50be84..d67bd62 100644 --- a/.github/workflows/docker_main.yml +++ b/.github/workflows/docker_main.yml @@ -9,7 +9,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set build tag id: build_tag_generator @@ -25,16 +25,16 @@ jobs: --label build_date=$(date -u +"%Y-%m-%dT%H:%M:%SZ") \ --label tag=${{ steps.build_tag_generator.outputs.BUILD_TAG }} \ --tag ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }} . - + - name: Tag release run: docker tag ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }} ghcr.io/hyperledger/firefly-tokens-erc1155:head - + - name: Push docker image run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin docker push ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }} - + - name: Push head tag run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin - docker push ghcr.io/hyperledger/firefly-tokens-erc1155:head \ No newline at end of file + docker push ghcr.io/hyperledger/firefly-tokens-erc1155:head diff --git a/.github/workflows/docker_release.yml b/.github/workflows/docker_release.yml index 038285e..bf9619e 100644 --- a/.github/workflows/docker_release.yml +++ b/.github/workflows/docker_release.yml @@ -8,7 +8,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 0 @@ -30,7 +30,7 @@ jobs: run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin docker push ghcr.io/hyperledger/firefly-tokens-erc1155:${GITHUB_REF##*/} - + - name: Push head tag run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bbe9bc0..65cb078 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,12 +10,12 @@ jobs: test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Use Node.js uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci - run: npm run test @@ -25,12 +25,12 @@ jobs: run: working-directory: ./samples/solidity steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Use Node.js uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci - run: npm run compile - run: npm run test diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1377495 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,33 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Tests", + "runtimeExecutable": "npm", + "args": ["run", "test"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "name": "Run E2E Tests", + "runtimeExecutable": "npm", + "args": ["run", "test:e2e"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": ["/**"], + "program": "${file}", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": ["${workspaceFolder}/dist/**/*.js"] + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b273c98 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,9 @@ +{ + "solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765", + "editor.codeActionsOnSave": { + "source.fixAll.eslint": "explicit" + }, + "eslint.validate": ["javascript"], + "solidity.defaultCompiler": "remote", + "cSpell.words": ["eventstream", "fftm"] +} diff --git a/Dockerfile b/Dockerfile index a613133..67b1d5c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16-alpine3.15 as solidity-builder +FROM node:20-alpine3.17 as solidity-build RUN apk add python3 alpine-sdk USER node WORKDIR /home/node @@ -7,22 +7,24 @@ RUN npm install ADD --chown=node:node ./samples/solidity . RUN npx hardhat compile -FROM node:16-alpine3.15 as builder +FROM node:20-alpine3.17 as build WORKDIR /root ADD package*.json ./ RUN npm install ADD . . RUN npm run build -FROM node:16-alpine3.15 +FROM node:20-alpine3.17 RUN apk add curl +# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI +COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/ERC1155MixedFungible.sol/ERC1155MixedFungible.json /root/contracts/ WORKDIR /app ADD package*.json ./ RUN npm install --production -COPY --from=solidity-builder /home/node/contracts contracts/source -COPY --from=solidity-builder /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts -COPY --from=builder /root/dist dist -COPY --from=builder /root/.env /app/.env +COPY --from=solidity-build /home/node/contracts contracts/source +COPY --from=solidity-build /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts +COPY --from=build /root/dist dist +COPY --from=build /root/.env /app/.env RUN chgrp -R 0 /app/ \ && chmod -R g+rwX /app/ USER 1001 diff --git a/src/app.module.ts b/src/app.module.ts index de35ace..a0fe6ec 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.interfaces.ts b/src/event-stream/event-stream.interfaces.ts index 9fb2cb6..34a40e1 100644 --- a/src/event-stream/event-stream.interfaces.ts +++ b/src/event-stream/event-stream.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.module.ts b/src/event-stream/event-stream.module.ts index 9087296..e677c8d 100644 --- a/src/event-stream/event-stream.module.ts +++ b/src/event-stream/event-stream.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.spec.ts b/src/event-stream/event-stream.service.spec.ts index 5266f16..ffe98dc 100644 --- a/src/event-stream/event-stream.service.spec.ts +++ b/src/event-stream/event-stream.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 42ec157..5ad0196 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,11 +18,11 @@ import { HttpService } from '@nestjs/axios'; import { Injectable, Logger } from '@nestjs/common'; import { AxiosRequestConfig } from 'axios'; import { lastValueFrom } from 'rxjs'; -import * as WebSocket from 'ws'; -import { IAbiMethod } from '../tokens/tokens.interfaces'; -import { getHttpRequestOptions, getWebsocketOptions } from '../utils'; -import { Context } from '../request-context/request-context.decorator'; +import WebSocket from 'ws'; import { FFRequestIDHeader } from '../request-context/constants'; +import { Context, newContext } from '../request-context/request-context.decorator'; +import { IAbiMethod } from '../tokens/tokens.interfaces'; +import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils'; import { Event, EventBatch, @@ -46,6 +46,7 @@ export class EventStreamSocket { constructor( private url: string, private topic: string, + private namespace: string, private username: string, private password: string, private handleEvents: (events: EventBatch) => void, @@ -67,7 +68,7 @@ export class EventStreamSocket { } else { this.logger.log('Event stream websocket connected'); } - this.produce({ type: 'listen', topic: this.topic }); + this.produce({ type: 'listen', topic: `${eventStreamName(this.topic, this.namespace)}` }); this.produce({ type: 'listenreplies' }); this.ping(); }) @@ -83,6 +84,7 @@ export class EventStreamSocket { } }) .on('message', (message: string) => { + this.logger.verbose(`WS => ${message}`); this.handleMessage(JSON.parse(message)); }) .on('pong', () => { @@ -109,7 +111,19 @@ export class EventStreamSocket { } ack(batchNumber: number | undefined) { - this.produce({ type: 'ack', topic: this.topic, batchNumber }); + this.produce({ + type: 'ack', + topic: `${eventStreamName(this.topic, this.namespace)}`, + batchNumber, + }); + } + + nack(batchNumber: number | undefined) { + this.produce({ + type: 'nack', + topic: `${eventStreamName(this.topic, this.namespace)}`, + batchNumber, + }); } close() { @@ -176,11 +190,12 @@ export class EventStreamService { return config; } - async getStreams(): Promise { + async getStreams(ctx: Context): Promise { const response = await lastValueFrom( - this.http.get(new URL('/eventstreams', this.baseUrl).href, { - ...getHttpRequestOptions(this.username, this.password), - }), + this.http.get( + new URL('/eventstreams', this.baseUrl).href, + this.requestOptions(ctx), + ), ); return response.data; } @@ -192,13 +207,14 @@ export class EventStreamService { batchSize: 50, batchTimeoutMS: 500, type: 'websocket', - websocket: { topic }, + websocket: { topic: name }, blockedReryDelaySec: 30, // intentional due to spelling error in ethconnect inputs: true, timestamps: true, }; - const existingStreams = await this.getStreams(); + const existingStreams = await this.getStreams(ctx); + const stream = existingStreams.find(s => s.name === streamDetails.name); if (stream) { const patchedStreamRes = await lastValueFrom( @@ -207,12 +223,10 @@ export class EventStreamService { { ...streamDetails, }, - { - ...this.requestOptions(ctx), - }, + this.requestOptions(ctx), ), ); - this.logger.log(`Event stream for ${topic}: ${stream.id}`); + this.logger.log(`Event stream for ${name}: ${stream.id}`); return patchedStreamRes.data; } const newStreamRes = await lastValueFrom( @@ -221,28 +235,25 @@ export class EventStreamService { { ...streamDetails, }, - { - ...this.requestOptions(ctx), - }, + this.requestOptions(ctx), ), ); - this.logger.log(`Event stream for ${topic}: ${newStreamRes.data.id}`); + this.logger.log(`Event stream for ${name}: ${newStreamRes.data.id}`); return newStreamRes.data; } async deleteStream(ctx: Context, id: string) { await lastValueFrom( - this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, { - ...this.requestOptions(ctx), - }), + this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, this.requestOptions(ctx)), ); } async getSubscriptions(ctx: Context): Promise { const response = await lastValueFrom( - this.http.get(new URL('/subscriptions', this.baseUrl).href, { - ...this.requestOptions(ctx), - }), + this.http.get( + new URL('/subscriptions', this.baseUrl).href, + this.requestOptions(ctx), + ), ); return response.data; } @@ -275,7 +286,7 @@ export class EventStreamService { ): Promise { const response = await lastValueFrom( this.http.post( - new URL(`/subscriptions`, instancePath).href, + `${instancePath}/subscriptions`, { name, stream: streamId, @@ -284,9 +295,7 @@ export class EventStreamService { address, methods, }, - { - ...this.requestOptions(ctx), - }, + this.requestOptions(ctx), ), ); this.logger.log(`Created subscription ${name}: ${response.data.id}`); @@ -337,15 +346,20 @@ export class EventStreamService { return true; } - connect( + async connect( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) { + const name = eventStreamName(topic, namespace); + await this.createOrUpdateStream(newContext(), name, topic); + return new EventStreamSocket( url, topic, + namespace, this.username, this.password, handleEvents, diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index c021ef4..45a400c 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -15,15 +15,17 @@ // limitations under the License. import { Logger } from '@nestjs/common'; -import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; +import { Context, newContext } from '../request-context/request-context.decorator'; import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; -import { Context, newContext } from '../request-context/request-context.decorator'; import { + WebSocketAck, + WebSocketActionBase, WebSocketEventsBase, WebSocketEx, WebSocketMessage, + WebSocketStart, } from '../websocket-events/websocket-events.base'; import { AckMessageData, @@ -40,20 +42,21 @@ import { * @WebSocketGateway({ path: '/api/stream' }) */ export abstract class EventStreamProxyBase extends WebSocketEventsBase { - socket?: EventStreamSocket; + namespaceClients: Map> = new Map(); + namespaceEventStreamSocket: Map = new Map(); url?: string; topic?: string; private connectListeners: ConnectionListener[] = []; private eventListeners: EventListener[] = []; - private awaitingAck: WebSocketMessageWithId[] = []; - private currentClient: WebSocketEx | undefined; + // Map of client IDs to all the messages for which we are awaiting an ack + private awaitingAck: Map = new Map(); private subscriptionNames = new Map(); private queue = Promise.resolve(); constructor( protected readonly logger: Logger, - protected eventstream: EventStreamService, + protected eventStreamService: EventStreamService, requireAuth = false, ) { super(logger, requireAuth); @@ -66,55 +69,97 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); - if (this.server.clients.size === 1) { - this.logger.log(`Initializing event stream proxy`); - Promise.all(this.connectListeners.map(l => l.onConnect())) - .then(() => { - this.setCurrentClient(client); - this.startListening(); - }) - .catch(err => { - this.logger.error(`Error initializing event stream proxy: ${err}`); - }); + + if (!this.awaitingAck.get(client.id)) { + this.awaitingAck.set(client.id, []); } + + client.on('message', async (message: string) => { + const action = JSON.parse(message) as WebSocketActionBase; + switch (action.type) { + case 'start': { + const startAction = action as WebSocketStart; + this.startListening(client, startAction.namespace); + break; + } + case 'ack': { + const ackAction = action as WebSocketAck; + this.handleAck(client, ackAction); + } + } + }); } private queueTask(task: () => void) { this.queue = this.queue.finally(task); } - private startListening() { + private async startListening(client: WebSocketEx, namespace: string) { if (this.url === undefined || this.topic === undefined) { return; } - this.socket = this.eventstream.connect( - this.url, - this.topic, - events => { - this.queueTask(() => this.processEvents(events)); - }, - receipt => { - this.broadcast('receipt', receipt); - }, - ); + try { + if (!this.namespaceEventStreamSocket.has(namespace)) { + const eventStreamSocket = await this.eventStreamService.connect( + this.url, + this.topic, + namespace, + events => { + this.queueTask(() => this.processEvents(events, namespace)); + }, + receipt => { + this.broadcast('receipt', receipt); + }, + ); + this.namespaceEventStreamSocket.set(namespace, eventStreamSocket); + } + let clientSet = this.namespaceClients.get(namespace); + if (!clientSet) { + clientSet = new Set(); + } + clientSet.add(client); + this.namespaceClients.set(namespace, clientSet); + + // ack the start command + client.send( + JSON.stringify({ + event: 'started', + data: { + namespace: namespace, + }, + }), + ); + this.logger.debug(`Started namespace '${namespace}'`); + } catch (e) { + this.logger.error(`Error connecting to event stream websocket: ${e.message}`); + } } handleDisconnect(client: WebSocketEx) { super.handleDisconnect(client); - if (this.server.clients.size === 0) { - this.stopListening(); - } else if (client.id === this.currentClient?.id) { - for (const newClient of this.server.clients) { - this.setCurrentClient(newClient as WebSocketEx); - break; - } - } - } - private stopListening() { - this.socket?.close(); - this.socket = undefined; - this.currentClient = undefined; + // Iterate over all the namespaces this client was subscribed to + this.namespaceClients.forEach((clientSet, namespace) => { + clientSet.delete(client); + + // Nack any messages that are inflight for that namespace + const nackedMessageIds: Set = new Set(); + this.awaitingAck + ?.get(client.id) + ?.filter(msg => msg.namespace === namespace) + .map(msg => { + this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); + nackedMessageIds.add(msg.id); + }); + this.awaitingAck.delete(client.id); + + // If all clients for this namespace have disconnected, also close the connection to EVMConnect + if (clientSet.size == 0) { + this.namespaceEventStreamSocket.get(namespace)?.close(); + this.namespaceEventStreamSocket.delete(namespace); + this.namespaceClients.delete(namespace); + } + }); } addConnectionListener(listener: ConnectionListener) { @@ -125,7 +170,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { this.eventListeners.push(listener); } - private async processEvents(batch: EventBatch) { + private async processEvents(batch: EventBatch, namespace: string) { const messages: WebSocketMessage[] = []; for (const event of batch.events) { this.logger.log(`Proxying event: ${JSON.stringify(event)}`); @@ -148,6 +193,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } } const message: WebSocketMessageWithId = { + namespace: namespace, id: uuidv4(), event: 'batch', data: { @@ -155,8 +201,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { }, batchNumber: batch.batchNumber, }; - this.awaitingAck.push(message); - this.currentClient?.send(JSON.stringify(message)); + this.send(namespace, message); } private async getSubscriptionName(ctx: Context, subId: string) { @@ -166,7 +211,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } try { - const sub = await this.eventstream.getSubscription(ctx, subId); + const sub = await this.eventStreamService.getSubscription(ctx, subId); if (sub !== undefined) { this.subscriptionNames.set(subId, sub.name); return sub.name; @@ -177,34 +222,51 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - private setCurrentClient(client: WebSocketEx) { - this.currentClient = client; - for (const message of this.awaitingAck) { - this.currentClient.send(JSON.stringify(message)); - } - } - - @SubscribeMessage('ack') - handleAck(@MessageBody() data: AckMessageData) { + handleAck(client: WebSocketEx, data: AckMessageData) { if (data.id === undefined) { this.logger.error('Received malformed ack'); return; } - const inflight = this.awaitingAck.find(msg => msg.id === data.id); - this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); - if (this.socket !== undefined && inflight !== undefined) { - this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); - if ( - // If nothing is left awaiting an ack - then we clearly need to ack - this.awaitingAck.length === 0 || - // Or if we have a batch number associated with this ID, then we can only ack if there - // are no other messages in-flight with the same batch number. - (inflight.batchNumber !== undefined && - !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) - ) { - this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); - this.socket.ack(inflight.batchNumber); + let awaitingAck = this.awaitingAck.get(client.id); + + if (awaitingAck) { + const inflight = awaitingAck.find(msg => msg.id === data.id); + this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); + if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { + // Remove the acked message id from the queue + awaitingAck = awaitingAck.filter(msg => msg.id !== data.id); + this.awaitingAck.set(client.id, awaitingAck); + if ( + // If nothing is left awaiting an ack - then we clearly need to ack + awaitingAck.length === 0 || + // Or if we have a batch number associated with this ID, then we can only ack if there + // are no other messages in-flight with the same batch number. + (inflight.batchNumber !== undefined && + !awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber)) + ) { + this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); + this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + } + } + } else { + this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`); + } + } + + send(namespace, payload: WebSocketMessageWithId) { + const clients = this.namespaceClients.get(namespace); + if (clients) { + // Randomly select a connected client for this namespace to distribute load + const selected = Math.floor(Math.random() * clients.size); + let i = 0; + for (const client of clients.keys()) { + if (i++ == selected) { + this.awaitingAck.get(client.id)?.push(payload); + this.logger.verbose(`WS <= ${payload}`); + client.send(JSON.stringify(payload)); + return; + } } } } diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts index becfe11..a650b4a 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.ts b/src/eventstream-proxy/eventstream-proxy.gateway.ts index 90e1a2b..c4e302d 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.interfaces.ts b/src/eventstream-proxy/eventstream-proxy.interfaces.ts index b33ca69..dbe6e2b 100644 --- a/src/eventstream-proxy/eventstream-proxy.interfaces.ts +++ b/src/eventstream-proxy/eventstream-proxy.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -26,10 +26,15 @@ export interface ConnectionListener { } export interface EventListener { - onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise; + onEvent: ( + subName: string, + event: Event, + process: EventProcessor, + ) => undefined | Promise; } export interface WebSocketMessageWithId extends WebSocketMessage { + namespace: string; id: string; batchNumber: number | undefined; } diff --git a/src/eventstream-proxy/eventstream-proxy.module.ts b/src/eventstream-proxy/eventstream-proxy.module.ts index 7a96d27..158a5e5 100644 --- a/src/eventstream-proxy/eventstream-proxy.module.ts +++ b/src/eventstream-proxy/eventstream-proxy.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/health/health.controller.spec.ts b/src/health/health.controller.spec.ts index 0277994..e732d2e 100644 --- a/src/health/health.controller.spec.ts +++ b/src/health/health.controller.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/health/health.controller.ts b/src/health/health.controller.ts index 16aa8cd..a44523c 100644 --- a/src/health/health.controller.ts +++ b/src/health/health.controller.ts @@ -38,11 +38,12 @@ export class HealthController { readiness() { return this.health.check([ () => - this.http.pingCheck( - 'ethconnect', - `${this.blockchain.baseUrl}/status`, - getHttpRequestOptions(this.blockchain.username, this.blockchain.password), - ), + this.http.pingCheck('ethconnect', `${this.blockchain.baseUrl}/status`, { + auth: { + username: this.blockchain.username, + password: this.blockchain.password, + }, + }), ]); } } diff --git a/src/health/health.module.ts b/src/health/health.module.ts index 299008e..ba2d9e5 100644 --- a/src/health/health.module.ts +++ b/src/health/health.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/main.ts b/src/main.ts index 95ff1c3..9a1670f 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -77,7 +77,6 @@ async function bootstrap() { const ethConnectUrl = config.get('ETHCONNECT_URL', ''); const instancePath = config.get('ETHCONNECT_INSTANCE', ''); const topic = config.get('ETHCONNECT_TOPIC', 'token'); - const autoInit = config.get('AUTO_INIT', 'true'); const username = config.get('ETHCONNECT_USERNAME', ''); const password = config.get('ETHCONNECT_PASSWORD', ''); const contractAddress = config.get('CONTRACT_ADDRESS', ''); @@ -103,10 +102,6 @@ async function bootstrap() { .get(BlockchainConnectorService) .configure(ethConnectUrl, username, password, passthroughHeaders, blockchainRetryCfg); - if (autoInit.toLowerCase() !== 'false') { - await app.get(TokensService).init(newContext()); - } - const port = config.get('PORT', 3000); console.log(`Listening on port ${port}`); await app.listen(port); diff --git a/src/request-context/request-id.middleware.ts b/src/request-context/request-id.middleware.ts index 7ad509b..a614f68 100644 --- a/src/request-context/request-id.middleware.ts +++ b/src/request-context/request-id.middleware.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.spec.ts b/src/request-logging.interceptor.spec.ts index 347ce32..31a8c6a 100644 --- a/src/request-logging.interceptor.spec.ts +++ b/src/request-logging.interceptor.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.ts b/src/request-logging.interceptor.ts index d9c18e0..e790d5e 100644 --- a/src/request-logging.interceptor.ts +++ b/src/request-logging.interceptor.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/abimapper.service.ts b/src/tokens/abimapper.service.ts index 38afd56..aa2013f 100644 --- a/src/tokens/abimapper.service.ts +++ b/src/tokens/abimapper.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -15,7 +15,7 @@ // limitations under the License. import { BadRequestException, Injectable, Logger } from '@nestjs/common'; -import * as LRUCache from 'lru-cache'; +import LRUCache from 'lru-cache'; import { abi as ERC1155MixedFungibleAbiV2 } from '../abi/ERC1155MixedFungible.json'; import { abi as ERC1155MixedFungibleAbiV1 } from '../abi/ERC1155MixedFungibleV1.json'; import { abi as ERC1155MixedFungibleAbiOld } from '../abi/ERC1155MixedFungibleOld.json'; diff --git a/src/tokens/blockchain.service.ts b/src/tokens/blockchain.service.ts index 8b36aeb..61b480e 100644 --- a/src/tokens/blockchain.service.ts +++ b/src/tokens/blockchain.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc1155.ts b/src/tokens/erc1155.ts index 0efd52c..ed401ae 100644 --- a/src/tokens/erc1155.ts +++ b/src/tokens/erc1155.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc165.ts b/src/tokens/erc165.ts index 62f4915..086d056 100644 --- a/src/tokens/erc165.ts +++ b/src/tokens/erc165.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.spec.ts b/src/tokens/tokens.controller.spec.ts index 0874654..df698c7 100644 --- a/src/tokens/tokens.controller.spec.ts +++ b/src/tokens/tokens.controller.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 404b850..af80fe4 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -48,13 +48,16 @@ import { TokensService } from './tokens.service'; @Controller() export class TokensController { - constructor(private service: TokensService, private blockchain: BlockchainConnectorService) {} + constructor( + private service: TokensService, + private blockchain: BlockchainConnectorService, + ) {} @Post('init') @HttpCode(204) @ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' }) async init(@RequestContext() ctx: Context) { - await this.service.init(ctx); + // Do nothing. Endpoint retained for backwards compatibility with older tooling. } @Post('createpool') diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index 1671c86..749dc55 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -139,6 +139,10 @@ export class TokenPoolConfig { } export class TokenPool { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty({ enum: TokenType }) @IsDefined() type: TokenType; @@ -205,6 +209,10 @@ export class BlockchainEvent { } export class TokenPoolActivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -219,6 +227,10 @@ export class TokenPoolActivate { } export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -229,6 +241,10 @@ export class TokenPoolDeactivate { } export class TokenBalanceQuery { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -367,6 +383,10 @@ export class TokenApproval { // Websocket notifications class tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() poolLocator: string; @@ -393,6 +413,9 @@ export class TokenPoolEventInfo { } export class TokenPoolEvent extends tokenEventBase { + @ApiProperty() + alternateLocators: string[]; + @ApiProperty() type: TokenType; diff --git a/src/tokens/tokens.listener.ts b/src/tokens/tokens.listener.ts index 4d83815..af18610 100644 --- a/src/tokens/tokens.listener.ts +++ b/src/tokens/tokens.listener.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -38,10 +38,12 @@ import { decodeHex, encodeHexIDForURI, packPoolLocator, + packOldPoolLocator, poolContainsId, unpackPoolLocator, unpackSubscriptionName, unpackTypeId, + unpackOldTypeId, } from './tokens.util'; import { BASE_SUBSCRIPTION_NAME } from './tokens.service'; import { BlockchainConnectorService } from './blockchain.service'; @@ -118,6 +120,7 @@ export class TokenListener implements EventListener { subName: string, event: TokenPoolCreationEvent, ): WebSocketMessage | undefined { + let oldLocator: string | undefined = undefined; const { data: output } = event; const unpackedSub = unpackSubscriptionName(subName); const decodedData = decodeHex(output.data ?? ''); @@ -133,6 +136,7 @@ export class TokenListener implements EventListener { if ('type_id' in output) { // Older creation event - must interpret the "type_id" parameter const unpackedId = unpackTypeId(output.type_id); + const oldUnpackedId = unpackOldTypeId(output.type_id); poolLocator = { address: event.address.toLowerCase(), isFungible: unpackedId.isFungible, @@ -147,6 +151,11 @@ export class TokenListener implements EventListener { unpackedId.endId, event.blockNumber, ); + oldLocator = packOldPoolLocator( + event.address.toLowerCase(), + oldUnpackedId.poolId, + event.blockNumber, + ); } else { // Newer creation event - all needed params are in the event poolLocator = { @@ -186,6 +195,7 @@ export class TokenListener implements EventListener { interfaceFormat: InterfaceFormat.ABI, poolData: unpackedSub.poolData, poolLocator: packedPoolLocator, + alternateLocators: oldLocator ? [oldLocator] : [], type: poolLocator.isFungible ? TokenType.FUNGIBLE : TokenType.NONFUNGIBLE, signer: output.operator, data: decodedData, diff --git a/src/tokens/tokens.module.ts b/src/tokens/tokens.module.ts index a28a406..aa2d616 100644 --- a/src/tokens/tokens.module.ts +++ b/src/tokens/tokens.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index 2ed8739..7db6ec2 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -113,50 +113,6 @@ describe('TokensService', () => { expect(service).toBeDefined(); }); - describe('Subscription migration', () => { - it('should not migrate if no subscriptions exists', async () => { - service.topic = 'tokens'; - service.instancePath = '0x123'; - eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]); - eventStream.getSubscriptions.mockReturnValueOnce([]); - expect(await service.migrationCheck(newContext())).toBe(false); - }); - - it('should not migrate if correct base subscription exists', async () => { - service.topic = 'tokens'; - service.instancePath = '0x123'; - eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]); - eventStream.getSubscriptions.mockReturnValueOnce([ - { name: 'fft:0x123:base:TokenPoolCreation' }, - ]); - expect(await service.migrationCheck(newContext())).toBe(false); - }); - - it('should migrate if any event subscriptions are missing', async () => { - service.topic = 'tokens'; - service.instancePath = '0x123'; - eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]); - eventStream.getSubscriptions.mockReturnValueOnce([ - { name: 'fft:0x123:p1:TokenPoolCreation' }, - ]); - expect(await service.migrationCheck(newContext())).toBe(true); - }); - - it('should not migrate if all event subscriptions exist', async () => { - service.topic = 'tokens'; - service.instancePath = '0x123'; - eventStream.getStreams.mockReturnValueOnce([{ name: 'tokens:0x123' }]); - eventStream.getSubscriptions.mockReturnValueOnce([ - { name: 'fft:0x123:p1:TokenPoolCreation:ns1' }, - { name: 'fft:0x123:p1:TokenPoolCreationV2:ns1' }, - { name: 'fft:0x123:p1:TransferSingle:ns1' }, - { name: 'fft:0x123:p1:TransferBatch:ns1' }, - { name: 'fft:0x123:p1:ApprovalForAll:ns1' }, - ]); - expect(await service.migrationCheck(newContext())).toBe(false); - }); - }); - describe('Query token URI', () => { it('should get the token URI', async () => { const ctx = newContext(); diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 69343fd..d558315 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,7 +24,7 @@ import { import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStream, EventStreamSubscription } from '../event-stream/event-stream.interfaces'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; -import { Context, newContext } from '../request-context/request-context.decorator'; +import { Context } from '../request-context/request-context.decorator'; import { AsyncResponse, CheckInterfaceRequest, @@ -49,7 +49,6 @@ import { packSubscriptionName, computeTokenId, unpackPoolLocator, - unpackSubscriptionName, packPoolLocator, } from './tokens.util'; import { TOKEN_STANDARD, TokenListener } from './tokens.listener'; @@ -63,6 +62,7 @@ import { TransferBatch, TransferSingle, } from './erc1155'; +import { eventStreamName } from '../utils'; export const BASE_SUBSCRIPTION_NAME = 'base'; @@ -80,7 +80,7 @@ export class TokensService { baseUrl: string; instancePath: string; topic: string; - stream: EventStream | undefined; + streamCache: Map = new Map(); constructor( private eventstream: EventStreamService, @@ -94,34 +94,20 @@ export class TokensService { this.instancePath = instancePath; this.topic = topic; this.contractAddress = contractAddress.toLowerCase(); - this.proxy.addConnectionListener(this); this.proxy.addEventListener(new TokenListener(this.blockchain)); - } - - async onConnect() { const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; - const stream = await this.getStream(newContext()); - this.proxy.configure(wsUrl, stream.name); - } - - /** - * One-time initialization of event stream and base subscription. - */ - async init(ctx: Context) { - const defaultContract = await this.getContractAddress(ctx); - if (defaultContract) { - await this.createPoolSubscription(ctx, defaultContract, BASE_SUBSCRIPTION_NAME); - } + this.proxy.configure(wsUrl, this.topic); } private async createPoolSubscription( ctx: Context, + namespace: string, address: string, poolLocator: string, blockNumber?: string, poolData?: string, ) { - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, namespace); const eventABIV1 = this.mapper.getCreateEventV1(); const eventABIV2 = this.mapper.getCreateEventV2(); const methodABI = this.mapper.getCreateMethod(); @@ -173,16 +159,20 @@ export class TokensService { return this.contractAddress; } - private async getStream(ctx: Context) { - const stream = this.stream; + private async getStream(ctx: Context, namespace: string) { + let stream = this.streamCache.get(namespace); if (stream !== undefined) { return stream; } - await this.migrationCheck(ctx); // note: may update this.stream - const name = this.stream?.name ?? packStreamName(this.topic, this.contractAddress); - this.logger.log('Using event stream with name ' + name); - this.stream = await this.eventstream.createOrUpdateStream(ctx, name, name); - return this.stream; + await this.migrationCheck(ctx); + this.logger.log('Creating stream with name ' + eventStreamName(this.topic, namespace)); + stream = await this.eventstream.createOrUpdateStream( + ctx, + `${eventStreamName(this.topic, namespace)}`, + this.topic, + ); + this.streamCache.set(namespace, stream); + return stream; } /** @@ -195,72 +185,22 @@ export class TokensService { async migrationCheck(ctx: Context) { const currentName = packStreamName(this.topic, this.contractAddress); const oldName1 = packStreamName(this.topic, this.instancePath); - const oldName2 = this.topic; - - const streams = await this.eventstream.getStreams(); - let existingStream = streams.find(s => s.name === currentName); - if (existingStream === undefined) { - // Look for the old stream names - existingStream = streams.find(s => s.name === oldName1); - if (existingStream === undefined) { - existingStream = streams.find(s => s.name === oldName2); - if (existingStream === undefined) { - return false; - } - } - this.logger.warn( - `Old event stream found with name ${existingStream.name}. ` + - `The connector will continue to use this stream, but it is recommended ` + - `to create a new stream with the name ${currentName}.`, - ); - } - this.stream = existingStream; - const streamId = existingStream.id; - - const allSubscriptions = await this.eventstream.getSubscriptions(ctx); - const subscriptions = allSubscriptions.filter(s => s.stream === streamId); - if (subscriptions.length === 0) { - return false; - } + const oldName2 = packStreamName(this.topic, ''); + const oldName3 = this.topic; - const foundEvents = new Map(); - for (const sub of subscriptions) { - const parts = unpackSubscriptionName(sub.name); - if (parts.poolLocator === BASE_SUBSCRIPTION_NAME) { - continue; - } - if (parts.poolLocator === undefined || parts.event === undefined) { - this.logger.warn( - `Non-parseable subscription name '${sub.name}' found in event stream '${existingStream.name}'.` + - `It is recommended to delete all subscriptions and activate all pools again.`, - ); - return true; - } - const key = packSubscriptionName(parts.address, parts.poolLocator, '', parts.poolData); - const existing = foundEvents.get(key); - if (existing !== undefined) { - existing.push(parts.event); - } else { - foundEvents.set(key, [parts.event]); - } - } - - // Expect to have found subscriptions for each of the events. - for (const [key, events] of foundEvents) { - const parts = unpackSubscriptionName(key); - if ( - ALL_SUBSCRIBED_EVENTS.length !== events.length || - !ALL_SUBSCRIBED_EVENTS.every(event => events.includes(event)) - ) { - this.logger.warn( - `Event stream subscriptions for pool ${parts.poolLocator} do not include all expected events ` + - `(${ALL_SUBSCRIBED_EVENTS}). Events may not be properly delivered to this pool. ` + - `It is recommended to delete its subscriptions and activate the pool again.`, - ); - return true; - } + const existingStreams = await this.eventstream.getStreams(ctx); + // Check to see if there is a deprecated stream that we should remove + this.logger.debug( + `Checking for deprecated event steams named '${currentName}' or '${oldName1}' or '${oldName2}' or '${oldName3}'`, + ); + const deprecatedStreams = existingStreams.filter( + s => + s.name === currentName || s.name === oldName1 || s.name === oldName2 || s.name === oldName3, + ); + for (const deprecatedStream of deprecatedStreams) { + this.logger.log(`Purging deprecated eventstream '${deprecatedStream.id}'`); + await this.eventstream.deleteStream(ctx, deprecatedStream.id); } - return false; } async createPool(ctx: Context, dto: TokenPool): Promise { @@ -275,6 +215,7 @@ export class TokensService { } await this.createPoolSubscription( ctx, + dto.namespace, dto.config.address, BASE_SUBSCRIPTION_NAME, dto.config.blockNumber, @@ -284,6 +225,14 @@ export class TokensService { const defaultContract = await this.getContractAddress(ctx); if (defaultContract !== undefined) { + await this.createPoolSubscription( + ctx, + dto.namespace, + defaultContract, + BASE_SUBSCRIPTION_NAME, + dto.config?.blockNumber, + ); + return this.createWithAddress(ctx, defaultContract, dto); } @@ -319,7 +268,7 @@ export class TokensService { } async activatePool(ctx: Context, dto: TokenPoolActivate) { - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const poolLocator = unpackPoolLocator(dto.poolLocator); const address = poolLocator.address ?? (await this.getContractAddress(ctx)); if (!address) { @@ -332,6 +281,7 @@ export class TokensService { const promises: Promise[] = [ this.createPoolSubscription( ctx, + dto.namespace, address, dto.poolLocator, poolLocator.blockNumber, @@ -407,7 +357,7 @@ export class TokensService { packSubscriptionName(this.instancePath, dto.poolLocator, ApprovalForAll.name, dto.poolData), ]; - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const results = await Promise.all( subscriptionNames.map(name => this.eventstream.deleteSubscriptionByName(ctx, stream.id, name), diff --git a/src/tokens/tokens.util.spec.ts b/src/tokens/tokens.util.spec.ts index a6e5013..7f2587e 100644 --- a/src/tokens/tokens.util.spec.ts +++ b/src/tokens/tokens.util.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.util.ts b/src/tokens/tokens.util.ts index 24fddfe..1a84337 100644 --- a/src/tokens/tokens.util.ts +++ b/src/tokens/tokens.util.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -78,6 +78,19 @@ export function unpackTypeId(id: string): TokenLocator { }; } +/** + * Given a token ID from the underlying contract, split it into its meaningful parts. + */ +export function unpackOldTypeId(id: string) { + const val = BigInt(id); + const isFungible = val >> BigInt(255) === BigInt(0); + return { + isFungible: isFungible, + poolId: (isFungible ? 'F' : 'N') + (BigInt.asUintN(255, val) >> BigInt(128)), + tokenIndex: isFungible ? undefined : BigInt.asUintN(128, val).toString(), + }; +} + /** * Given individual pool parameters, create a packed string to be used as a pool locator. * @@ -103,6 +116,24 @@ export function packPoolLocator( return encoded.toString(); } +/** + * Given a pool ID (in format 'F1') and optional block number, create a packed + * string to be used as a pool locator. + * + * This should only be called once when the pool is first created! You should + * never re-pack a locator during event or request processing (always send + * back the one provided as input or unpacked from the subscription). + */ +export function packOldPoolLocator(address: string, poolId: string, blockNumber?: string) { + const encoded = new URLSearchParams(); + encoded.set('address', address); + encoded.set('id', poolId); + if (blockNumber !== undefined) { + encoded.set('block', blockNumber); + } + return encoded.toString(); +} + /** * Unpack a pool locator string into its meaningful parts. * Fall back to various ways that pool locators have been encoded historically. diff --git a/src/utils.ts b/src/utils.ts index 2aa5db6..fbcadbc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -69,3 +69,7 @@ export const getNestOptions = (): NestApplicationOptions => { } return options; }; + +export const eventStreamName = (topic: string, namespace: string) => { + return `${topic}/${namespace}`; +}; diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index 9c88f1b..ebc5bfe 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -50,6 +50,18 @@ export interface WebSocketMessage { data: any; } +export interface WebSocketActionBase { + type: 'start' | 'ack' | 'nack' | 'protocol_error'; +} + +export interface WebSocketStart extends WebSocketActionBase { + namespace: string; +} + +export interface WebSocketAck extends WebSocketActionBase { + id: string; +} + /** * Base class for websocket gateways. * @@ -61,7 +73,10 @@ export abstract class WebSocketEventsBase { @WebSocketServer() server: Server; - constructor(protected readonly logger: Logger, private requireAuth = false) {} + constructor( + protected readonly logger: Logger, + private requireAuth = false, + ) {} afterInit(server: Server) { const interval = setInterval(() => this.ping(), PING_INTERVAL); @@ -86,6 +101,9 @@ export abstract class WebSocketEventsBase client.on('error', err => { this.logger.log(`WebSocket ${client.id}: error: ${err}`); }); + client.on('message', msg => { + this.logger.verbose(`WS => ${msg}`); + }); } handleDisconnect(client: WebSocketEx) { diff --git a/test/app.e2e-context.ts b/test/app.e2e-context.ts index 383d93f..0fd6c03 100644 --- a/test/app.e2e-context.ts +++ b/test/app.e2e-context.ts @@ -28,16 +28,28 @@ export class TestContext { }; eventHandler: (events: EventBatch) => void; receiptHandler: (receipt: EventStreamReply) => void; + connected: Promise; + private resolveConnected: () => void; + private rejectConnected: () => void; + + resetConnectedPromise() { + this.connected = new Promise((resolve, reject) => { + this.resolveConnected = resolve; + this.rejectConnected = reject; + }); + } eventstream = { connect: ( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) => { this.eventHandler = handleEvents; this.receiptHandler = handleReceipt; + this.resolveConnected(); }, getStreams: jest.fn(), @@ -86,6 +98,7 @@ export class TestContext { (this.app.getHttpServer() as Server).listen(); this.server = request(this.app.getHttpServer()); + this.resetConnectedPromise(); } async end() { diff --git a/test/app.e2e-spec.ts b/test/app.e2e-spec.ts index 5d6f0d6..7983201 100644 --- a/test/app.e2e-spec.ts +++ b/test/app.e2e-spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/test/suites/api.ts b/test/suites/api.ts index 6e6d770..3e8ae75 100644 --- a/test/suites/api.ts +++ b/test/suites/api.ts @@ -35,6 +35,7 @@ const CTX = { export default (context: TestContext) => { it('Create fungible pool', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId, data: 'tx1', @@ -68,6 +69,7 @@ export default (context: TestContext) => { it('Create non-fungible pool', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, signer: IDENTITY, requestId, @@ -100,6 +102,7 @@ export default (context: TestContext) => { it('Create non-fungible pool - non-default address', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, signer: IDENTITY, requestId, @@ -147,6 +150,7 @@ export default (context: TestContext) => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, signer: IDENTITY, isBestPool: true, // will be stripped but will not cause an error @@ -164,6 +168,7 @@ export default (context: TestContext) => { it('Create pool - existing contract', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId, data: 'tx1', @@ -367,6 +372,7 @@ export default (context: TestContext) => { it('Query balance', async () => { const request: TokenBalanceQuery = { + namespace: 'ns1', account: '1', poolLocator: 'F1', tokenIndex: '0', diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 26e581f..c093135 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -57,7 +57,12 @@ export default (context: TestContext) => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -78,6 +83,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -89,6 +98,7 @@ export default (context: TestContext) => { interfaceFormat: 'abi', poolData: 'default', poolLocator: 'F1', + alternateLocators: [], type: 'fungible', signer: 'bob', data: '', @@ -119,7 +129,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Token pool event from base subscription', () => { @@ -129,7 +140,12 @@ export default (context: TestContext) => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -150,6 +166,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -161,6 +181,7 @@ export default (context: TestContext) => { interfaceFormat: 'abi', poolLocator: 'address=0x00001&type=fungible&startId=0x100000000000000000000000000000000&endId=0x100000000000000000000000000000000&block=1', + alternateLocators: ['address=0x00001&id=F1&block=1'], type: 'fungible', signer: 'bob', data: '', @@ -191,7 +212,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Token pool event with old signature', () => { @@ -201,7 +223,12 @@ export default (context: TestContext) => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -222,6 +249,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -234,6 +265,7 @@ export default (context: TestContext) => { poolData: 'default', poolLocator: 'address=0x00001&type=fungible&startId=0x100000000000000000000000000000000&endId=0x100000000000000000000000000000000&block=1', + alternateLocators: ['address=0x00001&id=F1&block=1'], type: 'fungible', signer: 'bob', data: '', @@ -264,7 +296,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Token mint event', async () => { @@ -274,7 +307,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -308,6 +346,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -353,7 +395,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Token mint event with old pool ID', async () => { @@ -370,7 +413,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -404,6 +452,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -451,7 +503,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith( @@ -482,7 +535,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -515,6 +573,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -561,7 +623,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith( @@ -592,7 +655,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -616,6 +684,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -658,7 +730,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith( @@ -682,7 +755,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -704,6 +782,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -743,7 +825,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Token transfer event from wrong pool', () => { @@ -754,7 +837,12 @@ export default (context: TestContext) => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -791,6 +879,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { // Only the second transfer should have been processed expect(message.event).toEqual('batch'); @@ -799,7 +891,8 @@ export default (context: TestContext) => { expect(message.data.events[0].data.poolLocator).toEqual('id=N1&block=1'); expect(message.data.events[0].data.blockchain.info.blockNumber).toEqual('2'); return true; - }); + }) + .close(); }); it('Token batch transfer', async () => { @@ -816,7 +909,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [ @@ -843,6 +941,10 @@ export default (context: TestContext) => { ], }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -922,7 +1024,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); expect(context.http.post).toHaveBeenCalledTimes(2); expect(context.http.post).toHaveBeenCalledWith( @@ -942,7 +1045,12 @@ export default (context: TestContext) => { it('Success receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -962,13 +1070,19 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Error receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -990,7 +1104,8 @@ export default (context: TestContext) => { }, }); return true; - }); + }) + .close(); }); it('Disconnect and reconnect', async () => { @@ -1016,10 +1131,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [tokenPoolMessage] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.event).toEqual('batch'); expect(message.data.events).toHaveLength(1); @@ -1028,43 +1152,22 @@ export default (context: TestContext) => { }) .close(); - await context.server.ws('/api/ws').expectJson(message => { - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-pool'); - return true; - }); - }); + context.resetConnectedPromise(); - it('Client switchover', async () => { - const tokenPoolMessage: TokenPoolCreationEvent = { - subId: 'sb-123', - signature: tokenCreateEventSignature, - address: '0x00001', - blockNumber: '1', - transactionIndex: '0x0', - transactionHash: '0x123', - logIndex: '1', - timestamp: '2020-01-01 00:00:00Z', - data: { - operator: 'bob', - type_id: '340282366920938463463374607431768211456', - data: '0x6e73006e616d65006964', - }, - }; - - context.eventstream.getSubscription.mockReturnValueOnce({ - name: packSubscriptionName('0x123', 'id=F1&block=1', '', 'default'), - }); - - const ws1 = context.server.ws('/api/ws'); - const ws2 = context.server.ws('/api/ws'); - - await ws1 - .exec(() => { - expect(context.eventHandler).toBeDefined(); + await context.server + .ws('/api/ws') + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; context.eventHandler({ events: [tokenPoolMessage] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.event).toEqual('batch'); expect(message.data.events).toHaveLength(1); @@ -1072,12 +1175,5 @@ export default (context: TestContext) => { return true; }) .close(); - - await ws2.expectJson(message => { - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-pool'); - return true; - }); }); }; diff --git a/tsconfig.json b/tsconfig.json index e584d56..d0ac7c7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,6 +12,8 @@ "baseUrl": "./", "incremental": true, "resolveJsonModule": true, - "strictNullChecks": true + "strictNullChecks": true, + "skipLibCheck": true, + "esModuleInterop": true } }