Skip to content

Commit

Permalink
feature: object values handling
Browse files Browse the repository at this point in the history
Object valued properties are written as JSON strings. An
empty path is changed to be the literal string <empty>, a
InfluxDb measurement name can not be empty.
  • Loading branch information
tkurki committed Sep 15, 2022
1 parent e4a33df commit 0a05b38
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
12 changes: 9 additions & 3 deletions src/influx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { SKContext } from '@chacal/signalk-ts'
import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client'
import { BucketsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis'

import { QueryParams } from './plugin'
import { Logging, QueryParams } from './plugin'
import { S2 } from 's2-geometry'

export interface SKInfluxConfig {
Expand Down Expand Up @@ -51,13 +51,15 @@ interface PathValue {
value: any
}

export const influxPath = (path: string) => (path !== '' ? path : '<empty>')

export class SKInflux {
private influx: InfluxDB
private org: string
private bucket: string
private writeApi: WriteApi
private queryApi: QueryApi
constructor(config: SKInfluxConfig) {
constructor(config: SKInfluxConfig, private logging: Logging) {
const { org, bucket } = config
this.influx = new InfluxDB(config)
this.org = org
Expand All @@ -71,7 +73,7 @@ export class SKInflux {
}

handleValue(context: SKContext, source: string, pathValue: PathValue) {
const point = new Point(pathValue.path).tag('context', context).tag('source', source)
const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source)
if (pathValue.path === 'navigation.position') {
point.floatField('lat', pathValue.value.latitude)
point.floatField('lon', pathValue.value.longitude)
Expand All @@ -86,8 +88,12 @@ export class SKInflux {
break
case 'boolean':
point.booleanField('value', pathValue.value)
break
case 'object':
point.stringField('value', JSON.stringify(pathValue.value))
}
}
this.logging.debug(point)
this.writeApi.writePoint(point)
}

Expand Down
22 changes: 17 additions & 5 deletions src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { EventEmitter } from 'stream'
import InfluxPluginFactory, { App } from './plugin'
import waitOn from 'wait-on'
import retry from 'async-await-retry'
import { influxPath } from './influx'

const INFLUX_HOST = process.env['INFLUX_HOST'] || '127.0.0.1'

Expand All @@ -17,11 +18,13 @@ describe('Plugin', () => {
const app: App = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars
debug: function (...args: any): void {
throw new Error('Function not implemented.')
// eslint-disable-next-line no-console
console.log(args)
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars
error: function (...args: any): void {
throw new Error('Function not implemented.')
// eslint-disable-next-line no-console
console.log(args)
},
signalk: new EventEmitter(),
}
Expand Down Expand Up @@ -69,6 +72,15 @@ describe('Plugin', () => {
rowCount: 2,
},
],
[
{
path: '',
value: {
mmsi: '20123456',
},
rowCount: 1,
},
],
]
TESTVALUES.forEach((values) =>
app.signalk.emit('delta', {
Expand Down Expand Up @@ -96,11 +108,11 @@ describe('Plugin', () => {
context: TESTCONTEXT,
from: ZonedDateTime.parse('2022-08-17T17:00:00Z'),
to: ZonedDateTime.parse('2022-08-17T17:00:00Z'),
paths: [pathValue.path],
paths: [influxPath(pathValue.path)],
resolution: 60,
})
.then((rows) => {
expect(rows.length).to.equal(pathValue.rowCount)
expect(rows.length).to.equal(pathValue.rowCount, `${JSON.stringify(pathValue)}`)
}),
),
)
Expand All @@ -109,7 +121,7 @@ describe('Plugin', () => {
}, new Array<any[]>()),
)
return await retry(testAllValuesFoundInDb, [null], {
retriesMax: 10,
retriesMax: 5,
interval: 50,
})
})
Expand Down
6 changes: 4 additions & 2 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import { ZonedDateTime } from '@js-joda/core'
// eslint-disable-next-line @typescript-eslint/no-var-requires
const packageInfo = require('../package.json')

export interface App {
export interface Logging {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
debug: (...args: any) => void
// eslint-disable-next-line @typescript-eslint/no-explicit-any
error: (...args: any) => void
}
export interface App extends Logging {
signalk: EventEmitter
}

Expand Down Expand Up @@ -68,7 +70,7 @@ export default function InfluxPluginFactory(app: App): Plugin & InfluxPlugin {
let skInfluxes: SKInflux[]
return {
start: function (config: PluginConfig) {
skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config))
skInfluxes = config.influxes.map((config: SKInfluxConfig) => new SKInflux(config, app))
return Promise.all(skInfluxes.map((skInflux) => skInflux.init())).then(() =>
app.signalk.on('delta', (delta: SKDelta) => {
delta.updates.forEach((update) => {
Expand Down

0 comments on commit 0a05b38

Please sign in to comment.