Skip to content

Commit

Permalink
matching: tweak logging & metrics 💥
Browse files Browse the repository at this point in the history
This is a breaking change because it
- changes log names
- renames metrics
  • Loading branch information
derhuerst committed Aug 29, 2024
1 parent 074f30a commit 9ea0060
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ metricsServer.start()
}, abortWithError)

await runGtfsMatching({
logger,
natsClient,
natsJetstreamClient,
natsJetstreamManager,
Expand Down
49 changes: 32 additions & 17 deletions lib/match.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ const abortWithError = (err) => {

const runGtfsMatching = async (cfg, opt = {}) => {
const {
logger: serviceLogger,
natsClient,
natsJetstreamClient,
natsJetstreamManager,
} = cfg
ok(serviceLogger)
ok(natsClient)
ok(natsJetstreamClient)
ok(natsJetstreamManager)
Expand All @@ -84,20 +86,25 @@ const runGtfsMatching = async (cfg, opt = {}) => {
...opt,
}

const matchedTotal = new Counter({
name: 'matched_total',
help: 'nr. of successfully matched movements/trips',
const successesTotal = new Counter({
name: 'matching_successes_total',
help: 'number of successfully matched movements/trips',
registers: [register],
labelNames: [
'cached',
],
})
const failuresTotal = new Counter({
name: 'matching_failures_total',
help: 'nr. of matching failures',
help: 'number of matching failures',
registers: [register],
labelNames: [
],
labelNames: [],
})
const errorsTotal = new Counter({
name: 'matching_errors_total',
help: 'number of errors that have occured while matching',
registers: [register],
labelNames: [],
})
const matchingTimeSeconds = new Summary({
name: 'matching_time_seconds',
Expand All @@ -116,7 +123,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
logger,
})

const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async (vdvAusIstFahrt) => {
const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async (vdvAusIstFahrt, msg) => {
try {
const {
item: gtfsRtTripUpdate,
Expand All @@ -127,17 +134,24 @@ const runGtfsMatching = async (cfg, opt = {}) => {

const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate)

logger.debug({
logger.trace({
topic,
isMatched,
isCached,
matchingTime,
gtfsRtTripUpdate,
// todo: log just a slice?
vdvAusIstFahrt,
natsMsgSeq: msg.seq,
}, 'publishing GTFS-RT TripUpdate')
natsJetstreamClient.publish(topic, natsJson.encode(gtfsRtTripUpdate))

if (isMatched) {
matchedTotal.inc({
successesTotal.inc({
isCached: isCached ? '1' : '0',
})
} else {
failuresTotal.inc()
}
matchingTimeSeconds.observe({
matched: isMatched ? '1' : '0',
Expand All @@ -150,31 +164,32 @@ const runGtfsMatching = async (cfg, opt = {}) => {
logger.warn({
err,
vdvAusIstFahrt,
natsMsgSeq: msg.seq,
}, `failed to match trip: ${err.message || (err + '')}`)
failuresTotal.inc()
errorsTotal.inc()
}
}

const processAusIstFahrtMsg = async (msg) => {
const {subject, seq, data} = msg
logger.trace({
serviceLogger.debug({
subject,
seq,
dataSlice: data.slice(0, 100).toString('utf8'),
}, 'processing AUS IstFahrt msg')
try {
const ausIstFahrt = msg.json(data)
// todo: validate against schema, error-log and abort if invalid
await matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate(ausIstFahrt)
await matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate(ausIstFahrt, msg)

logger.trace({
serviceLogger.trace({
subject,
seq,
}, 'successfully processed AUS IstFahrt msg')
msg.ack()
} catch (err) {
// We catch all non-programmer errors in order not to abort the message processing (see below).
logger.warn({
serviceLogger.warn({
err,
subject,
seq,
Expand All @@ -198,7 +213,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
],
// todo: limits?
})
logger.debug({
serviceLogger.debug({
streamInfo,
}, 'created/re-used NATS JetStream stream for AUS IstFahrts')
}
Expand All @@ -211,7 +226,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {
],
// todo: limits?
})
logger.debug({
serviceLogger.debug({
streamInfo,
}, 'created/re-used NATS JetStream stream for GTFS-RT data')
}
Expand All @@ -228,7 +243,7 @@ const runGtfsMatching = async (cfg, opt = {}) => {

// todo: add trip ID to topic, consume with `DeliverLastPerSubject`? – would not work for partial IstFahrts
})
logger.debug({
serviceLogger.debug({
consumerInfo,
}, 'created/re-used NATS JetStream consumer')

Expand Down

0 comments on commit 9ea0060

Please sign in to comment.