From 80433dd370b24750942ffe7c88088825344b8c84 Mon Sep 17 00:00:00 2001 From: anmunoz Date: Wed, 3 Mar 2021 00:46:07 +0100 Subject: [PATCH] Update NGSIToPostgres for adding new aattributes --- .../nifi/processors/ngsi/NGSIToCKAN.java | 293 ++++++++++++++++++ .../processors/ngsi/NGSIToPostgreSQL.java | 9 +- .../ngsi/ngsi/backends/PostgreSQLBackend.java | 0 .../ngsi/ngsi/backends/ckan/CkanBackend.java | 149 ++++++++- .../org.apache.nifi.processor.Processor | 3 +- nifi-ngsi-resources/docker-compose.yml | 16 +- 6 files changed, 454 insertions(+), 16 deletions(-) create mode 100644 nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToCKAN.java mode change 100644 => 100755 nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java mode change 100644 => 100755 nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/PostgreSQLBackend.java diff --git a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToCKAN.java b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToCKAN.java new file mode 100644 index 00000000..b4a5de7f --- /dev/null +++ b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToCKAN.java @@ -0,0 +1,293 @@ +package org.apache.nifi.processors.ngsi; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processors.ngsi.ngsi.backends.ckan.CkanBackend; +import org.apache.nifi.processors.ngsi.ngsi.utils.Entity; +import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIEvent; +import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIUtils; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"CKAN","ckan","sql", "put", "rdbms", "database", "create", "insert", "relational","NGSIv2", "NGSI","FIWARE"}) +@CapabilityDescription("Create a CKAN resource, package and dataset if not exits using the information coming from and NGSI event converted to flow file." + + "After insert all of the vales of the flow file content extraction the entities and attributes") + + +public class NGSIToCKAN extends AbstractProcessor { + protected static final PropertyDescriptor CKAN_HOST = new PropertyDescriptor.Builder() + .name("CKAN Host") + .displayName("CKAN Host") + .description("FQDN/IP address where the CKAN server runs. Default value is localhost") + .required(true) + .defaultValue("localhost") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor CKAN_PORT = new PropertyDescriptor.Builder() + .name("CKAN Port") + .displayName("CKAN Port") + .description("Port where the CKAN server runs. Default value is 80") + .required(true) + .defaultValue("80") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + protected static final PropertyDescriptor CKAN_VIEWER = new PropertyDescriptor.Builder() + .name("CKAN Viewer") + .displayName("CKAN Viewer") + .description("The CKAN resource page can contain one or more visualizations of the resource data or file contents (a table, a bar chart, a map, etc). These are commonly referred to as resource views.") + .required(true) + .defaultValue("recline_grid_view") + .allowableValues("recline_view", "recline_grid_view","recline_graph_view","recline_map_view","text_view","image_view","video_view","audio_view","webpage_view") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor CKAN_API_KEY = new PropertyDescriptor.Builder() + .name("CKAN API Key") + .displayName("CKAN API Key") + .description("The APi Key you are going o use in CKAN") + .required(true) + .defaultValue("XXXXXX") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor ORION_URL = new PropertyDescriptor.Builder() + .name("ORION URL") + .displayName("ORION URL") + .description("To be put as the filestore URL.\n") + .required(true) + .defaultValue(" http://localhost:1026") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SSL = new PropertyDescriptor.Builder() + .name("SSL") + .displayName("SSL") + .description("ssl for connection") + .required(false) + .defaultValue("false") + .allowableValues("false", "true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + protected static final PropertyDescriptor DATA_MODEL = new PropertyDescriptor.Builder() + .name("data-model") + .displayName("Data Model") + .description("The Data model for creating the tables when an event have been received you can choose between" + + ":db-by-service-path or db-by-entity for ngsiv2 and db-by-entity or db-by-entity-type for ngsi-ld, default value is db-by-entity") + .required(false) + .allowableValues("db-by-entity-id", "db-by-entity") + .defaultValue("db-by-entity") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor ATTR_PERSISTENCE = new PropertyDescriptor.Builder() + .name("attr-persistence") + .displayName("Attribute Persistence") + .description("The mode of storing the data inside of the table") + .required(false) + .allowableValues("row", "column") + .defaultValue("row") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor NGSI_VERSION = new PropertyDescriptor.Builder() + .name("ngsi-version") + .displayName("NGSI Version") + .description("The version of NGSI of your incomming events. You can choose Between v2 for NGSIv2 and ld for NGSI-LD ") + .required(false) + .allowableValues("v2","ld") + .defaultValue("v2") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor DEFAULT_SERVICE = new PropertyDescriptor.Builder() + .name("default-service") + .displayName("Default Service") + .description("Default Fiware Service for building the database name") + .required(false) + .defaultValue("test") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor DEFAULT_SERVICE_PATH = new PropertyDescriptor.Builder() + .name("default-service-path") + .displayName("Default Service path") + .description("Default Fiware ServicePath for building the table name") + .required(false) + .defaultValue("/path") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor ENABLE_ENCODING= new PropertyDescriptor.Builder() + .name("enable-encoding") + .displayName("Enable Encoding") + .description("true or false, true applies the new encoding, false applies the old encoding.") + .required(false) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + protected static final PropertyDescriptor ENABLE_LOWERCASE= new PropertyDescriptor.Builder() + .name("enable-lowercase") + .displayName("Enable Lowercase") + .description("true or false, true for creating the Schema and Tables name with lowercase.") + .required(false) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + protected static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Transaction Timeout") + .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The preferred number of FlowFiles to put to the database in a single transaction") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + protected static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Connections") + .description("Maximum number of connections allowed for a Http-based HDFS backend.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("500") + .build(); + + protected static final PropertyDescriptor MAX_CONNECTIONS_PER_ROUTE = new PropertyDescriptor.Builder() + .name("Max Connections per Route") + .description("Maximum number of connections per route allowed for a Http-based HDFS backend.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + protected static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the database is successfully updated") + .build(); + protected static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + protected static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + + "such as an invalid query or an integrity constraint violation") + .build(); + + + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(CKAN_HOST); + properties.add(CKAN_PORT); + properties.add(CKAN_VIEWER); + properties.add(CKAN_API_KEY); + properties.add(ORION_URL); + properties.add(SSL); + properties.add(NGSI_VERSION); + properties.add(DATA_MODEL); + properties.add(ATTR_PERSISTENCE); + properties.add(DEFAULT_SERVICE); + properties.add(DEFAULT_SERVICE_PATH); + properties.add(ENABLE_ENCODING); + properties.add(ENABLE_LOWERCASE); + properties.add(BATCH_SIZE); + properties.add(MAX_CONNECTIONS); + properties.add(MAX_CONNECTIONS_PER_ROUTE); + properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); + return properties; + } + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_RETRY); + rels.add(REL_FAILURE); + return rels; + } + + protected void persistFlowFile(final ProcessContext context, final FlowFile flowFile, ProcessSession session) { + + final String[] host = {context.getProperty(CKAN_HOST).getValue()}; + final String port = context.getProperty(CKAN_PORT).getValue(); + final String apiKey = context.getProperty(CKAN_API_KEY).getValue(); + final String ckanViewer = context.getProperty(CKAN_VIEWER).getValue(); + final String orioUrl = context.getProperty(ORION_URL).getValue(); + final boolean ssl = context.getProperty(SSL).asBoolean(); + final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); + final int maxConnectionsPerRoute = context.getProperty(MAX_CONNECTIONS_PER_ROUTE).asInteger(); + final boolean enableEncoding = context.getProperty(ENABLE_ENCODING).asBoolean(); + final boolean enableLowercase = context.getProperty(ENABLE_LOWERCASE).asBoolean(); + final CkanBackend ckanBackend = new CkanBackend(apiKey,host,port,orioUrl,ssl,maxConnections,maxConnectionsPerRoute,ckanViewer); + NGSIUtils n = new NGSIUtils(); + final String ngsiVersion=context.getProperty(NGSI_VERSION).getValue(); + final String dataModel=context.getProperty(DATA_MODEL).getValue(); + + final NGSIEvent event=n.getEventFromFlowFile(flowFile,session,ngsiVersion); + final long creationTime = event.getCreationTime(); + final String fiwareService = (event.getFiwareService().compareToIgnoreCase("nd")==0)?context.getProperty(DEFAULT_SERVICE).getValue():event.getFiwareService(); + final String fiwareServicePath = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?"":(event.getFiwareServicePath().compareToIgnoreCase("/nd")==0)?context.getProperty(DEFAULT_SERVICE_PATH).getValue():event.getFiwareServicePath(); + try { + final String orgName = ckanBackend.buildOrgName(fiwareService,dataModel,enableEncoding,enableLowercase,ngsiVersion); + ArrayList entities= new ArrayList<>(); + entities = ("ld".equals(context.getProperty(NGSI_VERSION).getValue()))?event.getEntitiesLD():event.getEntities(); + for (Entity entity : event.getEntities()) { + final String pkgName = ckanBackend.buildPkgName(fiwareService,entity,dataModel,enableEncoding,enableLowercase,ngsiVersion); + final String resName = ckanBackend.buildResName(entity,dataModel,enableEncoding,enableLowercase,ngsiVersion); + + } // for + + }catch (Exception e){ + getLogger().error(e.toString()); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + + try { + persistFlowFile(context, flowFile, session); + logger.info("inserted {} into CKAN", new Object[]{flowFile}); + session.getProvenanceReporter().send(flowFile, "report"); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + logger.error("Failed to insert {} into CKAN due to {}", new Object[] {flowFile, e}, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + +} + diff --git a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java old mode 100644 new mode 100755 index 42e54757..19ecaac7 --- a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java +++ b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/NGSIToPostgreSQL.java @@ -268,12 +268,13 @@ void apply(final ProcessContext context, final ProcessSession session, final Fun JdbcCommon.setParameters(stmt, flowFile.getAttributes()); try { System.out.println(postgres.checkColumnNames(tableName)); - ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName)); - newColumns = postgres.getNewColumns(rs,listOfFields); conn.createStatement().execute(postgres.createSchema(schemaName)); - conn.createStatement().execute(postgres.addColumns(schemaName,tableName,newColumns)); - conn.createStatement().execute(postgres.createTable(schemaName, tableName,listOfFields)); + ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName)); + newColumns = postgres.getNewColumns(rs,listOfFields); + if (newColumns.size()>0){ + conn.createStatement().execute(postgres.addColumns(schemaName,tableName,newColumns)); + } System.out.println(schemaName+"."+tableName+" columns -------- : "); } catch (SQLException s) { diff --git a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/PostgreSQLBackend.java b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/PostgreSQLBackend.java old mode 100644 new mode 100755 diff --git a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/ckan/CkanBackend.java b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/ckan/CkanBackend.java index 87db5f7b..87fd8288 100644 --- a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/ckan/CkanBackend.java +++ b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/backends/ckan/CkanBackend.java @@ -9,15 +9,16 @@ import org.apache.nifi.processors.ngsi.ngsi.backends.ckan.model.DataStore; import org.apache.nifi.processors.ngsi.ngsi.backends.http.HttpBackend; import org.apache.nifi.processors.ngsi.ngsi.backends.http.JsonResponse; -import org.apache.nifi.processors.ngsi.ngsi.utils.CommonConstants; -import org.apache.nifi.processors.ngsi.ngsi.utils.NGSIConstants; -import org.json.simple.JSONArray; +import org.apache.nifi.processors.ngsi.ngsi.utils.*; +import org.json.JSONException; import org.json.simple.JSONObject; +import org.json.simple.JSONArray; import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.Locale; public class CkanBackend extends HttpBackend { @@ -659,4 +660,146 @@ private JsonResponse doCKANRequest(String method, String urlPath, String jsonStr return doRequest(method, urlPath, true, headers, new StringEntity(jsonString, "UTF-8")); } // doCKANRequest + + /** + * Builds an organization name given a fiwareService. It throws an exception if the naming conventions are violated. + * @param service + * @return Organization name + * @throws Exception + */ + public String buildOrgName(String service, String dataModel, boolean enableEncoding, boolean enableLowercase, String ngsiVersion) throws Exception { + String orgName=""; + String fiwareService=(enableLowercase)?service.toLowerCase():service; + + if ("v2".equals(ngsiVersion)){ + + } + else if ("ld".equals(ngsiVersion)) { + switch (dataModel) { + case "db-by-entity-id": + //FIXME + //note that if we enable encode() and/or encodeCKAN() in this datamodel we could have problems, although it need to be analyzed in deep + orgName = fiwareService; + break; + case "db-by-entity": + if (enableEncoding) { + orgName = NGSICharsets.encodeCKAN(fiwareService); + } else { + orgName = NGSICharsets.encode(fiwareService, false, true).toLowerCase(Locale.ENGLISH); + } // if else + + if (orgName.length() > NGSIConstants.CKAN_MAX_NAME_LEN) { + throw new Exception("Building organization name '" + orgName + "' and its length is " + + "greater than " + NGSIConstants.CKAN_MAX_NAME_LEN); + } else if (orgName.length() < NGSIConstants.CKAN_MIN_NAME_LEN) { + throw new Exception("Building organization name '" + orgName + "' and its length is " + + "lower than " + NGSIConstants.CKAN_MIN_NAME_LEN); + } // if else if + break; + default: + throw new Exception("Not supported Data Model for CKAN Sink: " + dataModel); + } + } + return orgName; + } // buildOrgName + + /** + * Builds a package name given a fiwareService and a fiwareServicePath. It throws an exception if the naming + * conventions are violated. + * @param fiwareService + * @return Package name + * @throws Exception + */ + public String buildPkgName( String fiwareService, Entity entity, String dataModel, boolean enableEncoding, boolean enableLowercase, String ngsiVersion) throws Exception { + String pkgName=""; + String entityId = (enableLowercase) ? entity.getEntityId().toLowerCase() : entity.getEntityId(); + + if ("v2".equals(ngsiVersion)){ + + } + else if ("ld".equals(ngsiVersion)) { + switch (dataModel) { + case "db-by-entity-id": + //FIXME + //note that if we enable encode() and/or encodeCKAN() in this datamodel we could have problems, although it need to be analyzed in deep + pkgName = entityId; + break; + case "db-by-entity": + if (enableEncoding) { + pkgName = NGSICharsets.encodeCKAN(fiwareService); + + } else { + pkgName = NGSICharsets.encode(fiwareService, false, true).toLowerCase(Locale.ENGLISH); + } // if else + if (pkgName.length() > NGSIConstants.CKAN_MAX_NAME_LEN) { + throw new Exception("Building package name '" + pkgName + "' and its length is " + + "greater than " + NGSIConstants.CKAN_MAX_NAME_LEN); + } else if (pkgName.length() < NGSIConstants.CKAN_MIN_NAME_LEN) { + throw new Exception("Building package name '" + pkgName + "' and its length is " + + "lower than " + NGSIConstants.CKAN_MIN_NAME_LEN); + } // if else if + break; + default: + throw new Exception("Not supported Data Model for CKAN Sink: " + dataModel); + } + } + return pkgName; + } // buildPkgName + + /** + * Builds a resource name given a entity. It throws an exception if the naming conventions are violated. + * @param entity + * @return Resource name + * @throws Exception + */ + public String buildResName(Entity entity, String dataModel, boolean enableEncoding, boolean enableLowercase, String ngsiVersion) throws Exception { + String resName=""; + String entityId = (enableLowercase) ? entity.getEntityId().toLowerCase() : entity.getEntityId(); + String entityType = (enableLowercase) ? entity.getEntityType().toLowerCase() : entity.getEntityType(); + if ("v2".equals(ngsiVersion)){ + + } + else if ("ld".equals(ngsiVersion)) { + switch (dataModel) { + case "db-by-entity-id": + //FIXME + //note that if we enable encode() and/or encodeCKAN() in this datamodel we could have problems, although it need to be analyzed in deep + resName = entityId; + break; + case "db-by-entity": + if (enableEncoding) { + resName = NGSICharsets.encodeCKAN(entityId)+"_"+NGSICharsets.encodeCKAN(entityType); + } else { + resName = NGSICharsets.encode(entityId, false, true).toLowerCase(Locale.ENGLISH)+"_"+NGSICharsets.encode(entityType,false,true); + } // if else + + if (resName.length() > NGSIConstants.CKAN_MAX_NAME_LEN) { + throw new Exception("Building resource name '" + resName + "' and its length is " + + "greater than " + NGSIConstants.CKAN_MAX_NAME_LEN); + } else if (resName.length() < NGSIConstants.CKAN_MIN_NAME_LEN) { + throw new Exception("Building resource name '" + resName + "' and its length is " + + "lower than " + NGSIConstants.CKAN_MIN_NAME_LEN); + } // if else if + break; + default: + throw new Exception("Not supported Data Model for CKAN Sink: " + dataModel); + } + } + return resName; + } // buildResName + + + public boolean isValid(String test) { + try { + new org.json.JSONObject(test); + } catch (JSONException ex) { + try { + new org.json.JSONArray(test); + } catch (JSONException ex1) { + return false; + } + } + return true; + } + } diff --git a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 1048d0cb..b2afcd46 100644 --- a/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-ngsi-bundle/nifi-ngsi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,4 +16,5 @@ org.apache.nifi.processors.ngsi.NGSIToMySQL org.apache.nifi.processors.ngsi.NGSIToPostgreSQL org.apache.nifi.processors.ngsi.NGSIToMongo org.apache.nifi.processors.ngsi.NGSIToHDFS -org.apache.nifi.processors.ngsi.NGSIToCarto \ No newline at end of file +org.apache.nifi.processors.ngsi.NGSIToCarto +org.apache.nifi.processors.ngsi.NGSIToCKAN \ No newline at end of file diff --git a/nifi-ngsi-resources/docker-compose.yml b/nifi-ngsi-resources/docker-compose.yml index 4f3112b9..e75b7e72 100644 --- a/nifi-ngsi-resources/docker-compose.yml +++ b/nifi-ngsi-resources/docker-compose.yml @@ -1,13 +1,13 @@ version: '3.1' services: - # draco: - # image: ging/fiware-draco:1.3.6 - # container_name: draco - # environment: - # - NIFI_WEB_HTTP_PORT=9090 - # ports: - # - "9090:9090" - # - "5050:5050" + draco: + image: ging/fiware-draco:1.3.6 + container_name: draco + environment: + - NIFI_WEB_HTTP_PORT=9090 + ports: + - "9090:9090" + - "5050:5050" # db-mysql: # image: mysql:5.7.22 # container_name: mysql