From 79787710e4fefcac6c127bfb8907650e11b3aa43 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 4 Feb 2025 12:57:23 +0100 Subject: [PATCH] Message processing metrics manager (#1050) --- package-lock.json | 14 ++++++ package.json | 1 + src/infrastructure/commonDiConfig.ts | 11 ++++ .../MessageProcessingMetricsManager.ts | 39 +++++++++++++++ .../users/consumers/PermissionConsumer.ts | 1 + .../consumers/PermissionsConsumer.spec.ts | 50 ++++++++++++++++++- 6 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 src/infrastructure/metrics/MessageProcessingMetricsManager.ts diff --git a/package-lock.json b/package-lock.json index cbac805c..8c9c0acb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,6 +28,7 @@ "@lokalise/zod-extras": "^2.1.0", "@message-queue-toolkit/amqp": "^18.0.0", "@message-queue-toolkit/core": "^19.0.0", + "@message-queue-toolkit/metrics": "^2.0.0", "@message-queue-toolkit/schemas": "^4.0.0", "@message-queue-toolkit/sns": "^20.0.0", "@message-queue-toolkit/sqs": "^19.0.0", @@ -4524,6 +4525,19 @@ "zod": "^3.23.8" } }, + "node_modules/@message-queue-toolkit/metrics": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/@message-queue-toolkit/metrics/-/metrics-2.0.0.tgz", + "integrity": "sha512-PajAmdx/74l4hBTWCpRDqya7JM4Fx+5xHyMX27bpDloQEeOw4/YOftRuNDVxeVVbP17f1KiF3QIhLZGS7vhr2g==", + "license": "MIT", + "dependencies": { + "@lokalise/node-core": "^13.3.0" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=19.0.0", + "prom-client": ">=15.0.0" + } + }, "node_modules/@message-queue-toolkit/schemas": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@message-queue-toolkit/schemas/-/schemas-4.0.0.tgz", diff --git a/package.json b/package.json index 56dd0c30..96bc44b4 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "@message-queue-toolkit/schemas": "^4.0.0", "@message-queue-toolkit/sns": "^20.0.0", "@message-queue-toolkit/sqs": "^19.0.0", + "@message-queue-toolkit/metrics": "^2.0.0", "@scalar/fastify-api-reference": "^1.25.108", "@supercharge/promise-pool": "^3.2.0", "amqplib": "^0.10.5", diff --git a/src/infrastructure/commonDiConfig.ts b/src/infrastructure/commonDiConfig.ts index e9cd0d05..b3e2ed51 100644 --- a/src/infrastructure/commonDiConfig.ts +++ b/src/infrastructure/commonDiConfig.ts @@ -47,6 +47,7 @@ import { STALENESS_THRESHOLD_IN_MSECS, type SupportedHealthchecks, } from './healthchecks/healthchecks.js' +import { MessageProcessingMetricsManager } from './metrics/MessageProcessingMetricsManager.js' import { SINGLETON_CONFIG } from './parentDiConfig.js' import type { ExternalDependencies } from './parentDiConfig.js' @@ -276,6 +277,14 @@ export function resolveCommonDiConfig( healthchecks: asFunction((dependencies: CommonDependencies) => { return [dependencies.redisHealthcheck, dependencies.dbHealthcheck] }, SINGLETON_CONFIG), + + messageProcessingMetricsManager: asFunction( + () => + dependencies.app?.metrics + ? new MessageProcessingMetricsManager(dependencies.app.metrics) + : undefined, + SINGLETON_CONFIG, + ), } } @@ -308,4 +317,6 @@ export type CommonDependencies = { dbHealthcheck: DbHealthcheck healthcheckStore: HealthcheckResultsStore healthchecks: readonly Healthcheck[] + + messageProcessingMetricsManager?: MessageProcessingMetricsManager } diff --git a/src/infrastructure/metrics/MessageProcessingMetricsManager.ts b/src/infrastructure/metrics/MessageProcessingMetricsManager.ts new file mode 100644 index 00000000..84a2f0e0 --- /dev/null +++ b/src/infrastructure/metrics/MessageProcessingMetricsManager.ts @@ -0,0 +1,39 @@ +import type { ConsumerBaseMessageType, ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import { + MessageLifetimeMetric, + MessageProcessingMultiMetrics, + MessageProcessingTimeMetric, +} from '@message-queue-toolkit/metrics' +import type { IFastifyMetrics } from 'fastify-metrics' + +export class MessageProcessingMetricsManager extends MessageProcessingMultiMetrics { + constructor(appMetrics: IFastifyMetrics) { + const buckets = [100, 200, 300, 500, 1000, 3000, 10000, 50000, 100000] + const messageVersionResolver = ( + messageMetadata: ProcessedMessageMetadata, + ) => { + return messageMetadata.message?.metadata.schemaVersion + } + + super([ + new MessageProcessingTimeMetric( + { + name: 'message_processing_milliseconds', + helpDescription: 'Message processing time in milliseconds', + buckets, + messageVersion: messageVersionResolver, + }, + appMetrics.client, + ), + new MessageLifetimeMetric( + { + name: 'message_lifetime_milliseconds', + helpDescription: 'Message lifetime in milliseconds', + buckets, + messageVersion: messageVersionResolver, + }, + appMetrics.client, + ), + ]) + } +} diff --git a/src/modules/users/consumers/PermissionConsumer.ts b/src/modules/users/consumers/PermissionConsumer.ts index 8f779e83..6d282f5d 100644 --- a/src/modules/users/consumers/PermissionConsumer.ts +++ b/src/modules/users/consumers/PermissionConsumer.ts @@ -41,6 +41,7 @@ export class PermissionConsumer extends AbstractAmqpTopicConsumer< errorReporter: dependencies.errorReporter, logger: dependencies.logger, transactionObservabilityManager: dependencies.transactionObservabilityManager, + messageMetricsManager: dependencies.messageProcessingMetricsManager, }, { creationConfig: { diff --git a/src/modules/users/consumers/PermissionsConsumer.spec.ts b/src/modules/users/consumers/PermissionsConsumer.spec.ts index e0c56446..27c4bf7b 100644 --- a/src/modules/users/consumers/PermissionsConsumer.spec.ts +++ b/src/modules/users/consumers/PermissionsConsumer.spec.ts @@ -1,10 +1,10 @@ import type { Cradle } from '@fastify/awilix' -import { waitAndRetry } from '@message-queue-toolkit/core' +import { type MessagePublishType, waitAndRetry } from '@message-queue-toolkit/core' import type { Channel } from 'amqplib' import type { AwilixContainer } from 'awilix' import { asClass } from 'awilix' -import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { DB_MODEL, cleanTables } from '../../../../test/DbCleaner.js' import { FakeConsumerErrorResolver } from '../../../../test/fakes/FakeConsumerErrorResolver.js' import { createRequestContext } from '../../../../test/requestUtils.js' @@ -19,6 +19,7 @@ import { user as userTable } from '../../../db/schema/user.js' import type { PublisherManager } from '../../../infrastructure/commonDiConfig.js' import { buildQueueMessage } from '../../../utils/queueUtils.js' import { PermissionConsumer } from './PermissionConsumer.js' +import type { PermissionsMessages } from './permissionsMessageSchemas.js' const userIds = [generateUuid7(), generateUuid7(), generateUuid7()] const perms: [string, ...string[]] = ['perm1', 'perm2'] @@ -62,6 +63,7 @@ describe('PermissionsConsumer', () => { app = await getApp( { queuesEnabled: [PermissionConsumer.QUEUE_NAME], + monitoringEnabled: true, }, { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), @@ -234,5 +236,49 @@ describe('PermissionsConsumer', () => { // We fail first when doing real reading, and second one when trying to extract an id expect(fakeResolver.handleErrorCallsCount).toBe(2) }) + + it('Registers message processing metrics', async () => { + // Given + const { permissionsService, drizzle } = diContainer.cradle + await createUsers(drizzle, userIds) + + expect(diContainer.cradle.messageProcessingMetricsManager).toBeDefined() + const metricsSpy = vi.spyOn( + diContainer.cradle.messageProcessingMetricsManager!, + 'registerProcessedMessage', + ) + + // When + const messageId = 'testId' + const message = { + id: messageId, + payload: { + userIds, + permissions: perms, + }, + type: 'permissions.added', + } satisfies MessagePublishType + publisher.publishSync('permissions', message) + + // Then + await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + + const usersPermissions = await resolvePermissions(permissionsService, userIds) + + expect(usersPermissions).toBeDefined() + expect(usersPermissions).not.toBeNull() + expect(usersPermissions![0]).toHaveLength(2) + + expect(metricsSpy).toHaveBeenCalledWith({ + messageId: messageId, + messageType: 'permissions.added', + processingResult: 'consumed', + message: expect.objectContaining(message), + queueName: PermissionConsumer.QUEUE_NAME, + messageTimestamp: expect.any(Number), + messageProcessingStartTimestamp: expect.any(Number), + messageProcessingEndTimestamp: expect.any(Number), + }) + }) }) })