Skip to content

Commit

Permalink
Merge pull request #1416 from sabrine2020/master
Browse files Browse the repository at this point in the history
fature/1156_add_metadata_storing_in_mongosink
  • Loading branch information
Francisco Romero Bueno authored Mar 28, 2017
2 parents 07b3820 + a5b887f commit 13c693d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
- [cygnus-twitter][bug] Fix wrong data in spec file (#1407)
- [cygnus-ngsi][bug] Fix NGSIMySQLSink when persisting bulks of rows with different number of columns (#1409)
- [cygnus-ngsi][feature] Add metadata storing in NGSIMongoSink (#1156)
2 changes: 2 additions & 0 deletions cygnus-ngsi/conf/agent_ngsi.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ cygnus-ngsi.sinks.mongo-sink.channel = mongo-channel
#cygnus-ngsi.sinks.mongo-sink.data_model = dm-by-entity
# how the attributes are stored, either per row either per column (row, column)
#cygnus-ngsi.sinks.mongo-sink.attr_persistence = row
# true if the metadata will be stored, false by default
#cygnus-ngsi.sinks.mongo-sink.attr_metadata_store = false
# number of notifications to be included within a processing batch
#cygnus-ngsi.sinks.mongo-sink.batch_size = 100
# timeout for batch accumulation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.telefonica.iot.cygnus.sinks;

import com.mongodb.DBObject;
import com.mongodb.util.JSON;
import com.telefonica.iot.cygnus.containers.NotifyContextRequest.ContextAttribute;
import com.telefonica.iot.cygnus.containers.NotifyContextRequest.ContextElement;
import com.telefonica.iot.cygnus.errors.CygnusBadConfiguration;
Expand Down Expand Up @@ -44,7 +46,7 @@ public class NGSIMongoSink extends NGSIMongoBaseSink {
private long maxDocuments;

private boolean rowAttrPersistence;

private String attrMetadataStore;
/**
* Constructor.
*/
Expand Down Expand Up @@ -83,6 +85,17 @@ public void configure(Context context) {
+ attrPersistenceStr + ") must be 'row' or 'column'");
} // if else

attrMetadataStore = context.getString("attr_metadata_store", "false");

if (attrMetadataStore.equals("true") || attrMetadataStore.equals("false")) {
LOGGER.debug("[" + this.getName() + "] Reading configuration (attr_metadata_store="
+ attrMetadataStore + ")");
} else {
invalidConfiguration = true;
LOGGER.debug("[" + this.getName() + "] Invalid configuration (attr_metadata_store="
+ attrMetadataStore + ") must be 'true' or 'false'");
} // if else

super.configure(context);
} // configure

Expand Down Expand Up @@ -235,11 +248,50 @@ public void aggregate(NGSIEvent cygnusEvent) {

LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type="
+ attrType + ")");
Document doc = createDoc(recvTimeTs, entityId, entityType, attrName, attrType, attrValue);

Document doc;

if(attrMetadataStore.equals("true")){
doc = createDocWithMetadata(recvTimeTs, entityId, entityType, attrName, attrType, attrValue, attrMetadata);
} else {
doc = createDoc(recvTimeTs, entityId, entityType, attrName, attrType, attrValue);
} // if else

aggregation.add(doc);
} // for
} // aggregate

private Document createDocWithMetadata(Long recvTimeTs, String entityId, String entityType, String attrName,
String attrType, String attrValue, String attrMetadata) {
Document doc = new Document("recvTime", new Date(recvTimeTs));

switch (dataModel) {
case DMBYSERVICEPATH:
doc.append("entityId", entityId)
.append("entityType", entityType)
.append("attrName", attrName)
.append("attrType", attrType)
.append("attrValue", attrValue)
.append("attrMetadata", (DBObject)JSON.parse(attrMetadata));
break;
case DMBYENTITY:
doc.append("attrName", attrName)
.append("attrType", attrType)
.append("attrValue", attrValue)
.append("attrMetadata", (DBObject)JSON.parse(attrMetadata));
break;
case DMBYATTRIBUTE:
doc.append("attrType", attrType)
.append("attrValue", attrValue)
.append("attrMetadata", (DBObject)JSON.parse(attrMetadata));
break;
default:
return null; // this will never be reached
} // switch

return doc;
} // createDocWithMetadata

