Skip to content
This repository was archived by the owner on Sep 10, 2024. It is now read-only.

Clean-up queue code a bit #38

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 68 additions & 325 deletions src/queue.ts

Large diffs are not rendered by default.

134 changes: 134 additions & 0 deletions src/queues/canonical.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { Redis } from 'ioredis'
import { PrismaClient } from '../db/prisma.js'
import { Logger } from '@l2beat/backend-tools'
import { NetworkConfig } from '../utils/getNetworksConfig.js'
import { setupQueue } from '../utils/queue/setup-queue.js'
import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js'
import { buildArbitrumCanonicalSource } from '../sources/arbitrumCanonical.js'
import { setupCollector } from '../utils/queue/aggregates/collector.js'
import { buildOptimismCanonicalSource } from '../sources/optimismCanonical.js'
import { buildZkSyncCanonicalSource } from '../sources/zkSyncCanonical.js'
import { BatchTokenPayload, TokenPayload } from './payloads.js'

const oneMinuteMs = 60 * 1000

export async function setupCanonicalQueues({
connection,
db,
logger,
networksConfig,
}: {
connection: Redis
db: PrismaClient
logger: Logger
networksConfig: NetworkConfig[]
}) {
const deps = {
connection,
logger,
}

const queue = setupQueue(deps)
const queueWithProcessor = setupQueueWithProcessor(deps)

// Arbitrum
const arbitrumCanonicalProcessor = queueWithProcessor<BatchTokenPayload>({
name: 'ArbitrumCanonicalBatchProcessor',
processor: buildArbitrumCanonicalSource({ logger, db, networksConfig }),
})
const arbitrumCanonicalEventCollector = queue<TokenPayload>({
name: 'ArbitrumCanonicalEventCollector',
})

// Handle backpressure from the deployment processor (for each below)
const arbitrumCollectorWorker = setupCollector({
inputQueue: arbitrumCanonicalEventCollector,
outputQueue: arbitrumCanonicalProcessor.queue,
aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }),
bufferSize: 100,
flushIntervalMs: oneMinuteMs,
connection,
logger,
})

// Optimism
const optimismCanonicalProcessor = queueWithProcessor<BatchTokenPayload>({
name: 'OptimismCanonicalBatchProcessor',
processor: buildOptimismCanonicalSource({ logger, db, networksConfig }),
})
const optimismCanonicalEventCollector = queue<TokenPayload>({
name: 'OptimismCanonicalEventCollector',
})

const optimismCollectorWorker = setupCollector({
inputQueue: optimismCanonicalEventCollector,
outputQueue: optimismCanonicalProcessor.queue,
aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }),
bufferSize: 100,
flushIntervalMs: oneMinuteMs,
connection,
logger,
})

// ZkSync
const zkSyncCanonicalProcessor = queueWithProcessor<BatchTokenPayload>({
name: 'ZkSyncCanonicalBatchProcessor',
processor: buildZkSyncCanonicalSource({ logger, db, networksConfig }),
})
const zkSyncCanonicalEventCollector = queue<TokenPayload>({
name: 'ZkSyncCanonicalEventCollector',
})

const zkSyncCollectorWorker = setupCollector({
inputQueue: zkSyncCanonicalEventCollector,
outputQueue: zkSyncCanonicalProcessor.queue,
aggregate: (data) => ({ tokenIds: data.map((d) => d.tokenId) }),
bufferSize: 100,
flushIntervalMs: oneMinuteMs,
connection,
logger,
})

function start() {
const statusLogger = logger.for('CanonicalQueuesModule')
statusLogger.info('Starting')

const toRun = [
arbitrumCollectorWorker,
arbitrumCanonicalProcessor.worker,
optimismCollectorWorker,
optimismCanonicalProcessor.worker,
zkSyncCollectorWorker,
zkSyncCanonicalProcessor.worker,
]

toRun.forEach((worker) => worker.run())

statusLogger.info('Started')
}

return {
start,
arbitrum: {
processor: arbitrumCanonicalProcessor,
collector: {
queue: arbitrumCanonicalEventCollector,
worker: arbitrumCollectorWorker,
},
},
optimism: {
processor: optimismCanonicalProcessor,
collector: {
queue: optimismCanonicalEventCollector,
worker: optimismCollectorWorker,
},
},
zkSync: {
processor: zkSyncCanonicalProcessor,
collector: {
queue: zkSyncCanonicalEventCollector,
worker: zkSyncCollectorWorker,
},
},
}
}
83 changes: 83 additions & 0 deletions src/queues/deployment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Logger } from '@l2beat/backend-tools'
import { PrismaClient } from '../db/prisma.js'
import { NetworkConfig, withExplorer } from '../utils/getNetworksConfig.js'
import { Redis } from 'ioredis'
import { eventRouter } from '../utils/queue/router/index.js'
import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js'
import { TokenPayload } from './payloads.js'
import { wrapDeploymentUpdatedQueue } from '../utils/queue/wrap.js'
import { setupQueue } from '../utils/queue/setup-queue.js'
import { buildDeploymentSource } from '../sources/deployment.js'
import { byTokenChainId } from '../utils/queue/router/routing-key-rules.js'

