diff --git a/index.js b/index.js index f1f44a8..58751a3 100644 --- a/index.js +++ b/index.js @@ -28,6 +28,7 @@ metricsServer.start() }, abortWithError) await runGtfsMatching({ + logger, natsClient, natsJetstreamClient, natsJetstreamManager, diff --git a/lib/match.js b/lib/match.js index e2fdc48..7f2ec75 100644 --- a/lib/match.js +++ b/lib/match.js @@ -55,10 +55,12 @@ const abortWithError = (err) => { const runGtfsMatching = async (cfg, opt = {}) => { const { + logger: serviceLogger, natsClient, natsJetstreamClient, natsJetstreamManager, } = cfg + ok(serviceLogger) ok(natsClient) ok(natsJetstreamClient) ok(natsJetstreamManager) @@ -84,9 +86,9 @@ const runGtfsMatching = async (cfg, opt = {}) => { ...opt, } - const matchedTotal = new Counter({ - name: 'matched_total', - help: 'nr. of successfully matched movements/trips', + const successesTotal = new Counter({ + name: 'matching_successes_total', + help: 'number of successfully matched movements/trips', registers: [register], labelNames: [ 'cached', @@ -94,10 +96,15 @@ const runGtfsMatching = async (cfg, opt = {}) => { }) const failuresTotal = new Counter({ name: 'matching_failures_total', - help: 'nr. of matching failures', + help: 'number of matching failures', registers: [register], - labelNames: [ - ], + labelNames: [], + }) + const errorsTotal = new Counter({ + name: 'matching_errors_total', + help: 'number of errors that have occured while matching', + registers: [register], + labelNames: [], }) const matchingTimeSeconds = new Summary({ name: 'matching_time_seconds', @@ -116,7 +123,7 @@ const runGtfsMatching = async (cfg, opt = {}) => { logger, }) - const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async (vdvAusIstFahrt) => { + const matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate = async (vdvAusIstFahrt, msg) => { try { const { item: gtfsRtTripUpdate, @@ -127,17 +134,24 @@ const runGtfsMatching = async (cfg, opt = {}) => { const topic = getNatsTopicFromGtfsRtTripUpdate(gtfsRtTripUpdate) - logger.debug({ + logger.trace({ topic, + isMatched, + isCached, + matchingTime, gtfsRtTripUpdate, + // todo: log just a slice? vdvAusIstFahrt, + natsMsgSeq: msg.seq, }, 'publishing GTFS-RT TripUpdate') natsJetstreamClient.publish(topic, natsJson.encode(gtfsRtTripUpdate)) if (isMatched) { - matchedTotal.inc({ + successesTotal.inc({ isCached: isCached ? '1' : '0', }) + } else { + failuresTotal.inc() } matchingTimeSeconds.observe({ matched: isMatched ? '1' : '0', @@ -150,14 +164,15 @@ const runGtfsMatching = async (cfg, opt = {}) => { logger.warn({ err, vdvAusIstFahrt, + natsMsgSeq: msg.seq, }, `failed to match trip: ${err.message || (err + '')}`) - failuresTotal.inc() + errorsTotal.inc() } } const processAusIstFahrtMsg = async (msg) => { const {subject, seq, data} = msg - logger.trace({ + serviceLogger.debug({ subject, seq, dataSlice: data.slice(0, 100).toString('utf8'), @@ -165,16 +180,16 @@ const runGtfsMatching = async (cfg, opt = {}) => { try { const ausIstFahrt = msg.json(data) // todo: validate against schema, error-log and abort if invalid - await matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate(ausIstFahrt) + await matchVdvAusIstFahrtAndPublishAsGtfsRtTripUpdate(ausIstFahrt, msg) - logger.trace({ + serviceLogger.trace({ subject, seq, }, 'successfully processed AUS IstFahrt msg') msg.ack() } catch (err) { // We catch all non-programmer errors in order not to abort the message processing (see below). - logger.warn({ + serviceLogger.warn({ err, subject, seq, @@ -198,7 +213,7 @@ const runGtfsMatching = async (cfg, opt = {}) => { ], // todo: limits? }) - logger.debug({ + serviceLogger.debug({ streamInfo, }, 'created/re-used NATS JetStream stream for AUS IstFahrts') } @@ -211,7 +226,7 @@ const runGtfsMatching = async (cfg, opt = {}) => { ], // todo: limits? }) - logger.debug({ + serviceLogger.debug({ streamInfo, }, 'created/re-used NATS JetStream stream for GTFS-RT data') } @@ -228,7 +243,7 @@ const runGtfsMatching = async (cfg, opt = {}) => { // todo: add trip ID to topic, consume with `DeliverLastPerSubject`? – would not work for partial IstFahrts }) - logger.debug({ + serviceLogger.debug({ consumerInfo, }, 'created/re-used NATS JetStream consumer')