Skip to content

Commit

Permalink
Merge pull request #546 from vtex/fix/requestMetrics
Browse files Browse the repository at this point in the history
Fix metrics being restricted by tracing restrictions
  • Loading branch information
filafb authored Oct 4, 2023
2 parents 566900b + 44170b7 commit 1966d53
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 45 deletions.
70 changes: 70 additions & 0 deletions src/service/metrics/requestMetricsMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { finished as onStreamFinished } from 'stream'
import { hrToMillisFloat } from '../../utils'
import {
createConcurrentRequestsInstrument,
createRequestsResponseSizesInstrument,
createRequestsTimingsInstrument,
createTotalAbortedRequestsInstrument,
createTotalRequestsInstrument,
RequestsMetricLabels,
} from '../tracing/metrics/instruments'
import { ServiceContext } from '../worker/runtime/typings'


export const addRequestMetricsMiddleware = () => {
const concurrentRequests = createConcurrentRequestsInstrument()
const requestTimings = createRequestsTimingsInstrument()
const totalRequests = createTotalRequestsInstrument()
const responseSizes = createRequestsResponseSizesInstrument()
const abortedRequests = createTotalAbortedRequestsInstrument()

return async function addRequestMetrics(ctx: ServiceContext, next: () => Promise<void>) {
const start = process.hrtime()
concurrentRequests.inc(1)

ctx.req.once('aborted', () =>
abortedRequests.inc({ [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName }, 1)
)

let responseClosed = false
ctx.res.once('close', () => (responseClosed = true))

try {
await next()
} finally {
const responseLength = ctx.response.length
if (responseLength) {
responseSizes.observe(
{ [RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName },
responseLength
)
}

totalRequests.inc(
{
[RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName,
[RequestsMetricLabels.STATUS_CODE]: ctx.response.status,
},
1
)

const onResFinished = () => {
requestTimings.observe(
{
[RequestsMetricLabels.REQUEST_HANDLER]: ctx.requestHandlerName,
},
hrToMillisFloat(process.hrtime(start))
)

concurrentRequests.dec(1)
}

if (responseClosed) {
onResFinished()
} else {
onStreamFinished(ctx.res, onResFinished)
}
}
}
}

48 changes: 3 additions & 45 deletions src/service/tracing/tracingMiddlewares.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,19 @@ import { RuntimeLogFields } from '../../tracing/LogFields'
import { CustomHttpTags, OpentracingTags, VTEXIncomingRequestTags } from '../../tracing/Tags'
import { UserLandTracer } from '../../tracing/UserLandTracer'
import { cloneAndSanitizeHeaders } from '../../tracing/utils'
import { hrToMillis, hrToMillisFloat } from '../../utils'
import { hrToMillis } from '../../utils'
import { ServiceContext } from '../worker/runtime/typings'
import {
createConcurrentRequestsInstrument,
createRequestsResponseSizesInstrument,
createRequestsTimingsInstrument,
createTotalAbortedRequestsInstrument,
createTotalRequestsInstrument,
RequestsMetricLabels,
} from './metrics/instruments'

const PATHS_BLACKLISTED_FOR_TRACING = ['/_status', '/healthcheck']

export const addTracingMiddleware = (tracer: Tracer) => {
const concurrentRequests = createConcurrentRequestsInstrument()
const requestTimings = createRequestsTimingsInstrument()
const totalRequests = createTotalRequestsInstrument()
const responseSizes = createRequestsResponseSizesInstrument()
const abortedRequests = createTotalAbortedRequestsInstrument()

return async function addTracing(ctx: ServiceContext, next: () => Promise<void>) {
const start = process.hrtime()
concurrentRequests.inc(1)
const rootSpan = tracer.extract(FORMAT_HTTP_HEADERS, ctx.request.headers) as undefined | SpanContext
ctx.tracing = { tracer, currentSpan: undefined}

if (!shouldTrace(ctx, rootSpan)) {
await next()
concurrentRequests.dec(1)
return
}

Expand All @@ -47,9 +31,6 @@ export const addTracingMiddleware = (tracer: Tracer) => {
const initialSamplingDecision = getTraceInfo(currentSpan).isSampled

ctx.tracing = { currentSpan, tracer }
ctx.req.once('aborted', () =>
abortedRequests.inc({ [RequestsMetricLabels.REQUEST_HANDLER]: (currentSpan as any).operationName as string }, 1)
)

let responseClosed = false
ctx.res.once('close', () => (responseClosed = true))
Expand All @@ -60,22 +41,6 @@ export const addTracingMiddleware = (tracer: Tracer) => {
ErrorReport.create({ originalError: err }).injectOnSpan(currentSpan, ctx.vtex?.logger)
throw err
} finally {
const responseLength = ctx.response.length
if (responseLength) {
responseSizes.observe(
{ [RequestsMetricLabels.REQUEST_HANDLER]: (currentSpan as any).operationName as string },
responseLength
)
}

totalRequests.inc(
{
[RequestsMetricLabels.REQUEST_HANDLER]: (currentSpan as any).operationName as string,
[RequestsMetricLabels.STATUS_CODE]: ctx.response.status,
},
1
)

const traceInfo = getTraceInfo(currentSpan)
if (traceInfo?.isSampled) {
if (!initialSamplingDecision) {
Expand All @@ -98,14 +63,6 @@ export const addTracingMiddleware = (tracer: Tracer) => {
}

const onResFinished = () => {
requestTimings.observe(
{
[RequestsMetricLabels.REQUEST_HANDLER]: (currentSpan as any).operationName as string,
},
hrToMillisFloat(process.hrtime(start))
)

concurrentRequests.dec(1)
currentSpan?.finish()
}

Expand All @@ -120,7 +77,8 @@ export const addTracingMiddleware = (tracer: Tracer) => {

export const nameSpanOperationMiddleware = (operationType: string, operationName: string) => {
return function nameSpanOperation(ctx: ServiceContext, next: () => Promise<void>) {
ctx.tracing?.currentSpan?.setOperationName(`${operationType}:${operationName}`)
ctx.requestHandlerName = `${operationType}:${operationName}`
ctx.tracing?.currentSpan?.setOperationName(ctx.requestHandlerName)
return next()
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/service/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { MetricsAccumulator } from '../../metrics/MetricsAccumulator'
import { getService } from '../loaders'
import { logOnceToDevConsole } from '../logger/console'
import { LogLevel } from '../logger/logger'
import { addRequestMetricsMiddleware } from '../metrics/requestMetricsMiddleware'
import { TracerSingleton } from '../tracing/TracerSingleton'
import { addTracingMiddleware } from '../tracing/tracingMiddlewares'
import { addProcessListeners, logger } from './listeners'
Expand Down Expand Up @@ -221,6 +222,7 @@ export const startWorker = (serviceJSON: ServiceJSON) => {
.use(error)
.use(prometheusLoggerMiddleware())
.use(addTracingMiddleware(tracer))
.use(addRequestMetricsMiddleware())
.use(addMetricsLoggerMiddleware())
.use(concurrentRateLimiter(serviceJSON?.rateLimitPerReplica?.concurrent))
.use(compress())
Expand Down
1 change: 1 addition & 0 deletions src/service/worker/runtime/typings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export interface Context<T extends IOClients> {
timings: Record<string, [number, number]>
metrics: Record<string, [number, number]>
previousTimerStart: [number, number]
requestHandlerName?: string
serverTiming?: ServerTiming
tracing?: TracingContext
}
Expand Down

0 comments on commit 1966d53

Please sign in to comment.