From ac611c4332cc9192083bfa27a4e1b42f1a2e4694 Mon Sep 17 00:00:00 2001 From: yshashix Date: Tue, 16 May 2023 18:31:29 +0200 Subject: [PATCH] TimescaleDB bridge - storing timeseries ngsild entity data in tsdb successfully - Related to epic #428 --- KafkaBridge/Dockerfile | 1 + KafkaBridge/README.md | 21 +- KafkaBridge/config/config.json | 11 +- KafkaBridge/package.json | 2 + KafkaBridge/test/testTimescaledbApp.js | 337 ++++++++++++++++++ KafkaBridge/timescaledb/app.js | 161 +++++++++ .../timescaledb/model/entity_history.js | 63 ++++ KafkaBridge/timescaledb/utils/tsdb-connect.js | 43 +++ helm/charts/kafka-bridges/.helmignore | 1 + .../templates/bridge-configmap.yaml | 9 + .../timescaledb-bridge-deployment.yaml | 70 ++++ .../templates/minimal-postgres-manifest.yaml | 7 + helm/charts/postgres/values.yaml | 10 +- helm/environment/default.yaml | 4 + helm/environment/production.yaml | 4 + helm/values.yaml.gotmpl | 8 + 16 files changed, 749 insertions(+), 3 deletions(-) create mode 100644 KafkaBridge/test/testTimescaledbApp.js create mode 100644 KafkaBridge/timescaledb/app.js create mode 100644 KafkaBridge/timescaledb/model/entity_history.js create mode 100644 KafkaBridge/timescaledb/utils/tsdb-connect.js create mode 100644 helm/charts/kafka-bridges/.helmignore create mode 100644 helm/charts/kafka-bridges/templates/timescaledb-bridge-deployment.yaml diff --git a/KafkaBridge/Dockerfile b/KafkaBridge/Dockerfile index 06482c4c..dd81ee0f 100644 --- a/KafkaBridge/Dockerfile +++ b/KafkaBridge/Dockerfile @@ -2,6 +2,7 @@ FROM node:16 COPY alerta/ /opt/alerta/ COPY debeziumBridge/ /opt/debeziumBridge/ COPY ngsildUpdates/ /opt/ngsildUpdates/ +COPY timescaledb/ /opt/timescaledb/ ADD package.json /opt/package.json ADD package-lock.json package-lock.json COPY lib/ /opt/lib/ diff --git a/KafkaBridge/README.md b/KafkaBridge/README.md index ad19efa3..573e1027 100644 --- a/KafkaBridge/README.md +++ b/KafkaBridge/README.md @@ -3,6 +3,7 @@ 1. Alerta bridge, a service which listens at a specific Kafka topic and forwards the data to Alerta. 2. NgsildUpdates bride, a service which listens at a specific Kafka topic and forwards data to the scorpio/Ngsild broker. 3. DebeziumBridge, a service which listens to debezium updates on ngsild entities and forward them to specific topics. +4. Timescaledb Bridge, a service which listens at specific Kafka topic and forwards data to timescaledb(tsdb) running in postgres database ## Alerta Bridge @@ -74,4 +75,22 @@ cat ngsildUpdates.json | tr -d '\n' | kafkacat -P -t iff.alerts -b my-cluster-ka ## Debezium Bridge The debezium bridge maps the updates from the NGSILD entity tabe to StreamingSQL tables. -Concept TBD. \ No newline at end of file +Concept TBD. + +## TimescaledDB Bridge + +This service which listens at specific Kafka topic (iff.ngsild.attributes) and forwards NGSI-LD data to timescaledb(tsdb) running in postgres database. +- Currently it is disabled by default. To enable please remove timescaledb deployment file link from .helmignore of kafkabridge charts + +* entities: Array of valid [NGSI-LD](https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.05.01_60/gs_CIM009v010501p.pdf) entities + +Data stored in the hypertable "entityhistories" of timescaledb database "tsdb" in below format: + +``` +tsdb=# select * from entityhistories; + observedAt | modifiedAt | entityId | attributeId | attributeType | datasetId | nodeType | value | index +----------------------------+----------------------------+--------------------+--------------------------------------------------+-------------------------------------------+---------------------------------------------------------------------+----------+--------------------------------------------------+------- + 2023-07-18 09:40:24.22+00 | 2023-07-18 09:40:24.22+00 | urn:plasmacutter:1 | https://industry-fusion.com/types/v0.9/state | https://uri.etsi.org/ngsi-ld/Property | urn:plasmacutter:1\https://industry-fusion.com/types/v0.9/state | @value | https://industry-fusion.com/types/v0.9/state_OFF | 0 + 2023-07-18 09:38:15.559+00 | 2023-07-18 09:38:15.559+00 | urn:plasmacutter:1 | https://industry-fusion.com/types/v0.9/hasFilter | https://uri.etsi.org/ngsi-ld/Relationship | urn:plasmacutter:1\https://industry-fusion.com/types/v0.9/hasFilter | @id | urn:filter:1 | 0 + +``` diff --git a/KafkaBridge/config/config.json b/KafkaBridge/config/config.json index 137432a7..523fb3de 100644 --- a/KafkaBridge/config/config.json +++ b/KafkaBridge/config/config.json @@ -43,5 +43,14 @@ }, "bridgeCommon": { "kafkaSyncOnAttribute": "https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" + }, + "timescaledb": { + "topic": "iff.ngsild.attributes", + "PGSSLMODE": "require", + "dbname": "tsdb", + "hostname": "acid-cluster.iff.svc.cluster.local", + "port": "5432", + "username": "ngb", + "password": "POSTGRES_PASSWORD" } -} +} \ No newline at end of file diff --git a/KafkaBridge/package.json b/KafkaBridge/package.json index fd4118e2..a7c964cd 100644 --- a/KafkaBridge/package.json +++ b/KafkaBridge/package.json @@ -12,6 +12,8 @@ "dependencies": { "@comunica/actor-init-sparql-file": "1.22.3", "kafkajs": "^1.16.0", + "pg": "^8.2.1", + "sequelize": "^6.5.0", "keycloak-connect": "^17.0.1", "openid-client": "^5.1.2", "underscore": "^1.13.1", diff --git a/KafkaBridge/test/testTimescaledbApp.js b/KafkaBridge/test/testTimescaledbApp.js new file mode 100644 index 00000000..30f49457 --- /dev/null +++ b/KafkaBridge/test/testTimescaledbApp.js @@ -0,0 +1,337 @@ +/** +* Copyright (c) 2022 Intel Corporation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +'use strict'; + +const { assert } = require('chai'); +const chai = require('chai'); +global.should = chai.should(); +const rewire = require('rewire'); +const sinon = require('sinon'); +const toTest = rewire('../timescaledb/app.js'); + +const logger = { + debug: function () {}, + info: function () {}, + error: function () {} +}; + +const Logger = function () { + return logger; +}; + +describe('Test timescaledb processMessage', function () { + it('Should send a Property with @value nodetype', async function () { + const entityHistoryTable = { + create: async function (datapoint) { + assert.equal(datapoint.observedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.modifiedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.entityId, 'entityId'); + assert.equal(datapoint.attributeId, 'name'); + assert.equal(datapoint.nodeType, '@value'); + assert.equal(datapoint.index, 0); + assert.equal(datapoint.datasetId, 'id'); + assert.equal(datapoint.attributeType, 'https://uri.etsi.org/ngsi-ld/Property'); + assert.equal(datapoint.value, 123); + return Promise.resolve(datapoint); + } + }; + + const kafkamessage = { + topic: 'topic', + partition: 'partition', + message: { + value: JSON.stringify({ + id: 'id', + entityId: 'entityId', + name: 'name', + type: 'https://uri.etsi.org/ngsi-ld/Property', + 'https://uri.etsi.org/ngsi-ld/hasValue': 123, + nodeType: '@value', + index: 0 + }), + timestamp: '1689344953110' + } + }; + toTest.__set__('entityHistoryTable', entityHistoryTable); + toTest.__set__('Logger', Logger); + const processMessage = toTest.__get__('processMessage'); + await processMessage(kafkamessage); + }); + + it('Should send a Relationship', async function () { + const entityHistoryTable = { + create: async function (datapoint) { + assert.equal(datapoint.observedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.modifiedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.entityId, 'entityId'); + assert.equal(datapoint.attributeId, 'relationship'); + assert.equal(datapoint.nodeType, '@id'); + assert.equal(datapoint.index, 0); + assert.equal(datapoint.datasetId, 'id'); + assert.equal(datapoint.attributeType, 'https://uri.etsi.org/ngsi-ld/Relationship'); + assert.equal(datapoint.value, 'object'); + return Promise.resolve(datapoint); + } + }; + + const kafkamessage = { + topic: 'topic', + partition: 'partition', + message: { + value: JSON.stringify({ + id: 'id', + entityId: 'entityId', + name: 'relationship', + type: 'https://uri.etsi.org/ngsi-ld/Relationship', + 'https://uri.etsi.org/ngsi-ld/hasObject': 'object', + nodeType: '@id', + index: 0 + }), + timestamp: '1689344953110' + } + }; + toTest.__set__('entityHistoryTable', entityHistoryTable); + toTest.__set__('Logger', Logger); + const processMessage = toTest.__get__('processMessage'); + await processMessage(kafkamessage); + }); + + it('Should send a Property with @value nodetype and valueType string', async function () { + const entityHistoryTable = { + create: async function (datapoint) { + assert.equal(datapoint.observedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.modifiedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.entityId, 'entityId'); + assert.equal(datapoint.attributeId, 'property'); + assert.equal(datapoint.nodeType, '@value'); + assert.equal(datapoint.index, 0); + assert.equal(datapoint.datasetId, 'id'); + assert.equal(datapoint.attributeType, 'https://uri.etsi.org/ngsi-ld/Property'); + assert.equal(datapoint.value, 'value'); + assert.equal(datapoint.valueType, 'http://www.w3.org/2001/XMLSchema#string'); + return Promise.resolve(datapoint); + } + }; + + const kafkamessage = { + topic: 'topic', + partition: 'partition', + message: { + value: JSON.stringify({ + id: 'id', + entityId: 'entityId', + name: 'property', + type: 'https://uri.etsi.org/ngsi-ld/Property', + 'https://uri.etsi.org/ngsi-ld/hasValue': 'value', + nodeType: '@value', + valueType: 'http://www.w3.org/2001/XMLSchema#string', + index: 0 + }), + timestamp: '1689344953110' + } + }; + toTest.__set__('entityHistoryTable', entityHistoryTable); + toTest.__set__('Logger', Logger); + const processMessage = toTest.__get__('processMessage'); + await processMessage(kafkamessage); + }); + + it('Should check correct object type- property/relationship with @value nodetype and valueType string', async function () { + const entityHistoryTable = { + create: async function (datapoint) { + assert.equal(datapoint.observedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.modifiedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.entityId, 'entityId'); + assert.equal(datapoint.attributeId, 'property'); + assert.equal(datapoint.nodeType, '@value'); + assert.equal(datapoint.index, 0); + assert.equal(datapoint.datasetId, 'id'); + assert.notEqual(datapoint.attributeType, 'https://uri.etsi.org/ngsi-ld/Property'); + assert.equal(datapoint.value, 'value'); + assert.equal(datapoint.valueType, 'http://www.w3.org/2001/XMLSchema#string'); + return Promise.resolve(datapoint); + } + }; + + const kafkamessage = { + topic: 'topic', + partition: 'partition', + message: { + value: JSON.stringify({ + id: 'id', + entityId: 'entityId', + name: 'property', + type: 'https://uri.etsi.org/ngsi-ld/', + 'https://uri.etsi.org/ngsi-ld/hasValue': 'value', + nodeType: '@value', + valueType: 'http://www.w3.org/2001/XMLSchema#string', + index: 0 + }), + timestamp: '1689344953110' + } + }; + toTest.__set__('entityHistoryTable', entityHistoryTable); + toTest.__set__('Logger', Logger); + const processMessage = toTest.__get__('processMessage'); + await processMessage(kafkamessage); + }); + + it('Should send a Property with wrong timestamp and attributeType', async function () { + const entityHistoryTable = { + create: async function (datapoint) { + assert.notEqual(datapoint.observedAt, '2023-07-14T14:29:13.110Z'); + assert.notEqual(datapoint.modifiedAt, '2023-07-14T14:29:13.110Z'); + assert.equal(datapoint.entityId, 'entityId'); + assert.equal(datapoint.attributeId, 'property'); + assert.equal(datapoint.nodeType, '@value'); + assert.equal(datapoint.index, 0); + assert.equal(datapoint.datasetId, 'id'); + assert.notEqual(datapoint.attributeType, 'https://uri.etsi.org/ngsi-ld/Relationship'); + assert.equal(datapoint.value, 'value'); + return Promise.resolve(datapoint); + } + }; + + const kafkamessage = { + topic: 'topic', + partition: 'partition', + message: { + value: JSON.stringify({ + id: 'id', + entityId: 'entityId', + name: 'property', + type: 'https://uri.etsi.org/ngsi-ld/Property', + 'https://uri.etsi.org/ngsi-ld/hasValue': 'value', + nodeType: '@value', + index: 0 + }), + timestamp: '168934495320' + } + }; + toTest.__set__('entityHistoryTable', entityHistoryTable); + toTest.__set__('Logger', Logger); + const processMessage = toTest.__get__('processMessage'); + await processMessage(kafkamessage); + }); +}); + +describe('Test startListener', function () { + const historyTableName = 'entityhistories'; + const htChecksqlquery = 'SELECT * FROM timescaledb_information.hypertables WHERE hypertable_name = \'' + historyTableName + '\';'; + const htCreateSqlquery = 'SELECT create_hypertable(\'' + historyTableName + '\', \'observedAt\', migrate_data => true);'; + + const SequelizeClass = class Sequelize { + authenticate () { + return Promise.resolve(); + } + + sync () { + return Promise.resolve(); + } + + query (sqlquery) { + assert.oneOf(sqlquery, [htChecksqlquery, htCreateSqlquery], 'Wrong query message for timescaledb table.'); + return Promise.resolve(sqlquery); + } + }; + + const consumer = { + run: function (run) { + return new Promise(function (resolve, reject) { + resolve(); + }); + }, + connect: function () {}, + subscribe: function (obj) { + obj.topic.should.equal('topic'); + obj.fromBeginning.should.equal(false); + }, + disconnect: function () { + } + }; + const fs = { + writeFileSync: function (file, message) { + assert.oneOf(file, ['/tmp/ready', '/tmp/healthy'], 'Wrong file for filesync'); + assert.oneOf(message, ['ready', 'healthy'], 'Wrong filesync message.'); + } + }; + const config = { + timescaledb: { + topic: 'topic' + } + }; + const process = { + on: async function (type, f) { + expect(type).to.satisfy(function (type) { + if (type === 'unhandledRejection' || type === 'uncaughtException') { + return true; + } + }); + await f('Test Error'); + }, + exit: function (value) { + }, + once: async function (type, f) { + await f('Test Error'); + } + }; + + it('Should check database reject handling', async function () { + const errorMsg = 'error on processing'; + const SequelizeClassNoQuery = class Sequelize { + authenticate () { + return Promise.reject(errorMsg); + } + + sync () { + return Promise.reject(errorMsg); + } + + query (sqlquery) { + return Promise.reject(errorMsg); + } + }; + + const sequelizeObj = new SequelizeClassNoQuery(); + const revert = toTest.__set__('consumer', consumer); + toTest.__set__('sequelize', sequelizeObj); + toTest.__set__('fs', fs); + toTest.__set__('config', config); + toTest.__set__('process', process); + const startListener = toTest.__get__('startListener'); + await startListener(); + revert(); + }); + + it('Setup Kafka listener, readiness and health status', async function () { + const sequelizeObj = new SequelizeClass(); + const consumerDisconnectSpy = sinon.spy(consumer, 'disconnect'); + const consumerConnectSpy = sinon.spy(consumer, 'connect'); + const processOnceSpy = sinon.spy(process, 'once'); + // const processExitSpy = sinon.spy(process, 'exit'); + const revert = toTest.__set__('consumer', consumer); + toTest.__set__('sequelize', sequelizeObj); + toTest.__set__('fs', fs); + toTest.__set__('config', config); + toTest.__set__('process', process); + const startListener = toTest.__get__('startListener'); + await startListener(); + consumerDisconnectSpy.callCount.should.equal(3); + assert(consumerConnectSpy.calledOnce); + assert(processOnceSpy.calledThrice); + revert(); + }); +}); diff --git a/KafkaBridge/timescaledb/app.js b/KafkaBridge/timescaledb/app.js new file mode 100644 index 00000000..af3db04b --- /dev/null +++ b/KafkaBridge/timescaledb/app.js @@ -0,0 +1,161 @@ +/** +* Copyright (c) 2023 Intel Corporation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +'use strict'; + +const GROUPID = 'timescaledbkafkabridge'; +const CLIENTID = 'timescaledbkafkaclient'; +const { Kafka } = require('kafkajs'); +const fs = require('fs'); +const config = require('../config/config.json'); +const sequelize = require('./utils/tsdb-connect'); // Import sequelize object, Database connection pool managed by Sequelize. +const { QueryTypes } = require('sequelize'); +const Logger = require('../lib/logger.js'); +const entityHistoryTable = require('./model/entity_history.js'); // Import entityHistory model/table defined +const historyTableName = 'entityhistories'; +const runningAsMain = require.main === module; +const logger = new Logger(config); + +const kafka = new Kafka({ + clientId: CLIENTID, + brokers: config.kafka.brokers +}); +const consumer = kafka.consumer({ groupId: GROUPID, allowAutoTopicCreation: false }); +const processMessage = async function ({ topic, partition, message }) { + const body = JSON.parse(message.value); + if (body.type === undefined) { + return; + } + const datapoint = {}; + const kafkaTimestamp = Number(message.timestamp); + const epochDate = new Date(kafkaTimestamp); + const utcTime = epochDate.toISOString(); + + // Creating datapoint which will be inserted to tsdb + datapoint.observedAt = utcTime; + datapoint.modifiedAt = utcTime; + datapoint.entityId = body.entityId; + datapoint.attributeId = body.name; + datapoint.nodeType = body.nodeType; + datapoint.index = body.index; + datapoint.datasetId = body.id; + + if (body.type === 'https://uri.etsi.org/ngsi-ld/Property') { + let value = body['https://uri.etsi.org/ngsi-ld/hasValue']; + if (!isNaN(value)) { + value = Number(value); + } + datapoint.attributeType = body.type; + datapoint.value = value; + if (body.valueType !== undefined && body.valueType !== null) { + datapoint.valueType = body.valueType; + } + } else if (body.type === 'https://uri.etsi.org/ngsi-ld/Relationship') { + datapoint.attributeType = body.type; + datapoint.value = body['https://uri.etsi.org/ngsi-ld/hasObject']; + } else { + logger.error('Could not send Datapoints: Neither Property nor Relationship'); + return; + } + + entityHistoryTable.create(datapoint) + .then(data => { + logger.debug('Datapoint succefully stored in tsdb table'); + }) + .catch((err) => logger.error('Error in storing datapoint in tsdb: ' + err)); +}; + +const startListener = async function () { + let hypertableStatus = false; + sequelize.authenticate().then(() => { + logger.info('TSDB connection has been established.'); + }) + .catch(error => { + logger.error('Unable to connect to TSDB:', error); + process.exit(1); + }); + + // await sequelize.sync({ force: true }) + // await sequelize.sync({ alter: true }); + await sequelize.sync({ force: true }).then(() => { + logger.info('Table created in tsdb: ' + historyTableName); + }) + .catch(error => { + logger.error('Unable to create table in tsdb:', error); + }); + + const htChecksqlquery = 'SELECT * FROM timescaledb_information.hypertables WHERE hypertable_name = \'' + historyTableName + '\';'; + await sequelize.query(htChecksqlquery, { type: QueryTypes.SELECT }).then((hypertableInfo) => { + if (hypertableInfo.length) { + hypertableStatus = true; + } + }) + .catch(error => { + logger.warn('Hypertable is not created, please create', error); + }); + + if (!hypertableStatus) { + const htCreateSqlquery = 'SELECT create_hypertable(\'' + historyTableName + '\', \'observedAt\', migrate_data => true);'; + await sequelize.query(htCreateSqlquery, { type: QueryTypes.SELECT }).then((hyperTableCreate) => { + logger.debug('Return of hypertable create sql query: ' + JSON.stringify(hyperTableCreate)); + }) + .catch(error => { + logger.error('Unable to create hypertable', error); + }); + }; + + // Kafka topic subscription + await consumer.connect(); + await consumer.subscribe({ topic: config.timescaledb.topic, fromBeginning: false }); + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => processMessage({ topic, partition, message }) + }).catch(e => console.error(`[example/consumer] ${e.message}`, e)); + + const errorTypes = ['unhandledRejection', 'uncaughtException']; + const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']; + + errorTypes.map(type => + process.on(type, async e => { + try { + console.log(`process.on ${type}`); + console.error(e); + await consumer.disconnect(); + process.exit(0); + } catch (_) { + process.exit(1); + } + })); + + signalTraps.map(type => + process.once(type, async () => { + try { + await consumer.disconnect(); + } finally { + process.kill(process.pid, type); + } + })); + + try { + fs.writeFileSync('/tmp/ready', 'ready'); + fs.writeFileSync('/tmp/healthy', 'healthy'); + } catch (err) { + logger.error(err); + } +}; + +if (runningAsMain) { + logger.info('Now staring Kafka listener'); + startListener(); +} diff --git a/KafkaBridge/timescaledb/model/entity_history.js b/KafkaBridge/timescaledb/model/entity_history.js new file mode 100644 index 00000000..e8a1be96 --- /dev/null +++ b/KafkaBridge/timescaledb/model/entity_history.js @@ -0,0 +1,63 @@ +/** +* Copyright (c) 2023 Intel Corporation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; +// Include Sequelize module. +const { Sequelize } = require('sequelize'); +// Sequelize tsdb connect instance +const sequelize = require('../utils/tsdb-connect'); + + +// CREATE table/model in tsdb to enter SpB NGSI_LD data +// Define method takes two arguments +// 1st - name of table, 2nd - columns inside the table +const entityHistoryTable = sequelize.define( "entityhistories", { + + // Column-1, observedAt is an object with kafka timestamp to date Data_Type UTC timestamp + observedAt:{ type:Sequelize.DATE, allowNull:false, primaryKey:true }, + + // Same as observedAt for now-> Later we modify + modifiedAt: { type:Sequelize.DATE, allowNull:false }, + + // Column-2, entityId + entityId: { type: Sequelize.TEXT, allowNull:false }, + + // Column-3, attributeId-> full name as URI + attributeId: { type: Sequelize.TEXT, allowNull:false }, + + // Column-4, attributeType-> Relaionship or properties + attributeType: { type: Sequelize.TEXT, allowNull:false }, + + // Column-5, datasetId-> entityid+name(Must be URI) + datasetId: { type: Sequelize.TEXT, allowNull:false, primaryKey:true }, + + nodeType: { type: Sequelize.TEXT, allowNull:false }, + + value: { type: Sequelize.TEXT, allowNull:false }, + + // In future can be used for literals value types + valueType: { type: Sequelize.TEXT, allowNull:true }, + + // Useful when we have array + index: { type: Sequelize.INTEGER, allowNull:false }, + + }, { + // disabled for a model auto timestamping with createAt and ModifiedAt as we take value from Kafka + timestamps: false + +}) + +module.exports = entityHistoryTable ; \ No newline at end of file diff --git a/KafkaBridge/timescaledb/utils/tsdb-connect.js b/KafkaBridge/timescaledb/utils/tsdb-connect.js new file mode 100644 index 00000000..886af1b6 --- /dev/null +++ b/KafkaBridge/timescaledb/utils/tsdb-connect.js @@ -0,0 +1,43 @@ +/** +* Copyright (c) 2023 Intel Corporation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +'use strict'; + +const { Sequelize } = require('sequelize'); +const config = require('../../config/config.json'); + +var sequelize; +const postgPassword = process.env.POSTGRES_PASSWORD || config.timescaledb.password; +const postgHostname = process.env.POSTGRES_SERVICE || config.timescaledb.hostname; +if (config.timescaledb.PGSSLMODE === "require") { + sequelize = new Sequelize(config.timescaledb.dbname, config.timescaledb.username, postgPassword, { + host: postgHostname, + port: config.timescaledb.port, + dialect: 'postgres', + dialectOptions: { + ssl: { + rejectUnauthorized: false + } + }, + }); +} else { + sequelize = new Sequelize(config.timescaledb.dbname, config.timescaledb.username, postgPassword, { + host: postgHostname, + port: config.timescaledb.port, + dialect: 'postgres' + }); +} + +module.exports = sequelize; \ No newline at end of file diff --git a/helm/charts/kafka-bridges/.helmignore b/helm/charts/kafka-bridges/.helmignore new file mode 100644 index 00000000..5ac27d3b --- /dev/null +++ b/helm/charts/kafka-bridges/.helmignore @@ -0,0 +1 @@ +./templates/timescaledb-bridge-deployment.yaml \ No newline at end of file diff --git a/helm/charts/kafka-bridges/templates/bridge-configmap.yaml b/helm/charts/kafka-bridges/templates/bridge-configmap.yaml index 5ccd99f2..9bce4402 100644 --- a/helm/charts/kafka-bridges/templates/bridge-configmap.yaml +++ b/helm/charts/kafka-bridges/templates/bridge-configmap.yaml @@ -54,6 +54,15 @@ data: }, "bridgeCommon": { "kafkaSyncOnAttribute": "https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" + }, + "timescaledb": { + "topic": "iff.ngsild.attributes", + "PGSSLMODE": "require", + "dbname": "tsdb", + "hostname": "acid-cluster", + "port": "5432", + "username": "ngb", + "password": "POSTGRES_PASSWORD" } } knowledge.ttl: | diff --git a/helm/charts/kafka-bridges/templates/timescaledb-bridge-deployment.yaml b/helm/charts/kafka-bridges/templates/timescaledb-bridge-deployment.yaml new file mode 100644 index 00000000..7e34608a --- /dev/null +++ b/helm/charts/kafka-bridges/templates/timescaledb-bridge-deployment.yaml @@ -0,0 +1,70 @@ +--- +{{- $secret := (lookup "v1" "Secret" .Release.Namespace "keycloak-client-secret-timescaledb") -}} +# yamllint disable rule:line-length +# yamllint disable rule:braces +apiVersion: apps/v1 +kind: Deployment +metadata: + name: timescaledb-bridge + namespace: {{ .Release.Namespace }} + labels: + app: timescaledb-bridge +spec: + replicas: {{ .Values.kafkaBridge.timescaledb.replicaCount }} + selector: + matchLabels: + app: timescaledb-bridge + template: + metadata: + labels: + app: timescaledb-bridge + annotations: + checksum/config: {{ include (print $.Template.BasePath "/bridge-configmap.yaml") . | sha256sum }} + {{- if $secret }} + checksum/credentials: {{ printf "%s" $secret | toString | sha256sum }} + {{- end }} + spec: + containers: + - name: timescaledb-bridge + image: '{{ .Values.mainRepo }}/kafka-bridge:{{ .Values.mainVersion }}' + command: ["node"] + args: ["/opt/timescaledb/app.js"] + imagePullPolicy: IfNotPresent + env: + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.db.dbUser -}}.{{- .Values.clusterSvcName -}}.{{- .Values.db.secretPostfix }} + key: password + - name: POSTGRES_SERVICE + value: {{ .Values.clusterSvcName }} + livenessProbe: + exec: + command: + - cat + - /tmp/healthy + initialDelaySeconds: 300 + readinessProbe: + exec: + command: + - cat + - /tmp/ready + initialDelaySeconds: 5 + volumeMounts: + - name: config + mountPath: /opt/config + readOnly: true + resources: + volumes: + - name: config + configMap: + # Provide the name of the ConfigMap you want to mount. + name: bridge-configmap + # An array of keys from the ConfigMap to create as files + items: + - key: "config.json" + path: "config.json" + - key: "knowledge.ttl" + path: "knowledge.ttl" + imagePullSecrets: + - name: {{ .Values.pullSecretCredentials }} \ No newline at end of file diff --git a/helm/charts/postgres/templates/minimal-postgres-manifest.yaml b/helm/charts/postgres/templates/minimal-postgres-manifest.yaml index 59e279a1..a1ac3941 100644 --- a/helm/charts/postgres/templates/minimal-postgres-manifest.yaml +++ b/helm/charts/postgres/templates/minimal-postgres-manifest.yaml @@ -43,6 +43,13 @@ spec: {{ toYaml .Values.dbUsers | indent 4 }} databases: {{ toYaml .Values.databases | indent 4 }} + preparedDatabases: +{{ toYaml .Values.preparedDatabases | indent 4 }}: + extensions: + timescaledb: public + schemas: + public: + defaultRoles: false postgresql: version: "14" parameters: diff --git a/helm/charts/postgres/values.yaml b/helm/charts/postgres/values.yaml index 391c526a..311277e3 100644 --- a/helm/charts/postgres/values.yaml +++ b/helm/charts/postgres/values.yaml @@ -54,10 +54,18 @@ dbUsers: - createdb - replication - bypassrls + pgrest: + - nosuperuser + - noreplication + - nocreaterole + - nocreatedb ## Databases; corresponding DB owner must be specified above ## databases: ngb: ngb # dbname: dbowner keycloakdb: ngb # keycloak - monitoring: ngb # alerta \ No newline at end of file + monitoring: ngb # alerta + tsdb: ngb # timescaledb ngsild timeseries data + +preparedDatabases: tsdb \ No newline at end of file diff --git a/helm/environment/default.yaml b/helm/environment/default.yaml index 9c7eef63..2baa4db2 100644 --- a/helm/environment/default.yaml +++ b/helm/environment/default.yaml @@ -91,6 +91,10 @@ kafkaBridge: replicaCount: 1 tokenRefreshInterval: 200 listenTopicRetention: 3600000 + timescaledb: + replicaCount: 1 + tokenRefreshInterval: 200 + listenTopicRetention: 3600000 velero: backupBucket: velero-backup diff --git a/helm/environment/production.yaml b/helm/environment/production.yaml index 842001d2..d81c0157 100644 --- a/helm/environment/production.yaml +++ b/helm/environment/production.yaml @@ -93,6 +93,10 @@ kafkaBridge: replicaCount: 1 tokenRefreshInterval: 200 listenTopicRetention: 28800000 + timescaledb: + replicaCount: 1 + tokenRefreshInterval: 200 + listenTopicRetention: 28800000 velero: backupBucket: velero-backup diff --git a/helm/values.yaml.gotmpl b/helm/values.yaml.gotmpl index 5acbe4d2..2001d0dd 100644 --- a/helm/values.yaml.gotmpl +++ b/helm/values.yaml.gotmpl @@ -172,6 +172,14 @@ kafkaBridge: listenTopicRetention: {{- if hasKey .StateValues.kafkaBridge.ngsildUpdates "listenTopicRetention" }} {{ .StateValues.kafkaBridge.ngsildUpdates.listenTopicRetention | quote}} {{- else }} "3600000" {{- end }} + + timescaledb: + replicaCount: 1 + tokenRefreshInterval: 200 + listenTopic: "iff.ngsild.attributes" + listenTopicRetention: {{- if hasKey .StateValues.kafkaBridge.timescaledb "listenTopicRetention" }} {{ .StateValues.kafkaBridge.ngsildUpdates.listenTopicRetention | quote}} + {{- else }} "3600000" + {{- end }} keycloak: adminName: admin