Skip to content

Commit

Permalink
feature: add resolution parameter
Browse files Browse the repository at this point in the history
Add configuration parameter resolution that specifies how often data is
written to the database.

Fixes #39.
  • Loading branch information
tkurki committed Dec 25, 2023
1 parent 25ea85a commit e76dd62
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 27 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},
"license": "Apache-2.0",
"devDependencies": {
"@signalk/server-api": "^2.3.0",
"@types/chai": "^4.3.3",
"@types/express": "^4.17.17",
"@types/mocha": "^9.1.1",
Expand Down
97 changes: 72 additions & 25 deletions src/influx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

import { SKContext } from '@chacal/signalk-ts'
import { Context, Path, PathValue, SourceRef } from '@signalk/server-api'
import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client'
import { BucketsAPI, DbrpsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis'
import { InfluxDB as InfluxV1 } from 'influx'
Expand Down Expand Up @@ -75,17 +76,18 @@ export interface SKInfluxConfig {
*/
useSKTimestamp: boolean

/**
* @title Resolution (milliseconds)
* @default: 0
* @description Time resolution of data written to the database. Zero means write all data, 1000 means write each context-path-source combination once per second. Updates arriving more quicker will not be written.
*/
resolution: number

writeOptions: Partial<WriteOptions>
}

type PartialBy<T, K extends keyof T> = Omit<T, K> & Partial<Pick<T, K>>

interface PathValue {
path: string
// eslint-disable-next-line @typescript-eslint/no-explicit-any
value: any
}

export const influxPath = (path: string) => (path !== '' ? path : '<empty>')

enum JsValueType {
Expand Down Expand Up @@ -118,8 +120,17 @@ export class SKInflux {
private ignoredSources: string[]
private useSKTimestamp: boolean

private lastWrittenTimestamps: {
[context: Context]: {
[path: Path]: {
[sourceRef: SourceRef]: number
}
}
} = {}
private resolution: number

constructor(config: SKInfluxConfig, private logging: Logging, triggerStatusUpdate: () => void) {
const { org, bucket, url, onlySelf, ignoredPaths, ignoredSources, useSKTimestamp } = config
const { org, bucket, url, onlySelf, ignoredPaths, ignoredSources, resolution, useSKTimestamp } = config
this.influx = new InfluxDB(config)
this.org = org
this.bucket = bucket
Expand All @@ -128,6 +139,7 @@ export class SKInflux {
this.ignoredPaths = ignoredPaths
this.ignoredSources = ignoredSources
this.useSKTimestamp = useSKTimestamp
this.resolution = resolution
this.writeApi = this.influx.getWriteApi(org, bucket, 'ms', {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
writeFailed: (_error, lines, _attempt, _expires) => {
Expand Down Expand Up @@ -195,19 +207,52 @@ export class SKInflux {
}
}

handleValue(context: SKContext, isSelf: boolean, source: string, _timestamp: Date, pathValue: PathValue) {
handleValue(context: Context, isSelf: boolean, sourceRef: SourceRef, _timestamp: Date, pathValue: PathValue, now: number) {
const timestamp = this.useSKTimestamp ? _timestamp : new Date()
if (this.isIgnored(pathValue.path, source)) {
if (this.isIgnored(pathValue.path, sourceRef)) {
return
}
if (!this.shouldStoreNow(context, pathValue.path, sourceRef, now)) {
return
}
if (!this.onlySelf || isSelf) {
const point = toPoint(context, isSelf, source, timestamp, pathValue, this.logging.debug)
console.log(point)
const point = toPoint(context, isSelf, sourceRef, timestamp, pathValue, this.logging.debug)
if (point) {
this.writeApi.writePoint(point)
this.updateLastWritten(context, pathValue.path, sourceRef, now)
}
}
}
updateLastWritten(context: Context, path: Path, sourceRef: SourceRef, now: number) {
this.lastWrittenTimestamps[context][path][sourceRef] = now
}

shouldStoreNow(context: Context, path: Path, sourceRef: SourceRef, now: number) {
const byContext = this.lastWrittenTimestamps[context] ?? (this.lastWrittenTimestamps[context] = {})
const byPath = byContext[path] ?? (byContext[path] = {})
return now - (byPath[sourceRef] || 0) > (this.resolution ?? 0)
}

/**
* Delete last written timestamps by context if the newest timestamp from
* that context is older than x times resolution
*/
pruneLastWrittenTimestamps() {
const now = Date.now()
Object.entries(this.lastWrittenTimestamps).forEach(([context, byContext]) => {
const maxTimestamp = Object.values(byContext).reduce<number>((acc, byPath) => {
return Math.max(
acc,
Object.values(byPath).reduce<number>((acc, bySource) => {
return Math.max(0, bySource)
}, 0),
)
}, 0)
if (now - maxTimestamp > 100 * this.resolution) {
delete this.lastWrittenTimestamps[context as Context]
}
})
}

flush() {
return this.writeApi.flush()
Expand Down Expand Up @@ -270,36 +315,38 @@ const toPoint = (
if (isSelf) {
point.tag(SELF_TAG_NAME, SELF_TAG_VALUE)
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const value = pathValue.value as any
if (
pathValue.path === 'navigation.position' &&
typeof pathValue.value === 'object' &&
pathValue.value !== null &&
pathValue.value.latitude !== null &&
pathValue.value.longitude !== null &&
!isNaN(pathValue.value.latitude) &&
!isNaN(pathValue.value.longitude)
typeof value === 'object' &&
value !== null &&
value.latitude !== null &&
value.longitude !== null &&
!isNaN(value.latitude) &&
!isNaN(value.longitude)
) {
point.floatField('lat', pathValue.value.latitude)
point.floatField('lon', pathValue.value.longitude)
point.tag('s2_cell_id', posToS2CellId(pathValue.value))
point.floatField('lat', value.latitude)
point.floatField('lon', value.longitude)
point.tag('s2_cell_id', posToS2CellId(value))
} else {
const valueType = typeFor(pathValue)
if (pathValue.value === null) {
if (value === null) {
return
}
try {
switch (valueType) {
case JsValueType.number:
point.floatField('value', pathValue.value)
point.floatField('value', value)
break
case JsValueType.string:
point.stringField('value', pathValue.value)
point.stringField('value', value)
break
case JsValueType.boolean:
point.booleanField('value', pathValue.value)
point.booleanField('value', value)
break
case JsValueType.object:
point.stringField('value', JSON.stringify(pathValue.value))
point.stringField('value', JSON.stringify(value))
}
} catch (e) {
debug(`Error creating point ${pathValue.path}:${pathValue.value} => ${valueType}`)
Expand Down
1 change: 1 addition & 0 deletions src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe('Plugin', () => {
ignoredPaths: [],
ignoredSources: [],
useSKTimestamp: false,
resolution: 0,
},
],
})
Expand Down
24 changes: 22 additions & 2 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { SKDelta } from '@chacal/signalk-ts'
import { EventEmitter } from 'stream'
import { registerHistoryApiRoute } from './HistoryAPI'
import { IRouter } from 'express'
import { Context, PathValue, SourceRef } from '@signalk/server-api'

// eslint-disable-next-line @typescript-eslint/no-var-requires
const packageInfo = require('../package.json')
Expand Down Expand Up @@ -71,6 +72,7 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin {
const selfContext = 'vessels.' + app.selfId

let skInfluxes: SKInflux[]
let onStop: (() => void)[] = []
return {
start: function (config: PluginConfig) {
const updatePluginStatus = () => {
Expand All @@ -85,15 +87,30 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin {
}
skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config, app, updatePluginStatus))
registerHistoryApiRoute(app, skInfluxes[0], app.selfId, app.debug)

onStop = []
skInfluxes.forEach((skInflux) => {
const pruner = setInterval(() => skInflux.pruneLastWrittenTimestamps(), 5 * 60 * 1000)
onStop.push(() => clearInterval(pruner))
})

return Promise.all(skInfluxes.map((skInflux) => skInflux.init())).then(() =>
app.signalk.on('delta', (delta: SKDelta) => {
const now = Date.now()
const isSelf = delta.context === selfContext
delta.updates &&
delta.updates.forEach((update) => {
update.values &&
update.values.forEach((pathValue) => {
skInfluxes.forEach((skInflux) =>
skInflux.handleValue(delta.context, isSelf, update.$source, update.timestamp, pathValue),
skInflux.handleValue(
delta.context as Context,
isSelf,
update.$source as SourceRef,
update.timestamp,
pathValue as PathValue,
now,
),
)
})
})
Expand All @@ -102,7 +119,10 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin {
},

// eslint-disable-next-line @typescript-eslint/no-empty-function
stop: function () {},
stop: () => {
onStop.forEach((f) => f())
onStop = []
},

flush: () => Promise.all(skInfluxes.map((ski) => ski.flush())),

Expand Down
1 change: 1 addition & 0 deletions src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const skinflux = new SKInflux(
ignoredPaths: [],
ignoredSources: [],
useSKTimestamp: false,
resolution: 0,
},
logging,
() => undefined,
Expand Down

0 comments on commit e76dd62

Please sign in to comment.