private Document createDoc(long recvTimeTs, String entityId, String entityType, String attrName,
String attrType, String attrValue) {
Document doc = new Document("recvTime", new Date(recvTimeTs));
Expand Down Expand Up @@ -367,7 +419,7 @@ private void persistAggregation(MongoDBAggregator aggregator) throws CygnusPersi
backend.insertContextDataRaw(dbName, collectionName, aggregation);
} catch (Exception e) {
throw new CygnusPersistenceError("-, " + e.getMessage());
}
} // try catch
} // persistAggregation

} // NGSIMongoSink
4 changes: 4 additions & 0 deletions doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,20 @@ Regarding the specific data stored within the above collections, if `attr_persis
* `attrName`: Notified attribute name.
* `attrType`: Notified attribute type.
* `attrValue`: In its simplest form, this value is just a string, but since Orion 0.11.0 it can be Json object or Json array.
* `attrMetadata`: will be stored only if it was configured to (attr_metadata_store set to true in the configuration file ngsi_agent.conf). It is a Json object.
* Data model by entity:
* `recvTimeTs`: UTC timestamp expressed in miliseconds.
* `recvTime`: UTC timestamp in human-readable format ([ISO 8601](http://en.wikipedia.org/wiki/ISO_8601)).
* `attrName`: Notified attribute name.
* `attrType`: Notified attribute type.
* `attrValue`: In its simplest form, this value is just a string, but since Orion 0.11.0 it can be Json object or Json array.
* `attrMetadata`: will be stored only if it was configured to (attr_metadata_store set to true in the configuration file ngsi_agent.conf). It is a Json object.
* Data model by attribute:
* `recvTimeTs`: UTC timestamp expressed in miliseconds.
* `recvTime`: UTC timestamp in human-readable format ([ISO 8601](http://en.wikipedia.org/wiki/ISO_8601)).
* `attrType`: Notified attribute type.
* `attrValue`: In its simplest form, this value is just a string, but since Orion 0.11.0 it can be Json object or Json array.
* `attrMetadata`: will be stored only if it was configured to (attr_metadata_store set to true in the configuration file ngsi_agent.conf). It is a Json object.

[Top](#top)

Expand Down Expand Up @@ -298,6 +301,7 @@ If `data_model=dm-by-entity` and `attr_persistence=column` then `NGSIMongoSink`
| enable\_lowercase | no | false | <i>true</i> or <i>false</i>. |
| data\_model | no | dm-by-entity | <i>dm-by-service-path</i>, <i>dm-by-entity</i> or <dm-by-attribute</i>. <i>dm-by-service</i> is not currently supported. |
| attr\_persistence | no | row | <i>row</i> or <i>column</i>. |
| attr\_metadata\_store | no | false | <i>true</i> or <i>false</i>. |
| mongo\_hosts | no | localhost:27017 | FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run. |
| mongo\_username | no | <i>empty</i> | If empty, no authentication is done. |
| mongo\_password | no | <i>empty</i> | If empty, no authentication is done. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ cygnus-ngsi.sinks.mongo-sink.channel = mongo-channel
#cygnus-ngsi.sinks.mongo-sink.data_model = dm-by-entity
# how the attributes are stored, either per row either per column (row, column)
#cygnus-ngsi.sinks.mongo-sink.attr_persistence = column
# true if the attribute metadata will be stored, false by default
#cygnus-ngsi.sinks.mongo-sink.attr_metadata_store = false
# number of notifications to be included within a processing batch
#cygnus-ngsi.sinks.mongo-sink.batch_size = 100
# timeout for batch accumulation
Expand Down

0 comments on commit 13c693d

Please sign in to comment.