From 3096624e675ece108e5d05e163d7af4e7d5133b5 Mon Sep 17 00:00:00 2001 From: Meric Feyzullahoglu Date: Fri, 13 Dec 2024 15:23:55 +0100 Subject: [PATCH] Implement merge behavior to the ngsild bridge and update Scorpio Version Update operation of ngsild bridge now exclusively uses batch processed merge patch operation. Tests are adapted to reflect behavioral change. New tests are added to verify new behavior. Scorpio version upgraded to v5.0.5. Signed-off-by: Meric Feyzullahoglu --- KafkaBridge/lib/ngsild.js | 20 +++ KafkaBridge/lib/ngsildUpdates.js | 22 ++- KafkaBridge/test/testLibNgsild.js | 41 ++++++ KafkaBridge/test/testLibNgsildUpdates.js | 86 ++++++------ .../test-ngsild-updates-bridge.bats | 117 ++++++++++------ .../test-scorpio/test-scorpio-mergepatch.bats | 129 ++++++++++++++++-- test/build-local-platform.sh | 2 +- 7 files changed, 307 insertions(+), 110 deletions(-) diff --git a/KafkaBridge/lib/ngsild.js b/KafkaBridge/lib/ngsild.js index 00a3b50f..7843f064 100644 --- a/KafkaBridge/lib/ngsild.js +++ b/KafkaBridge/lib/ngsild.js @@ -499,6 +499,26 @@ function fiwareApi (conf) { return rest.postBody({ options, body: data }); }; + /** + * Run batch merge operation on the entities + * @param {array[Object]} entities - Array of JSON patches to merge + * @param {array[Object]} headers - additional headers + */ + this.batchMerge = function (entities, { headers }) { + headers = headers || {}; + headers['Content-Type'] = 'application/ld+json'; + + const options = { + hostname: config.ngsildServer.hostname, + protocol: config.ngsildServer.protocol, + port: config.ngsildServer.port, + path: '/ngsi-ld/v1/entityOperations/merge', + headers: headers, + method: 'POST' + }; + return rest.postBody({ options, body: entities }); + }; + /** * Helpers */ diff --git a/KafkaBridge/lib/ngsildUpdates.js b/KafkaBridge/lib/ngsildUpdates.js index 14478254..e3af66e4 100644 --- a/KafkaBridge/lib/ngsildUpdates.js +++ b/KafkaBridge/lib/ngsildUpdates.js @@ -128,20 +128,16 @@ module.exports = function NgsildUpdates (conf) { try { // update the entity - do not create it if (op === 'update') { - // NOTE: The batch update API of Scorpio does not yet support noOverwrite options. For the time being - // the batch processing will be done sequentially - until this is fixed in Scorpio - for (const entity of entities) { // olet i = 0; i < entities.length; i ++) { - // basic health check of entity - if (entity.id === undefined || entity.id == null) { - logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity)); - } else { - logger.debug('Updating: ' + JSON.stringify(entities)); - result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers }); - if (result.statusCode !== 204 && result.statusCode !== 207) { - logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it - } + // Only batch merge is run + if (entities === undefined || entities == null) { + logger.error('Unhealthy entities - ignoring it:' + JSON.stringify(entities)); + } else { + logger.debug('Updating: ' + JSON.stringify(entities)); + result = await ngsild.batchMerge(entities, { headers }); + if (result.statusCode !== 204 && result.statusCode !== 207) { + logger.error('Entity cannot run merge:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it } - }; + } } else if (op === 'upsert') { // in this case, entity will be created if not existing logger.debug('Upserting: ' + JSON.stringify(entities)); diff --git a/KafkaBridge/test/testLibNgsild.js b/KafkaBridge/test/testLibNgsild.js index 779ebf32..3ce45167 100644 --- a/KafkaBridge/test/testLibNgsild.js +++ b/KafkaBridge/test/testLibNgsild.js @@ -971,3 +971,44 @@ describe('Test updateEntities', function () { revert(); }); }); +describe('Test batchMerge', function () { + it('Should use correct options and headers', async function () { + const Logger = function () { + return logger; + }; + const Rest = function () { + return rest; + }; + const headers = { Authorization: 'Bearer token' }; + const expectedOptions = { + hostname: 'hostname', + protocol: 'http:', + port: 1234, + method: 'POST', + path: '/ngsi-ld/v1/entityOperations/merge', + headers: { + 'Content-Type': 'application/ld+json', + Authorization: 'Bearer token' + } + }; + const rest = { + postBody: function (obj) { + assert.deepEqual(obj.options, expectedOptions); + assert.deepEqual(obj.body, entities); + return Promise.resolve('merged'); + } + }; + + const entities = [ + { id: 'id1', type: 'type1', attr1: 'value1' }, + { id: 'id2', type: 'type2', attr2: 'value2' } + ]; + + const revert = ToTest.__set__('Logger', Logger); + ToTest.__set__('Rest', Rest); + const ngsild = new ToTest(config); + const result = await ngsild.batchMerge(entities, { headers }); + result.should.equal('merged'); + revert(); + }); +}); diff --git a/KafkaBridge/test/testLibNgsildUpdates.js b/KafkaBridge/test/testLibNgsildUpdates.js index 0267e1b5..b0e76974 100644 --- a/KafkaBridge/test/testLibNgsildUpdates.js +++ b/KafkaBridge/test/testLibNgsildUpdates.js @@ -30,8 +30,8 @@ const logger = { const addSyncOnAttribute = function () {}; describe('Test libNgsildUpdates', function () { - it('Should post body with correct path and token for nonOverwrite update', async function () { - let updatePropertiesCalled = false; + it('Should post entities with correct path and token for nonOverwrite update using batchMerge', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -66,11 +66,9 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type' }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + assert.deepEqual(entities, body.entities); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -78,7 +76,10 @@ describe('Test libNgsildUpdates', function () { }); }); }, + // Stub updateProperties if needed + updateProperties: function () {}, replaceEntities: function () { + } }; }; @@ -108,11 +109,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); - it('Should post body and filter out datasetId === "@none"', async function () { - let updatePropertiesCalled = false; + it('Should post entities and filter out datasetId === "@none"', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -151,19 +152,24 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: { value: 'value' } }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + // Check top-level properties + assert.equal(entity.id, 'id'); + assert.equal(entity.type, 'type'); + + // Check attribute properties + assert.isUndefined(entity.attribute.datasetId, 'datasetId should be filtered out'); + assert.property(entity.attribute, 'value'); + assert.equal(entity.attribute.value, 'value'); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ statusCode: 204 }); }); - }, - replaceEntities: function () { } }; }; @@ -192,11 +198,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); it('Should post body and filter out datasetId === "@none" from attribute array', async function () { - let updatePropertiesCalled = false; + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -241,11 +247,12 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + assert.deepEqual(entity.id, 'id'); + assert.deepEqual(entity, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] }); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -282,11 +289,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); - it('Should post body and not filter out datasetId !== "@none"', async function () { - let updatePropertiesCalled = false; + it('Should post entities and not filter out datasetId !== "@none"', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -325,11 +332,9 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: { datasetId: 'https://example.com/source1', value: 'value' } }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + assert.deepEqual(entities, body.entities); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -366,7 +371,7 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); it('Should post body with correct path and token for nonOverwrite upsert', async function () { @@ -446,7 +451,7 @@ describe('Test libNgsildUpdates', function () { revert(); }); it('Should post body with string entity', async function () { - let updatePropertiesCalled = false; + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -481,11 +486,12 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type' }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + assert.deepEqual(entity.id, 'id'); + assert.deepEqual(entity, { id: 'id', type: 'type' }); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -524,7 +530,7 @@ describe('Test libNgsildUpdates', function () { const ngsildUpdates = new ToTest(config); body.entities = JSON.stringify(body.entities); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); }); diff --git a/test/bats/test-bridges/test-ngsild-updates-bridge.bats b/test/bats/test-bridges/test-ngsild-updates-bridge.bats index 7ced51b4..8fe4c5cc 100644 --- a/test/bats/test-bridges/test-ngsild-updates-bridge.bats +++ b/test/bats/test-bridges/test-ngsild-updates-bridge.bats @@ -20,6 +20,7 @@ UPDATE_FILTER=/tmp/UPDATE_FILTER UPDATE_FILTER_NO_OVERWRITE=/tmp/UPDATE_FILTER_NO_OVERWRITE UPDATE_2_ENTITIES=/tmp/UPDATE_2_ENTITIES UPDATE_2_ENTITIES2=/tmp/UPDATE_2_ENTITIES2 +UPDATE_FILTER_MERGE=/tmp/UPDATE_FILTER_MERGE KAFKA_BOOTSTRAP=my-cluster-kafka-bootstrap:9092 KAFKACAT_NGSILD_UPDATES_TOPIC=iff.ngsild-updates FILTER_ID=urn:filter-test:12345 @@ -218,6 +219,24 @@ cat << EOF | tr -d '\n' > ${UPDATE_FILTER_NO_OVERWRITE} } EOF +cat << EOF | tr -d '\n' > ${UPDATE_FILTER_MERGE} +{ + "op": "update", + "overwriteOrReplace": "false", + "entities": [ + { + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${FILTER_ID}", + "https://industry-fusion.com/types/v0.9/strength": { + "datasetId": "https://example.com/source1", + "type": "Property", + "value": "10.9" + } + } + ] +} +EOF + get_iso_timestamp(){ echo '"'"$(date +"%Y-%m-%dT%T.%3NZ" -u)"'"' } @@ -242,7 +261,8 @@ timestamp_upsert_2_entities(){ "type": "Property", "datasetId": "https://example.com/source1", "value": "0.9", - "observedAt": ${timestamp} + "observedAt": ${timestamp}, + "unitCode": "unitCode" }, "https://industry-fusion.com/types/v0.9/hasCartridge": { "type": "Relationship", @@ -452,7 +472,8 @@ compare_inserted_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "0.9" + "value" : "0.9", + "unitCode": "unitCode" } } EOF @@ -525,24 +546,18 @@ compare_upserted_non_overwritten_entity() { "type" : "${FILTER_TYPE}", "https://industry-fusion.com/types/v0.9/hasCartridge" : { "type" : "Relationship", - "object" : "urn:filterCartridge-test:22345" + "object" : "urn:filterCartridge-test:12345" }, "https://industry-fusion.com/types/v0.9/state" : { "type" : "Property", - "value" : "OFF" + "value" : "OFFF" + }, + "https://industry-fusion.com/types/v0.9/strength" : { + "type" : "Property", + "datasetId": "https://example.com/source1", + "value": "0.9", + "unitCode": "unitCode" }, - "https://industry-fusion.com/types/v0.9/strength" : [ - { - "type" : "Property", - "datasetId": "https://example.com/source1", - "value" : "0.1" - }, - { - "type": "Property", - "value": "0.9" - } - - ], "https://industry-fusion.com/types/v0.9/strength2": { "type": "Property", "datasetId": "https://example.com/source4", @@ -570,7 +585,8 @@ compare_updated_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "0.5" + "value" : "0.5", + "unitCode": "unitCode" } } EOF @@ -669,7 +685,31 @@ compare_updated_filter_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "1.0" + "value" : "1.0", + "unitCode": "unitCode" + } +} +EOF +} + +compare_merged_filter_entity() { + cat << EOF | jq | diff "$1" - >&3 +{ + "id":"${FILTER_ID}", + "type":"${FILTER_TYPE}", + "https://industry-fusion.com/types/v0.9/hasCartridge":{ + "type":"Relationship", + "object":"urn:filterCartridge-test:12345" + }, + "https://industry-fusion.com/types/v0.9/state":{ + "type":"Property", + "value":"OFFF" + }, + "https://industry-fusion.com/types/v0.9/strength":{ + "type":"Property", + "datasetId":"https://example.com/source1", + "value":"10.9", + "unitCode":"unitCode" } } EOF @@ -753,13 +793,6 @@ teardown(){ @test "verify ngsild-update bridge is upserting and non-overwriting ngsi-ld entitiy" { $SKIP - # This test is not working properlty the entityOperations/upsert?options=update should only update existing - # property but Quarkus - # Currently the test is not changing the object. We leave it in in case in future this API is working correctly - # And will be detected by this. - # Update: In Scorpio Version 3.0.11 the program behaviour changed again but it is still not quite aligned - # with the NGSI-LD documentations, therefore we tweaked the test case to work with the current behaviour. - # When this test fails in the future, we will know that the behaviour has changed again. password=$(get_password) # shellcheck disable=SC2030 token=$(get_token) @@ -796,22 +829,6 @@ teardown(){ [ "$status" -eq 0 ] } -@test "verify ngsild-update bridge is updating with noOverwrite option ngsi-ld entitiy" { - $SKIP - kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPSERT_FILTER} - echo "# Sent upsert object to ngsi-ld-updates-bridge, wait some time to let it settle" - sleep 2 - password=$(get_password) - token=$(get_token) - kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPDATE_FILTER_NO_OVERWRITE} - echo "# Sent update object to ngsi-ld-updates-bridge, wait some time to let it settle" - sleep 2 - get_ngsild "${token}" ${FILTER_ID} | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )'| jq 'del(..|.observedAt?)' >${RECEIVED_ENTITY} - delete_ngsild "${token}" ${FILTER_ID} - run compare_updated_no_overwrite_entity ${RECEIVED_ENTITY} - [ "$status" -eq 0 ] -} - @test "verify ngsild-update bridge is upserting 2 entities" { $SKIP password=$(get_password) @@ -899,4 +916,22 @@ teardown(){ delete_ngsild "${token}" ${FILTER_ID} run compare_inserted_entity_timestamped ${RECEIVED_ENTITY} [ "$status" -eq 0 ] +} + +@test "verify deep merge behavior with update operation (non-overwrite)" { + $SKIP + password=$(get_password) + token=$(get_token) + delete_ngsild "${token}" ${FILTER_ID} + sleep 2 + kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPSERT_FILTER} + echo "# Sent initial upsert object to ngsi-ld-updates-bridge, wait some time to let it settle" + sleep 2 + kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPDATE_FILTER_MERGE} + echo "# Sent update object to ngsi-ld-updates-bridge, wait some time to let it settle" + sleep 2 + get_ngsild "${token}" ${FILTER_ID} | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${RECEIVED_ENTITY} + sleep 2 + run compare_merged_filter_entity ${RECEIVED_ENTITY} + [ "$status" -eq 0 ] } \ No newline at end of file diff --git a/test/bats/test-scorpio/test-scorpio-mergepatch.bats b/test/bats/test-scorpio/test-scorpio-mergepatch.bats index fcc8f6ec..3428ec39 100644 --- a/test/bats/test-scorpio/test-scorpio-mergepatch.bats +++ b/test/bats/test-scorpio/test-scorpio-mergepatch.bats @@ -10,9 +10,12 @@ USER=realm_user CLIENT_ID=scorpio KEYCLOAK_URL=http://keycloak.local/auth/realms PLASMACUTTER_ID=urn:plasmacutter-test:12345 +PLASMACUTTER_ID_2=urn:plasmacutter-test:123456 CUTTER=/tmp/CUTTER CUTTER_MERGE=/tmp/CUTTER_MERGE CUTTER_QUERY=/tmp/CUTTER_QUERY +CUTTER_BATCH=/tmp/CUTTER_BATCH +CUTTER_MERGE_BATCH=/tmp/CUTTER_MERGE_BATCH # Function definitions get_password() { @@ -27,6 +30,10 @@ create_ngsild() { curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entities/ -H "Content-Type: application/ld+json" } +create_ngsild_batch() { + curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entityOperations/upsert -H "Content-Type: application/ld+json" +} + delete_ngsild() { curl -vv -X DELETE -H "Authorization: Bearer $1" http://ngsild.local/ngsi-ld/v1/entities/"$2" -H "Content-Type: application/ld+json" } @@ -35,6 +42,10 @@ merge_patch_ngsild() { curl -vv -X PATCH -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entities/"$3" -H "Content-Type: application/ld+json" } +merge_patch_ngsild_batch() { + curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entityOperations/merge -H "Content-Type: application/ld+json" +} + query_ngsild() { curl -s -X GET -H "Authorization: Bearer $1" http://ngsild.local/ngsi-ld/v1/entities/"$2" -H "Accept: application/json" } @@ -55,7 +66,7 @@ cat << EOF > ${CUTTER} "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "ON", - "unit": "Binary" + "unitCode": "Binary" }, "https://industry-fusion.com/types/v0.9/hasFilter": { "type": "Relationship", @@ -68,37 +79,107 @@ EOF cat << EOF > ${CUTTER_MERGE} { "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:ngsi-ld:null" + }, "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "OFF" - }, - "https://industry-fusion.com/types/v0.9/hasFilter": { + } +} +EOF + +# Create 2 cutters +cat << EOF > ${CUTTER_BATCH} +[ +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "ON", + "unitCode": "Binary" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { "type": "Relationship", - "object": "urn:ngsi-ld:null" - } + "object": "urn:filter-test:12345" + } +}, +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID_2}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "ONN", + "unitCode": "Binary" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12346" + } } +] EOF -compare_merge_patch() { - # Filter out observedAt and kafkaSyncOn from the input file - filtered_input=$(jq 'del(.."observedAt", .."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn")' "$1") +# Run merge on two cutters +cat << EOF > ${CUTTER_MERGE_BATCH} +[ +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID}", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OF" + } +}, +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID_2}", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OFF" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12347" + } +} +] +EOF - # Create the expected output, filtering out the same keys - expected_output=$(cat << EOF | jq 'del(.."observedAt", .."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn")' +compare_merge_patch() { + cat << EOF | jq | diff "$1" - >&3 { "id": "${PLASMACUTTER_ID}", "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "OFF", - "unit": "Binary" + "unitCode": "Binary" } } EOF -) +} - # Perform the diff on the filtered JSON objects - diff <(echo "$filtered_input" | jq -S) <(echo "$expected_output" | jq -S) >&3 +compare_merge_patch_batch_second() { + cat << EOF | jq | diff "$1" - >&3 +{ + "id": "${PLASMACUTTER_ID_2}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12347" + }, + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OFF", + "unitCode": "Binary" + } +} +EOF } @test "verify merge patch behaviour" { @@ -110,9 +191,27 @@ EOF sleep 2 merge_patch_ngsild "$token" "$CUTTER_MERGE" "$PLASMACUTTER_ID" sleep 2 - query_ngsild "$token" "$PLASMACUTTER_ID" >${CUTTER_QUERY} + query_ngsild "$token" "$PLASMACUTTER_ID" | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${CUTTER_QUERY} delete_ngsild "$token" "$PLASMACUTTER_ID" run compare_merge_patch ${CUTTER_QUERY} [ "$status" -eq 0 ] } + +@test "verify merge patch behaviour with batch processing" { + $SKIP + password=$(get_password) + token=$(get_token) + delete_ngsild "$token" "$PLASMACUTTER_ID" || echo "Could not delete $PLASMACUTTER_ID. But that is okay." + delete_ngsild "$token" "$PLASMACUTTER_ID_2" || echo "Could not delete $PLASMACUTTER_ID. But that is okay." + create_ngsild_batch "$token" "$CUTTER_BATCH" + sleep 2 + merge_patch_ngsild_batch "$token" "$CUTTER_MERGE_BATCH" + sleep 2 + query_ngsild "$token" "$PLASMACUTTER_ID_2" | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${CUTTER_QUERY} + delete_ngsild "$token" "$PLASMACUTTER_ID" + delete_ngsild "$token" "$PLASMACUTTER_ID_2" + + run compare_merge_patch_batch_second ${CUTTER_QUERY} + [ "$status" -eq 0 ] +} diff --git a/test/build-local-platform.sh b/test/build-local-platform.sh index fdc180fd..73bea040 100644 --- a/test/build-local-platform.sh +++ b/test/build-local-platform.sh @@ -27,7 +27,7 @@ echo Build Scorpio containers if [[ $TEST -eq "true" ]]; then ( cd ../.. && git clone https://github.com/IndustryFusion/ScorpioBroker.git) - ( cd ../../ScorpioBroker && git checkout 64afd0b ) # Checking out specific commit for CI purposes + ( cd ../../ScorpioBroker && git checkout 45e40a4 ) # Checking out specific commit for CI purposes ( cd ../../ScorpioBroker && source /etc/profile.d/maven.sh && mvn clean package -DskipTests -Ddocker -Ddocker-tag=$DOCKER_TAG -Dkafka -Pkafka -Dquarkus.profile=kafka) else ( cd ../.. && git clone https://github.com/IndustryFusion/ScorpioBroker.git )