diff --git a/KafkaBridge/lib/ngsild.js b/KafkaBridge/lib/ngsild.js index 00a3b50f..7843f064 100644 --- a/KafkaBridge/lib/ngsild.js +++ b/KafkaBridge/lib/ngsild.js @@ -499,6 +499,26 @@ function fiwareApi (conf) { return rest.postBody({ options, body: data }); }; + /** + * Run batch merge operation on the entities + * @param {array[Object]} entities - Array of JSON patches to merge + * @param {array[Object]} headers - additional headers + */ + this.batchMerge = function (entities, { headers }) { + headers = headers || {}; + headers['Content-Type'] = 'application/ld+json'; + + const options = { + hostname: config.ngsildServer.hostname, + protocol: config.ngsildServer.protocol, + port: config.ngsildServer.port, + path: '/ngsi-ld/v1/entityOperations/merge', + headers: headers, + method: 'POST' + }; + return rest.postBody({ options, body: entities }); + }; + /** * Helpers */ diff --git a/KafkaBridge/lib/ngsildUpdates.js b/KafkaBridge/lib/ngsildUpdates.js index 14478254..e3af66e4 100644 --- a/KafkaBridge/lib/ngsildUpdates.js +++ b/KafkaBridge/lib/ngsildUpdates.js @@ -128,20 +128,16 @@ module.exports = function NgsildUpdates (conf) { try { // update the entity - do not create it if (op === 'update') { - // NOTE: The batch update API of Scorpio does not yet support noOverwrite options. For the time being - // the batch processing will be done sequentially - until this is fixed in Scorpio - for (const entity of entities) { // olet i = 0; i < entities.length; i ++) { - // basic health check of entity - if (entity.id === undefined || entity.id == null) { - logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity)); - } else { - logger.debug('Updating: ' + JSON.stringify(entities)); - result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers }); - if (result.statusCode !== 204 && result.statusCode !== 207) { - logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it - } + // Only batch merge is run + if (entities === undefined || entities == null) { + logger.error('Unhealthy entities - ignoring it:' + JSON.stringify(entities)); + } else { + logger.debug('Updating: ' + JSON.stringify(entities)); + result = await ngsild.batchMerge(entities, { headers }); + if (result.statusCode !== 204 && result.statusCode !== 207) { + logger.error('Entity cannot run merge:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it } - }; + } } else if (op === 'upsert') { // in this case, entity will be created if not existing logger.debug('Upserting: ' + JSON.stringify(entities)); diff --git a/KafkaBridge/test/testLibNgsild.js b/KafkaBridge/test/testLibNgsild.js index 779ebf32..3ce45167 100644 --- a/KafkaBridge/test/testLibNgsild.js +++ b/KafkaBridge/test/testLibNgsild.js @@ -971,3 +971,44 @@ describe('Test updateEntities', function () { revert(); }); }); +describe('Test batchMerge', function () { + it('Should use correct options and headers', async function () { + const Logger = function () { + return logger; + }; + const Rest = function () { + return rest; + }; + const headers = { Authorization: 'Bearer token' }; + const expectedOptions = { + hostname: 'hostname', + protocol: 'http:', + port: 1234, + method: 'POST', + path: '/ngsi-ld/v1/entityOperations/merge', + headers: { + 'Content-Type': 'application/ld+json', + Authorization: 'Bearer token' + } + }; + const rest = { + postBody: function (obj) { + assert.deepEqual(obj.options, expectedOptions); + assert.deepEqual(obj.body, entities); + return Promise.resolve('merged'); + } + }; + + const entities = [ + { id: 'id1', type: 'type1', attr1: 'value1' }, + { id: 'id2', type: 'type2', attr2: 'value2' } + ]; + + const revert = ToTest.__set__('Logger', Logger); + ToTest.__set__('Rest', Rest); + const ngsild = new ToTest(config); + const result = await ngsild.batchMerge(entities, { headers }); + result.should.equal('merged'); + revert(); + }); +}); diff --git a/KafkaBridge/test/testLibNgsildUpdates.js b/KafkaBridge/test/testLibNgsildUpdates.js index 0267e1b5..b0e76974 100644 --- a/KafkaBridge/test/testLibNgsildUpdates.js +++ b/KafkaBridge/test/testLibNgsildUpdates.js @@ -30,8 +30,8 @@ const logger = { const addSyncOnAttribute = function () {}; describe('Test libNgsildUpdates', function () { - it('Should post body with correct path and token for nonOverwrite update', async function () { - let updatePropertiesCalled = false; + it('Should post entities with correct path and token for nonOverwrite update using batchMerge', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -66,11 +66,9 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type' }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + assert.deepEqual(entities, body.entities); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -78,7 +76,10 @@ describe('Test libNgsildUpdates', function () { }); }); }, + // Stub updateProperties if needed + updateProperties: function () {}, replaceEntities: function () { + } }; }; @@ -108,11 +109,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); - it('Should post body and filter out datasetId === "@none"', async function () { - let updatePropertiesCalled = false; + it('Should post entities and filter out datasetId === "@none"', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -151,19 +152,24 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: { value: 'value' } }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + // Check top-level properties + assert.equal(entity.id, 'id'); + assert.equal(entity.type, 'type'); + + // Check attribute properties + assert.isUndefined(entity.attribute.datasetId, 'datasetId should be filtered out'); + assert.property(entity.attribute, 'value'); + assert.equal(entity.attribute.value, 'value'); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ statusCode: 204 }); }); - }, - replaceEntities: function () { } }; }; @@ -192,11 +198,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); it('Should post body and filter out datasetId === "@none" from attribute array', async function () { - let updatePropertiesCalled = false; + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -241,11 +247,12 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + assert.deepEqual(entity.id, 'id'); + assert.deepEqual(entity, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] }); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -282,11 +289,11 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); - it('Should post body and not filter out datasetId !== "@none"', async function () { - let updatePropertiesCalled = false; + it('Should post entities and not filter out datasetId !== "@none"', async function () { + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -325,11 +332,9 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type', attribute: { datasetId: 'https://example.com/source1', value: 'value' } }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + assert.deepEqual(entities, body.entities); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -366,7 +371,7 @@ describe('Test libNgsildUpdates', function () { ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute); const ngsildUpdates = new ToTest(config); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); it('Should post body with correct path and token for nonOverwrite upsert', async function () { @@ -446,7 +451,7 @@ describe('Test libNgsildUpdates', function () { revert(); }); it('Should post body with string entity', async function () { - let updatePropertiesCalled = false; + let batchMergeCalled = false; const config = { ngsildUpdates: { clientSecretVariable: 'CLIENT_SECRET', @@ -481,11 +486,12 @@ describe('Test libNgsildUpdates', function () { }; const Ngsild = function () { return { - updateProperties: function ({ id, body, isOverwrite }, { headers }) { - updatePropertiesCalled = true; - id.should.equal('id'); - assert.deepEqual(body, { id: 'id', type: 'type' }); - isOverwrite.should.equal(false); + batchMerge: function (entities, { headers }) { + batchMergeCalled = true; + entities.forEach(entity => { + assert.deepEqual(entity.id, 'id'); + assert.deepEqual(entity, { id: 'id', type: 'type' }); + }); assert.deepEqual(headers, expHeaders); return new Promise(function (resolve) { resolve({ @@ -524,7 +530,7 @@ describe('Test libNgsildUpdates', function () { const ngsildUpdates = new ToTest(config); body.entities = JSON.stringify(body.entities); await ngsildUpdates.ngsildUpdates(body); - updatePropertiesCalled.should.equal(true); + batchMergeCalled.should.equal(true); revert(); }); }); diff --git a/test/bats/test-bridges/test-ngsild-updates-bridge.bats b/test/bats/test-bridges/test-ngsild-updates-bridge.bats index 7ced51b4..8fe4c5cc 100644 --- a/test/bats/test-bridges/test-ngsild-updates-bridge.bats +++ b/test/bats/test-bridges/test-ngsild-updates-bridge.bats @@ -20,6 +20,7 @@ UPDATE_FILTER=/tmp/UPDATE_FILTER UPDATE_FILTER_NO_OVERWRITE=/tmp/UPDATE_FILTER_NO_OVERWRITE UPDATE_2_ENTITIES=/tmp/UPDATE_2_ENTITIES UPDATE_2_ENTITIES2=/tmp/UPDATE_2_ENTITIES2 +UPDATE_FILTER_MERGE=/tmp/UPDATE_FILTER_MERGE KAFKA_BOOTSTRAP=my-cluster-kafka-bootstrap:9092 KAFKACAT_NGSILD_UPDATES_TOPIC=iff.ngsild-updates FILTER_ID=urn:filter-test:12345 @@ -218,6 +219,24 @@ cat << EOF | tr -d '\n' > ${UPDATE_FILTER_NO_OVERWRITE} } EOF +cat << EOF | tr -d '\n' > ${UPDATE_FILTER_MERGE} +{ + "op": "update", + "overwriteOrReplace": "false", + "entities": [ + { + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${FILTER_ID}", + "https://industry-fusion.com/types/v0.9/strength": { + "datasetId": "https://example.com/source1", + "type": "Property", + "value": "10.9" + } + } + ] +} +EOF + get_iso_timestamp(){ echo '"'"$(date +"%Y-%m-%dT%T.%3NZ" -u)"'"' } @@ -242,7 +261,8 @@ timestamp_upsert_2_entities(){ "type": "Property", "datasetId": "https://example.com/source1", "value": "0.9", - "observedAt": ${timestamp} + "observedAt": ${timestamp}, + "unitCode": "unitCode" }, "https://industry-fusion.com/types/v0.9/hasCartridge": { "type": "Relationship", @@ -452,7 +472,8 @@ compare_inserted_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "0.9" + "value" : "0.9", + "unitCode": "unitCode" } } EOF @@ -525,24 +546,18 @@ compare_upserted_non_overwritten_entity() { "type" : "${FILTER_TYPE}", "https://industry-fusion.com/types/v0.9/hasCartridge" : { "type" : "Relationship", - "object" : "urn:filterCartridge-test:22345" + "object" : "urn:filterCartridge-test:12345" }, "https://industry-fusion.com/types/v0.9/state" : { "type" : "Property", - "value" : "OFF" + "value" : "OFFF" + }, + "https://industry-fusion.com/types/v0.9/strength" : { + "type" : "Property", + "datasetId": "https://example.com/source1", + "value": "0.9", + "unitCode": "unitCode" }, - "https://industry-fusion.com/types/v0.9/strength" : [ - { - "type" : "Property", - "datasetId": "https://example.com/source1", - "value" : "0.1" - }, - { - "type": "Property", - "value": "0.9" - } - - ], "https://industry-fusion.com/types/v0.9/strength2": { "type": "Property", "datasetId": "https://example.com/source4", @@ -570,7 +585,8 @@ compare_updated_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "0.5" + "value" : "0.5", + "unitCode": "unitCode" } } EOF @@ -669,7 +685,31 @@ compare_updated_filter_entity() { "https://industry-fusion.com/types/v0.9/strength" : { "type" : "Property", "datasetId": "https://example.com/source1", - "value" : "1.0" + "value" : "1.0", + "unitCode": "unitCode" + } +} +EOF +} + +compare_merged_filter_entity() { + cat << EOF | jq | diff "$1" - >&3 +{ + "id":"${FILTER_ID}", + "type":"${FILTER_TYPE}", + "https://industry-fusion.com/types/v0.9/hasCartridge":{ + "type":"Relationship", + "object":"urn:filterCartridge-test:12345" + }, + "https://industry-fusion.com/types/v0.9/state":{ + "type":"Property", + "value":"OFFF" + }, + "https://industry-fusion.com/types/v0.9/strength":{ + "type":"Property", + "datasetId":"https://example.com/source1", + "value":"10.9", + "unitCode":"unitCode" } } EOF @@ -753,13 +793,6 @@ teardown(){ @test "verify ngsild-update bridge is upserting and non-overwriting ngsi-ld entitiy" { $SKIP - # This test is not working properlty the entityOperations/upsert?options=update should only update existing - # property but Quarkus - # Currently the test is not changing the object. We leave it in in case in future this API is working correctly - # And will be detected by this. - # Update: In Scorpio Version 3.0.11 the program behaviour changed again but it is still not quite aligned - # with the NGSI-LD documentations, therefore we tweaked the test case to work with the current behaviour. - # When this test fails in the future, we will know that the behaviour has changed again. password=$(get_password) # shellcheck disable=SC2030 token=$(get_token) @@ -796,22 +829,6 @@ teardown(){ [ "$status" -eq 0 ] } -@test "verify ngsild-update bridge is updating with noOverwrite option ngsi-ld entitiy" { - $SKIP - kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPSERT_FILTER} - echo "# Sent upsert object to ngsi-ld-updates-bridge, wait some time to let it settle" - sleep 2 - password=$(get_password) - token=$(get_token) - kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPDATE_FILTER_NO_OVERWRITE} - echo "# Sent update object to ngsi-ld-updates-bridge, wait some time to let it settle" - sleep 2 - get_ngsild "${token}" ${FILTER_ID} | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )'| jq 'del(..|.observedAt?)' >${RECEIVED_ENTITY} - delete_ngsild "${token}" ${FILTER_ID} - run compare_updated_no_overwrite_entity ${RECEIVED_ENTITY} - [ "$status" -eq 0 ] -} - @test "verify ngsild-update bridge is upserting 2 entities" { $SKIP password=$(get_password) @@ -899,4 +916,22 @@ teardown(){ delete_ngsild "${token}" ${FILTER_ID} run compare_inserted_entity_timestamped ${RECEIVED_ENTITY} [ "$status" -eq 0 ] +} + +@test "verify deep merge behavior with update operation (non-overwrite)" { + $SKIP + password=$(get_password) + token=$(get_token) + delete_ngsild "${token}" ${FILTER_ID} + sleep 2 + kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPSERT_FILTER} + echo "# Sent initial upsert object to ngsi-ld-updates-bridge, wait some time to let it settle" + sleep 2 + kafkacat -P -t ${KAFKACAT_NGSILD_UPDATES_TOPIC} -b ${KAFKA_BOOTSTRAP} <${UPDATE_FILTER_MERGE} + echo "# Sent update object to ngsi-ld-updates-bridge, wait some time to let it settle" + sleep 2 + get_ngsild "${token}" ${FILTER_ID} | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${RECEIVED_ENTITY} + sleep 2 + run compare_merged_filter_entity ${RECEIVED_ENTITY} + [ "$status" -eq 0 ] } \ No newline at end of file diff --git a/test/bats/test-scorpio/test-scorpio-mergepatch.bats b/test/bats/test-scorpio/test-scorpio-mergepatch.bats index fcc8f6ec..3428ec39 100644 --- a/test/bats/test-scorpio/test-scorpio-mergepatch.bats +++ b/test/bats/test-scorpio/test-scorpio-mergepatch.bats @@ -10,9 +10,12 @@ USER=realm_user CLIENT_ID=scorpio KEYCLOAK_URL=http://keycloak.local/auth/realms PLASMACUTTER_ID=urn:plasmacutter-test:12345 +PLASMACUTTER_ID_2=urn:plasmacutter-test:123456 CUTTER=/tmp/CUTTER CUTTER_MERGE=/tmp/CUTTER_MERGE CUTTER_QUERY=/tmp/CUTTER_QUERY +CUTTER_BATCH=/tmp/CUTTER_BATCH +CUTTER_MERGE_BATCH=/tmp/CUTTER_MERGE_BATCH # Function definitions get_password() { @@ -27,6 +30,10 @@ create_ngsild() { curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entities/ -H "Content-Type: application/ld+json" } +create_ngsild_batch() { + curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entityOperations/upsert -H "Content-Type: application/ld+json" +} + delete_ngsild() { curl -vv -X DELETE -H "Authorization: Bearer $1" http://ngsild.local/ngsi-ld/v1/entities/"$2" -H "Content-Type: application/ld+json" } @@ -35,6 +42,10 @@ merge_patch_ngsild() { curl -vv -X PATCH -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entities/"$3" -H "Content-Type: application/ld+json" } +merge_patch_ngsild_batch() { + curl -vv -X POST -H "Authorization: Bearer $1" -d @"$2" http://ngsild.local/ngsi-ld/v1/entityOperations/merge -H "Content-Type: application/ld+json" +} + query_ngsild() { curl -s -X GET -H "Authorization: Bearer $1" http://ngsild.local/ngsi-ld/v1/entities/"$2" -H "Accept: application/json" } @@ -55,7 +66,7 @@ cat << EOF > ${CUTTER} "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "ON", - "unit": "Binary" + "unitCode": "Binary" }, "https://industry-fusion.com/types/v0.9/hasFilter": { "type": "Relationship", @@ -68,37 +79,107 @@ EOF cat << EOF > ${CUTTER_MERGE} { "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:ngsi-ld:null" + }, "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "OFF" - }, - "https://industry-fusion.com/types/v0.9/hasFilter": { + } +} +EOF + +# Create 2 cutters +cat << EOF > ${CUTTER_BATCH} +[ +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "ON", + "unitCode": "Binary" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { "type": "Relationship", - "object": "urn:ngsi-ld:null" - } + "object": "urn:filter-test:12345" + } +}, +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID_2}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "ONN", + "unitCode": "Binary" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12346" + } } +] EOF -compare_merge_patch() { - # Filter out observedAt and kafkaSyncOn from the input file - filtered_input=$(jq 'del(.."observedAt", .."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn")' "$1") +# Run merge on two cutters +cat << EOF > ${CUTTER_MERGE_BATCH} +[ +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID}", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OF" + } +}, +{ + "@context": "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld", + "id": "${PLASMACUTTER_ID_2}", + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OFF" + }, + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12347" + } +} +] +EOF - # Create the expected output, filtering out the same keys - expected_output=$(cat << EOF | jq 'del(.."observedAt", .."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn")' +compare_merge_patch() { + cat << EOF | jq | diff "$1" - >&3 { "id": "${PLASMACUTTER_ID}", "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", "https://industry-fusion.com/types/v0.9/state": { "type": "Property", "value": "OFF", - "unit": "Binary" + "unitCode": "Binary" } } EOF -) +} - # Perform the diff on the filtered JSON objects - diff <(echo "$filtered_input" | jq -S) <(echo "$expected_output" | jq -S) >&3 +compare_merge_patch_batch_second() { + cat << EOF | jq | diff "$1" - >&3 +{ + "id": "${PLASMACUTTER_ID_2}", + "type": "https://industry-fusion.com/types/v0.9/plasmacutter_test", + "https://industry-fusion.com/types/v0.9/hasFilter": { + "type": "Relationship", + "object": "urn:filter-test:12347" + }, + "https://industry-fusion.com/types/v0.9/state": { + "type": "Property", + "value": "OFF", + "unitCode": "Binary" + } +} +EOF } @test "verify merge patch behaviour" { @@ -110,9 +191,27 @@ EOF sleep 2 merge_patch_ngsild "$token" "$CUTTER_MERGE" "$PLASMACUTTER_ID" sleep 2 - query_ngsild "$token" "$PLASMACUTTER_ID" >${CUTTER_QUERY} + query_ngsild "$token" "$PLASMACUTTER_ID" | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${CUTTER_QUERY} delete_ngsild "$token" "$PLASMACUTTER_ID" run compare_merge_patch ${CUTTER_QUERY} [ "$status" -eq 0 ] } + +@test "verify merge patch behaviour with batch processing" { + $SKIP + password=$(get_password) + token=$(get_token) + delete_ngsild "$token" "$PLASMACUTTER_ID" || echo "Could not delete $PLASMACUTTER_ID. But that is okay." + delete_ngsild "$token" "$PLASMACUTTER_ID_2" || echo "Could not delete $PLASMACUTTER_ID. But that is okay." + create_ngsild_batch "$token" "$CUTTER_BATCH" + sleep 2 + merge_patch_ngsild_batch "$token" "$CUTTER_MERGE_BATCH" + sleep 2 + query_ngsild "$token" "$PLASMACUTTER_ID_2" | jq 'del( ."https://industry-fusion.com/types/v0.9/metadata/kafkaSyncOn" )' >${CUTTER_QUERY} + delete_ngsild "$token" "$PLASMACUTTER_ID" + delete_ngsild "$token" "$PLASMACUTTER_ID_2" + + run compare_merge_patch_batch_second ${CUTTER_QUERY} + [ "$status" -eq 0 ] +} diff --git a/test/build-local-platform.sh b/test/build-local-platform.sh index fdc180fd..73bea040 100644 --- a/test/build-local-platform.sh +++ b/test/build-local-platform.sh @@ -27,7 +27,7 @@ echo Build Scorpio containers if [[ $TEST -eq "true" ]]; then ( cd ../.. && git clone https://github.com/IndustryFusion/ScorpioBroker.git) - ( cd ../../ScorpioBroker && git checkout 64afd0b ) # Checking out specific commit for CI purposes + ( cd ../../ScorpioBroker && git checkout 45e40a4 ) # Checking out specific commit for CI purposes ( cd ../../ScorpioBroker && source /etc/profile.d/maven.sh && mvn clean package -DskipTests -Ddocker -Ddocker-tag=$DOCKER_TAG -Dkafka -Pkafka -Dquarkus.profile=kafka) else ( cd ../.. && git clone https://github.com/IndustryFusion/ScorpioBroker.git )