Skip to content

Commit

Permalink
Add datasetId in ngsild-bridge, core-services and shacl2flink
Browse files Browse the repository at this point in the history
DatasetId is the way to create "arrays" in semantic/graph data. The problem is currently used with the "index" field, but this
is not compatible with the Semantic Web approach. Therefore, the datasetId is added (index to be removed in a later PR).
Unfortunately, the index/datasetId is spread across many components in the PDT. It has already be introduced for debezium bridge in the previous PR.
Therefore, this PR provides updates for the rest:

* Core SQL services tables and data forwarding
* Shacl2SQL build tools
* SQL Gateway
* Ngsild-kafka-bridge
* Respective e2e tests and unit tests

Related EPIC: #429
Related Userstory: #431
Related Task: #512

Signed-off-by: marcel <[email protected]>
  • Loading branch information
wagmarcel authored and abhijith-hr committed Apr 16, 2024
1 parent f61b477 commit d4f3e4f
Show file tree
Hide file tree
Showing 16 changed files with 586 additions and 85 deletions.
50 changes: 29 additions & 21 deletions FlinkSqlGateway/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
'use strict';
const express = require('express');
const { exec } = require('child_process');
const { spawn } = require('child_process');
const uuid = require('uuid');
const fs = require('fs');
const path = require('path');
Expand All @@ -42,33 +42,41 @@ function appget (_, response) {
};

