Skip to content

Commit

Permalink
Message processing metrics manager (#1050)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjamrog authored Feb 4, 2025
1 parent 37a9a49 commit 7978771
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 2 deletions.
14 changes: 14 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@message-queue-toolkit/schemas": "^4.0.0",
"@message-queue-toolkit/sns": "^20.0.0",
"@message-queue-toolkit/sqs": "^19.0.0",
"@message-queue-toolkit/metrics": "^2.0.0",
"@scalar/fastify-api-reference": "^1.25.108",
"@supercharge/promise-pool": "^3.2.0",
"amqplib": "^0.10.5",
Expand Down
11 changes: 11 additions & 0 deletions src/infrastructure/commonDiConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
STALENESS_THRESHOLD_IN_MSECS,
type SupportedHealthchecks,
} from './healthchecks/healthchecks.js'
import { MessageProcessingMetricsManager } from './metrics/MessageProcessingMetricsManager.js'
import { SINGLETON_CONFIG } from './parentDiConfig.js'
import type { ExternalDependencies } from './parentDiConfig.js'

Expand Down Expand Up @@ -276,6 +277,14 @@ export function resolveCommonDiConfig(
healthchecks: asFunction((dependencies: CommonDependencies) => {
return [dependencies.redisHealthcheck, dependencies.dbHealthcheck]
}, SINGLETON_CONFIG),

messageProcessingMetricsManager: asFunction(
() =>
dependencies.app?.metrics
? new MessageProcessingMetricsManager(dependencies.app.metrics)
: undefined,
SINGLETON_CONFIG,
),
}
}

Expand Down Expand Up @@ -308,4 +317,6 @@ export type CommonDependencies = {
dbHealthcheck: DbHealthcheck
healthcheckStore: HealthcheckResultsStore<SupportedHealthchecks>
healthchecks: readonly Healthcheck[]

messageProcessingMetricsManager?: MessageProcessingMetricsManager
}
39 changes: 39 additions & 0 deletions src/infrastructure/metrics/MessageProcessingMetricsManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { ConsumerBaseMessageType, ProcessedMessageMetadata } from '@message-queue-toolkit/core'
import {
MessageLifetimeMetric,
MessageProcessingMultiMetrics,
MessageProcessingTimeMetric,
} from '@message-queue-toolkit/metrics'
import type { IFastifyMetrics } from 'fastify-metrics'

export class MessageProcessingMetricsManager extends MessageProcessingMultiMetrics<ConsumerBaseMessageType> {
constructor(appMetrics: IFastifyMetrics) {
const buckets = [100, 200, 300, 500, 1000, 3000, 10000, 50000, 100000]
const messageVersionResolver = (
messageMetadata: ProcessedMessageMetadata<ConsumerBaseMessageType>,
) => {
return messageMetadata.message?.metadata.schemaVersion
}

super([
new MessageProcessingTimeMetric(
{
name: 'message_processing_milliseconds',
helpDescription: 'Message processing time in milliseconds',
buckets,
messageVersion: messageVersionResolver,
},
appMetrics.client,
),
new MessageLifetimeMetric(
{
name: 'message_lifetime_milliseconds',
helpDescription: 'Message lifetime in milliseconds',
buckets,
messageVersion: messageVersionResolver,
},
appMetrics.client,
),
])
}
}
1 change: 1 addition & 0 deletions src/modules/users/consumers/PermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class PermissionConsumer extends AbstractAmqpTopicConsumer<
errorReporter: dependencies.errorReporter,
logger: dependencies.logger,
transactionObservabilityManager: dependencies.transactionObservabilityManager,
messageMetricsManager: dependencies.messageProcessingMetricsManager,
},
{
creationConfig: {
Expand Down
50 changes: 48 additions & 2 deletions src/modules/users/consumers/PermissionsConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { Cradle } from '@fastify/awilix'
import { waitAndRetry } from '@message-queue-toolkit/core'
import { type MessagePublishType, waitAndRetry } from '@message-queue-toolkit/core'
import type { Channel } from 'amqplib'
import type { AwilixContainer } from 'awilix'
import { asClass } from 'awilix'

import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { DB_MODEL, cleanTables } from '../../../../test/DbCleaner.js'
import { FakeConsumerErrorResolver } from '../../../../test/fakes/FakeConsumerErrorResolver.js'
import { createRequestContext } from '../../../../test/requestUtils.js'
Expand All @@ -19,6 +19,7 @@ import { user as userTable } from '../../../db/schema/user.js'
import type { PublisherManager } from '../../../infrastructure/commonDiConfig.js'
import { buildQueueMessage } from '../../../utils/queueUtils.js'
import { PermissionConsumer } from './PermissionConsumer.js'
import type { PermissionsMessages } from './permissionsMessageSchemas.js'

const userIds = [generateUuid7(), generateUuid7(), generateUuid7()]
const perms: [string, ...string[]] = ['perm1', 'perm2']
Expand Down Expand Up @@ -62,6 +63,7 @@ describe('PermissionsConsumer', () => {
app = await getApp(
{
queuesEnabled: [PermissionConsumer.QUEUE_NAME],
monitoringEnabled: true,
},
{
consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG),
Expand Down Expand Up @@ -234,5 +236,49 @@ describe('PermissionsConsumer', () => {
// We fail first when doing real reading, and second one when trying to extract an id
expect(fakeResolver.handleErrorCallsCount).toBe(2)
})

it('Registers message processing metrics', async () => {
// Given
const { permissionsService, drizzle } = diContainer.cradle
await createUsers(drizzle, userIds)

expect(diContainer.cradle.messageProcessingMetricsManager).toBeDefined()
const metricsSpy = vi.spyOn(
diContainer.cradle.messageProcessingMetricsManager!,
'registerProcessedMessage',
)

// When
const messageId = 'testId'
const message = {
id: messageId,
payload: {
userIds,
permissions: perms,
},
type: 'permissions.added',
} satisfies MessagePublishType<typeof PermissionsMessages.added>
publisher.publishSync('permissions', message)

// Then
await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed')

const usersPermissions = await resolvePermissions(permissionsService, userIds)

expect(usersPermissions).toBeDefined()
expect(usersPermissions).not.toBeNull()
expect(usersPermissions![0]).toHaveLength(2)

expect(metricsSpy).toHaveBeenCalledWith({
messageId: messageId,
messageType: 'permissions.added',
processingResult: 'consumed',
message: expect.objectContaining(message),
queueName: PermissionConsumer.QUEUE_NAME,
messageTimestamp: expect.any(Number),
messageProcessingStartTimestamp: expect.any(Number),
messageProcessingEndTimestamp: expect.any(Number),
})
})
})
})

0 comments on commit 7978771

Please sign in to comment.