Skip to content

Commit

Permalink
Merged with develop branch
Browse files Browse the repository at this point in the history
  • Loading branch information
xdelox committed Nov 4, 2015
2 parents be9bfe8 + e969b5d commit 7247499
Show file tree
Hide file tree
Showing 93 changed files with 6,709 additions and 1,835 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ target/
*.idea
*.iml
nbactions.xml

maven-deps/

conf/agent_0.conf
19 changes: 18 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,18 @@
- [FEATURE] Column-like persistence in OrionMongoSink (#548)
- [HARDENING] Cygnus user creation added to the from the sources installation guide (#558)
- [FEATURE] Add a binary implementation of the HDFS backend (#537)
- [HARDENING] Add docker as an option in the README for installing Cygnus (#552)
- [FEATURE] Add support to batch processing within a transaction in OrionHDFSSink (#554)
- [FEATURE] Add support to batch processing within a transaction in OrionMySQLSink (#568)
- [BUG] Add support to CKAN 2.4 API changes (#546)
- [BUG] Remove final line feed in the HDFS aggregation (#575)
- [FEATURE] Add some interesting fields to HDFS persisted data (servicePath in row-like modes, servicePath, entityId and entityType in column-like modes), including migration script (#544)
- [HARDENING] Document how to choose the hadoop-core dependency for compilation purposes (#559)
- [HARDENING] Add a batching timeout and accumulate the transaction IDs for right tracing (#562)
- [BUG] Ask for CKAN resources when populating a package cache after calling organization_show (#589)
- [FEATURE] Add some interesting fields to MySQL persisted data (servicePath in row-like modes, servicePath, entityId and entityType in column-like modes), including migration script (#501)
- [BUG] Fix the CKAN resource name building, removing references to old default destination (#592)
- [FEATURE] Allow enabling/disabling Hive in OrionHDFSSink (#555)
- [HARDENING] Explain the batching mechanism in the performance document (#564)
- [HARDENING] Fix the documentation regarding the installation from sources (#596)
- [BUG] Add extra fields (servicePath in row-like modes, servicePath, entityId and entityType in column-like modes) to Hive tables (#598)
- [FEATURE] Column-like persistence in OrionMongoSink (#548)
181 changes: 137 additions & 44 deletions README.md

Large diffs are not rendered by default.

96 changes: 82 additions & 14 deletions conf/agent.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
# sink of the same type and sharing the channel in order to improve the performance (this is like having
# multi-threading).
cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink mysql-sink ckan-sink
cygnusagent.channels = hdfs-channel mysql-channel ckan-channel
cygnusagent.sinks = hdfs-sink mysql-sink ckan-sink mongo-sink sth-sink kafka-sink
cygnusagent.channels = hdfs-channel mysql-channel ckan-channel mongo-channel sth-channel kafka-channel

#=============================================
# source configuration
# channel name where to write the notification events
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel ckan-channel
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel ckan-channel mongo-channel sth-channel kafka-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
Expand Down Expand Up @@ -64,30 +64,44 @@ cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
# sink class, must not be changed
cygnusagent.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.OrionHDFSSink
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.hdfs-sink.enable_grouping = false
# rest if the interaction with HDFS will be WebHDFS/HttpFS-based, binary if based on the Hadoop API
cygnusagent.sinks.hdfs-sink.backend_impl = rest
# Comma-separated list of FQDN/IP address regarding the HDFS Namenode endpoints
# If you are using Kerberos authentication, then the usage of FQDNs instead of IP addresses is mandatory
cygnusagent.sinks.hdfs-sink.hdfs_host = x1.y1.z1.w1,x2.y2.z2.w2
# port of the HDFS service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
# username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.hdfs_username = hdfs_username
# OAuth2 token
# password for the above username; this is only required for Hive authentication
cygnusagent.sinks.hdfs-sink.hdfs_password = xxxxxxxx
# OAuth2 token for HDFS authentication
cygnusagent.sinks.hdfs-sink.oauth2_token = xxxxxxxx
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# Hive FQDN/IP address of the Hive server
cygnusagent.sinks.hdfs-sink.hive_host = x.y.z.w
# Hive port for Hive external table provisioning
cygnusagent.sinks.hdfs-sink.hive_port = 10000
# how the attributes are stored, available formats are json-row, json-column, csv-row and csv-column
cygnusagent.sinks.hdfs-sink.file_format = json-column
# number of notifications to be included within a processing batch
cygnusagent.sinks.hdfs-sink.batch_size = 100
# timeout for batch accumulation
cygunsagent.sinks.hdfs-sink.batch_timeout = 30
# Hive enabling
cygnusagent.sinks.hdfs-sink.hive = true
# Hive server version, 1 or 2 (ignored if hive is false)
cygnusagent.sinks.hdfs-sink.hive.server_version = 2
# Hive FQDN/IP address of the Hive server (ignored if hive is false)
cygnusagent.sinks.hdfs-sink.hive.host = x.y.z.w
# Hive port for Hive external table provisioning (ignored if hive is false)
cygnusagent.sinks.hdfs-sink.hive.port = 10000
# Kerberos-based authentication enabling
cygnusagent.sinks.hdfs-sink.krb5_auth = false
# Kerberos username
# Kerberos username (ignored if krb5_auth is false)
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_user = krb5_username
# Kerberos password
# Kerberos password (ignored if krb5_auth is false)
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_password = xxxxxxxxxxxxx
# Kerberos login file
# Kerberos login file (ignored if krb5_auth is false)
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_login_conf_file = /usr/cygnus/conf/krb5_login.conf
# Kerberos configuration file
# Kerberos configuration file (ignored if krb5_auth is false)
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.conf

# ============================================
Expand All @@ -96,6 +110,8 @@ cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.con
cygnusagent.sinks.ckan-sink.channel = ckan-channel
# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.ckan-sink.enable_grouping = false
# the CKAN API key to use
cygnusagent.sinks.ckan-sink.api_key = ckanapikey
# the FQDN/IP address for the CKAN API endpoint
Expand All @@ -115,6 +131,8 @@ cygnusagent.sinks.ckan-sink.ssl = false
cygnusagent.sinks.mysql-sink.channel = mysql-channel
# sink class, must not be changed
cygnusagent.sinks.mysql-sink.type = com.telefonica.iot.cygnus.sinks.OrionMySQLSink
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.mysql-sink.enable_grouping = false
# the FQDN/IP address where the MySQL server runs
cygnusagent.sinks.mysql-sink.mysql_host = x.y.z.w
# the port where the MySQL server listes for incomming connections
Expand All @@ -125,13 +143,21 @@ cygnusagent.sinks.mysql-sink.mysql_username = root
cygnusagent.sinks.mysql-sink.mysql_password = xxxxxxxxxxxxx
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mysql-sink.attr_persistence = column
# select the table type from table-by-destination and table-by-service-path
cygnusagent.sinks.mysql-sink.table_type = table-by-destination
# number of notifications to be included within a processing batch
cygnusagent.sinks.mysql-sink.batch_size = 100
# timeout for batch accumulation
cygunsagent.sinks.mysql-sink.batch_timeout = 30

# ============================================
# OrionMongoSink configuration
# sink class, must not be changed
cygnusagent.sinks.mongo-sink.type = com.telefonica.iot.cygnus.sinks.OrionMongoSink
# channel name from where to read notification events
cygnusagent.sinks.mongo-sink.channel = mongo-channel
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.mongo-sink.enable_grouping = false
# FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run
cygnusagent.sinks.mongo-sink.mongo_hosts = x1.y1.z1.w1:port1,x2.y2.z2.w2:port2,...
# a valid user in the MongoDB server (or empty if authentication is not enabled in MongoDB)
Expand All @@ -155,6 +181,8 @@ cygnusagent.sinks.mongo-sink.attr_persistence = column
cygnusagent.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.OrionSTHSink
# channel name from where to read notification events
cygnusagent.sinks.sth-sink.channel = sth-channel
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.sth-sink.enable_grouping = false
# FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run
cygnusagent.sinks.sth-sink.mongo_hosts = x1.y1.z1.w1:port1,x2.y2.z2.w2:port2,...
# a valid user in the MongoDB server (or empty if authentication is not enabled in MongoDB)
Expand All @@ -168,6 +196,19 @@ cygnusagent.sinks.sth-sink.collection_prefix = sth_
# true is collection names are based on a hash, false for human redable collections
cygnusagent.sinks.sth-sink.should_hash = false

#=============================================
# OrionKafkaSink configuration
# sink class, must not be changed
cygnusagent.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.OrionKafkaSink
# channel name from where to read notification events
cygnusagent.sinks.kafka-sink.channel = kafka-channel
# select the Kafka topic type between topic-by-service, topic-by-service-path and topic-by-destination
cygnusagent.sinks.kafka-sink.topic_type = topic-by-destination
# comma-separated list of Kafka brokers (a broker is defined as host:port)
cygnusagent.sinks.kafka-sink.broker_list = x1.y1.z1.w1:port1,x2.y2.z2.w2:port2,...
# Zookeeper endpoint needed to create Kafka topics, in the form of host:port
cygnusagent.sinks.kafka-sink.zookeeper_endpoint = x.y.z.w:port

#=============================================
# hdfs-channel configuration
# channel type (must not be changed)
Expand All @@ -194,3 +235,30 @@ cygnusagent.channels.mysql-channel.type = memory
cygnusagent.channels.mysql-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mysql-channel.transactionCapacity = 100

#=============================================
# mongo-channel configuration
# channel type (must not be changed)
cygnusagent.channels.mongo-channel.type = memory
# capacity of the channel
cygnusagent.channels.mongo-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mongo-channel.transactionCapacity = 100

#=============================================
# sth-channel configuration
# channel type (must not be changed)
cygnusagent.channels.sth-channel.type = memory
# capacity of the channel
cygnusagent.channels.sth-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.sth-channel.transactionCapacity = 100

#=============================================
# kafka-channel configuration
# channel type (must not be changed)
cygnusagent.channels.kafka-channel.type = memory
# capacity of the channel
cygnusagent.channels.kafka-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mkafka-channel.transactionCapacity = 100
27 changes: 20 additions & 7 deletions doc/design/OrionCKANSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Independently of the data generator, NGSI context data is always [transformed](f
4. The context attributes within each context response/entity are iterated, and a new data row is upserted in the datastore related to the resource. The format for this append depends on the configured persistence mode:
* `row`: A data row is upserted for each notified context attribute. This kind of row will always contain 8 fields:
* `recvTimeTs`: UTC timestamp expressed in miliseconds.
* `recvTime`: UTC timestamp in human-redable format ([ISO 6801](http://en.wikipedia.org/wiki/ISO_8601)).
* `recvTime`: UTC timestamp in human-redable format ([ISO 8601](http://en.wikipedia.org/wiki/ISO_8601)).
* `entityId`: Notified entity identifier.
* `entityType`: Notified entity type.
* `attrName`: Notified attribute name.
Expand All @@ -50,12 +50,15 @@ Assuming the following Flume event is created from a notified NGSI context data
flume-event={
headers={
content-type=application/json,
fiware-service=vehicles,
fiware-servicepath=4wheels,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
ttl=10,
destination=car1_car
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
ttl=10,
notified-service=vehicles,
notified-servicepath=4wheels,
default-destination=car1_car
default-servicepaths=4wheels
grouped-destination=car1_car
grouped-servicepath=4wheels
},
body={
entityId=car1,
Expand Down Expand Up @@ -96,6 +99,14 @@ Assuming `api_key=myapikey` and `attr_persistence=row` as configuration paramete
"type": "timestamp",
"id": "recvTime"
},
{
"id": "entityId",
"type": "text"
},
{
"id": "entityType",
"type": "text"
},
{
"type": "text",
"id": "attrName"
Expand Down Expand Up @@ -204,6 +215,7 @@ NOTE: `curl` is a Unix command allowing for interacting with REST APIs such as t
|---|---|---|---|
| type | yes | N/A | Must be <i>com.telefonica.iot.cygnus.sinks.OrionCKANSink</i> |
| channel | yes | N/A |
| enable_grouping | no | false | <i>true</i> or <i>false</i> |
| ckan_host | no | localhost | FQDN/IP address where the CKAN server runs |
| ckan_port | no | 80 |
| ssl | no | false |
Expand All @@ -218,6 +230,7 @@ A configuration example could be:
...
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
cygnusagent.sinks.ckan-sink.channel = ckan-channel
cygnusagent.sinks.ckan-sink.enable_grouping = false
cygnusagent.sinks.ckan-sink.ckan_host = 192.168.80.34
cygnusagent.sinks.ckan-sink.ckan_port = 80
cygnusagent.sinks.ckan-sink.ssl = false
Expand Down
Loading

0 comments on commit 7247499

Please sign in to comment.