Skip to content

Commit

Permalink
Implement DatasetID Handling to Debezium Bridge
Browse files Browse the repository at this point in the history
It manually adds datasetid if it is not present already and if
there are multiple attributes with the same datasetid, the last one
will overwrite the others. The attributes are then sorted
lexicologically according to the datasetids and indexed.
Test cases are readjusted appropriately to reflect changes.

Signed-off-by: Meric Feyzullahoglu <[email protected]>
  • Loading branch information
MericFeyz authored and wagmarcel committed Apr 12, 2024
1 parent 7151374 commit f943b50
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 18 deletions.
37 changes: 32 additions & 5 deletions KafkaBridge/lib/debeziumBridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ module.exports = function DebeziumBridge (conf) {
const { insertedAttrs, updatedAttrs, deletedAttrs } = this.diffAttributes(beforeAttrs, afterAttrs);
const isKafkaUpdate = (updatedAttrs[syncOnAttribute] !== undefined ||
insertedAttrs[syncOnAttribute] !== undefined) && body.before != null; // There is one important exception:
// When body.before is null, it means that the whole entity is created.
// This is unlikely happening over default Kafka update. Even if it is a Kafka update
// the next iteration will be an update with body.before != null so the loop will be detected.
// When body.before is null, it means that the whole entity is created.
// This is unlikely happening over default Kafka update. Even if it is a Kafka update
// the next iteration will be an update with body.before != null so the loop will be detected.
// when syncOnAttribute is used, it means that update
// did not come through API but through Kafka channel
// so do not forward to avoid 'infinity' loop
Expand All @@ -58,7 +58,7 @@ module.exports = function DebeziumBridge (conf) {
delete afterEntity[syncOnAttribute];
// isAttributesChanged: When there are changes in any of the attrs (not coming over Kafka)
const isAttributesChanged = !isKafkaUpdate && (Object.keys(updatedAttrs).length > 0 ||
Object.keys(deletedAttrs).length > 0 || Object.keys(insertedAttrs).length > 0);
Object.keys(deletedAttrs).length > 0 || Object.keys(insertedAttrs).length > 0);
// deletedEntity needs to remember type so that it can be deleted for
// all subtypes. However, type must be removed lated since it is not part of
// primary key. Deletion means to set type to null
Expand Down Expand Up @@ -129,6 +129,12 @@ module.exports = function DebeziumBridge (conf) {
obj.id = refId;
obj.entityId = id;
obj.name = key;
if ('https://uri.etsi.org/ngsi-ld/datasetId' in refObj) {
obj['https://uri.etsi.org/ngsi-ld/datasetId'] = refObj['https://uri.etsi.org/ngsi-ld/datasetId'][0]['@id'];
} else {
obj['https://uri.etsi.org/ngsi-ld/datasetId'] = '@none';
}

// extract timestamp
// timestamp is normally observedAt but it is non-mandatory
// If observedAt is missing (e.g. because it was entered over REST API)
Expand Down Expand Up @@ -165,9 +171,30 @@ module.exports = function DebeziumBridge (conf) {
} else {
return;
}
obj.index = index;
baAttrs[key].push(obj);
});
// Check if datasetId is doubled
let counter = 0;
const copyArr = [];
baAttrs[key].forEach((obj, idx) => {
let check = false;
const curDatId = obj['https://uri.etsi.org/ngsi-ld/datasetId'];
for (let i = idx + 1; i < baAttrs[key].length; i++) {
if (curDatId === baAttrs[key][i]['https://uri.etsi.org/ngsi-ld/datasetId']) {
check = true;
copyArr[counter] = baAttrs[key][i];
}
}
if (check === false) {
copyArr[counter] = baAttrs[key][idx];
counter++;
}
});
copyArr.sort((a, b) => a[['https://uri.etsi.org/ngsi-ld/datasetId']].localeCompare(b['https://uri.etsi.org/ngsi-ld/datasetId']));
copyArr.forEach((obj, idx) => { // Index it according to their sorting
obj.index = idx;
});
baAttrs[key] = copyArr;
});
return { entity: resEntity, attributes: baAttrs };
};
Expand Down
5 changes: 5 additions & 0 deletions KafkaBridge/test/testLibDebeziumBridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ describe('Test parseBeforeAfterEntity', function () {
nodeType: '@id',
name: 'https://example/hasRel',
type: 'https://uri.etsi.org/ngsi-ld/Relationship',
'https://uri.etsi.org/ngsi-ld/datasetId': '@none',
'https://uri.etsi.org/ngsi-ld/hasObject': 'urn:object:1',
index: 0
}],
Expand All @@ -292,6 +293,7 @@ describe('Test parseBeforeAfterEntity', function () {
nodeType: '@value',
name: 'https://example/prop',
type: 'https://uri.etsi.org/ngsi-ld/Property',
'https://uri.etsi.org/ngsi-ld/datasetId': '@none',
'https://uri.etsi.org/ngsi-ld/hasValue': 'value',
'https://uri.etsi.org/ngsi-ld/observedAt': [{
'@type': 'https://uri.etsi.org/ngsi-ld/DateTime',
Expand Down Expand Up @@ -362,6 +364,7 @@ describe('Test parseBeforeAfterEntity', function () {
nodeType: '@id',
name: 'https://example/hasRel',
type: 'https://uri.etsi.org/ngsi-ld/Relationship',
'https://uri.etsi.org/ngsi-ld/datasetId': '@none',
'https://uri.etsi.org/ngsi-ld/observedAt': [{
'@type': 'https://uri.etsi.org/ngsi-ld/DateTime',
'@value': '2022-02-19T20:32:26.123656Z'
Expand All @@ -376,6 +379,7 @@ describe('Test parseBeforeAfterEntity', function () {
name: 'https://example/prop',
type: 'https://uri.etsi.org/ngsi-ld/Property',
'https://uri.etsi.org/ngsi-ld/hasValue': 'value',
'https://uri.etsi.org/ngsi-ld/datasetId': '@none',
'https://uri.etsi.org/ngsi-ld/observedAt': [{
'@type': 'https://uri.etsi.org/ngsi-ld/DateTime',
'@value': '2022-02-19T20:31:26.123656Z'
Expand Down Expand Up @@ -432,6 +436,7 @@ describe('Test parseBeforeAfterEntity', function () {
nodeType: '@value',
name: 'https://example/prop',
type: 'https://uri.etsi.org/ngsi-ld/Property',
'https://uri.etsi.org/ngsi-ld/datasetId': '@none',
'https://uri.etsi.org/ngsi-ld/hasValue': 'value',
'https://uri.etsi.org/ngsi-ld/observedAt': [{
'@type': 'https://uri.etsi.org/ngsi-ld/DateTime',
Expand Down
40 changes: 27 additions & 13 deletions test/bats/test-bridges/test-debezium-bridge.bats
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ cat << EOF > ${CUTTER}
"https://industry-fusion.com/types/v0.9/hasWorkpiece": [
{
"type": "Relationship",
"object": "urn:workpiece-test:12345"
"object": "urn:workpiece-test:12345",
"datasetId": "urn:workpiece-test:12345-ZZZ"
},
{
"type": "Relationship",
Expand All @@ -59,7 +60,8 @@ cat << EOF > ${CUTTER}
"type": "Property",
"value": {
"https://industry-fusion.com/types/v0.9/my": "json1"
}
},
"datasetId": "urn:json-value-test:json1"
},
{
"type": "Property",
Expand All @@ -72,7 +74,8 @@ cat << EOF > ${CUTTER}
"https://industry-fusion.com/types/v0.9/multiState": [
{
"type": "Property",
"value": "OFF"
"value": "OFF",
"datasetId": "urn:multistate-test:off"
},
{
"type": "Property",
Expand All @@ -99,6 +102,7 @@ cat << EOF > ${CUTTER_TIMESTAMPED}
{
"type": "Property",
"value": "ON",
"datasetId": "urn:state-test:on",
"observedAt": "2023-01-08T01:10:35.555Z"
}
],
Expand Down Expand Up @@ -132,50 +136,60 @@ compare_create_attributes() {
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/hasFilter",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/hasFilter",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Relationship",\
"https://uri.etsi.org/ngsi-ld/hasObject":"urn:filter-test:12345","nodeType":"@id","index":0}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/hasWorkpiece",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/hasWorkpiece",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Relationship",\
"https://uri.etsi.org/ngsi-ld/hasObject":"urn:workpiece-test:12345",\
"https://uri.etsi.org/ngsi-ld/hasObject":"urn:workpiece-test:23456",\
"nodeType":"@id","index":0}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/hasWorkpiece",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/hasWorkpiece",\
"https://uri.etsi.org/ngsi-ld/datasetId":"urn:workpiece-test:12345-ZZZ",\
"type":"https://uri.etsi.org/ngsi-ld/Relationship",\
"https://uri.etsi.org/ngsi-ld/hasObject":"urn:workpiece-test:23456",\
"https://uri.etsi.org/ngsi-ld/hasObject":"urn:workpiece-test:12345",\
"nodeType":"@id","index":1}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/jsonValueArray",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/jsonValueArray",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"{\"https://industry-fusion.com/types/v0.9/my\":[{\"@value\":\"json1\"}]}",\
"nodeType":"@json","index":0}
"https://uri.etsi.org/ngsi-ld/hasValue":"{\"@type\":[\"https://industry-fusion.com/types/v0.9/myJsonType\"],\"https://industry-fusion.com/types/v0.9/my\":[{\"@value\":\"json2\"}]}",\
"nodeType":"@json","valueType":["https://industry-fusion.com/types/v0.9/myJsonType"],"index":0}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/jsonValueArray",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/jsonValueArray",\
"https://uri.etsi.org/ngsi-ld/datasetId":"urn:json-value-test:json1",\
"type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"{\"@type\":[\"https://industry-fusion.com/types/v0.9/myJsonType\"],\"https://industry-fusion.com/types/v0.9/my\":[{\"@value\":\"json2\"}]}",\
"nodeType":"@json","valueType":["https://industry-fusion.com/types/v0.9/myJsonType"],"index":1}
"https://uri.etsi.org/ngsi-ld/hasValue":"{\"https://industry-fusion.com/types/v0.9/my\":[{\"@value\":\"json1\"}]}",\
"nodeType":"@json","index":1}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/jsonValue",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/jsonValue",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"{\"@type\":[\"https://industry-fusion.com/types/v0.9/myJsonType\"],\"https://industry-fusion.com/types/v0.9/my\":[{\"@value\":\"json\"}]}",\
"nodeType":"@json","valueType":["https://industry-fusion.com/types/v0.9/myJsonType"],"index":0}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/multiState",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/multiState","type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"OFF","nodeType":"@value","index":0}
"name":"https://industry-fusion.com/types/v0.9/multiState",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Property","https://uri.etsi.org/ngsi-ld/hasValue":"ON",\
"nodeType":"@value","valueType":"https://industry-fusion.com/types/v0.9/multiStateType","index":0}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/multiState",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/multiState",\
"type":"https://uri.etsi.org/ngsi-ld/Property","https://uri.etsi.org/ngsi-ld/hasValue":"ON",\
"nodeType":"@value","valueType":"https://industry-fusion.com/types/v0.9/multiStateType","index":1}
"https://uri.etsi.org/ngsi-ld/datasetId":"urn:multistate-test:off",\
"type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"OFF","nodeType":"@value","index":1}
{"id":"${PLASMACUTTER_ID}\\\https://industry-fusion.com/types/v0.9/state",\
"entityId":"${PLASMACUTTER_ID}",\
"name":"https://industry-fusion.com/types/v0.9/state",\
"https://uri.etsi.org/ngsi-ld/datasetId":"@none",\
"type":"https://uri.etsi.org/ngsi-ld/Property",\
"https://uri.etsi.org/ngsi-ld/hasValue":"OFF","nodeType":"@value","index":0}
EOF
Expand Down

0 comments on commit f943b50

Please sign in to comment.