diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index 268366960..b2b057053 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -141,6 +141,12 @@ export const createApis = async (ctx) => { onCreateWorker: onCreateWorker('primary') }) const primaryWorkQueue = new PQueue({ concurrency: maxPrimaryWorkerThreads }) + primaryWorkQueue.on('add', () => { + pendingEvaluationCounter.inc(1, { type: 'primary' }) + }) + primaryWorkQueue.on('next', () => { + pendingEvaluationCounter.dec(1, { type: 'primary' }) + }) const maxDryRunWorkerTheads = Math.max( 1, @@ -152,6 +158,12 @@ export const createApis = async (ctx) => { maxQueueSize: ctx.WASM_EVALUATION_WORKERS_DRY_RUN_MAX_QUEUE }) const dryRunWorkQueue = new PQueue({ concurrency: maxDryRunWorkerTheads }) + dryRunWorkQueue.on('add', () => { + pendingEvaluationCounter.inc(1, { type: 'dry-run' }) + }) + dryRunWorkQueue.on('next', () => { + pendingEvaluationCounter.dec(1, { type: 'dry-run' }) + }) const arweave = ArweaveClient.createWalletClient() const address = ArweaveClient.addressWith({ WALLET: ctx.WALLET, arweave }) @@ -294,6 +306,12 @@ export const createApis = async (ctx) => { labelNames: ['stream_type', 'message_type', 'process_error'] }) + const pendingEvaluationCounter = MetricsClient.counterWith({})({ + name: 'ao_process_pending_evaluations', + description: 'The total number of pending evaluations on a CU', + labelNames: ['type'] + }) + /** * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens */ @@ -330,6 +348,7 @@ export const createApis = async (ctx) => { logger }), evaluationCounter, + pendingEvaluationCounter, // gasCounter, saveProcess: AoProcessClient.saveProcessWith({ db, logger }), findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),