function submitJob (command, response) {
return new Promise((resolve, reject) =>
exec(command, (error, stdout, stderr) => {
if (error) {
logger.error('Error while submitting sql job: ' + error);
logger.error('Additional stdout messages from applicatino: ' + stdout);
logger.error('Additional sterr messages from applicatino: ' + stderr);
response.status(500);
response.send('Error while submitting sql job: ' + error);
reject(error);
return;
}
const args = command.split(' ');
let jobId = null;
return new Promise((resolve, reject) => {
const res = spawn(args[0], args.slice(1));
res.stdout.on('data', (data) => {
logger.info('Stdout messages from application: ' + data);
// find Job ID ind stdout, e.g.
// Job ID: e1ebb6b314c82b27cf81cbc812300d97
const regexp = /JobID=\[([0-9a-f]*)\]/i;
const found = stdout.match(regexp);
logger.debug('Server output: ' + stdout);
if (found !== null && found !== undefined) {
const jobId = found[1];
logger.debug('jobId found:' + jobId);
try {
const found = data.toString().match(regexp);
if (found !== null && found !== undefined) {
jobId = found[1];
logger.debug(`JobId matched: ${jobId}`);
}
} catch (e) {
logger.debug('No JobId found');
}
});
res.stderr.on('data', (data) => {
logger.error('Stderr messages from application: ' + data);
});
res.on('close', (code) => {
logger.info(`child process exited with code ${code}`);
if (code === 0) {
logger.info('jobId found: ' + jobId);
response.status(200).send('{ "jobid": "' + jobId + '" }');
} else { // no JOB ID found, unsuccessful
resolve();
} else {
response.status(500);
response.send('Not successfully submitted. No JOB ID found in server reply.');
reject(new Error('Could not find JOB ID'));
}
resolve();
})
);
});
});
}

const createCommand = function (dirname) {
Expand Down
9 changes: 7 additions & 2 deletions FlinkSqlGateway/submitjob/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import pathlib
import os
import sys


JARDIR = '/opt/gateway/jars'
Expand Down Expand Up @@ -57,5 +58,9 @@
for statement in d["sqlstatementset"]:
statement_set.add_insert_sql(statement)

jobresult = statement_set.execute()
print(f'JobID=[{jobresult.get_job_client().get_job_id()}]')
try:
jobresult = statement_set.execute()
print(f'JobID=[{jobresult.get_job_client().get_job_id()}]')
except Exception as e:
print(f'Error executing statement set: {e}')
sys.exit(1)
74 changes: 59 additions & 15 deletions FlinkSqlGateway/test/testGateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ describe('Test statement path', function () {
const request = {
body: body
};
const exec = function (command, output) {
command.should.equal(flinkSqlCommand + fsWriteFilename + ' --pyExecutable /usr/local/bin/python3 --pyFiles testfile');
const spawn = function (command, args, output) {
const fullcommand = flinkSqlCommand + fsWriteFilename + ' --pyExecutable /usr/local/bin/python3 --pyFiles testfile';
command.should.equal(fullcommand.split(' ')[0]);
args.should.equal(fullcommand.split(' ').slice(1));
};
const fs = {
writeFileSync: function (filename, data) {
Expand Down Expand Up @@ -101,7 +103,7 @@ describe('Test statement path', function () {

const revert = toTest.__set__({
logger: logger,
exec: exec,
spawn: spawn,
fs: fs,
getLocalPythonUdfs: getLocalPythonUdfs,
process: process
Expand Down Expand Up @@ -175,7 +177,7 @@ describe('Test apppost', function () {
statement: 'select *;'
}
};
const exec = function (command, output) {
const spawn = function (command, output) {
output(error, null, null);
};

Expand All @@ -185,7 +187,7 @@ describe('Test apppost', function () {

const revert = toTest.__set__({
logger: logger,
exec: exec,
spawn: spawn,
uuid: uuid,
fs: fs,
getLocalPythonUdfs: getLocalPythonUdfs
Expand Down Expand Up @@ -239,7 +241,7 @@ describe('Test apppost', function () {

const revert = toTest.__set__({
logger: logger,
exec: exec,
spawn: exec,
uuid: uuid,
fs: fs,
getLocalPythonUdfs: getLocalPythonUdfs
Expand Down Expand Up @@ -287,7 +289,7 @@ describe('Test apppost', function () {

const revert = toTest.__set__({
logger: logger,
exec: exec,
spawn: exec,
uuid: uuid,
fs: fs,
getLocalPythonUdfs: getLocalPythonUdfs
Expand Down Expand Up @@ -401,22 +403,36 @@ describe('Test submitJob', function () {
it('Submit without error', function (done) {
const command = 'command';
const exec = function (command, fn) {
fn('error', null, null);
return {
stdout: {
on: function (data, fn) {
fn(Buffer.from('JobID=[1]', 'utf8'));
}
},
stderr: {
on: function (data, fn) {
fn('');
}
},
on: function (ev, fn) {
fn(0);
}
};
};
const response = {
status: function (val) {
val.should.equal(500);
val.should.equal(200);
return response;
},
send: function (val) {
val.should.equal('Error while submitting sql job: error');
val.should.deep.equal('{ "jobid": "1" }');
revert();
done();
}
};
const revert = toTest.__set__({
logger: logger,
exec: exec
spawn: exec
});
const submitJob = toTest.__get__('submitJob');
submitJob(command, response);
Expand All @@ -425,7 +441,21 @@ describe('Test submitJob', function () {
it('Submit without jobId', function (done) {
const command = 'command';
const exec = function (command, fn) {
fn(null, 'nojobid', null);
return {
stdout: {
on: function (data, fn) {
fn(Buffer.from('JobID=', 'utf8'));
}
},
stderr: {
on: function (data, fn) {
fn('stderr');
}
},
on: function (ev, fn) {
fn(1);
}
};
};
const response = {
status: function (val) {
Expand All @@ -440,7 +470,7 @@ describe('Test submitJob', function () {
};
const revert = toTest.__set__({
logger: logger,
exec: exec
spawn: exec
});
const submitJob = toTest.__get__('submitJob');
submitJob(command, response);
Expand All @@ -449,7 +479,21 @@ describe('Test submitJob', function () {
it('Submit with jobId', function (done) {
const command = 'command';
const exec = function (command, fn) {
fn(null, 'JobID=[1234]', null);
return {
stdout: {
on: function (data, fn) {
fn(Buffer.from('JobID=[1234]', 'utf8'));
}
},
stderr: {
on: function (data, fn) {
fn('');
}
},
on: function (ev, fn) {
fn(0);
}
};
};
const response = {
status: function (val) {
Expand All @@ -464,7 +508,7 @@ describe('Test submitJob', function () {
};
const revert = toTest.__set__({
logger: logger,
exec: exec
spawn: exec
});
const submitJob = toTest.__get__('submitJob');
submitJob(command, response);
Expand Down
33 changes: 30 additions & 3 deletions KafkaBridge/lib/ngsildUpdates.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@ const addSyncOnAttribute = function (entities, syncOnAttribute, timestamp) {
});
};

/**
* Remove "@none" dataset ids
*
* entities: NGSILD entities
*/
const removeDefaultDatasetID = function (entities) {
entities.forEach(entity => {
Object.keys(entity).forEach(key => {
let attributes = entity[key];
if (!Array.isArray(attributes)) {
attributes = [attributes];
}
attributes.forEach(attribute => {
if (typeof attribute === 'object') {
if ('datasetId' in attribute && attribute.datasetId === '@none') {
delete attribute.datasetId;
}
}
});
});
});
};

module.exports = function NgsildUpdates (conf) {
const config = conf;
const ngsild = new NgsiLd(config);
Expand Down Expand Up @@ -93,6 +116,8 @@ module.exports = function NgsildUpdates (conf) {
} else {
entities = body.entities;
}
removeDefaultDatasetID(entities);

const overwriteOrReplace = getFlag(body.overwriteOrReplace);
const noForward = getFlag(body.noForward);
let result;
Expand All @@ -110,17 +135,19 @@ module.exports = function NgsildUpdates (conf) {
if (entity.id === undefined || entity.id == null) {
logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity));
} else {
logger.debug('Updating: ' + JSON.stringify(entities));
result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers });
if (result.statusCode !== 204 && result.statusCode !== 207) {
logger.error('Entity cannot update entity:' + JSON.stringify(result.body)); // throw no error, log it and ignore it, repeating would probably not solve it
logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
}
}
};
} else if (op === 'upsert') {
// in this case, entity will be created if not existing
logger.debug('Upserting: ' + JSON.stringify(entities));
result = await ngsild.replaceEntities(entities, overwriteOrReplace, { headers });
if (result.statusCode !== 204) {
logger.error('Cannot upsert entity:' + JSON.stringify(result.body)); // throw no error, log it and igonore it, repeating would probalby not solve it
if (result.statusCode !== 204 && result.statusCode !== 201) {
logger.error('Cannot upsert entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and igonore it, repeating would probalby not solve it
}
}
} catch (e) {
Expand Down
Loading

0 comments on commit d4f3e4f

Please sign in to comment.