From 7ccc7b7d1f613193ecb83e74185474d4bd167930 Mon Sep 17 00:00:00 2001 From: Nyorok <38190289+nyorok@users.noreply.github.com> Date: Sat, 16 Nov 2024 23:53:29 -0300 Subject: [PATCH] Queue Ogmios Requests --- package-lock.json | 54 ++++++++++++++++++++++++++++++++++++++ package.json | 1 + src/IndexerApplication.ts | 15 +++++------ src/utils.ts | 55 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 8 deletions(-) diff --git a/package-lock.json b/package-lock.json index 031a467..28729f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "lucid-cardano": "^0.10.7", "mysql2": "^3.6.0", "node-cache": "^5.1.2", + "p-queue": "^8.0.1", "pg": "^8.11.5", "queue-promise": "^2.2.1", "reflect-metadata": "^0.1.13", @@ -4181,6 +4182,12 @@ "node": ">= 0.6" } }, + "node_modules/eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", + "license": "MIT" + }, "node_modules/execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", @@ -6242,6 +6249,34 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz", + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "license": "MIT", + "dependencies": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "6.1.3", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.3.tgz", + "integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", @@ -11271,6 +11306,11 @@ "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg==" }, + "eventemitter3": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" + }, "execa": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/execa/-/execa-5.1.1.tgz", @@ -12774,6 +12814,20 @@ } } }, + "p-queue": { + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.0.1.tgz", + "integrity": "sha512-NXzu9aQJTAzbBqOt2hwsR63ea7yvxJc0PwN/zobNAudYfb1B7R08SzB4TsLeSbUCuG467NhnoT0oO6w1qRO+BA==", + "requires": { + "eventemitter3": "^5.0.1", + "p-timeout": "^6.1.2" + } + }, + "p-timeout": { + "version": "6.1.3", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.3.tgz", + "integrity": "sha512-UJUyfKbwvr/uZSV6btANfb+0t/mOhKV/KXcCUTp8FcQI+v/0d+wXqH4htrW0E4rR6WiEO/EPvUFiV9D5OI4vlw==" + }, "p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", diff --git a/package.json b/package.json index 0caf39f..d3213f4 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "lucid-cardano": "^0.10.7", "mysql2": "^3.6.0", "node-cache": "^5.1.2", + "p-queue": "^8.0.1", "pg": "^8.11.5", "queue-promise": "^2.2.1", "reflect-metadata": "^0.1.13", diff --git a/src/IndexerApplication.ts b/src/IndexerApplication.ts index bd8917e..426121b 100644 --- a/src/IndexerApplication.ts +++ b/src/IndexerApplication.ts @@ -29,6 +29,7 @@ import { VyFiAnalyzer } from './dex/VyFiAnalyzer'; import { ChainSynchronization } from '@cardano-ogmios/client'; import { MinswapV2Analyzer } from './dex/MinswapV2Analyzer'; import { SundaeSwapV3Analyzer } from './dex/SundaeSwapV3Analyzer'; +import { QueueProcessor } from './utils'; export class IndexerApplication { @@ -130,6 +131,8 @@ export class IndexerApplication { }); } + blockQueue = new QueueProcessor(5000, (block) => this.rollForward(block), (block) => this.rollBackward(block)); + /** * Boot Ogmios connection. */ @@ -162,8 +165,8 @@ export class IndexerApplication { this._chainSyncClient = await createChainSynchronizationClient( this._ogmiosContext, { - rollForward: this.rollForward.bind(this), - rollBackward: this.rollBackward.bind(this), + rollForward: (response, nextBlock) => this.blockQueue.enqueue(response, nextBlock), + rollBackward: (response, nextBlock) => this.blockQueue.enqueue(response, nextBlock), }, { sequential: true, @@ -198,7 +201,7 @@ export class IndexerApplication { * @param update - New block update. * @param requestNext - Callback to request next block. */ - private async rollForward(update: { block: Block, tip: TipOrOrigin }, requestNext: () => void): Promise { + private async rollForward(update: { block: Block, tip: TipOrOrigin }): Promise { if (update.block.type === 'praos') { const block: BlockPraos = update.block; @@ -216,8 +219,6 @@ export class IndexerApplication { logInfo(`====== Finished with block at slot ${block.slot} ======`); } - - requestNext(); } /** @@ -225,7 +226,7 @@ export class IndexerApplication { * @param update - Point in which to revert to. * @param requestNext - Callback to request next block. */ - private async rollBackward(update: { point: PointOrOrigin }, requestNext: () => void): Promise { + private async rollBackward(update: { point: PointOrOrigin }): Promise { if (typeof update.point === 'object' && 'slot' in update.point) { logInfo(`Rollback occurred to slot ${update.point.slot}`); @@ -235,8 +236,6 @@ export class IndexerApplication { this._indexers.map((indexer: BaseIndexer) => indexer.onRollBackward(point.id, point.slot)), ); } - - requestNext(); } } diff --git a/src/utils.ts b/src/utils.ts index 41cf9ba..272a34a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -11,11 +11,17 @@ import { Lucid, Utils } from 'lucid-cardano'; import { Asset, Token } from './db/entities/Asset'; import { LiquidityPool } from './db/entities/LiquidityPool'; import { + Block, BlockPraos, Transaction as OgmiosTransaction, + Origin, + PointOrOrigin, + Tip, TransactionOutput, TransactionOutputReference } from '@cardano-ogmios/schema'; +import PQueue from 'p-queue'; +import { logError, logInfo } from './logger'; export const lucidUtils: Utils = new Utils(new Lucid()); @@ -149,3 +155,52 @@ export function formatTransaction(block: BlockPraos | null, transaction: OgmiosT scriptHashes: Object.keys(transaction.scripts ?? {}), }; } + +type ForwardBlock = { block: Block, tip: Tip | Origin}; +type BackwardBlock = { point: PointOrOrigin }; + +export class QueueProcessor { + private queue: PQueue; // Use p-queue for queue management + private maxSize: number; + + constructor( + maxSize: number, + private rollForward: (update: ForwardBlock) => Promise, + private rollBackward: (update: BackwardBlock) => Promise + ) { + this.maxSize = maxSize; + this.queue = new PQueue({ + concurrency: 1, // Ensure sequential processing + autoStart: true, // Automatically start processing tasks + }); + } + + async enqueue(response: ForwardBlock | BackwardBlock, requestNext: () => void): Promise { + try { + // Add the task to the queue + this.queue.add(async () => { + if ('block' in response) { + await this.rollForward(response); + } else { + await this.rollBackward(response); + } + }); + + // Call next block when there's room + await this.queue.onSizeLessThan(this.maxSize) + requestNext(); + } catch (error) { + logError(`BlockQueue.enqueue error: ${error}`); + } + } + + async stopProcessing(): Promise { + logInfo('Stopping Queue Processor...'); + this.queue.pause(); // Pause the queue + await this.queue.onIdle(); // Wait until all tasks are finished + } + + queueSize(): number { + return this.queue.size; + } +} \ No newline at end of file