export async function setupDeploymentQueues({
connection,
db,
logger,
networksConfig,
}: {
connection: Redis
db: PrismaClient
logger: Logger
networksConfig: NetworkConfig[]
}) {
const deps = {
connection,
logger,
}

const router = eventRouter(deps)
const queue = setupQueue(deps)
const queueWithProcessor = setupQueueWithProcessor(deps)

// Routing inbox where TokenUpdate events are broadcasted from independent sources
const routingInbox = queue<TokenPayload>({ name: 'DeploymentRoutingInbox' })

// Output queue for the deployment processors where the tokenIds are broadcasted if the deployment is updated
const updatedInbox = queue<TokenPayload>({ name: 'DeploymentUpdatedInbox' })
const updatedQueue = wrapDeploymentUpdatedQueue(updatedInbox)

// For each supported network with an explorer, create a deployment processor
const buses = networksConfig.filter(withExplorer).map((networkConfig) => {
const processor = buildDeploymentSource({
logger,
db,
networkConfig,
queue: updatedQueue,
})
const { queue, worker } = queueWithProcessor<TokenPayload>({
name: `DeploymentProcessor:${networkConfig.name}`,
processor: (job) => processor(job.data.tokenId),
})

return { queue, worker, routingKey: networkConfig.chainId }
})

// Route the events from routing inbox to the per-chain deployment processors
const routeingWorker = router.routingKey({
from: routingInbox,
to: buses,
extractRoutingKey: byTokenChainId({ db }),
})

async function start() {
const statusLogger = logger.for('DeploymentQueuesModule')

buses.forEach(({ worker }) => worker.run())
routeingWorker.run()

statusLogger.info('Started')
}

return {
start,
routing: {
inbox: routingInbox,
worker: routeingWorker,
},
update: {
inbox: updatedInbox,
},
buses,
}
}
118 changes: 118 additions & 0 deletions src/queues/independent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Logger } from '@l2beat/backend-tools'
import { Redis } from 'ioredis'
import { PrismaClient } from '../db/prisma.js'
import { setupQueue } from '../utils/queue/setup-queue.js'
import { setupQueueWithProcessor } from '../utils/queue/queue-with-processor.js'
import { TokenPayload } from './payloads.js'
import { wrapTokenQueue } from '../utils/queue/wrap.js'
import { buildCoingeckoSource } from '../sources/coingecko.js'
import { buildAxelarConfigSource } from '../sources/axelarConfig.js'
import { buildWormholeSource } from '../sources/wormhole.js'
import { buildOrbitSource } from '../sources/orbit.js'
import { buildTokenListSource } from '../sources/tokenList.js'
import { NetworkConfig } from '../utils/getNetworksConfig.js'
import { buildAxelarGatewaySource } from '../sources/axelarGateway.js'

export async function setupIndependentQueues({
db,
logger,
connection,
networksConfig,
}: {
db: PrismaClient
logger: Logger
connection: Redis
networksConfig: NetworkConfig[]
}) {
const deps = {
connection,
logger,
}
const queue = setupQueue(deps)
const queueWithProcessor = setupQueueWithProcessor(deps)

const tokenUpdateInbox = queue<TokenPayload>({ name: 'TokenUpdateInbox' })
const tokenUpdateQueue = wrapTokenQueue(tokenUpdateInbox)

const lists = [
{ tag: '1INCH', url: 'https://tokens.1inch.eth.link' },
{ tag: 'AAVE', url: 'http://tokenlist.aave.eth.link' },
{ tag: 'MYCRYPTO', url: 'https://uniswap.mycryptoapi.com/' },
{
tag: 'SUPERCHAIN',
url: 'https://static.optimism.io/optimism.tokenlist.json',
},
]

const coingecko = queueWithProcessor({
name: 'CoingeckoProcessor',
processor: buildCoingeckoSource({ logger, db, queue: tokenUpdateQueue }),
})

const axelarConfig = queueWithProcessor({
name: 'AxelarConfigProcessor',
processor: buildAxelarConfigSource({ logger, db, queue: tokenUpdateQueue }),
})

const wormhole = queueWithProcessor({
name: 'WormholeProcessor',
processor: buildWormholeSource({ logger, db, queue: tokenUpdateQueue }),
})

const orbit = queueWithProcessor({
name: 'OrbitProcessor',
processor: buildOrbitSource({ logger, db, queue: tokenUpdateQueue }),
})

const tokenLists = lists.map(({ tag, url }) =>
queueWithProcessor({
name: `TokenListProcessor:${tag}`,
processor: buildTokenListSource({
tag,
url,
logger,
db,
queue: tokenUpdateQueue,
}),
}),
)

const axelarGateway = networksConfig.map((networkConfig) =>
queueWithProcessor({
name: `AxelarGatewayProcessor:${networkConfig.name}`,
processor: buildAxelarGatewaySource({
logger,
db,
networkConfig,
queue: tokenUpdateQueue,
}),
}),
)

async function start() {
const statusLogger = logger.for('IndependentQueuesModule')
statusLogger.info('Starting')

coingecko.worker.run()
axelarConfig.worker.run()
wormhole.worker.run()
orbit.worker.run()
tokenLists.forEach(({ worker }) => worker.run())
axelarGateway.forEach(({ worker }) => worker.run())

statusLogger.info('Started')
}

return {
start,
sources: {
coingecko,
axelarConfig,
wormhole,
orbit,
tokenLists,
axelarGateway,
},
inbox: tokenUpdateInbox,
}
}
Loading