From 0a05b389bd78c7678d0487fa941ec59a5d113dfb Mon Sep 17 00:00:00 2001 From: Teppo Kurki Date: Thu, 15 Sep 2022 22:46:04 +0300 Subject: [PATCH] feature: object values handling Object valued properties are written as JSON strings. An empty path is changed to be the literal string , a InfluxDb measurement name can not be empty. --- src/influx.ts | 12 +++++++++--- src/plugin.test.ts | 22 +++++++++++++++++----- src/plugin.ts | 6 ++++-- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/influx.ts b/src/influx.ts index a07f9bb..20a2978 100644 --- a/src/influx.ts +++ b/src/influx.ts @@ -17,7 +17,7 @@ import { SKContext } from '@chacal/signalk-ts' import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client' import { BucketsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis' -import { QueryParams } from './plugin' +import { Logging, QueryParams } from './plugin' import { S2 } from 's2-geometry' export interface SKInfluxConfig { @@ -51,13 +51,15 @@ interface PathValue { value: any } +export const influxPath = (path: string) => (path !== '' ? path : '') + export class SKInflux { private influx: InfluxDB private org: string private bucket: string private writeApi: WriteApi private queryApi: QueryApi - constructor(config: SKInfluxConfig) { + constructor(config: SKInfluxConfig, private logging: Logging) { const { org, bucket } = config this.influx = new InfluxDB(config) this.org = org @@ -71,7 +73,7 @@ export class SKInflux { } handleValue(context: SKContext, source: string, pathValue: PathValue) { - const point = new Point(pathValue.path).tag('context', context).tag('source', source) + const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source) if (pathValue.path === 'navigation.position') { point.floatField('lat', pathValue.value.latitude) point.floatField('lon', pathValue.value.longitude) @@ -86,8 +88,12 @@ export class SKInflux { break case 'boolean': point.booleanField('value', pathValue.value) + break + case 'object': + point.stringField('value', JSON.stringify(pathValue.value)) } } + this.logging.debug(point) this.writeApi.writePoint(point) } diff --git a/src/plugin.test.ts b/src/plugin.test.ts index 44e7d7a..b64e609 100644 --- a/src/plugin.test.ts +++ b/src/plugin.test.ts @@ -4,6 +4,7 @@ import { EventEmitter } from 'stream' import InfluxPluginFactory, { App } from './plugin' import waitOn from 'wait-on' import retry from 'async-await-retry' +import { influxPath } from './influx' const INFLUX_HOST = process.env['INFLUX_HOST'] || '127.0.0.1' @@ -17,11 +18,13 @@ describe('Plugin', () => { const app: App = { // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars debug: function (...args: any): void { - throw new Error('Function not implemented.') + // eslint-disable-next-line no-console + console.log(args) }, // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars error: function (...args: any): void { - throw new Error('Function not implemented.') + // eslint-disable-next-line no-console + console.log(args) }, signalk: new EventEmitter(), } @@ -69,6 +72,15 @@ describe('Plugin', () => { rowCount: 2, }, ], + [ + { + path: '', + value: { + mmsi: '20123456', + }, + rowCount: 1, + }, + ], ] TESTVALUES.forEach((values) => app.signalk.emit('delta', { @@ -96,11 +108,11 @@ describe('Plugin', () => { context: TESTCONTEXT, from: ZonedDateTime.parse('2022-08-17T17:00:00Z'), to: ZonedDateTime.parse('2022-08-17T17:00:00Z'), - paths: [pathValue.path], + paths: [influxPath(pathValue.path)], resolution: 60, }) .then((rows) => { - expect(rows.length).to.equal(pathValue.rowCount) + expect(rows.length).to.equal(pathValue.rowCount, `${JSON.stringify(pathValue)}`) }), ), ) @@ -109,7 +121,7 @@ describe('Plugin', () => { }, new Array()), ) return await retry(testAllValuesFoundInDb, [null], { - retriesMax: 10, + retriesMax: 5, interval: 50, }) }) diff --git a/src/plugin.ts b/src/plugin.ts index 651c442..4cba04e 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -21,11 +21,13 @@ import { ZonedDateTime } from '@js-joda/core' // eslint-disable-next-line @typescript-eslint/no-var-requires const packageInfo = require('../package.json') -export interface App { +export interface Logging { // eslint-disable-next-line @typescript-eslint/no-explicit-any debug: (...args: any) => void // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (...args: any) => void +} +export interface App extends Logging { signalk: EventEmitter } @@ -68,7 +70,7 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin { let skInfluxes: SKInflux[] return { start: function (config: PluginConfig) { - skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config)) + skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config, app)) return Promise.all(skInfluxes.map((skInflux) => skInflux.init())).then(() => app.signalk.on('delta', (delta: SKDelta) => { delta.updates.forEach((update) => {