From ff4354dbbcb13a01f3256db4ccbbde245df5f244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Podsiad=C5=82y?= Date: Thu, 23 May 2024 16:59:46 +0200 Subject: [PATCH 1/2] feat: cleanup queue --- src/queue.ts | 393 ++++-------------------- src/queues/canonical.ts | 136 ++++++++ src/queues/deployment.ts | 83 +++++ src/queues/independent.ts | 118 +++++++ src/queues/onChain.ts | 114 +++++++ src/queues/payloads.ts | 4 + src/utils/queue/aggregates/collector.ts | 10 +- src/utils/queue/router/broadcast.ts | 13 +- src/utils/queue/router/routingKey.ts | 29 +- src/utils/queue/setup-worker.ts | 27 +- 10 files changed, 580 insertions(+), 347 deletions(-) create mode 100644 src/queues/canonical.ts create mode 100644 src/queues/deployment.ts create mode 100644 src/queues/independent.ts create mode 100644 src/queues/onChain.ts create mode 100644 src/queues/payloads.ts diff --git a/src/queue.ts b/src/queue.ts index 246fd4e..75a23b9 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,33 +1,16 @@ import { Logger } from '@l2beat/backend-tools' -import { Token } from '@prisma/client' import { createPrismaClient } from './db/prisma.js' import { connection } from './redis/redis.js' -import { buildCoingeckoSource } from './sources/coingecko.js' -import { buildDeploymentSource } from './sources/deployment.js' -import { buildOrbitSource } from './sources/orbit.js' -import { buildTokenListSource } from './sources/tokenList.js' -import { buildWormholeSource } from './sources/wormhole.js' -import { getNetworksConfig, withExplorer } from './utils/getNetworksConfig.js' +import { getNetworksConfig } from './utils/getNetworksConfig.js' import { eventRouter } from './utils/queue/router/index.js' import { setupQueue } from './utils/queue/setup-queue.js' -import { setupQueueWithProcessor } from './utils/queue/queue-with-processor.js' -import { - wrapDeploymentUpdatedQueue, - wrapTokenQueue, -} from './utils/queue/wrap.js' -import { setupCollector } from './utils/queue/aggregates/collector.js' -import { buildArbitrumCanonicalSource } from './sources/arbitrumCanonical.js' -import { buildOptimismCanonicalSource } from './sources/optimismCanonical.js' -import { buildAxelarConfigSource } from './sources/axelarConfig.js' -import { buildAxelarGatewaySource } from './sources/axelarGateway.js' -import { buildOnChainMetadataSource } from './sources/onChainMetadata.js' import { byTokenChainId } from './utils/queue/router/routing-key-rules.js' import { env } from './env.js' import { startQueueDashboard } from './utils/queue/dashboard.js' -import { buildZkSyncCanonicalSource } from './sources/zkSyncCanonical.js' - -type TokenPayload = { tokenId: Token['id'] } -type BatchTokenPayload = { tokenIds: Token['id'][] } +import { setupIndependentQueues } from './queues/independent.js' +import { setupCanonicalQueues } from './queues/canonical.js' +import { setupDeploymentQueues } from './queues/deployment.js' +import { setupOnChainMetadataQueues } from './queues/onChain.js' const db = createPrismaClient() @@ -43,350 +26,98 @@ const router = eventRouter({ logger, }) -const queueWithProcessor = setupQueueWithProcessor({ connection, logger }) const queue = setupQueue({ connection }) -const lists = [ - { - tag: '1INCH', - url: 'https://tokens.1inch.eth.link', - }, - { - tag: 'AAVE', - url: 'http://tokenlist.aave.eth.link', - }, - { - tag: 'MYCRYPTO', - url: 'https://uniswap.mycryptoapi.com/', - }, - { - tag: 'SUPERCHAIN', - url: 'https://static.optimism.io/optimism.tokenlist.json', - }, -] - -// #region Deployment processors -// Routing inbox where TokenUpdate events are broadcasted from independent sources -const deploymentRoutingInbox = queue({ - name: 'DeploymentRoutingInbox', -}) - -// Output queue for the deployment processors where the tokenIds are broadcasted if the deployment is updated -const deploymentUpdatedInbox = queue({ - name: 'DeploymentUpdatedInbox', -}) - -const deploymentUpdatedQueue = wrapDeploymentUpdatedQueue( - deploymentUpdatedInbox, -) - -// For each supported network with an explorer, create a deployment processor -const deploymentProcessors = networksConfig - .filter(withExplorer) - .map((networkConfig) => { - const processor = buildDeploymentSource({ - logger, - db, - networkConfig, - queue: deploymentUpdatedQueue, - }) - - const bus = queueWithProcessor({ - name: `DeploymentProcessor:${networkConfig.name}`, - processor: (job) => { - return processor(job.data.tokenId) - }, - }) - - return { - queue: bus.queue, - routingKey: networkConfig.chainId, - } - }) - -// Route the events from deploymentRoutingInbox to the per-chain deployment processors -router.routingKey({ - from: deploymentRoutingInbox, - to: deploymentProcessors, - extractRoutingKey: byTokenChainId({ db }), -}) -// #endregion Deployment processors - -// #region Canonical sources - Arbitrum -const arbitrumCanonicalProcessor = queueWithProcessor({ - name: 'ArbitrumCanonicalProcessor', - processor: buildArbitrumCanonicalSource({ logger, db, networksConfig }), -}) - -// Handle backpressure from the deployment processor -const arbitrumCanonicalEventCollector = queue({ - name: 'ArbitrumCanonicalEventCollector', -}) -const oneMinuteMs = 60 * 1000 - -setupCollector({ - inputQueue: arbitrumCanonicalEventCollector, - outputQueue: arbitrumCanonicalProcessor.queue, - aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), - bufferSize: 100, - flushIntervalMs: oneMinuteMs, - connection, +const deps = { + db, logger, -}) -// #endregion Canonical sources - Arbitrum - -// #region Canonical sources - Optimism -const optimismCanonicalProcessor = queueWithProcessor({ - name: 'OptimismCanonicalProcessor', - processor: buildOptimismCanonicalSource({ logger, db, networksConfig }), -}) - -// Handle backpressure from the deployment processor -const optimismCanonicalEventCollector = queue({ - name: 'OptimismCanonicalEventCollector', -}) - -setupCollector({ - inputQueue: optimismCanonicalEventCollector, - outputQueue: optimismCanonicalProcessor.queue, - aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), - bufferSize: 100, - flushIntervalMs: oneMinuteMs, connection, - logger, -}) -// #endregion Canonical sources - Optimism - -// #region Canonical sources - ZkSync -const zkSyncCanonicalProcessor = queueWithProcessor({ - name: 'ZkSyncCanonicalProcessor', - processor: buildZkSyncCanonicalSource({ logger, db, networksConfig }), -}) + networksConfig, +} -// Handle backpressure from the deployment processor -const zkSyncCanonicalEventCollector = queue({ - name: 'ZkSyncCanonicalEventCollector', -}) +const canonical = await setupCanonicalQueues(deps) +const deployment = await setupDeploymentQueues(deps) +const independent = await setupIndependentQueues(deps) +const onChainMetadata = await setupOnChainMetadataQueues(deps) -setupCollector({ - inputQueue: zkSyncCanonicalEventCollector, - outputQueue: zkSyncCanonicalProcessor.queue, - aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), - bufferSize: 100, - flushIntervalMs: oneMinuteMs, - connection, - logger, -}) -// #endregion Canonical sources - ZkSync - -// #region Canonical sources update wire up -router.routingKey({ - from: deploymentUpdatedInbox, +// When token deployment is updated, route the event to the canonical processors since these are dependent on the deployment data +const deploymentToCanonicalRoutingWorker = router.routingKey({ + from: deployment.update.inbox, to: [ // Ditch the rest { - queue: arbitrumCanonicalEventCollector, + queue: canonical.arbitrum.collector.queue, routingKey: 42161, }, { - queue: optimismCanonicalEventCollector, + queue: canonical.optimism.collector.queue, routingKey: 10, }, { - queue: zkSyncCanonicalEventCollector, + queue: canonical.zkSync.collector.queue, routingKey: 324, }, + // Also notify about new canonical token deployments + { + queue: [ + canonical.arbitrum.collector.queue, + canonical.optimism.collector.queue, + canonical.zkSync.collector.queue, + ], + routingKey: 1, + }, ], extractRoutingKey: byTokenChainId({ db }), }) -// #endregion Canonical sources update wire up - -const tokenUpdateInbox = queue({ - name: 'TokenUpdateInbox', -}) - -const tokenUpdateQueue = wrapTokenQueue(tokenUpdateInbox) - -// #region On-chain metadata sources -// Routing inbox where TokenUpdate events are broadcasted from independent sources -const onChainMetadataRoutingInbox = queue({ - name: 'OnChainMetadataRoutingInbox', -}) - -// For each network, create routing inbox and backpressure (collector) queue -// so we can batch process the events instead of calling node for each token -const onChainMetadataBuses = networksConfig - .filter(withExplorer) - .map((networkConfig) => { - // Per-chain events will be collected here - const eventCollectorInbox = queue({ - name: `OnChainMetadataEventCollector:${networkConfig.name}`, - }) - - // Batch processor for the collected events - const batchEventProcessor = queueWithProcessor({ - name: `OnChainMetadataBatchProcessor:${networkConfig.name}`, - processor: (job) => - buildOnChainMetadataSource({ - logger, - db, - networkConfig, - })(job.data.tokenIds), - }) - - // Wire up the collector to the processor - setupCollector({ - inputQueue: eventCollectorInbox, - outputQueue: batchEventProcessor.queue, - aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), - bufferSize: 100, - flushIntervalMs: oneMinuteMs, - connection, - logger, - }) - - return { - queue: eventCollectorInbox, - batchQueue: batchEventProcessor.queue, - routingKey: networkConfig.chainId, - } - }) - -// Route the events from the global inbox to the per-chain event collectors -router.routingKey({ - from: onChainMetadataRoutingInbox, - to: onChainMetadataBuses.map((bus) => ({ - queue: bus.queue, - routingKey: bus.routingKey, - })), - extractRoutingKey: byTokenChainId({ db }), -}) -// #endregion On-chain metadata sources -// #region Independent sources - -const coingeckoSource = buildCoingeckoSource({ - logger, - db, - queue: tokenUpdateQueue, -}) -const axelarConfigSource = buildAxelarConfigSource({ - logger, - db, - queue: tokenUpdateQueue, -}) -const wormholeSource = buildWormholeSource({ - logger, - db, - queue: tokenUpdateQueue, -}) -const orbitSource = buildOrbitSource({ logger, db, queue: tokenUpdateQueue }) -const tokenListSources = lists.map(({ tag, url }) => - queueWithProcessor({ - name: `TokenListProcessor:${tag}`, - processor: buildTokenListSource({ - tag, - url, - logger, - db, - queue: tokenUpdateQueue, - }), - }), -) - -// const lzV1Sources = networksConfig.filter(withExplorer).map((networkConfig) => { -// return { -// name: `LayerZeroV1Processor:${networkConfig.name}`, -// processor: buildLayerZeroV1Source({ -// logger, -// db, -// networkConfig, -// queue: tokenUpdateQueue, -// }), -// } -// }) - -// const lzV1Queues = lzV1Sources.map((source) => sourceQueue(source)) - -const axelarGatewayQueues = networksConfig.map((networkConfig) => - queueWithProcessor({ - name: `AxelarGatewayProcessor:${networkConfig.name}`, - processor: buildAxelarGatewaySource({ - logger, - db, - networkConfig, - queue: tokenUpdateQueue, - }), - }), -) - -const coingeckoQueue = queueWithProcessor({ - name: 'CoingeckoProcessor', - processor: coingeckoSource, -}) - -const axelarConfigQueue = queueWithProcessor({ - name: 'AxelarConfigProcessor', - processor: axelarConfigSource, -}) - -const wormholeQueue = queueWithProcessor({ - name: 'WormholeProcessor', - processor: wormholeSource, -}) - -const orbitQueue = queueWithProcessor({ - name: 'OrbitProcessor', - processor: orbitSource, -}) - -const independentSources = [ - coingeckoQueue, - ...axelarGatewayQueues, - axelarConfigQueue, - wormholeQueue, - orbitQueue, - ...tokenListSources, - // ...lzV1Queues, -] // Input signal, might be removed const refreshInbox = queue({ name: 'RefreshInbox', }) +const independentSources = Object.values(independent.sources).flat() + // Input signal, might be removed -router.broadcast({ +const refreshBroadcastWorker = router.broadcast({ from: refreshInbox, to: independentSources.map((q) => q.queue), }) -// Broadcast the token update events to the independent sources to dependant sources -router.broadcast({ - from: tokenUpdateInbox, - to: [deploymentRoutingInbox, onChainMetadataRoutingInbox], +// Broadcast the token update events to the dependent sources to dependant sources +const independentBroadcastWorker = router.broadcast({ + from: independent.inbox, + to: [deployment.routing.inbox, onChainMetadata.routing.inbox], }) -// #endregion Independent sources - // #region BullBoard const dashboardLogger = logger.for('QueueDashboard') if (env.QUEUE_DASHBOARD_PORT) { const allQueues = [ - deploymentRoutingInbox, - tokenUpdateInbox, refreshInbox, + + // Independent + independent.inbox, independentSources.map((q) => q.queue), - deploymentProcessors.map((p) => p.queue), - arbitrumCanonicalEventCollector, - optimismCanonicalEventCollector, - arbitrumCanonicalProcessor.queue, - optimismCanonicalProcessor.queue, - deploymentUpdatedInbox, - onChainMetadataBuses.map((b) => b.queue), - onChainMetadataBuses.map((b) => b.batchQueue), - onChainMetadataRoutingInbox, + + // Deployment + deployment.routing.inbox, + deployment.update.inbox, + deployment.buses.map((b) => b.queue), + + // Canonical + canonical.arbitrum.collector.queue, + canonical.arbitrum.processor.queue, + canonical.optimism.collector.queue, + canonical.optimism.processor.queue, + canonical.zkSync.collector.queue, + canonical.zkSync.processor.queue, + + // Onchain + onChainMetadata.routing.inbox, + onChainMetadata.buses.map((b) => b.collector.queue), + onChainMetadata.buses.map((b) => b.processor.queue), ].flat() await startQueueDashboard({ @@ -399,3 +130,15 @@ if (env.QUEUE_DASHBOARD_PORT) { dashboardLogger.warn('Queue dashboard is disabled') } // #endregion BullBoard + +// Start all the workers +await Promise.all([ + refreshBroadcastWorker.run(), + independentBroadcastWorker.run(), + deploymentToCanonicalRoutingWorker.run(), + + independent.start(), + canonical.start(), + deployment.start(), + onChainMetadata.start(), +]) diff --git a/src/queues/canonical.ts b/src/queues/canonical.ts new file mode 100644 index 0000000..bc0ff69 --- /dev/null +++ b/src/queues/canonical.ts @@ -0,0 +1,136 @@ +import { Redis } from 'ioredis' +import { PrismaClient } from '../db/prisma.js' +import { Logger } from '@l2beat/backend-tools' +import { NetworkConfig } from '../utils/getNetworksConfig.js' +import { setupQueue } from '../utils/queue/setup-queue.js' +import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js' +import { buildArbitrumCanonicalSource } from '../sources/arbitrumCanonical.js' +import { setupCollector } from '../utils/queue/aggregates/collector.js' +import { buildOptimismCanonicalSource } from '../sources/optimismCanonical.js' +import { buildZkSyncCanonicalSource } from '../sources/zkSyncCanonical.js' + +const oneMinuteMs = 60 * 1000 + +type TokenPayload = { tokenId: string } +type BatchTokenPayload = { tokenIds: string[] } + +export async function setupCanonicalQueues({ + connection, + db, + logger, + networksConfig, +}: { + connection: Redis + db: PrismaClient + logger: Logger + networksConfig: NetworkConfig[] +}) { + const deps = { + connection, + logger, + } + + const queue = setupQueue(deps) + const queueWithProcessor = setupQueueWithProcessor(deps) + + // Arbitrum + const arbitrumCanonicalProcessor = queueWithProcessor({ + name: 'ArbitrumCanonicalBatchProcessor', + processor: buildArbitrumCanonicalSource({ logger, db, networksConfig }), + }) + const arbitrumCanonicalEventCollector = queue({ + name: 'ArbitrumCanonicalEventCollector', + }) + + // Handle backpressure from the deployment processor (for each below) + const arbitrumCollectorWorker = setupCollector({ + inputQueue: arbitrumCanonicalEventCollector, + outputQueue: arbitrumCanonicalProcessor.queue, + aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), + bufferSize: 100, + flushIntervalMs: oneMinuteMs, + connection, + logger, + }) + + // Optimism + const optimismCanonicalProcessor = queueWithProcessor({ + name: 'OptimismCanonicalBatchProcessor', + processor: buildOptimismCanonicalSource({ logger, db, networksConfig }), + }) + const optimismCanonicalEventCollector = queue({ + name: 'OptimismCanonicalEventCollector', + }) + + const optimismCollectorWorker = setupCollector({ + inputQueue: optimismCanonicalEventCollector, + outputQueue: optimismCanonicalProcessor.queue, + aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), + bufferSize: 100, + flushIntervalMs: oneMinuteMs, + connection, + logger, + }) + + // ZkSync + const zkSyncCanonicalProcessor = queueWithProcessor({ + name: 'ZkSyncCanonicalBatchProcessor', + processor: buildZkSyncCanonicalSource({ logger, db, networksConfig }), + }) + const zkSyncCanonicalEventCollector = queue({ + name: 'ZkSyncCanonicalEventCollector', + }) + + const zkSyncCollectorWorker = setupCollector({ + inputQueue: zkSyncCanonicalEventCollector, + outputQueue: zkSyncCanonicalProcessor.queue, + aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), + bufferSize: 100, + flushIntervalMs: oneMinuteMs, + connection, + logger, + }) + + function start() { + const statusLogger = logger.for('CanonicalQueuesModule') + statusLogger.info('Starting') + + const toRun = [ + arbitrumCollectorWorker, + arbitrumCanonicalProcessor.worker, + optimismCollectorWorker, + optimismCanonicalProcessor.worker, + zkSyncCollectorWorker, + zkSyncCanonicalProcessor.worker, + ] + + toRun.forEach((worker) => worker.run()) + + statusLogger.info('Started') + } + + return { + start, + arbitrum: { + processor: arbitrumCanonicalProcessor, + collector: { + queue: arbitrumCanonicalEventCollector, + worker: arbitrumCollectorWorker, + }, + }, + optimism: { + processor: optimismCanonicalProcessor, + collector: { + queue: optimismCanonicalEventCollector, + worker: optimismCollectorWorker, + }, + }, + zkSync: { + processor: zkSyncCanonicalProcessor, + collector: { + queue: zkSyncCanonicalEventCollector, + worker: zkSyncCollectorWorker, + }, + }, + } +} diff --git a/src/queues/deployment.ts b/src/queues/deployment.ts new file mode 100644 index 0000000..d07b91b --- /dev/null +++ b/src/queues/deployment.ts @@ -0,0 +1,83 @@ +import { Logger } from '@l2beat/backend-tools' +import { PrismaClient } from '../db/prisma.js' +import { NetworkConfig, withExplorer } from '../utils/getNetworksConfig.js' +import { Redis } from 'ioredis' +import { eventRouter } from '../utils/queue/router/index.js' +import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js' +import { TokenPayload } from './payloads.js' +import { wrapDeploymentUpdatedQueue } from '../utils/queue/wrap.js' +import { setupQueue } from '../utils/queue/setup-queue.js' +import { buildDeploymentSource } from '../sources/deployment.js' +import { byTokenChainId } from '../utils/queue/router/routing-key-rules.js' + +export async function setupDeploymentQueues({ + connection, + db, + logger, + networksConfig, +}: { + connection: Redis + db: PrismaClient + logger: Logger + networksConfig: NetworkConfig[] +}) { + const deps = { + connection, + logger, + } + + const router = eventRouter(deps) + const queue = setupQueue(deps) + const queueWithProcessor = setupQueueWithProcessor(deps) + + // Routing inbox where TokenUpdate events are broadcasted from independent sources + const routingInbox = queue({ name: 'DeploymentRoutingInbox' }) + + // Output queue for the deployment processors where the tokenIds are broadcasted if the deployment is updated + const updatedInbox = queue({ name: 'DeploymentUpdatedInbox' }) + const updatedQueue = wrapDeploymentUpdatedQueue(updatedInbox) + + // For each supported network with an explorer, create a deployment processor + const buses = networksConfig.filter(withExplorer).map((networkConfig) => { + const processor = buildDeploymentSource({ + logger, + db, + networkConfig, + queue: updatedQueue, + }) + const { queue, worker } = queueWithProcessor({ + name: `DeploymentProcessor:${networkConfig.name}`, + processor: (job) => processor(job.data.tokenId), + }) + + return { queue, worker, routingKey: networkConfig.chainId } + }) + + // Route the events from routing inbox to the per-chain deployment processors + const routeingWorker = router.routingKey({ + from: routingInbox, + to: buses, + extractRoutingKey: byTokenChainId({ db }), + }) + + async function start() { + const statusLogger = logger.for('DeploymentQueuesModule') + + buses.forEach(({ worker }) => worker.run()) + routeingWorker.run() + + statusLogger.info('Started') + } + + return { + start, + routing: { + inbox: routingInbox, + worker: routeingWorker, + }, + update: { + inbox: updatedInbox, + }, + buses, + } +} diff --git a/src/queues/independent.ts b/src/queues/independent.ts new file mode 100644 index 0000000..4ba3d79 --- /dev/null +++ b/src/queues/independent.ts @@ -0,0 +1,118 @@ +import { Logger } from '@l2beat/backend-tools' +import { Redis } from 'ioredis' +import { PrismaClient } from '../db/prisma.js' +import { setupQueue } from '../utils/queue/setup-queue.js' +import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js' +import { TokenPayload } from './payloads.js' +import { wrapTokenQueue } from '../utils/queue/wrap.js' +import { buildCoingeckoSource } from '../sources/coingecko.js' +import { buildAxelarConfigSource } from '../sources/axelarConfig.js' +import { buildWormholeSource } from '../sources/wormhole.js' +import { buildOrbitSource } from '../sources/orbit.js' +import { buildTokenListSource } from '../sources/tokenList.js' +import { NetworkConfig } from '../utils/getNetworksConfig.js' +import { buildAxelarGatewaySource } from '../sources/axelarGateway.js' + +export async function setupIndependentQueues({ + db, + logger, + connection, + networksConfig, +}: { + db: PrismaClient + logger: Logger + connection: Redis + networksConfig: NetworkConfig[] +}) { + const deps = { + connection, + logger, + } + const queue = setupQueue(deps) + const queueWithProcessor = setupQueueWithProcessor(deps) + + const tokenUpdateInbox = queue({ name: 'TokenUpdateInbox' }) + const tokenUpdateQueue = wrapTokenQueue(tokenUpdateInbox) + + const lists = [ + { tag: '1INCH', url: 'https://tokens.1inch.eth.link' }, + { tag: 'AAVE', url: 'http://tokenlist.aave.eth.link' }, + { tag: 'MYCRYPTO', url: 'https://uniswap.mycryptoapi.com/' }, + { + tag: 'SUPERCHAIN', + url: 'https://static.optimism.io/optimism.tokenlist.json', + }, + ] + + const coingecko = queueWithProcessor({ + name: 'CoingeckoProcessor', + processor: buildCoingeckoSource({ logger, db, queue: tokenUpdateQueue }), + }) + + const axelarConfig = queueWithProcessor({ + name: 'AxelarConfigProcessor', + processor: buildAxelarConfigSource({ logger, db, queue: tokenUpdateQueue }), + }) + + const wormhole = queueWithProcessor({ + name: 'WormholeProcessor', + processor: buildWormholeSource({ logger, db, queue: tokenUpdateQueue }), + }) + + const orbit = queueWithProcessor({ + name: 'OrbitProcessor', + processor: buildOrbitSource({ logger, db, queue: tokenUpdateQueue }), + }) + + const tokenLists = lists.map(({ tag, url }) => + queueWithProcessor({ + name: `TokenListProcessor:${tag}`, + processor: buildTokenListSource({ + tag, + url, + logger, + db, + queue: tokenUpdateQueue, + }), + }), + ) + + const axelarGateway = networksConfig.map((networkConfig) => + queueWithProcessor({ + name: `AxelarGatewayProcessor:${networkConfig.name}`, + processor: buildAxelarGatewaySource({ + logger, + db, + networkConfig, + queue: tokenUpdateQueue, + }), + }), + ) + + async function start() { + const statusLogger = logger.for('IndependentQueuesModule') + statusLogger.info('Starting') + + coingecko.worker.run() + axelarConfig.worker.run() + wormhole.worker.run() + orbit.worker.run() + tokenLists.forEach(({ worker }) => worker.run()) + axelarGateway.forEach(({ worker }) => worker.run()) + + statusLogger.info('Started') + } + + return { + start, + sources: { + coingecko, + axelarConfig, + wormhole, + orbit, + tokenLists, + axelarGateway, + }, + inbox: tokenUpdateInbox, + } +} diff --git a/src/queues/onChain.ts b/src/queues/onChain.ts new file mode 100644 index 0000000..50294f0 --- /dev/null +++ b/src/queues/onChain.ts @@ -0,0 +1,114 @@ +import { Redis } from 'ioredis' +import { PrismaClient } from '../db/prisma.js' +import { Logger } from '@l2beat/backend-tools' +import { NetworkConfig, withExplorer } from '../utils/getNetworksConfig.js' +import { eventRouter } from '../utils/queue/router/index.js' +import { setupQueue } from '../utils/queue/setup-queue.js' +import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js' +import { setupCollector } from '../utils/queue/aggregates/collector.js' +import { buildOnChainMetadataSource } from '../sources/onChainMetadata.js' +import { byTokenChainId } from '../utils/queue/router/routing-key-rules.js' + +const oneMinuteMs = 60 * 1000 + +type TokenPayload = { tokenId: string } +type BatchTokenPayload = { tokenIds: string[] } + +export async function setupOnChainMetadataQueues({ + connection, + db, + logger, + networksConfig, +}: { + connection: Redis + db: PrismaClient + logger: Logger + networksConfig: NetworkConfig[] +}) { + const deps = { + connection, + logger, + } + + const queue = setupQueue(deps) + const queueWithProcessor = setupQueueWithProcessor(deps) + const router = eventRouter(deps) + + // Routing inbox where TokenUpdate events are broadcasted from independent sources + const onChainMetadataRoutingInbox = queue({ + name: 'OnChainMetadataRoutingInbox', + }) + + // For each network, create routing inbox and backpressure (collector) queue + // so we can batch process the events instead of calling node for each token + const onChainMetadataBuses = networksConfig + .filter(withExplorer) + .map((networkConfig) => { + // Per-chain events will be collected here + const inbox = queue({ + name: `OnChainMetadataEventCollector:${networkConfig.name}`, + }) + // Batch processor for the collected events + const batchEventProcessor = queueWithProcessor({ + name: `OnChainMetadataBatchProcessor:${networkConfig.name}`, + processor: (job) => + buildOnChainMetadataSource({ logger, db, networkConfig })( + job.data.tokenIds, + ), + }) + + // Wire up the collector to the processor + const worker = setupCollector({ + inputQueue: inbox, + outputQueue: batchEventProcessor.queue, + aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }), + bufferSize: 100, + flushIntervalMs: oneMinuteMs, + connection, + logger, + }) + + return { + collector: { + queue: inbox, + worker, + }, + processor: batchEventProcessor, + routingKey: networkConfig.chainId, + } + }) + + // Route the events from the inbox to the per-chain event collectors + const routingWorker = router.routingKey({ + from: onChainMetadataRoutingInbox, + to: onChainMetadataBuses.map((bus) => ({ + queue: bus.collector.queue, + routingKey: bus.routingKey, + })), + extractRoutingKey: byTokenChainId({ db }), + }) + + async function start() { + const statusLogger = logger.for('OnChainMetadataQueuesModule') + statusLogger.info('Starting') + + const toRun = [ + ...onChainMetadataBuses.map((bus) => bus.collector.worker), + ...onChainMetadataBuses.map((bus) => bus.processor.worker), + routingWorker, + ] + + toRun.forEach((worker) => worker.run()) + + statusLogger.info('Started') + } + + return { + start, + routing: { + inbox: onChainMetadataRoutingInbox, + worker: routingWorker, + }, + buses: onChainMetadataBuses, + } +} diff --git a/src/queues/payloads.ts b/src/queues/payloads.ts new file mode 100644 index 0000000..72257a6 --- /dev/null +++ b/src/queues/payloads.ts @@ -0,0 +1,4 @@ +import { Token } from '@prisma/client' + +export type TokenPayload = { tokenId: Token['id'] } +export type BatchTokenPayload = { tokenIds: Token['id'][] } diff --git a/src/utils/queue/aggregates/collector.ts b/src/utils/queue/aggregates/collector.ts index 6bd1070..0898c9e 100644 --- a/src/utils/queue/aggregates/collector.ts +++ b/src/utils/queue/aggregates/collector.ts @@ -1,8 +1,9 @@ import { Logger } from '@l2beat/backend-tools' -import { Job, Queue, Worker } from 'bullmq' +import { Job, Queue } from 'bullmq' import { Redis } from 'ioredis' import { setupWorkerLogging } from '../logging.js' import { InferQueueDataType, InferQueueResultType } from '../types.js' +import { setupWorker } from '../setup-worker.js' type BufferEntry = { payload: T @@ -113,9 +114,12 @@ export function setupCollector< }) } - const worker = new Worker(inputQueue.name, processor, { + const worker = setupWorker({ + queue: inputQueue, connection, - concurrency: bufferSize, + processor, + logger, + workerOptions: { concurrency: bufferSize }, }) if (logger) { diff --git a/src/utils/queue/router/broadcast.ts b/src/utils/queue/router/broadcast.ts index 4b3f30c..58f70e5 100644 --- a/src/utils/queue/router/broadcast.ts +++ b/src/utils/queue/router/broadcast.ts @@ -1,7 +1,8 @@ import { Logger } from '@l2beat/backend-tools' -import { Queue, Worker } from 'bullmq' +import { Queue } from 'bullmq' import { Redis } from 'ioredis' import { InferQueueDataType } from '../types.js' +import { setupWorker } from '../setup-worker.js' /** * Broadcast events from one queue to multiple queues. @@ -20,15 +21,15 @@ export function broadcast({ from: InputQueue to: Queue[] }) => { - const broadcastWorker = new Worker( - from.name, - async (job) => { + const broadcastWorker = setupWorker({ + connection, + queue: from, + processor: async (job) => { to.forEach((queue) => { queue.add(job.name, job.data, job.opts) }) }, - { connection }, - ) + }) logger.info('Broadcast rule set', { from: from.name, diff --git a/src/utils/queue/router/routingKey.ts b/src/utils/queue/router/routingKey.ts index c585765..e647790 100644 --- a/src/utils/queue/router/routingKey.ts +++ b/src/utils/queue/router/routingKey.ts @@ -5,7 +5,14 @@ import { setupWorker } from '../setup-worker.js' import { InferQueueDataType } from '../types.js' type RoutedQueue = { - queue: Queue + /** + * The queue or queues to route the event to. + */ + queue: Queue | Queue[] + + /** + * The routing key to use for this queue. + */ routingKey: RoutingKey } @@ -32,8 +39,11 @@ export function routingKey({ to: RoutedQueue[] extractRoutingKey: (data: InputEvent) => Promise }) => { - const queueMap = new Map( - to.map(({ queue, routingKey }) => [routingKey, queue]), + const queueMap = new Map( + to.map(({ queue, routingKey }) => [ + routingKey, + Array.isArray(queue) ? queue : [queue], + ]), ) if (queueMap.size !== to.length) { @@ -51,9 +61,11 @@ export function routingKey({ connection, processor: async (job: Job) => { const routingKey = await extractRoutingKey(job.data) - const queue = queueMap.get(routingKey) - if (queue) { - await queue.add(job.name, job.data, job.opts) + const queues = queueMap.get(routingKey) + if (queues) { + await Promise.all( + queues.map((queue) => queue.add(job.name, job.data, job.opts)), + ) } else { logger .tag(from.name) @@ -63,7 +75,10 @@ export function routingKey({ }) const mapToLog = Object.fromEntries( - Array.from(queueMap.entries()).map(([key, queue]) => [key, queue.name]), + Array.from(queueMap.entries()).map(([key, queue]) => [ + key, + queue.map((queue) => queue.name), + ]), ) logger.info('Routing key rule set', { diff --git a/src/utils/queue/setup-worker.ts b/src/utils/queue/setup-worker.ts index 6246d90..ae5dd55 100644 --- a/src/utils/queue/setup-worker.ts +++ b/src/utils/queue/setup-worker.ts @@ -1,9 +1,16 @@ import { Logger } from '@l2beat/backend-tools' -import { Processor, Queue, Worker } from 'bullmq' +import { + Processor, + Queue, + Worker, + WorkerOptions as BullWorkerOptions, +} from 'bullmq' import { Redis } from 'ioredis' import { setupWorkerLogging } from './logging.js' import { InferQueueDataType, InferQueueResultType } from './types.js' +type WorkerOptions = Pick + export function setupWorker< EventQueue extends Queue, DataType = InferQueueDataType, @@ -13,16 +20,24 @@ export function setupWorker< connection, processor, logger, + workerOptions, }: { queue: EventQueue connection: Redis - processor: Processor + processor: Processor logger?: Logger + workerOptions?: WorkerOptions }) { - const worker = new Worker(queue.name, processor, { - connection, - concurrency: 1, - }) + const worker = new Worker( + queue.name, + processor, + { + connection, + concurrency: 1, + autorun: false, + ...workerOptions, + }, + ) if (logger) { setupWorkerLogging({ worker, logger }) From c1b7eab9b88a086103771964ef55c6e2e4ec3fb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Podsiad=C5=82y?= Date: Thu, 23 May 2024 18:31:03 +0200 Subject: [PATCH 2/2] chore: remove payloads from file --- src/queues/canonical.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/queues/canonical.ts b/src/queues/canonical.ts index bc0ff69..5f9ca5e 100644 --- a/src/queues/canonical.ts +++ b/src/queues/canonical.ts @@ -8,12 +8,10 @@ import { buildArbitrumCanonicalSource } from '../sources/arbitrumCanonical.js' import { setupCollector } from '../utils/queue/aggregates/collector.js' import { buildOptimismCanonicalSource } from '../sources/optimismCanonical.js' import { buildZkSyncCanonicalSource } from '../sources/zkSyncCanonical.js' +import { BatchTokenPayload, TokenPayload } from './payloads.js' const oneMinuteMs = 60 * 1000 -type TokenPayload = { tokenId: string } -type BatchTokenPayload = { tokenIds: string[] } - export async function setupCanonicalQueues({ connection, db,