diff --git a/package.json b/package.json index b0e7ad7..7e7fac6 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ }, "license": "Apache-2.0", "devDependencies": { + "@signalk/server-api": "^2.3.0", "@types/chai": "^4.3.3", "@types/express": "^4.17.17", "@types/mocha": "^9.1.1", diff --git a/src/influx.ts b/src/influx.ts index db27317..5a18260 100644 --- a/src/influx.ts +++ b/src/influx.ts @@ -14,6 +14,7 @@ */ import { SKContext } from '@chacal/signalk-ts' +import { Context, Path, PathValue, Source, SourceRef } from '@signalk/server-api' import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client' import { BucketsAPI, DbrpsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis' import { InfluxDB as InfluxV1 } from 'influx' @@ -68,17 +69,18 @@ export interface SKInfluxConfig { */ ignoredSources: string[] + /** + * @title Resolution (milliseconds) + * @default: 0 + * @description Time resolution of data written to the database. Zero means write all data, 1000 means write each context-path-source combination once per second. Updates arriving more quicker will not be written. + */ + resolution: number + writeOptions: Partial } type PartialBy = Omit & Partial> -interface PathValue { - path: string - // eslint-disable-next-line @typescript-eslint/no-explicit-any - value: any -} - export const influxPath = (path: string) => (path !== '' ? path : '') enum JsValueType { @@ -109,9 +111,17 @@ export class SKInflux { } = {} private ignoredPaths: string[] private ignoredSources: string[] + private lastWrittenTimestamps: { + [context:Context]: { + [path: Path]: { + [sourceRef:SourceRef]: number + } + } + } = {} + private resolution: number constructor(config: SKInfluxConfig, private logging: Logging, triggerStatusUpdate: () => void) { - const { org, bucket, url, onlySelf, ignoredPaths, ignoredSources } = config + const { org, bucket, url, onlySelf, ignoredPaths, ignoredSources, resolution } = config this.influx = new InfluxDB(config) this.org = org this.bucket = bucket @@ -119,6 +129,7 @@ export class SKInflux { this.onlySelf = onlySelf this.ignoredPaths = ignoredPaths this.ignoredSources = ignoredSources + this.resolution = resolution this.writeApi = this.influx.getWriteApi(org, bucket, 'ms', { // eslint-disable-next-line @typescript-eslint/no-unused-vars writeFailed: (_error, lines, _attempt, _expires) => { @@ -186,17 +197,48 @@ export class SKInflux { } } - handleValue(context: SKContext, isSelf: boolean, source: string, pathValue: PathValue) { - if (this.isIgnored(pathValue.path, source)) { + handleValue(context: Context, isSelf: boolean, sourceRef: SourceRef, pathValue: PathValue, now: number) { + if (this.isIgnored(pathValue.path, sourceRef)) { + return + } + if(!this.shouldStoreNow(context, pathValue.path, sourceRef, now)) { return } if (!this.onlySelf || isSelf) { - const point = toPoint(context, isSelf, source, pathValue, this.logging.debug) + const point = toPoint(context, isSelf, sourceRef, pathValue, this.logging.debug) if (point) { this.writeApi.writePoint(point) + this.updateLastWritten(context, pathValue.path, sourceRef, now) } } } + updateLastWritten(context: Context, path: Path, sourceRef: SourceRef, now: number) { + this.lastWrittenTimestamps[context][path][sourceRef] = now + } + + shouldStoreNow(context: Context, path: Path, sourceRef: SourceRef, now: number) { + const byContext = this.lastWrittenTimestamps[context] ?? (this.lastWrittenTimestamps[context] = {}) + const byPath = byContext[path] ?? (byContext[path] = {}) + return now - (byPath[sourceRef] || 0) > this.resolution + } + + /** + * Delete last written timestamps by context if the newest timestamp from + * that context is older than x times resolution + */ + pruneLastWrittenTimestamps() { + const now = Date.now() + Object.entries(this.lastWrittenTimestamps).forEach(([context, byContext]) => { + const maxTimestamp = Object.values(byContext).reduce((acc, byPath) => { + return Math.max(acc, Object.values(byPath).reduce((acc, bySource) => { + return Math.max(0, bySource) + }, 0)) + }, 0) + if (now - maxTimestamp > 100 * this.resolution) { + delete this.lastWrittenTimestamps[context as Context] + } + }) + } flush() { return this.writeApi.flush() @@ -258,36 +300,38 @@ const toPoint = ( if (isSelf) { point.tag(SELF_TAG_NAME, SELF_TAG_VALUE) } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const value = pathValue.value as any if ( pathValue.path === 'navigation.position' && - typeof pathValue.value === 'object' && - pathValue.value !== null && - pathValue.value.latitude !== null && - pathValue.value.longitude !== null && - !isNaN(pathValue.value.latitude) && - !isNaN(pathValue.value.longitude) + typeof value === 'object' && + value !== null && + value.latitude !== null && + value.longitude !== null && + !isNaN(value.latitude) && + !isNaN(value.longitude) ) { - point.floatField('lat', pathValue.value.latitude) - point.floatField('lon', pathValue.value.longitude) - point.tag('s2_cell_id', posToS2CellId(pathValue.value)) + point.floatField('lat', value.latitude) + point.floatField('lon', value.longitude) + point.tag('s2_cell_id', posToS2CellId(value)) } else { const valueType = typeFor(pathValue) - if (pathValue.value === null) { + if (value === null) { return } try { switch (valueType) { case JsValueType.number: - point.floatField('value', pathValue.value) + point.floatField('value', value) break case JsValueType.string: - point.stringField('value', pathValue.value) + point.stringField('value', value) break case JsValueType.boolean: - point.booleanField('value', pathValue.value) + point.booleanField('value', value) break case JsValueType.object: - point.stringField('value', JSON.stringify(pathValue.value)) + point.stringField('value', JSON.stringify(value)) } } catch (e) { debug(`Error creating point ${pathValue.path}:${pathValue.value} => ${valueType}`) diff --git a/src/plugin.test.ts b/src/plugin.test.ts index 62d8da7..65ebb58 100644 --- a/src/plugin.test.ts +++ b/src/plugin.test.ts @@ -62,6 +62,7 @@ describe('Plugin', () => { }, ignoredPaths: [], ignoredSources: [], + resolution: 0 }, ], }) diff --git a/src/plugin.ts b/src/plugin.ts index 623a82c..efa24d9 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -18,6 +18,7 @@ import { SKDelta } from '@chacal/signalk-ts' import { EventEmitter } from 'stream' import { registerHistoryApiRoute } from './HistoryAPI' import { IRouter } from 'express' +import { Context, PathValue, SourceRef } from '@signalk/server-api' // eslint-disable-next-line @typescript-eslint/no-var-requires const packageInfo = require('../package.json') @@ -71,6 +72,7 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin { const selfContext = 'vessels.' + app.selfId let skInfluxes: SKInflux[] + let onStop: (() => void)[] = [] return { start: function (config: PluginConfig) { const updatePluginStatus = () => { @@ -85,15 +87,23 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin { } skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config, app, updatePluginStatus)) registerHistoryApiRoute(app, skInfluxes[0], app.selfId, app.debug) + + onStop = [] + skInfluxes.forEach((skInflux) => { + const pruner = setInterval(() => skInflux.pruneLastWrittenTimestamps(), 5 * 60 * 1000) + onStop.push(() => clearInterval(pruner)) + }) + return Promise.all(skInfluxes.map((skInflux) => skInflux.init())).then(() => app.signalk.on('delta', (delta: SKDelta) => { + const now = Date.now() const isSelf = delta.context === selfContext delta.updates && delta.updates.forEach((update) => { update.values && update.values.forEach((pathValue) => { skInfluxes.forEach((skInflux) => - skInflux.handleValue(delta.context, isSelf, update.$source, pathValue), + skInflux.handleValue(delta.context as Context, isSelf, update.$source as SourceRef, pathValue as PathValue, now), ) }) }) @@ -102,7 +112,10 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin { }, // eslint-disable-next-line @typescript-eslint/no-empty-function - stop: function () {}, + stop: () => { + onStop.forEach((f) => f()) + onStop = [] + }, flush: () => Promise.all(skInfluxes.map((ski) => ski.flush())), diff --git a/src/query.ts b/src/query.ts index 5e22172..dcbd82d 100644 --- a/src/query.ts +++ b/src/query.ts @@ -21,6 +21,7 @@ const skinflux = new SKInflux( writeOptions: {}, ignoredPaths: [], ignoredSources: [], + resolution: 0 }, logging, () => undefined,