diff --git a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/integration/ITTestDataBridgeAASPollingConsumer.java b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/integration/ITTestDataBridgeAASPollingConsumer.java index f1578731..af44205d 100644 --- a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/integration/ITTestDataBridgeAASPollingConsumer.java +++ b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/integration/ITTestDataBridgeAASPollingConsumer.java @@ -26,8 +26,6 @@ import java.util.UUID; -import org.eclipse.basyx.aas.aggregator.api.IAASAggregator; -import org.eclipse.basyx.aas.aggregator.proxy.AASAggregatorProxy; import org.eclipse.digitaltwin.basyx.databridge.executable.regression.DataBridgeSuiteAASPollingConsumer; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; @@ -41,18 +39,13 @@ public class ITTestDataBridgeAASPollingConsumer extends DataBridgeSuiteAASPollingConsumer { private static String BROKER_HOST = "broker.mqttdashboard.com"; - private static String HOST = "localhost"; @Override protected MqttClient getMqttClient() throws MqttException { String publisherId = UUID.randomUUID().toString(); - return new MqttClient("tcp://" + BROKER_HOST+ ":1883", publisherId); - } - - @Override - protected IAASAggregator getAASAggregatorProxy() { - return new AASAggregatorProxy("http://" + HOST + ":4001"); + return new MqttClient("tcp://" + BROKER_HOST+ ":1883", publisherId, new MemoryPersistence()); } + } diff --git a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/DataBridgeSuiteAASPollingConsumer.java b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/DataBridgeSuiteAASPollingConsumer.java index 0b4364ca..f7e17d6a 100644 --- a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/DataBridgeSuiteAASPollingConsumer.java +++ b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/DataBridgeSuiteAASPollingConsumer.java @@ -30,7 +30,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; -import org.eclipse.basyx.aas.aggregator.api.IAASAggregator; +import java.util.concurrent.TimeUnit; + import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -53,7 +54,6 @@ public abstract class DataBridgeSuiteAASPollingConsumer { protected abstract MqttClient getMqttClient() throws MqttException; - protected abstract IAASAggregator getAASAggregatorProxy(); private static Logger logger = LoggerFactory.getLogger(DataBridgeSuiteAASPollingConsumer.class); private static String user_name = "test1"; private static String password = "1234567"; @@ -81,7 +81,7 @@ private void assertPropertyValue(String expectedValue, String topic) throws Mqtt fetchExpectedValue(topic); - assertEquals(receivedMessage, expectedValue); + assertEquals(expectedValue, receivedMessage); } @@ -100,7 +100,6 @@ public void connectionLost(Throwable cause) { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - receivedMessage = new String(message.getPayload(), StandardCharsets.UTF_8); } @@ -109,7 +108,6 @@ public void deliveryComplete(IMqttDeliveryToken token) { } }); - mqttClient.subscribe(currentTopic); waitForPropagation(); mqttClient.disconnect(); @@ -125,8 +123,8 @@ private MqttClient mqttConnectionInitiate() throws MqttException { MqttClient mqttClient = getMqttClient(); MqttConnectOptions connOpts = setUpMqttConnection(user_name, password); - connOpts.setCleanSession(true); mqttClient.connect(connOpts); + connOpts.setCleanSession(true); return mqttClient; } @@ -138,7 +136,7 @@ private static MqttConnectOptions setUpMqttConnection(String username, String pa } private static void waitForPropagation() throws InterruptedException { - Thread.sleep(6000); + TimeUnit.SECONDS.sleep(10); } private String wrapStringValue(String value) { diff --git a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeAASPollingConsumer.java b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeAASPollingConsumer.java index e73aa177..ff535b02 100644 --- a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeAASPollingConsumer.java +++ b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeAASPollingConsumer.java @@ -27,8 +27,7 @@ import java.io.IOException; import java.util.UUID; -import org.eclipse.basyx.aas.aggregator.api.IAASAggregator; -import org.eclipse.basyx.aas.aggregator.proxy.AASAggregatorProxy; + import org.eclipse.basyx.components.aas.AASServerComponent; import org.eclipse.basyx.components.aas.configuration.AASServerBackend; import org.eclipse.basyx.components.aas.configuration.BaSyxAASServerConfiguration; @@ -52,7 +51,6 @@ */ public class TestDataBridgeAASPollingConsumer extends DataBridgeSuiteAASPollingConsumer { - private static final String AAS_AGGREGATOR_URL = "http://localhost:4001"; private static AASServerComponent aasServer; private static String BROKER_URL = "tcp://broker.mqttdashboard.com:1883"; private static Server mqttBroker; @@ -75,11 +73,6 @@ protected MqttClient getMqttClient() throws MqttException { return new MqttClient(BROKER_URL, publisherId, new MemoryPersistence()); } - @Override - protected IAASAggregator getAASAggregatorProxy() { - return new AASAggregatorProxy(AAS_AGGREGATOR_URL); - } - private static void startMqttBroker() throws IOException { mqttBroker = new Server(); diff --git a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeMqtt.java b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeMqtt.java index 58682946..5179a7c1 100644 --- a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeMqtt.java +++ b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDataBridgeMqtt.java @@ -71,7 +71,7 @@ public static void setUp() throws IOException { startUpdaterComponent(); } - private static void startMqttBroker() throws IOException { + protected static void startMqttBroker() throws IOException { mqttBroker = new Server(); IResourceLoader classpathLoader = new ClasspathResourceLoader(); @@ -80,7 +80,7 @@ private static void startMqttBroker() throws IOException { mqttBroker.startServer(classPathConfig); } - private static void configureAndStartAASServer() { + protected static void configureAndStartAASServer() { BaSyxContextConfiguration aasContextConfig = new BaSyxContextConfiguration(4001, ""); BaSyxAASServerConfiguration aasConfig = new BaSyxAASServerConfiguration(AASServerBackend.INMEMORY, "aasx/updatertest.aasx"); diff --git a/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDatabeidgeMultipleDataSinks.java b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDatabeidgeMultipleDataSinks.java new file mode 100644 index 00000000..d736d826 --- /dev/null +++ b/databridge.component/src/test/java/org/eclipse/digitaltwin/basyx/databridge/executable/regression/TestDatabeidgeMultipleDataSinks.java @@ -0,0 +1,47 @@ +/******************************************************************************* + * Copyright (C) 2024 the Eclipse BaSyx Authors + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + ******************************************************************************/ +package org.eclipse.digitaltwin.basyx.databridge.executable.regression; + +import java.io.IOException; + +import org.eclipse.digitaltwin.basyx.databridge.component.DataBridgeExecutable; +import org.junit.BeforeClass; + +/** + * @author jungjan + */ +public class TestDatabeidgeMultipleDataSinks extends TestDataBridgeMqtt { + + @BeforeClass + public static void setUp() throws IOException { + TestDataBridgeMqtt.configureAndStartAASServer(); + TestDataBridgeMqtt.startMqttBroker(); + startUpdaterComponent(); + } + + protected static void startUpdaterComponent() { + DataBridgeExecutable.main(new String[] {"src/test/resources/mqtt/multisink-databridge"}); + } +} diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/aas.properties b/databridge.component/src/test/resources/mqtt/multisink-databridge/aas.properties new file mode 100644 index 00000000..ed7e8f74 --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/aas.properties @@ -0,0 +1,64 @@ +# ############################# +# AAS Server configuration file +# ############################# + +# ############################# +# Backend +# ############################# +# Specifies the backend that loads the AAS and Submodels + +# InMemory - does not persist AAS or submodels +aas.backend=InMemory + +# MongoDB - persists data within a MongoDB +# See connection configuration in mongodb.properties +# aas.backend=MongoDB + +# ############################# +# Source +# ############################# +# Possible to load an AAS Environment from a file + +aas.source=/usr/share/config/updatertest.aasx + +# Other examples (Currently supported: *.xml, *.json and *.aasx): +# aas.source=aasx/myAAS.aasx +# aas.source=aasx/myAAS.xml +# aas.source=aasx/myAAS.json +# Or when encapsulated in the docker volume for this container: +# aas.source=/usr/share/config/myAAS.aasx + +# ############################# +# MQTT +# ############################# +# Possible to enable MQTT events + +aas.events=NONE +# aas.events=MQTT + +# ############################# +# AASX Upload +# ############################# +# Possible to enable AASX Upload + +aas.aasxUpload=Disabled +# aas.aasxUpload=Enabled + + +# ############################# +# Registry +# ############################# +# If specified, can directly registers the AAS that has been loaded from the source file + +# Path specifies the registry endpoint +# registry.path=http://localhost:4000/registry/ + +# Hostpath specifies the endpoint of the deployed AAS component +# If hostpath is empty, the registered AAS endpoint is derived from the context properties +# registry.hostpath= + +# If one or more submodels are specified here, only the submodels will be registered at the +# registry. This can be used for distributed submodel deployments +# In case of an empty or no list, this does not have an effect. By default, all submodels +# are registered at a given registry. +# registry.submodels=["smId1","smId2"] diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/aasserver.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/aasserver.json new file mode 100644 index 00000000..c00c6398 --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/aasserver.json @@ -0,0 +1,12 @@ +[ + { + "uniqueId": "ConnectedSubmodel/ConnectedPropertyA", + "submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel", + "idShortPath": "ConnectedPropertyA" + }, + { + "uniqueId": "ConnectedSubmodel/ConnectedPropertyB", + "submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel", + "idShortPath": "ConnectedPropertyB" + } +] diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/context.properties b/databridge.component/src/test/resources/mqtt/multisink-databridge/context.properties new file mode 100644 index 00000000..b4d04d54 --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/context.properties @@ -0,0 +1,24 @@ +# ############################### +# HTTP Context configuration file +# ############################### + +# ############################### +# Context Path +# ############################### +# Specifies the subpath in the url for this server context + +contextPath= + +# ############################### +# Hostname +# ############################### +# Specifies the hostname for this server context + +contextHostname=localhost + +# ############################### +# Port +# ############################### +# Specifies the port for this server context + +contextPort=4001 diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataA.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataA.json new file mode 100644 index 00000000..8ae6a6db --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataA.json @@ -0,0 +1 @@ +$sum(Account.Order.Product.(Price * Quantity)) diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataB.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataB.json new file mode 100644 index 00000000..cb02082d --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonataB.json @@ -0,0 +1 @@ +Account.Order[0].Product[0].ProductID diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonatatransformer.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonatatransformer.json new file mode 100644 index 00000000..e233ac24 --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/jsonatatransformer.json @@ -0,0 +1,14 @@ +[ + { + "uniqueId": "jsonataA", + "queryPath": "jsonataA.json", + "inputType": "JsonString", + "outputType": "JsonString" + }, + { + "uniqueId": "jsonataB", + "queryPath": "jsonataB.json", + "inputType": "JsonString", + "outputType": "JsonString" + } +] diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/mqttconsumer.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/mqttconsumer.json new file mode 100644 index 00000000..2029cd5f --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/mqttconsumer.json @@ -0,0 +1,8 @@ +[ + { + "uniqueId": "property1", + "serverUrl": "127.0.0.1", + "serverPort": 1884, + "topic": "PropertyB" + } +] diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/routes.json b/databridge.component/src/test/resources/mqtt/multisink-databridge/routes.json new file mode 100644 index 00000000..7bc75129 --- /dev/null +++ b/databridge.component/src/test/resources/mqtt/multisink-databridge/routes.json @@ -0,0 +1,13 @@ +[ + { + "datasource": "property1", + "transformers": ["jsonataA", "jsonataB"], + "datasinks": ["ConnectedSubmodel/ConnectedPropertyA", "ConnectedSubmodel/ConnectedPropertyB"], + "datasinkMappingConfiguration": + { + "ConnectedSubmodel/ConnectedPropertyA": ["jsonataA"], + "ConnectedSubmodel/ConnectedPropertyB": ["jsonataB"] + }, + "trigger": "event" + } +] diff --git a/databridge.component/src/test/resources/mqtt/multisink-databridge/updatertest.aasx b/databridge.component/src/test/resources/mqtt/multisink-databridge/updatertest.aasx new file mode 100644 index 00000000..1297badb Binary files /dev/null and b/databridge.component/src/test/resources/mqtt/multisink-databridge/updatertest.aasx differ diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/loader/FileConfigurationLoader.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/loader/FileConfigurationLoader.java index 2b340b86..26d02d5e 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/loader/FileConfigurationLoader.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/loader/FileConfigurationLoader.java @@ -95,6 +95,7 @@ public Object loadConfiguration() { * file path and the resource loader * @return */ + @SuppressWarnings("resource") private InputStreamReader getJsonReader() { InputStream stream = null; diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/AbstractRouteCreator.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/AbstractRouteCreator.java index b9cf2c36..146863d2 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/AbstractRouteCreator.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/AbstractRouteCreator.java @@ -23,9 +23,10 @@ * SPDX-License-Identifier: MIT ******************************************************************************/ - package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core; +import java.util.Map; + import org.apache.camel.builder.RouteBuilder; public abstract class AbstractRouteCreator implements IRouteCreator { @@ -54,10 +55,17 @@ public void addRouteToRouteBuilder(RouteConfiguration routeConfig) { String dataSourceEndpoint = RouteCreatorHelper.getDataSourceEndpoint(routesConfiguration, routeConfig.getDatasource()); String[] dataSinkEndpoints = RouteCreatorHelper.getDataSinkEndpoints(routesConfiguration, routeConfig.getDatasinks()); String[] dataTransformerEndpoints = RouteCreatorHelper.getDataTransformerEndpoints(routesConfiguration, routeConfig.getTransformers()); + Map datasinkMapping = RouteCreatorHelper.getDataSinkMapping(routesConfiguration, routeConfig.getDatasinkMappingConfiguration()); String routeId = routeConfig.getRouteId(); - configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId); + if (datasinkMapping == null || datasinkMapping.isEmpty()) { + configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, routeId); + } else { + configureRoute(routeConfig, dataSourceEndpoint, dataSinkEndpoints, dataTransformerEndpoints, datasinkMapping, routeId); + } } protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId); + + protected abstract void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map DataSinkMapping, String routeId); } diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteConfiguration.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteConfiguration.java index c38e2ad6..403edd36 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteConfiguration.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteConfiguration.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (C) 2021 the Eclipse BaSyx Authors + * Copyright (C) 2024 the Eclipse BaSyx Authors * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -29,12 +29,16 @@ import java.util.List; import java.util.Map; +/** + * @author DataBridge authors, jungjan + */ public class RouteConfiguration { private String trigger; private String routeId; private String datasource; private List transformers = new ArrayList<>(); private List datasinks = new ArrayList<>(); + private Map datasinkMappingConfiguration; private Map triggerData = new HashMap<>(); @@ -42,11 +46,46 @@ public RouteConfiguration() { } /** + * Constructs a new RouteConfiguration object with a mapping configuration to + * map distinct transformators to multiple datasinks. + * * @param trigger + * the trigger for the route configuration * @param routeId + * the ID of the route * @param datasource + * the datasource associated with the route * @param transformers + * the list of transformers to be applied in the route * @param datasinks + * the list of datasinks to which data should be routed + * @param datasinkMappingConfiguration + * the mapping configuration for datasinks, mapping each datasink to + * its corresponding configuration + */ + public RouteConfiguration(String trigger, String datasource, List transformers, List datasinks, Map datasinkMappingConfiguration) { + this.trigger = trigger; + this.datasource = datasource; + this.transformers = transformers; + this.datasinks = datasinks; + this.datasinkMappingConfiguration = datasinkMappingConfiguration; + } + + /** + * Constructs a new RouteConfiguration object without a mapping configuration to + * map distinct transformators to multiple datasinks. I.e., all transformators + * would be equally applied to all data sinks. + * + * @param trigger + * the trigger for the route configuration + * @param routeId + * the ID of the route (optional, can be null) + * @param datasource + * the datasource associated with the route + * @param transformers + * the list of transformers to be applied in the route + * @param datasinks + * the list of datasinks to which data should be routed */ public RouteConfiguration(String trigger, String datasource, List transformers, List datasinks) { this.trigger = trigger; @@ -56,7 +95,7 @@ public RouteConfiguration(String trigger, String datasource, List transf } public RouteConfiguration(RouteConfiguration configuration) { - this(configuration.getRouteTrigger(), configuration.getDatasource(), configuration.getTransformers(), configuration.getDatasinks()); + this(configuration.getRouteTrigger(), configuration.getDatasource(), configuration.getTransformers(), configuration.getDatasinks(), configuration.getDatasinkMappingConfiguration()); setRouteId(configuration.getRouteId()); this.triggerData = configuration.triggerData; } @@ -97,4 +136,12 @@ public void setDatasource(String datasource) { this.datasource = datasource; } + public Map getDatasinkMappingConfiguration() { + return datasinkMappingConfiguration; + } + + public void setDatasinkMappingConfiguration(Map datasinkMappingConfiguration) { + this.datasinkMappingConfiguration = datasinkMappingConfiguration; + } + } diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteCreatorHelper.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteCreatorHelper.java index 761b58f9..113cf8ca 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteCreatorHelper.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/core/RouteCreatorHelper.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (C) 2021 the Eclipse BaSyx Authors + * Copyright (C) 2024 the Eclipse BaSyx Authors * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -24,35 +24,57 @@ ******************************************************************************/ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +/** + * @author DataBridge authors, jungjan + */ public class RouteCreatorHelper { private RouteCreatorHelper() { } public static String getDataSourceEndpoint(RoutesConfiguration routesConfiguration, String dataSourceId) { - return routesConfiguration.getDatasources().get(dataSourceId).getConnectionURI(); + return routesConfiguration.getDatasources() + .get(dataSourceId) + .getConnectionURI(); } public static String getDataSinkEndpoint(RoutesConfiguration routesConfiguration, String dataSinkId) { - return routesConfiguration.getDatasinks().get(dataSinkId).getConnectionURI(); + return routesConfiguration.getDatasinks() + .get(dataSinkId) + .getConnectionURI(); } public static String[] getDataSinkEndpoints(RoutesConfiguration routesConfiguration, List dataSinkIdList) { - List endpoints = new ArrayList<>(); - for (String dataSinkId : dataSinkIdList) { - endpoints.add(routesConfiguration.getDatasinks().get(dataSinkId).getConnectionURI()); - } + return dataSinkIdList.stream() + .map(routesConfiguration.getDatasinks()::get) + .map(dataSinkConfiguration -> dataSinkConfiguration.getConnectionURI()) + .toArray(String[]::new); + + } - return endpoints.toArray(new String[0]); + public static String[] getDataTransformerEndpoints(RoutesConfiguration routesConfiguration, List transformerIdLists) { + return transformerIdLists.stream() + .map(routesConfiguration.getTransformers()::get) + .map(dataTransformerConfiguration -> dataTransformerConfiguration.getConnectionURI()) + .toArray(String[]::new); } - public static String[] getDataTransformerEndpoints(RoutesConfiguration routesConfiguration, List transformerIdList) { - List endpoints = new ArrayList<>(); - for (String transformerId : transformerIdList) { - endpoints.add(routesConfiguration.getTransformers().get(transformerId).getConnectionURI()); + public static Map getDataSinkMapping(RoutesConfiguration routesConfiguration, Map datasinkMappingConfiguration) { + if (datasinkMappingConfiguration == null || datasinkMappingConfiguration.isEmpty()) { + return null; } - return endpoints.toArray(new String[0]); + Set dataSinkIds = datasinkMappingConfiguration.keySet(); + Map resolvedDataSinkEndpoints = dataSinkIds.stream() + .collect(Collectors.toMap(dataSinkId -> dataSinkId, dataSinkId -> routesConfiguration.getDatasinks() + .get(dataSinkId) + .getConnectionURI())); + + return dataSinkIds.stream() + .collect(Collectors.toMap(resolvedDataSinkEndpoints::get, dataSinkId -> getDataTransformerEndpoints(routesConfiguration, Arrays.asList(datasinkMappingConfiguration.get(dataSinkId))))); } } diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteConfiguration.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteConfiguration.java index 4916b4ec..3ad28c75 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteConfiguration.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteConfiguration.java @@ -25,11 +25,12 @@ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.event; import java.util.List; +import java.util.Map; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration; /** - * A connection of a single route (source, transformer(s), sink(s)) + * A connection of a single route (source, transformer(s), sink(s), sinkmapper(s) * * @author haque, fischer * @@ -40,6 +41,10 @@ public class EventRouteConfiguration extends RouteConfiguration { public EventRouteConfiguration(String datasource, List transformers, List datasinks) { super(ROUTE_TRIGGER, datasource, transformers, datasinks); } + + public EventRouteConfiguration(String datasource, List transformers, List datasinks, Map datasinkMapping) { + super(ROUTE_TRIGGER, datasource, transformers, datasinks, datasinkMapping); + } public EventRouteConfiguration(RouteConfiguration configuration) { super(configuration); diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteCreator.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteCreator.java index 8af93954..1d6d2df2 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteCreator.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/event/EventRouteCreator.java @@ -24,27 +24,60 @@ ******************************************************************************/ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.event; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.MulticastDefinition; import org.apache.camel.model.RouteDefinition; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.AbstractRouteCreator; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; public class EventRouteCreator extends AbstractRouteCreator { - public EventRouteCreator(RouteBuilder routeBuilder, RoutesConfiguration routesConfiguration) { super(routeBuilder, routesConfiguration); } @Override protected void configureRoute(RouteConfiguration routeConfiguration, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId) { - RouteDefinition routeDefinition = getRouteBuilder().from(dataSourceEndpoint).routeId(routeId).to("log:" + routeId); + RouteDefinition routeDefinition = startRouteDefinition(dataSourceEndpoint, routeId); if (!(dataTransformerEndpoints == null || dataTransformerEndpoints.length == 0)) { - routeDefinition.to(dataTransformerEndpoints).to("log:" + routeId); + routeDefinition.to(dataTransformerEndpoints) + .to("log:" + routeId); } - routeDefinition.to(dataSinkEndpoints[0]).to("log:" + routeId); + routeDefinition.to(dataSinkEndpoints) + .to("log:" + routeId); + } + + @Override + protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map dataSinkMapping, String routeId) { + MulticastDefinition routeDefinition = startRouteDefinition(dataSourceEndpoint, routeId).multicast(); + dataSinkMapping.forEach((dataSink, dataTransformers) -> routeDefinition.pipeline() + .to(dataTransformers) + .to(dataSink) + .to("log:" + routeId)); + + getUnmappedEndpoints(dataSinkEndpoints, dataSinkMapping).forEach(dataSink -> routeDefinition.to(dataSink) + .to("log: " + routeId)); + + routeDefinition.end(); } -} \ No newline at end of file + private List getUnmappedEndpoints(String[] dataSinkEndpoints, Map dataSinkMapping) { + return Arrays.stream(dataSinkEndpoints) + .filter(Predicate.not(dataSinkMapping::containsKey)) + .collect(Collectors.toList()); + } + + private RouteDefinition startRouteDefinition(String dataSourceEndpoint, String routeId) { + return getRouteBuilder().from(dataSourceEndpoint) + .routeId(routeId) + .to("log:" + routeId); + } +} diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteConfiguration.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteConfiguration.java index cd687ce2..e0322586 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteConfiguration.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteConfiguration.java @@ -25,6 +25,7 @@ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.request; import java.util.List; +import java.util.Map; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration; import org.springframework.http.HttpMethod; @@ -53,6 +54,10 @@ public RequestRouteConfiguration(String datasource, List transformers, L super(ROUTE_TRIGGER, datasource, transformers, datasinks); } + public RequestRouteConfiguration(String datasource, List transformers, List datasinks, Map datasinkMapping) { + super(ROUTE_TRIGGER, datasource, transformers, datasinks, datasinkMapping); + } + public RequestRouteConfiguration(RouteConfiguration configuration) { super(configuration); host = (String) getTriggerData().get(HOST); @@ -85,7 +90,6 @@ public void setPort(String port) { } public String getRequestEndpointURI() { - return REQUEST_COMPONENT + ":" + REQUEST_PROTOCOL + "://" + getHost() + ":" + getPort() + getPath() + "?" - + HTTP_METHOD_RESTRICT_PARAMETER; + return REQUEST_COMPONENT + ":" + REQUEST_PROTOCOL + "://" + getHost() + ":" + getPort() + getPath() + "?" + HTTP_METHOD_RESTRICT_PARAMETER; } } diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteCreator.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteCreator.java index 75c35711..5a176f07 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteCreator.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/request/RequestRouteCreator.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (C) 2023 the Eclipse BaSyx Authors + * Copyright (C) 2024 the Eclipse BaSyx Authors * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -24,7 +24,14 @@ ******************************************************************************/ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.request; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.MulticastDefinition; import org.apache.camel.model.RouteDefinition; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.delegator.handler.ResponseOkCodeHandler; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.AbstractRouteCreator; @@ -34,7 +41,7 @@ /** * Configures and creates the request route * - * @author danish + * @author danish, jungjan * */ public class RequestRouteCreator extends AbstractRouteCreator { @@ -45,21 +52,43 @@ public RequestRouteCreator(RouteBuilder routeBuilder, RoutesConfiguration routes } @Override - protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, - String[] dataTransformerEndpoints, String routeId) { - String delegatorEndpoint = ((RequestRouteConfiguration) routeConfig).getRequestEndpointURI(); - - RouteDefinition routeDefinition = createRoute(dataSourceEndpoint, routeId, delegatorEndpoint); + protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId) { + RouteDefinition routeDefinition = startRouteDefinition((RequestRouteConfiguration) routeConfig, dataSourceEndpoint, routeId); if (!(dataTransformerEndpoints == null || dataTransformerEndpoints.length == 0)) { - routeDefinition.to(dataTransformerEndpoints).log("Transformer : " + routeId); + routeDefinition.to(dataTransformerEndpoints) + .log("Transformer : " + routeId); } + routeDefinition.to(dataSinkEndpoints) + .bean(new ResponseOkCodeHandler()); + } + + @Override + protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map dataSinkMapping, String routeId) { + MulticastDefinition routeDefinition = startRouteDefinition((RequestRouteConfiguration) routeConfig, dataSourceEndpoint, routeId).multicast(); + dataSinkMapping.forEach((dataSink, dataTransformers) -> routeDefinition.pipeline() + .to(dataTransformers) + .to(dataSink) + .to("log:" + routeId)); + + getUnmappedEndpoints(dataSinkEndpoints, dataSinkMapping).forEach(dataSink -> routeDefinition.to(dataSink) + .to("log: " + routeId)); + + routeDefinition.end() + .bean(new ResponseOkCodeHandler()); + } - routeDefinition.bean(new ResponseOkCodeHandler()); + private List getUnmappedEndpoints(String[] dataSinkEndpoints, Map dataSinkMapping) { + return Arrays.stream(dataSinkEndpoints) + .filter(Predicate.not(dataSinkMapping::containsKey)) + .collect(Collectors.toList()); } - private RouteDefinition createRoute(String dataSourceEndpoint, String routeId, String delegatorEndpoint) { - return getRouteBuilder().from(delegatorEndpoint).routeId(routeId).pollEnrich(dataSourceEndpoint, TIMEOUT) + private RouteDefinition startRouteDefinition(RequestRouteConfiguration requestRouteConfig, String dataSourceEndpoint, String routeId) { + String delegatorEndpoint = requestRouteConfig.getRequestEndpointURI(); + return getRouteBuilder().from(delegatorEndpoint) + .routeId(routeId) + .pollEnrich(dataSourceEndpoint, TIMEOUT) .log("Source : " + routeId); } -} \ No newline at end of file +} diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteConfiguration.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteConfiguration.java index d50e64ee..fcdce3fd 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteConfiguration.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteConfiguration.java @@ -25,6 +25,7 @@ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.timer; import java.util.List; +import java.util.Map; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration; @@ -44,6 +45,10 @@ public TimerRouteConfiguration(String datasource, List transformers, Lis super(ROUTE_TRIGGER, datasource, transformers, datasinks); } + public TimerRouteConfiguration(String datasource, List transformers, List datasinks, Map datasinkMapping) { + super(ROUTE_TRIGGER, datasource, transformers, datasinks, datasinkMapping); + } + public TimerRouteConfiguration(RouteConfiguration configuration) { super(configuration); timerName = (String) getTriggerData().get(TIMER_NAME); diff --git a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteCreator.java b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteCreator.java index a4317072..c252c6ff 100644 --- a/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteCreator.java +++ b/databridge.core/src/main/java/org/eclipse/digitaltwin/basyx/databridge/core/configuration/route/timer/TimerRouteCreator.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (C) 2021 the Eclipse BaSyx Authors + * Copyright (C) 2024 the Eclipse BaSyx Authors * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the @@ -24,13 +24,22 @@ ******************************************************************************/ package org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.timer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.MulticastDefinition; import org.apache.camel.model.RouteDefinition; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.AbstractRouteCreator; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteConfiguration; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RouteCreatorHelper; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; - +/** + * @author DataBridge authors, jungjan + */ public class TimerRouteCreator extends AbstractRouteCreator { private static final Long TIMEOUT = 5000L; @@ -40,15 +49,42 @@ public TimerRouteCreator(RouteBuilder routeBuilder, RoutesConfiguration routesCo @Override protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, String routeId) { - TimerRouteConfiguration timerConfig = (TimerRouteConfiguration) routeConfig; - String timerEndpoint = RouteCreatorHelper.getDataSourceEndpoint(getRoutesConfiguration(), timerConfig.getTimerName()); - RouteDefinition routeDefinition = getRouteBuilder().from(timerEndpoint).pollEnrich(dataSourceEndpoint, TIMEOUT).routeId(routeId).to("log:" + routeId); + RouteDefinition routeDefinition = startRouteDefinition((TimerRouteConfiguration) routeConfig, dataSourceEndpoint, routeId); if (!(dataTransformerEndpoints == null || dataTransformerEndpoints.length == 0)) { - routeDefinition.to(dataTransformerEndpoints).to("log:" + routeId); + routeDefinition.to(dataTransformerEndpoints) + .to("log:" + routeId); } - routeDefinition.to(dataSinkEndpoints[0]).to("log:" + routeId); + routeDefinition.to(dataSinkEndpoints) + .to("log:" + routeId); + } + + @Override + protected void configureRoute(RouteConfiguration routeConfig, String dataSourceEndpoint, String[] dataSinkEndpoints, String[] dataTransformerEndpoints, Map dataSinkMapping, String routeId) { + MulticastDefinition routeDefinition = startRouteDefinition((TimerRouteConfiguration) routeConfig, dataSourceEndpoint, routeId).multicast(); + dataSinkMapping.forEach((dataSink, dataTransformers) -> routeDefinition.pipeline() + .to(dataTransformers) + .to(dataSink) + .to("log:" + routeId)); + + getUnmappedEndpoints(dataSinkEndpoints, dataSinkMapping).forEach(dataSink -> routeDefinition.to(dataSink) + .to("log: " + routeId)); + + routeDefinition.end(); } -} \ No newline at end of file + private List getUnmappedEndpoints(String[] dataSinkEndpoints, Map dataSinkMapping) { + return Arrays.stream(dataSinkEndpoints) + .filter(Predicate.not(dataSinkMapping::containsKey)) + .collect(Collectors.toList()); + } + + private RouteDefinition startRouteDefinition(TimerRouteConfiguration timerRouteConfig, String dataSourceEndpoint, String routeId) { + String timerEndpoint = RouteCreatorHelper.getDataSourceEndpoint(getRoutesConfiguration(), timerRouteConfig.getTimerName()); + return getRouteBuilder().from(timerEndpoint) + .pollEnrich(dataSourceEndpoint, TIMEOUT) + .routeId(routeId) + .to("log:" + routeId); + } +} diff --git a/databridge.examples/databridge.examples.aas-jsonata-mqtt/pom.xml b/databridge.examples/databridge.examples.aas-jsonata-mqtt/pom.xml index 650b6605..5469217c 100644 --- a/databridge.examples/databridge.examples.aas-jsonata-mqtt/pom.xml +++ b/databridge.examples/databridge.examples.aas-jsonata-mqtt/pom.xml @@ -38,7 +38,7 @@ org.slf4j slf4j-simple 2.0.11 - test + test @@ -51,28 +51,28 @@ org.eclipse.digitaltwin.basyx databridge.camel-aas 0.0.1-SNAPSHOT - test + test com.fasterxml.jackson.datatype jackson-datatype-jsr310 2.15.0 - test + test org.eclipse.digitaltwin.basyx databridge.camel-timer 0.0.1-SNAPSHOT - test + test org.eclipse.digitaltwin.basyx databridge.camel-paho 0.0.1-SNAPSHOT - test + test @@ -149,7 +149,15 @@ org.eclipse.basyx basyx.components.AASServer 1.2.0 - test + test + + + + + org.awaitility + awaitility + 4.2.0 + test diff --git a/databridge.examples/databridge.examples.aas-jsonata-mqtt/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/aasjsonatamqtt/test/TestAASUpdater.java b/databridge.examples/databridge.examples.aas-jsonata-mqtt/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/aasjsonatamqtt/test/TestAASUpdater.java index e22724b9..46bc4736 100644 --- a/databridge.examples/databridge.examples.aas-jsonata-mqtt/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/aasjsonatamqtt/test/TestAASUpdater.java +++ b/databridge.examples/databridge.examples.aas-jsonata-mqtt/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/aasjsonatamqtt/test/TestAASUpdater.java @@ -32,8 +32,11 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.UUID; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; +import org.awaitility.Awaitility; import org.eclipse.basyx.aas.registration.memory.InMemoryRegistry; import org.eclipse.basyx.components.aas.AASServerComponent; import org.eclipse.basyx.components.aas.configuration.AASServerBackend; @@ -75,16 +78,22 @@ * */ public class TestAASUpdater { + private final static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); + + private final static InMemoryRegistry REGISTRY = new InMemoryRegistry(); + private final static String MQTT_BROKER_URL = "tcp://broker.mqttdashboard.com:1883"; + private final static String USER_NAME = "test1"; + private final static String PASSWORD = "1234567"; + private final static String CLIENT_ID = UUID.randomUUID().toString(); - private static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); + private final static String TOPIC_PRESSURE = "aas/pressure"; + private final static String TOPIC_ROTATION = "aas/rotation"; + private final static String TOPIC_PRESSURE_ROTATION = "aas/pressure_rotation"; + + private static AASServerComponent aasServer; private static BaSyxContextConfiguration aasContextConfig; - private static InMemoryRegistry registry = new InMemoryRegistry(); private static DataBridgeComponent updater; - private static String mqtt_broker_url = "tcp://broker.mqttdashboard.com:1883"; - private static String user_name = "test1"; - private static String password = "1234567"; - private static String client_id = UUID.randomUUID().toString(); private static String receivedMessage; private static Server mqttBroker; @@ -103,54 +112,35 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() { updater.stopComponent(); - + mqttBroker.stopServer(); aasServer.stopComponent(); } @Test public void getUpdatedPropertyValueA() throws MqttException, MqttSecurityException, MqttPersistenceException, InterruptedException, JsonProcessingException { - String topic = "aas/pressure"; + String topic = TOPIC_PRESSURE; String expectedValue = wrapStringValue("103.5585973"); - - assertPropertyValue(expectedValue, topic); + awaitAndAssertMqttPropagation(expectedValue, topic); } @Test public void getUpdatedPropertyValueB() throws MqttException, MqttSecurityException, MqttPersistenceException, InterruptedException, JsonProcessingException { - String topic = "aas/rotation"; + String topic = TOPIC_ROTATION; String expectedValue = wrapStringValue("379.5784558"); - - assertPropertyValue(expectedValue, topic); + awaitAndAssertMqttPropagation(expectedValue, topic); } @Test public void getAllProperties() throws MqttException, MqttSecurityException, MqttPersistenceException, InterruptedException, IOException, URISyntaxException { - String topic = "aas/pressure_rotation"; + String topic = TOPIC_PRESSURE_ROTATION; String expectedValue = getExpectedValueFromFile(); - - assertAllProperties(expectedValue, topic); - } - - private void assertPropertyValue(String expectedValue, String topic) throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { - - fetchExpectedValue(topic); - - assertEquals(receivedMessage, expectedValue); - } - - private void assertAllProperties(String expectedValue, String topic) throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException, JsonMappingException, JsonProcessingException { - - fetchExpectedValue(topic); - - ObjectMapper mapper = new ObjectMapper(); - - assertEquals(mapper.readTree(receivedMessage), mapper.readTree(expectedValue)); + awaitAndAssertMqttPropagation(expectedValue, topic); } - private static void fetchExpectedValue(String currentTopic) throws MqttException, MqttSecurityException, MqttPersistenceException, InterruptedException { + private void awaitAndAssertMqttPropagation(String expectedValue, String currentTopic) throws MqttException, MqttSecurityException, MqttPersistenceException, InterruptedException { try { @@ -174,9 +164,8 @@ public void deliveryComplete(IMqttDeliveryToken token) { } }); - mqttClient.subscribe(currentTopic); - waitForPropagation(); + Awaitility.await().with().pollInterval(1, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertMessage(expectedValue, receivedMessage)); mqttClient.disconnect(); mqttClient.close(); @@ -185,6 +174,25 @@ public void deliveryComplete(IMqttDeliveryToken token) { } } + private void assertMessage(String expectedValue, String receivedMessage) { + if (!isAllProperties(expectedValue)) { + assertEquals(expectedValue, receivedMessage); + } + + ObjectMapper mapper = new ObjectMapper(); + try { + assertEquals(mapper.readTree(expectedValue), mapper.readTree(receivedMessage)); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } + + private boolean isAllProperties(String expectedValue) { + return expectedValue.startsWith("["); + } + private static void configureAasServer() throws InterruptedException { aasContextConfig = new BaSyxContextConfiguration(4001, ""); @@ -192,7 +200,7 @@ private static void configureAasServer() throws InterruptedException { BaSyxAASServerConfiguration aasConfig = new BaSyxAASServerConfiguration(AASServerBackend.INMEMORY, "aasx/telemeteryTest.aasx"); aasServer = new AASServerComponent(aasContextConfig, aasConfig); - aasServer.setRegistry(registry); + aasServer.setRegistry(REGISTRY); } private static void startAasServer() { @@ -221,18 +229,15 @@ private static void configureAndStartUpdaterComponent() throws Exception { configuration.addTransformers(jsonataConfigFactory.create()); updater = new DataBridgeComponent(configuration); + TimeUnit.SECONDS.sleep(5); // FIXME: Failed to start route routeN because of null updater.startComponent(); } - - private static void waitForPropagation() throws InterruptedException { - Thread.sleep(6000); - } private static MqttClient mqttConnectionInitiate() throws MqttException { - MqttClient mqttClient = new MqttClient(mqtt_broker_url, client_id, new MemoryPersistence()); + MqttClient mqttClient = new MqttClient(MQTT_BROKER_URL, CLIENT_ID, new MemoryPersistence()); - MqttConnectOptions connOpts = setUpMqttConnection(user_name, password); + MqttConnectOptions connOpts = setUpMqttConnection(USER_NAME, PASSWORD); mqttClient.connect(connOpts); connOpts.setCleanSession(true); return mqttClient; diff --git a/databridge.examples/databridge.examples.dot-aas-v3-api/pom.xml b/databridge.examples/databridge.examples.dot-aas-v3-api/pom.xml index 0e0828ad..172da791 100644 --- a/databridge.examples/databridge.examples.dot-aas-v3-api/pom.xml +++ b/databridge.examples/databridge.examples.dot-aas-v3-api/pom.xml @@ -66,29 +66,6 @@ databridge.camel-jsonata 0.0.1-SNAPSHOT - - - - org.eclipse.basyx - basyx.sdk - 1.2.0 - - - - - org.eclipse.basyx - basyx.sdk - 1.4.0 - tests - test - - - - - org.eclipse.basyx - basyx.components.lib - 1.4.0 - @@ -128,4 +105,4 @@ test - \ No newline at end of file + diff --git a/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/dotaasv3api/test/TestAASUpdater.java b/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/dotaasv3api/test/TestAASUpdater.java index 681838e9..20dbd3d5 100644 --- a/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/dotaasv3api/test/TestAASUpdater.java +++ b/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/dotaasv3api/test/TestAASUpdater.java @@ -59,55 +59,55 @@ import io.moquette.broker.config.ResourceLoaderConfig; public class TestAASUpdater { - + private static final String PROPERTY_INTEGER_VALUE = "\"0.75\""; private static final String PROPERTY_INTEGER_VALUE_PATH = "/submodels/c3VibW9kZWxJZA==/submodel-elements/DotAASV3ConformantApiSMC.DotAASV3ConformantApiProperty/$value"; private static final String PROPERTY_STRING_VALUE = "\"Bowler Hat\""; private static final String PROPERTY_STRING_VALUE_PATH = "/submodels/c3VibW9kZWxJZA==/submodel-elements/DotAASV3ConformantApiSMC.DotAASV3ConformantApiStringProperty/$value"; private static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); - + private static DataBridgeComponent updater; protected static Server mqttBroker; - + private static ClientAndServer mockServer; @BeforeClass public static void setUp() throws IOException { configureAndStartMockserver(); - + configureAndStartMqttBroker(); - + configureAndStartUpdaterComponent(); } - + @AfterClass public static void tearDown() { updater.stopComponent(); mockServer.close(); } - + @Test public void getDotAASV3ConformantPropertyIntegerValue() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { publishNewDatapoint("DotAASV3ConformantProperty"); - + waitForPropagation(); - + verifyPropertyValueUpdateRequestIntegerValue(); } - + @Test public void getDotAASV3ConformantPropertyStringValue() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { publishNewDatapoint("DotAASV3ConformantPropertyStringValue"); - + waitForPropagation(); - + verifyPropertyValueUpdateRequestStringValue(); } private static void configureAndStartMockserver() { mockServer = ClientAndServer.startClientAndServer(4001); - + createExpectationForPatchRequestForIntegerValue(); createExpectationForPatchRequestForStringValue(); @@ -139,7 +139,7 @@ private void waitForPropagation() throws InterruptedException { private void publishNewDatapoint(String topic) throws MqttException, MqttSecurityException, MqttPersistenceException { logger.info("Publishing event to {}", topic); - + String json = "{\"Account\":{\"Account Name\":\"Firefly\",\"Order\":[{\"OrderID\":\"order103\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"0406654608\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":2},{\"Product Name\":\"Trilby hat\",\"ProductID\":858236,\"SKU\":\"0406634348\",\"Description\":{\"Colour\":\"Orange\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.6},\"Price\":21.67,\"Quantity\":1}]},{\"OrderID\":\"order104\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"040657863\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":4},{\"ProductID\":345664,\"SKU\":\"0406654603\",\"Product Name\":\"Cloak\",\"Description\":{\"Colour\":\"Black\",\"Width\":30,\"Height\":20,\"Depth\":210,\"Weight\":2},\"Price\":107.99,\"Quantity\":1}]}]}}"; MqttClient mqttClient = new MqttClient("tcp://localhost:1884", "testClient", new MemoryPersistence()); mqttClient.connect(); @@ -154,36 +154,40 @@ private static void configureAndStartMqttBroker() throws IOException { final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); mqttBroker.startServer(classPathConfig); } - + @SuppressWarnings("resource") private static void createExpectationForPatchRequestForIntegerValue() { - new MockServerClient("localhost", 4001) - .when(request().withMethod("PATCH").withPath(PROPERTY_INTEGER_VALUE_PATH) - .withBody(PROPERTY_INTEGER_VALUE).withHeader("Content-Type", "application/json")) + new MockServerClient("localhost", 4001).when(request().withMethod("PATCH") + .withPath(PROPERTY_INTEGER_VALUE_PATH) + .withBody(PROPERTY_INTEGER_VALUE) + .withHeader("Content-Type", "application/json")) .respond(response().withStatusCode(HttpStatus.SC_CREATED) .withHeaders(new Header("Content-Type", "application/json; charset=utf-8"))); } - + @SuppressWarnings("resource") private static void createExpectationForPatchRequestForStringValue() { - new MockServerClient("localhost", 4001) - .when(request().withMethod("PATCH").withPath(PROPERTY_STRING_VALUE_PATH) - .withBody(PROPERTY_STRING_VALUE).withHeader("Content-Type", "application/json")) - .respond(response().withStatusCode(HttpStatus.SC_CREATED) - .withHeaders(new Header("Content-Type", "application/json; charset=utf-8"))); + new MockServerClient("localhost", 4001).when(request().withMethod("PATCH") + .withPath(PROPERTY_STRING_VALUE_PATH) + .withBody(PROPERTY_STRING_VALUE) + .withHeader("Content-Type", "application/json")) + .respond(response().withStatusCode(HttpStatus.SC_CREATED) + .withHeaders(new Header("Content-Type", "application/json; charset=utf-8"))); } @SuppressWarnings("resource") private void verifyPropertyValueUpdateRequestIntegerValue() { new MockServerClient("localhost", 4001).verify(request().withMethod("PATCH") - .withPath(PROPERTY_INTEGER_VALUE_PATH).withHeader("Content-Type", "application/json") + .withPath(PROPERTY_INTEGER_VALUE_PATH) + .withHeader("Content-Type", "application/json") .withBody(PROPERTY_INTEGER_VALUE), VerificationTimes.exactly(1)); } - + @SuppressWarnings("resource") private void verifyPropertyValueUpdateRequestStringValue() { new MockServerClient("localhost", 4001).verify(request().withMethod("PATCH") - .withPath(PROPERTY_STRING_VALUE_PATH).withHeader("Content-Type", "application/json") + .withPath(PROPERTY_STRING_VALUE_PATH) + .withHeader("Content-Type", "application/json") .withBody(PROPERTY_STRING_VALUE), VerificationTimes.exactly(1)); } } diff --git a/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/resources/config/moquette.conf b/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/resources/config/moquette.conf new file mode 100644 index 00000000..282ec025 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-api/src/test/resources/config/moquette.conf @@ -0,0 +1,6 @@ +# Moquette Java Broker configuration file for testing + +# Do not use the default 1883 port +port 1884 +host 0.0.0.0 +allow_anonymous true diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/pom.xml b/databridge.examples/databridge.examples.dot-aas-v3-multicast/pom.xml new file mode 100644 index 00000000..0426ee12 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/pom.xml @@ -0,0 +1,105 @@ + + 4.0.0 + + org.eclipse.digitaltwin.basyx + databridge.examples + ${revision} + + databridge.examples.dot-aas-v3-multicast + An integration example demonstrating the usage with one data source and multiple data sinks with transformer mapping for each datasink + + + + 11 + 11 + UTF-8 + UTF-8 + + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + + + + + + org.slf4j + slf4j-simple + 2.0.11 + + + + org.eclipse.digitaltwin.basyx + databridge.core + ${revision} + + + + org.eclipse.digitaltwin.basyx + databridge.camel-paho + 0.0.1-SNAPSHOT + + + + org.eclipse.digitaltwin.basyx + databridge.camel-aas + 0.0.1-SNAPSHOT + + + + org.eclipse.digitaltwin.basyx + databridge.camel-jsonata + 0.0.1-SNAPSHOT + + + + + junit + junit + 4.13.2 + test + + + + + io.moquette + moquette-broker + 0.16 + test + + + org.slf4j + slf4j-log4j12 + + + + + + + org.mock-server + mockserver-netty + 5.15.0 + test + + + + + org.mock-server + mockserver-client-java + 5.15.0 + test + + + diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/aasserver.json b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/aasserver.json new file mode 100644 index 00000000..10fb0499 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/aasserver.json @@ -0,0 +1,32 @@ +[ + { + "uniqueId": "process/processId", + "submodelEndpoint": "http://localhost:4001/submodels/cHJvY2Vzcw==", + "idShortPath": "processId", + "api": "DotAAS-V3" + }, + { + "uniqueId": "process/processResult", + "submodelEndpoint": "http://localhost:4001/submodels/cHJvY2Vzcw==", + "idShortPath": "processResult", + "api": "DotAAS-V3" + }, + { + "uniqueId": "process/processData", + "submodelEndpoint": "http://localhost:4001/submodels/cHJvY2Vzcw==", + "idShortPath": "processData", + "api": "DotAAS-V3" + }, + { + "uniqueId": "process/processData_raw", + "submodelEndpoint": "http://localhost:4001/submodels/cHJvY2Vzcw==", + "idShortPath": "processData_raw", + "api": "DotAAS-V3" + }, + { + "uniqueId": "process/processDuration", + "submodelEndpoint": "http://localhost:4001/submodels/cHJvY2Vzcw==", + "idShortPath": "processDuration", + "api": "DotAAS-V3" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/jsonatatransformer.json b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/jsonatatransformer.json new file mode 100644 index 00000000..c343e80d --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/jsonatatransformer.json @@ -0,0 +1,26 @@ +[ + { + "uniqueId": "processId", + "queryPath": "processId.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + }, + { + "uniqueId": "processResult", + "queryPath": "processResult.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + }, + { + "uniqueId": "processData", + "queryPath": "processData.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + }, + { + "uniqueId": "processDuration", + "queryPath": "processDuration.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/logback.xml b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/logback.xml new file mode 100644 index 00000000..7aa8cb8e --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/logback.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + + + + + + + + diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/mqttconsumer.json b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/mqttconsumer.json new file mode 100644 index 00000000..ecc8a000 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/mqttconsumer.json @@ -0,0 +1,8 @@ +[ + { + "uniqueId": "mqttSource", + "serverUrl": "localhost", + "serverPort": 1884, + "topic": "process-update" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processData.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processData.jsonata new file mode 100644 index 00000000..e0cbe603 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processData.jsonata @@ -0,0 +1 @@ +{"ProcessData": Process.ProcessData} diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processDuration.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processDuration.jsonata new file mode 100644 index 00000000..346c4eea --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processDuration.jsonata @@ -0,0 +1,6 @@ +( + $diff := $toMillis(ProcessData.EndPorcess) - $toMillis(ProcessData.StartProcess); + $minutes := $floor($diff / 60000); + $seconds := $floor(($diff % 60000) / 1000); + $string($minutes) & "M" & $string($seconds) & "S" +) diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processId.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processId.jsonata new file mode 100644 index 00000000..5a62f807 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processId.jsonata @@ -0,0 +1 @@ +Process.ProcessID diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processResult.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processResult.jsonata new file mode 100644 index 00000000..1271ee4d --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/processResult.jsonata @@ -0,0 +1 @@ +Process.Result diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/routes.json b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/routes.json new file mode 100644 index 00000000..d36589c8 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/main/resources/routes.json @@ -0,0 +1,28 @@ +[ + { + "datasource": "mqttSource", + "transformers": + [ + "processId", + "processResult", + "processData", + "processDuration" + ], + "datasinks": + [ + "process/processId", + "process/processResult", + "process/processData", + "process/processData_raw", + "process/processDuration" + ], + "datasinkMappingConfiguration": + { + "process/processId": ["processId"], + "process/processResult": ["processResult"], + "process/processData": ["processData"], + "process/processDuration": ["processData", "processDuration"] + }, + "trigger": "event" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java new file mode 100644 index 00000000..1077247e --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java @@ -0,0 +1,200 @@ +/******************************************************************************* + * Copyright (C) 2024 the Eclipse BaSyx Authors + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + ******************************************************************************/ +package org.eclipse.digitaltwin.basyx.databridge.dotaasv3multicast.test; + +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import java.io.IOException; + +import org.apache.http.HttpStatus; +import org.eclipse.digitaltwin.basyx.databridge.aas.configuration.factory.AASProducerDefaultConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.core.component.DataBridgeComponent; +import org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.RoutesConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; +import org.eclipse.digitaltwin.basyx.databridge.jsonata.configuration.factory.JsonataDefaultConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.paho.configuration.factory.MqttDefaultConfigurationFactory; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.Header; +import org.mockserver.verify.VerificationTimes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.moquette.broker.Server; +import io.moquette.broker.config.ClasspathResourceLoader; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.IResourceLoader; +import io.moquette.broker.config.ResourceLoaderConfig; + +/** + * @author jungjan + */ +public class TestAASUpdater { + private static final String INPUT_DATA = "{\"Process\": {\"ProcessID\": \"00001\",\"Result\": true,\"ProcessData\": {\"Volume_NV\": 410.5,\"Volume_AV\": 407.7,\"Charge\": \"0000000001\",\"Pressure\": \"43.31\",\"Speed\": \"0.158790365\",\"Temperature\": \"42\",\"Result\": \"1\",\"StartProcess\": \"2022-06-22T08:21:33.4300238Z\",\"Volume\": \"39.7\",\"Weight\": \"49.69\",\"EndPorcess\": \"2022-06-22T08:22:56.4085953Z\"}}}"; + + private static final String PROPERTY_PROCESS_ID_VALUE_PATH = "/submodels/cHJvY2Vzcw==/submodel-elements/processId/$value"; + private static final String PROPERTY_PROCESS_ID_VALUE = "\"00001\""; + private static final String PROPERTY_PROCESS_RESULT_VALUE_PATH = "/submodels/cHJvY2Vzcw==/submodel-elements/processResult/$value"; + private static final String PROPERTY_PROCESS_RESULT_VALUE = "\"true\""; + private static final String PROPERTY_PROCESS_DATA_VALUE_PATH = "/submodels/cHJvY2Vzcw==/submodel-elements/processData/$value"; + private static final String PROPERTY_PROCESS_DATA_VALUE = "\"{\"ProcessData\":{\"Volume_NV\":410.5,\"Volume_AV\":407.7,\"Charge\":\"0000000001\",\"Pressure\":\"43.31\",\"Speed\":\"0.158790365\",\"Temperature\":\"42\",\"Result\":\"1\",\"StartProcess\":\"2022-06-22T08:21:33.4300238Z\",\"Volume\":\"39.7\",\"Weight\":\"49.69\",\"EndPorcess\":\"2022-06-22T08:22:56.4085953Z\"}}\""; + private static final String PROPERTY_PROCESS_DATA_RAW_VALUE_PATH = "/submodels/cHJvY2Vzcw==/submodel-elements/processData_raw/$value"; + private static final String PROPERTY_PROCESS_DATA_RAW_VALUE = "\"" + INPUT_DATA + "\""; + private static final String PROPERTY_PROCESS_DURATION_VALUE_PATH = "/submodels/cHJvY2Vzcw==/submodel-elements/processDuration/$value"; + private static final String PROPERTY_PROCESS_DURATION_VALUE = "\"1M22S\""; + + private static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); + + private static DataBridgeComponent updater; + protected static Server mqttBroker; + + private static ClientAndServer mockServer; + + @BeforeClass + public static void setUp() throws IOException { + configureAndStartMockserver(); + + configureAndStartMqttBroker(); + + configureAndStartUpdaterComponent(); + try { + publishNewDatapoint("process-update"); + waitForPropagation(); + } catch (MqttException | InterruptedException e) { + // ignore + } + } + + @AfterClass + public static void tearDown() { + updater.stopComponent(); + mockServer.close(); + } + + @Test + public void singleMappedTransformer1() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(PROPERTY_PROCESS_ID_VALUE_PATH, PROPERTY_PROCESS_ID_VALUE); + } + + @Test + public void singleMappedTransformer2() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(PROPERTY_PROCESS_RESULT_VALUE_PATH, PROPERTY_PROCESS_RESULT_VALUE); + } + + @Test + public void jsonResultMappedTransformer() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(PROPERTY_PROCESS_DATA_VALUE_PATH, PROPERTY_PROCESS_DATA_VALUE); + + } + + @Test + public void noMappedTransformer() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(PROPERTY_PROCESS_DATA_RAW_VALUE_PATH, PROPERTY_PROCESS_DATA_RAW_VALUE); + } + + @Test + public void multipleMappedTransformers() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(PROPERTY_PROCESS_DURATION_VALUE_PATH, PROPERTY_PROCESS_DURATION_VALUE); + } + + private static void configureAndStartMockserver() { + mockServer = ClientAndServer.startClientAndServer(4001); + + createExpectationForRequest(PROPERTY_PROCESS_ID_VALUE_PATH, PROPERTY_PROCESS_ID_VALUE); + createExpectationForRequest(PROPERTY_PROCESS_RESULT_VALUE_PATH, PROPERTY_PROCESS_RESULT_VALUE); + createExpectationForRequest(PROPERTY_PROCESS_DATA_VALUE_PATH, PROPERTY_PROCESS_DATA_VALUE); + createExpectationForRequest(PROPERTY_PROCESS_DATA_RAW_VALUE_PATH, PROPERTY_PROCESS_DATA_RAW_VALUE); + createExpectationForRequest(PROPERTY_PROCESS_DURATION_VALUE_PATH, PROPERTY_PROCESS_DURATION_VALUE); + } + + private static void configureAndStartUpdaterComponent() { + ClassLoader loader = TestAASUpdater.class.getClassLoader(); + RoutesConfiguration configuration = new RoutesConfiguration(); + + RoutesConfigurationFactory routesFactory = new RoutesConfigurationFactory(loader); + configuration.addRoutes(routesFactory.create()); + + MqttDefaultConfigurationFactory mqttConfigFactory = new MqttDefaultConfigurationFactory(loader); + configuration.addDatasources(mqttConfigFactory.create()); + + AASProducerDefaultConfigurationFactory aasConfigFactory = new AASProducerDefaultConfigurationFactory(loader); + configuration.addDatasinks(aasConfigFactory.create()); + + JsonataDefaultConfigurationFactory jsonataConfigFactory = new JsonataDefaultConfigurationFactory(loader); + configuration.addTransformers(jsonataConfigFactory.create()); + + updater = new DataBridgeComponent(configuration); + updater.startComponent(); + } + + private static void waitForPropagation() throws InterruptedException { + Thread.sleep(5000); + } + + private static void publishNewDatapoint(String topic) throws MqttException, MqttSecurityException, MqttPersistenceException { + logger.info("Publishing event:\n{}\nto topic: {}", INPUT_DATA, topic); + + MqttClient mqttClient = new MqttClient("tcp://localhost:1884", "testClient", new MemoryPersistence()); + mqttClient.connect(); + mqttClient.publish(topic, new MqttMessage(INPUT_DATA.getBytes())); + mqttClient.disconnect(); + mqttClient.close(); + } + + private static void configureAndStartMqttBroker() throws IOException { + mqttBroker = new Server(); + IResourceLoader classpathLoader = new ClasspathResourceLoader(); + final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); + mqttBroker.startServer(classPathConfig); + } + + @SuppressWarnings("resource") + private static void createExpectationForRequest(String path, String value) { + new MockServerClient("localhost", 4001).when(request().withMethod("PATCH") + .withPath(path) + .withBody(value) + .withHeader("Content-Type", "application/json")) + .respond(response().withStatusCode(HttpStatus.SC_CREATED) + .withHeaders(new Header("Content-Type", "application/json; charset=utf-8"))); + } + + @SuppressWarnings("resource") + private void verifyCall(String path, String value) { + new MockServerClient("localhost", 4001).verify(request().withMethod("PATCH") + .withPath(path) + .withHeader("Content-Type", "application/json") + .withBody(value), VerificationTimes.exactly(1)); + } +} diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/resources/config/moquette.conf b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/resources/config/moquette.conf new file mode 100644 index 00000000..282ec025 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multicast/src/test/resources/config/moquette.conf @@ -0,0 +1,6 @@ +# Moquette Java Broker configuration file for testing + +# Do not use the default 1883 port +port 1884 +host 0.0.0.0 +allow_anonymous true diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/pom.xml b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/pom.xml new file mode 100644 index 00000000..607ad19c --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/pom.xml @@ -0,0 +1,104 @@ + + 4.0.0 + + org.eclipse.digitaltwin.basyx + databridge.examples + ${revision} + + databridge.examples.dot-aas-v3-multiroute + An integration example demonstrating the usage with one data source and multiple data sinks where the data for each sink is transformed equally + + + 11 + 11 + UTF-8 + UTF-8 + + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + + + + + + org.slf4j + slf4j-simple + 2.0.11 + + + + org.eclipse.digitaltwin.basyx + databridge.core + ${revision} + + + + org.eclipse.digitaltwin.basyx + databridge.camel-paho + 0.0.1-SNAPSHOT + + + + org.eclipse.digitaltwin.basyx + databridge.camel-aas + 0.0.1-SNAPSHOT + + + + org.eclipse.digitaltwin.basyx + databridge.camel-jsonata + 0.0.1-SNAPSHOT + + + + + junit + junit + 4.13.2 + test + + + + + io.moquette + moquette-broker + 0.16 + test + + + org.slf4j + slf4j-log4j12 + + + + + + + org.mock-server + mockserver-netty + 5.15.0 + test + + + + + org.mock-server + mockserver-client-java + 5.15.0 + test + + + diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/aasserver.json b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/aasserver.json new file mode 100644 index 00000000..c0459210 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/aasserver.json @@ -0,0 +1,14 @@ +[ + { + "uniqueId": "one/duration", + "submodelEndpoint": "http://localhost:4001/submodels/b25l", + "idShortPath": "duration", + "api": "DotAAS-V3" + }, + { + "uniqueId": "other/duration", + "submodelEndpoint": "http://localhost:4001/submodels/b3RoZXI=", + "idShortPath": "duration", + "api": "DotAAS-V3" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/jsonatatransformer.json b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/jsonatatransformer.json new file mode 100644 index 00000000..82d5d348 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/jsonatatransformer.json @@ -0,0 +1,14 @@ +[ + { + "uniqueId": "processData", + "queryPath": "processData.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + }, + { + "uniqueId": "processDuration", + "queryPath": "processDuration.jsonata", + "inputType": "JsonString", + "outputType": "JsonString" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/logback.xml b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/logback.xml new file mode 100644 index 00000000..7aa8cb8e --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/logback.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + + + + + + + + diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/mqttconsumer.json b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/mqttconsumer.json new file mode 100644 index 00000000..ecc8a000 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/mqttconsumer.json @@ -0,0 +1,8 @@ +[ + { + "uniqueId": "mqttSource", + "serverUrl": "localhost", + "serverPort": 1884, + "topic": "process-update" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processData.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processData.jsonata new file mode 100644 index 00000000..e0cbe603 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processData.jsonata @@ -0,0 +1 @@ +{"ProcessData": Process.ProcessData} diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processDuration.jsonata b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processDuration.jsonata new file mode 100644 index 00000000..346c4eea --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/processDuration.jsonata @@ -0,0 +1,6 @@ +( + $diff := $toMillis(ProcessData.EndPorcess) - $toMillis(ProcessData.StartProcess); + $minutes := $floor($diff / 60000); + $seconds := $floor(($diff % 60000) / 1000); + $string($minutes) & "M" & $string($seconds) & "S" +) diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/routes.json b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/routes.json new file mode 100644 index 00000000..462d3cd7 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/main/resources/routes.json @@ -0,0 +1,16 @@ +[ + { + "datasource": "mqttSource", + "transformers": + [ + "processData", + "processDuration" + ], + "datasinks": + [ + "one/duration", + "other/duration" + ], + "trigger": "event" + } +] diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java new file mode 100644 index 00000000..a10b44ab --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/java/org/eclipse/digitaltwin/basyx/databridge/dotaasv3multicast/test/TestAASUpdater.java @@ -0,0 +1,174 @@ +/******************************************************************************* + * Copyright (C) 2024 the Eclipse BaSyx Authors + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + ******************************************************************************/ +package org.eclipse.digitaltwin.basyx.databridge.dotaasv3multicast.test; + +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import java.io.IOException; + +import org.apache.http.HttpStatus; +import org.eclipse.digitaltwin.basyx.databridge.aas.configuration.factory.AASProducerDefaultConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.core.component.DataBridgeComponent; +import org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.RoutesConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; +import org.eclipse.digitaltwin.basyx.databridge.jsonata.configuration.factory.JsonataDefaultConfigurationFactory; +import org.eclipse.digitaltwin.basyx.databridge.paho.configuration.factory.MqttDefaultConfigurationFactory; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.Header; +import org.mockserver.verify.VerificationTimes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.moquette.broker.Server; +import io.moquette.broker.config.ClasspathResourceLoader; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.IResourceLoader; +import io.moquette.broker.config.ResourceLoaderConfig; + +/** + * @author jungjan + */ +public class TestAASUpdater { + private static final String INPUT_DATA = "{\"Process\": {\"ProcessID\": \"00001\",\"Result\": true,\"ProcessData\": {\"Volume_NV\": 410.5,\"Volume_AV\": 407.7,\"Charge\": \"0000000001\",\"Pressure\": \"43.31\",\"Speed\": \"0.158790365\",\"Temperature\": \"42\",\"Result\": \"1\",\"StartProcess\": \"2022-06-22T08:21:33.4300238Z\",\"Volume\": \"39.7\",\"Weight\": \"49.69\",\"EndPorcess\": \"2022-06-22T08:22:56.4085953Z\"}}}"; + + private static final String SUBMDOEL_ONE_PATH = "/submodels/b25l/submodel-elements/duration/$value"; + private static final String SUBMODEL_OTHER_PATH = "/submodels/b3RoZXI=/submodel-elements/duration/$value"; + private static final String PROPERTY_PROCESS_DURATION_VALUE = "\"1M22S\""; + + private static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); + + private static DataBridgeComponent updater; + protected static Server mqttBroker; + + private static ClientAndServer mockServer; + + @BeforeClass + public static void setUp() throws IOException { + configureAndStartMockserver(); + + configureAndStartMqttBroker(); + + configureAndStartUpdaterComponent(); + try { + publishNewDatapoint("process-update"); + waitForPropagation(); + } catch (MqttException | InterruptedException e) { + // ignore + } + } + + @AfterClass + public static void tearDown() { + updater.stopComponent(); + mockServer.close(); + } + + @Test + public void singleMappedTransformer1() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(SUBMDOEL_ONE_PATH, PROPERTY_PROCESS_DURATION_VALUE); + } + + @Test + public void singleMappedTransformer2() throws MqttSecurityException, MqttPersistenceException, MqttException, InterruptedException { + verifyCall(SUBMODEL_OTHER_PATH, PROPERTY_PROCESS_DURATION_VALUE); + } + + private static void configureAndStartMockserver() { + mockServer = ClientAndServer.startClientAndServer(4001); + + createExpectationForRequest(SUBMDOEL_ONE_PATH, PROPERTY_PROCESS_DURATION_VALUE); + createExpectationForRequest(SUBMODEL_OTHER_PATH, PROPERTY_PROCESS_DURATION_VALUE); + } + + private static void configureAndStartUpdaterComponent() { + ClassLoader loader = TestAASUpdater.class.getClassLoader(); + RoutesConfiguration configuration = new RoutesConfiguration(); + + RoutesConfigurationFactory routesFactory = new RoutesConfigurationFactory(loader); + configuration.addRoutes(routesFactory.create()); + + MqttDefaultConfigurationFactory mqttConfigFactory = new MqttDefaultConfigurationFactory(loader); + configuration.addDatasources(mqttConfigFactory.create()); + + AASProducerDefaultConfigurationFactory aasConfigFactory = new AASProducerDefaultConfigurationFactory(loader); + configuration.addDatasinks(aasConfigFactory.create()); + + JsonataDefaultConfigurationFactory jsonataConfigFactory = new JsonataDefaultConfigurationFactory(loader); + configuration.addTransformers(jsonataConfigFactory.create()); + + updater = new DataBridgeComponent(configuration); + updater.startComponent(); + } + + private static void waitForPropagation() throws InterruptedException { + Thread.sleep(5000); + } + + private static void publishNewDatapoint(String topic) throws MqttException, MqttSecurityException, MqttPersistenceException { + logger.info("Publishing event:\n{}\nto topic: {}", INPUT_DATA, topic); + + MqttClient mqttClient = new MqttClient("tcp://localhost:1884", "testClient", new MemoryPersistence()); + mqttClient.connect(); + mqttClient.publish(topic, new MqttMessage(INPUT_DATA.getBytes())); + mqttClient.disconnect(); + mqttClient.close(); + } + + private static void configureAndStartMqttBroker() throws IOException { + mqttBroker = new Server(); + IResourceLoader classpathLoader = new ClasspathResourceLoader(); + final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); + mqttBroker.startServer(classPathConfig); + } + + @SuppressWarnings("resource") + private static void createExpectationForRequest(String path, String value) { + new MockServerClient("localhost", 4001).when(request().withMethod("PATCH") + .withPath(path) + .withBody(value) + .withHeader("Content-Type", "application/json")) + .respond(response().withStatusCode(HttpStatus.SC_CREATED) + .withHeaders(new Header("Content-Type", "application/json; charset=utf-8"))); + } + + @SuppressWarnings("resource") + private void verifyCall(String path, String value) { + new MockServerClient("localhost", 4001).verify(request().withMethod("PATCH") + .withPath(path) + .withHeader("Content-Type", "application/json") + .withBody(value), VerificationTimes.exactly(1)); + } +} diff --git a/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/resources/config/moquette.conf b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/resources/config/moquette.conf new file mode 100644 index 00000000..282ec025 --- /dev/null +++ b/databridge.examples/databridge.examples.dot-aas-v3-multiroute/src/test/resources/config/moquette.conf @@ -0,0 +1,6 @@ +# Moquette Java Broker configuration file for testing + +# Do not use the default 1883 port +port 1884 +host 0.0.0.0 +allow_anonymous true diff --git a/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/main/resources/routes.json b/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/main/resources/routes.json index 0700a66d..5a642547 100644 --- a/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/main/resources/routes.json +++ b/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/main/resources/routes.json @@ -1,9 +1,7 @@ [ { "datasource": "httpsource", - "transformers": [ - "jsonataA" - ], + "transformers": ["jsonataA"], "trigger": "request", "triggerData": { "host": "localhost", diff --git a/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/httppollingjsonatadelegator/test/TestAASUpdater.java b/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/httppollingjsonatadelegator/test/TestAASUpdater.java index 9f323771..e3b789c4 100644 --- a/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/httppollingjsonatadelegator/test/TestAASUpdater.java +++ b/databridge.examples/databridge.examples.httppolling-jsonata-delegator/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/httppollingjsonatadelegator/test/TestAASUpdater.java @@ -27,13 +27,12 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.nio.charset.StandardCharsets; - import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.eclipse.digitaltwin.basyx.databridge.core.component.DataBridgeComponent; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.factory.RoutesConfigurationFactory; import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; @@ -67,7 +66,6 @@ public static void setUp() throws IOException { @Test public void transformedResponseIsReturned() throws ClientProtocolException, IOException { String actualValue = getContentFromDelegatedEndpoint(); - assertEquals(EXPECTED_VALUE, actualValue); } @@ -102,12 +100,12 @@ private static RoutesConfiguration addConfigurations() { return configuration; } - private String getContentFromDelegatedEndpoint() throws IOException, ClientProtocolException { + private String getContentFromDelegatedEndpoint() throws IOException { CloseableHttpClient client = HttpClients.createDefault(); HttpGet request = new HttpGet("http://localhost:8090/valueA"); CloseableHttpResponse resp = client.execute(request); - String content = new String(resp.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + String content = EntityUtils.toString(resp.getEntity()); client.close(); return content; } diff --git a/databridge.examples/databridge.examples.kafka-jsonata-aas/pom.xml b/databridge.examples/databridge.examples.kafka-jsonata-aas/pom.xml index c07e51c7..f1bc367d 100644 --- a/databridge.examples/databridge.examples.kafka-jsonata-aas/pom.xml +++ b/databridge.examples/databridge.examples.kafka-jsonata-aas/pom.xml @@ -105,11 +105,19 @@ test + + org.junit.vintage + junit-vintage-engine + 5.10.3 + test + + org.apache.kafka kafka-clients 3.4.0 + test @@ -137,4 +145,4 @@ - \ No newline at end of file + diff --git a/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonataaas/test/TestAASUpdater.java b/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonataaas/test/TestAASUpdater.java index c5394d74..96adab20 100644 --- a/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonataaas/test/TestAASUpdater.java +++ b/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonataaas/test/TestAASUpdater.java @@ -137,7 +137,6 @@ private Object retrievePropertyValue(String propertyIdShort) { ConnectedAssetAdministrationShell aas = getAAS(deviceAASId); ISubmodelElement updatedProp = getSubmodelElement(aas, "ConnectedSubmodel", propertyIdShort); - return updatedProp.getValue(); } diff --git a/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/resources/kafkaconfig.properties b/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/resources/kafkaconfig.properties index 5c6086b6..5c7848ba 100644 --- a/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/resources/kafkaconfig.properties +++ b/databridge.examples/databridge.examples.kafka-jsonata-aas/src/test/resources/kafkaconfig.properties @@ -3,4 +3,4 @@ port = 9092 broker.id = 0 log.dirs = templogs/ offsets.topic.replication.factor = 1 -advertised.host.name = 127.0.0.1 \ No newline at end of file +advertised.host.name = 127.0.0.1 diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/pom.xml b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/pom.xml index 668adf2b..9d7fc0aa 100644 --- a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/pom.xml +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/pom.xml @@ -12,7 +12,6 @@ Kafka to Jsonata multiple transformation in pipeline to AAS server multiple - jar @@ -67,7 +66,7 @@ 0.0.1-SNAPSHOT - + org.eclipse.basyx @@ -104,12 +103,44 @@ 4.13.2 test - + + + org.junit.vintage + junit-vintage-engine + 5.10.3 + test + + - - org.apache.kafka - kafka-clients - 3.4.0 - + + org.apache.kafka + kafka-clients + 3.4.0 + test + + + + + org.apache.kafka + kafka_2.13 + 3.4.0 + test + + + + + org.apache.curator + curator-test + 5.5.0 + test + + + + + org.awaitility + awaitility + 4.2.0 + test + - \ No newline at end of file + diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/aasserver.json b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/aasserver.json index a2193d0e..643b1319 100644 --- a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/aasserver.json +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/aasserver.json @@ -3,10 +3,5 @@ "uniqueId": "ConnectedSubmodel/ConnectedPropertyA", "submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel", "idShortPath": "ConnectedPropertyA" - }, - { - "uniqueId": "ConnectedSubmodel/ConnectedPropertyB", - "submodelEndpoint": "http://localhost:4001/shells/TestUpdatedDeviceAAS/aas/submodels/ConnectedSubmodel/submodel", - "idShortPath": "ConnectedPropertyB" } -] \ No newline at end of file +] diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/kafkaconsumer.json b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/kafkaconsumer.json index bbc0282e..d7a79003 100644 --- a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/kafkaconsumer.json +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/kafkaconsumer.json @@ -7,16 +7,6 @@ "maxPollRecords": 5000, "groupId": "basyx-updater", "consumersCount": 1, - "seekTo": "latest" - }, - { - "uniqueId": "property2", - "serverUrl": "localhost", - "serverPort": 9092, - "topic": "second-topic", - "maxPollRecords": 5000, - "groupId": "basyx-updater", - "consumersCount": 1, - "seekTo": "latest" + "seekTo": "BEGINNING" } -] \ No newline at end of file +] diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/routes.json b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/routes.json index 7cfb2cfb..8698b9e6 100644 --- a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/routes.json +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/main/resources/routes.json @@ -2,7 +2,7 @@ { "datasource": "property1", "transformers": ["jsonataA", "jsonataB"], - "datasinks": ["ConnectedSubmodel/ConnectedPropertyA", "ConnectedSubmodel/ConnectedPropertyB"], + "datasinks": ["ConnectedSubmodel/ConnectedPropertyA"], "trigger": "event" } -] \ No newline at end of file +] diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonatamultipleaas/test/TestAASUpdater.java b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonatamultipleaas/test/TestAASUpdater.java index a576360f..5d725299 100644 --- a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonatamultipleaas/test/TestAASUpdater.java +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/java/org/eclipse/digitaltwin/basyx/databridge/examples/kafkajsonatamultipleaas/test/TestAASUpdater.java @@ -26,13 +26,23 @@ import static org.junit.Assert.assertEquals; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Time; +import org.awaitility.Awaitility; import org.eclipse.basyx.aas.manager.ConnectedAssetAdministrationShellManager; import org.eclipse.basyx.aas.metamodel.connected.ConnectedAssetAdministrationShell; import org.eclipse.basyx.aas.metamodel.map.descriptor.CustomId; @@ -50,98 +60,188 @@ import org.eclipse.digitaltwin.basyx.databridge.core.configuration.route.core.RoutesConfiguration; import org.eclipse.digitaltwin.basyx.databridge.jsonata.configuration.factory.JsonataDefaultConfigurationFactory; import org.eclipse.digitaltwin.basyx.databridge.kafka.configuration.factory.KafkaDefaultConfigurationFactory; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttPersistenceException; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import scala.Option; public class TestAASUpdater { + private static Logger logger = LoggerFactory.getLogger(TestAASUpdater.class); private static AASServerComponent aasServer; private static DataBridgeComponent updater; private static InMemoryRegistry registry; + + private static KafkaServer kafkaServer; + private static String kafkaTmpLogsDirPath; + private static TestingServer zookeeper; + protected static IIdentifier deviceAAS = new CustomId("TestUpdatedDeviceAAS"); private static BaSyxContextConfiguration aasContextConfig; @BeforeClass - public static void setUp() throws IOException { - registry = new InMemoryRegistry(); + public static void setUp() throws Exception { + configureAndStartKafkaServer(); + configureAndStartAASServer(); + configureAndStartUpdaterComponent(); + + } + private static void configureAndStartAASServer() { aasContextConfig = new BaSyxContextConfiguration(4001, ""); BaSyxAASServerConfiguration aasConfig = new BaSyxAASServerConfiguration(AASServerBackend.INMEMORY, "aasx/updatertest.aasx"); aasServer = new AASServerComponent(aasContextConfig, aasConfig); - aasServer.setRegistry(registry); + aasServer.setRegistry(registry = new InMemoryRegistry()); + aasServer.startComponent(); + } + + @AfterClass + public static void tearDown() throws IOException { + updater.stopComponent(); + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + zookeeper.close(); + aasServer.stopComponent(); + clearLogs(); } - @Ignore @Test public void test() throws Exception { - aasServer.startComponent(); - System.out.println("AAS STARTED"); - Thread.sleep(1000); - System.out.println("START UPDATER"); - ClassLoader loader = this.getClass().getClassLoader(); + publishNewDatapoint(); + awaitAndCheckPropertyValue("198.56", "ConnectedPropertyA"); + } + + private static void configureAndStartUpdaterComponent() { + ClassLoader loader = TestAASUpdater.class.getClassLoader(); RoutesConfiguration configuration = new RoutesConfiguration(); - // Extend configutation for connections + addRoutes(loader, configuration); + addKafkaSource(loader, configuration); + addDatasinks(loader, configuration); + addDataTransformers(loader, configuration); + + updater = new DataBridgeComponent(configuration); + updater.startComponent(); + } + + private static void addRoutes(ClassLoader loader, RoutesConfiguration configuration) { RoutesConfigurationFactory routesFactory = new RoutesConfigurationFactory(loader); configuration.addRoutes(routesFactory.create()); - - // Extend configutation for Kafka Source + } + + private static void addKafkaSource(ClassLoader loader, RoutesConfiguration configuration) { KafkaDefaultConfigurationFactory kafkaConfigFactory = new KafkaDefaultConfigurationFactory(loader); configuration.addDatasources(kafkaConfigFactory.create()); - - // Extend configuration for AAS + } + + private static void addDatasinks(ClassLoader loader, RoutesConfiguration configuration) { AASProducerDefaultConfigurationFactory aasConfigFactory = new AASProducerDefaultConfigurationFactory(loader); configuration.addDatasinks(aasConfigFactory.create()); + } - // Extend configuration for Jsonata + private static void addDataTransformers(ClassLoader loader, RoutesConfiguration configuration) { JsonataDefaultConfigurationFactory jsonataConfigFactory = new JsonataDefaultConfigurationFactory(loader); configuration.addTransformers(jsonataConfigFactory.create()); - - updater = new DataBridgeComponent(configuration); - updater.startComponent(); - System.out.println("UPDATER STARTED"); - publishNewDatapoint(); - Thread.sleep(5000); - checkIfPropertyIsUpdated(); - updater.stopComponent(); - aasServer.stopComponent(); } + + private void awaitAndCheckPropertyValue(String expectedValue, String propertyIdShort) { + Awaitility.await().with().pollInterval(2, TimeUnit.SECONDS).atMost(14, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(expectedValue, retrievePropertyValue(propertyIdShort))); + } + + private Object retrievePropertyValue(String propertyIdShort) { + ConnectedAssetAdministrationShell aas = getAAS(deviceAAS); - private void checkIfPropertyIsUpdated() throws InterruptedException { + ISubmodelElement updatedProp = getSubmodelElement(aas, "ConnectedSubmodel", propertyIdShort); + return updatedProp.getValue(); + } + + private ConnectedAssetAdministrationShell getAAS(IIdentifier identifier) { ConnectedAssetAdministrationShellManager manager = new ConnectedAssetAdministrationShellManager(registry); - ConnectedAssetAdministrationShell aas = manager.retrieveAAS(deviceAAS); - ISubmodel sm = aas.getSubmodels().get("ConnectedSubmodel"); - ISubmodelElement updatedProp = sm.getSubmodelElement("ConnectedPropertyA"); - Object propValue = updatedProp.getValue(); - System.out.println("UpdatedPROP" + propValue); - assertEquals("198.56", propValue); + + return manager.retrieveAAS(identifier); + } + private ISubmodelElement getSubmodelElement(ConnectedAssetAdministrationShell aas, String submodelId, + String submodelElementId) { + ISubmodel sm = aas.getSubmodels().get(submodelId); + + return sm.getSubmodelElement(submodelElementId); + } + + private static void configureAndStartKafkaServer() throws Exception { + startZookeeper(); + startKafkaServer(); } - private void publishNewDatapoint() throws MqttException, MqttSecurityException, MqttPersistenceException { - String json = "{\"Account\":{\"Account Name\":\"Firefly\",\"Order\":[{\"OrderID\":\"order103\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"0406654608\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":2},{\"Product Name\":\"Trilby hat\",\"ProductID\":858236,\"SKU\":\"0406634348\",\"Description\":{\"Colour\":\"Orange\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.6},\"Price\":21.67,\"Quantity\":1}]},{\"OrderID\":\"order104\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"040657863\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":4},{\"ProductID\":345664,\"SKU\":\"0406654603\",\"Product Name\":\"Cloak\",\"Description\":{\"Colour\":\"Black\",\"Width\":30,\"Height\":20,\"Depth\":210,\"Weight\":2},\"Price\":107.99,\"Quantity\":1}]}]}}"; + private static void startKafkaServer() throws IOException { + KafkaConfig kafkaConfig = new KafkaConfig(loadKafkaConfigProperties()); + + kafkaTmpLogsDirPath = kafkaConfig.getString("log.dirs"); + + createKafkaLogDirectoryIfNotExists(Paths.get(kafkaTmpLogsDirPath)); + + Option threadNamePrefix = Option.apply("kafka-server"); + + kafkaServer = new KafkaServer(kafkaConfig, Time.SYSTEM, threadNamePrefix, true); + kafkaServer.startup(); + + logger.info("Kafka server started"); + } + + private static Properties loadKafkaConfigProperties() { + Properties props = new Properties(); + try (FileInputStream configFile = new FileInputStream("src/test/resources/kafkaconfig.properties")) { + props.load(configFile); + return props; + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException("Unable to load kafka config from properties file"); + } + } + + private static void createKafkaLogDirectoryIfNotExists(Path kafkaTempDirPath) throws IOException { + if (!Files.exists(kafkaTempDirPath)) + Files.createDirectory(kafkaTempDirPath); + } + + private static void startZookeeper() throws Exception { + zookeeper = new TestingServer(2181, true); + + logger.info("Zookeeper server started: " + zookeeper.getConnectString()); + } + private void publishNewDatapoint() { + String json = "{\"Account\":{\"Account Name\":\"Firefly\",\"Order\":[{\"OrderID\":\"order103\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"0406654608\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":2},{\"Product Name\":\"Trilby hat\",\"ProductID\":858236,\"SKU\":\"0406634348\",\"Description\":{\"Colour\":\"Orange\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.6},\"Price\":21.67,\"Quantity\":1}]},{\"OrderID\":\"order104\",\"Product\":[{\"Product Name\":\"Bowler Hat\",\"ProductID\":858383,\"SKU\":\"040657863\",\"Description\":{\"Colour\":\"Purple\",\"Width\":300,\"Height\":200,\"Depth\":210,\"Weight\":0.75},\"Price\":34.45,\"Quantity\":4},{\"ProductID\":345664,\"SKU\":\"0406654603\",\"Product Name\":\"Cloak\",\"Description\":{\"Colour\":\"Black\",\"Width\":30,\"Height\":20,\"Depth\":210,\"Weight\":2},\"Price\":107.99,\"Quantity\":1}]}]}}"; String bootstrapServer = "127.0.0.1:9092"; - // create producer properties - Properties properties = new Properties(); - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + Properties properties = createProducerProperties(bootstrapServer); - // create the producer KafkaProducer producer = new KafkaProducer(properties); - // create a producer record ProducerRecord producerRecord = new ProducerRecord("first-topic", json); - // send data producer.send(producerRecord); producer.flush(); producer.close(); } + + private Properties createProducerProperties(String bootstrapServer) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return properties; + } + + private static void clearLogs() throws IOException { + Path tempLogDirPath = Paths.get(kafkaTmpLogsDirPath); + + if (Files.exists(tempLogDirPath)) + FileUtils.deleteDirectory(new File(kafkaTmpLogsDirPath)); + } } diff --git a/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/resources/kafkaconfig.properties b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/resources/kafkaconfig.properties new file mode 100644 index 00000000..5c7848ba --- /dev/null +++ b/databridge.examples/databridge.examples.kafka-jsonatamultiple-aas/src/test/resources/kafkaconfig.properties @@ -0,0 +1,6 @@ +zookeeper.connect = 127.0.0.1:2181 +port = 9092 +broker.id = 0 +log.dirs = templogs/ +offsets.topic.replication.factor = 1 +advertised.host.name = 127.0.0.1 diff --git a/databridge.examples/pom.xml b/databridge.examples/pom.xml index faa5e0c5..c096ae24 100644 --- a/databridge.examples/pom.xml +++ b/databridge.examples/pom.xml @@ -36,6 +36,8 @@ databridge.examples.plc4x-jsonata-aas databridge.examples.dot-aas-v3-api databridge.examples.mqtt-aas_range_and_mlp + databridge.examples.dot-aas-v3-multicast + databridge.examples.dot-aas-v3-multiroute databridge.examples.sql-jsonata-aas pom