Skip to content

Commit

Permalink
Merge pull request #599 from telefonicaid/bug/598_extra_fields_not_in…
Browse files Browse the repository at this point in the history
…_hive

ADD the extra fields to the Hive table creation commands
  • Loading branch information
Fermín Galán Márquez committed Nov 4, 2015
2 parents e32bffa + 7fc1424 commit e969b5d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
- [FEATURE] Allow enabling/disabling Hive in OrionHDFSSink (#555)
- [HARDENING] Explain the batching mechanism in the performance document (#564)
- [HARDENING] Fix the documentation regarding the installation from sources (#596)
- [BUG] Add extra fields (servicePath in row-like modes, servicePath, entityId and entityType in column-like modes) to Hive tables (#598)
44 changes: 26 additions & 18 deletions src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,14 @@ private class JSONRowAggregator extends HDFSAggregator {
@Override
public void initialize(CygnusEvent cygnusEvent) throws Exception {
super.initialize(cygnusEvent);
hiveFields = Constants.RECV_TIME_TS + " bigint, "
+ Constants.RECV_TIME + " string, "
+ Constants.ENTITY_ID + " string, "
+ Constants.ENTITY_TYPE + " string, "
+ Constants.ATTR_NAME + " string, "
+ Constants.ATTR_TYPE + " string, "
+ Constants.ATTR_VALUE + " string, "
hiveFields = Constants.RECV_TIME_TS + " bigint,"
+ Constants.RECV_TIME + " string,"
+ Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + " string,"
+ Constants.ENTITY_ID + " string,"
+ Constants.ENTITY_TYPE + " string,"
+ Constants.ATTR_NAME + " string,"
+ Constants.ATTR_TYPE + " string,"
+ Constants.ATTR_VALUE + " string,"
+ Constants.ATTR_MD + " array<struct<name:string,type:string,value:string>>";
} // initialize

Expand Down Expand Up @@ -553,7 +554,7 @@ public void aggregate(CygnusEvent cygnusEvent) throws Exception {
String line = "{"
+ "\"" + Constants.RECV_TIME_TS + "\":\"" + recvTimeTs / 1000 + "\","
+ "\"" + Constants.RECV_TIME + "\":\"" + recvTime + "\","
+ "\"" + Constants.HEADER_NOTIFIED_SERVICE_PATH + "\":\"" + servicePath + "\","
+ "\"" + Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + "\":\"" + servicePath + "\","
+ "\"" + Constants.ENTITY_ID + "\":\"" + entityId + "\","
+ "\"" + Constants.ENTITY_TYPE + "\":\"" + entityType + "\","
+ "\"" + Constants.ATTR_NAME + "\":\"" + attrName + "\","
Expand Down Expand Up @@ -582,7 +583,10 @@ public void initialize(CygnusEvent cygnusEvent) throws Exception {
super.initialize(cygnusEvent);

// particular initialization
hiveFields = Constants.RECV_TIME + " string";
hiveFields = Constants.RECV_TIME + " string,"
+ Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + " string,"
+ Constants.ENTITY_ID + " string,"
+ Constants.ENTITY_TYPE + " string";

// iterate on all this context element attributes, if there are attributes
ArrayList<ContextAttribute> contextAttributes = cygnusEvent.getContextElement().getAttributes();
Expand Down Expand Up @@ -621,7 +625,7 @@ public void aggregate(CygnusEvent cygnusEvent) throws Exception {
} // if

String line = "{\"" + Constants.RECV_TIME + "\":\"" + recvTime + "\","
+ "\"" + Constants.HEADER_NOTIFIED_SERVICE_PATH + "\":\"" + servicePath + "\","
+ "\"" + Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + "\":\"" + servicePath + "\","
+ "\"" + Constants.ENTITY_ID + "\":\"" + entityId + "\","
+ "\"" + Constants.ENTITY_TYPE + "\":\"" + entityType + "\"";

Expand Down Expand Up @@ -655,13 +659,14 @@ private class CSVRowAggregator extends HDFSAggregator {
@Override
public void initialize(CygnusEvent cygnusEvent) throws Exception {
super.initialize(cygnusEvent);
hiveFields = Constants.RECV_TIME_TS + " bigint, "
+ Constants.RECV_TIME + " string, "
+ Constants.ENTITY_ID + " string, "
+ Constants.ENTITY_TYPE + " string, "
+ Constants.ATTR_NAME + " string, "
+ Constants.ATTR_TYPE + " string, "
+ Constants.ATTR_VALUE + " string, "
hiveFields = Constants.RECV_TIME_TS + " bigint,"
+ Constants.RECV_TIME + " string,"
+ Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + " string,"
+ Constants.ENTITY_ID + " string,"
+ Constants.ENTITY_TYPE + " string,"
+ Constants.ATTR_NAME + " string,"
+ Constants.ATTR_TYPE + " string,"
+ Constants.ATTR_VALUE + " string,"
+ Constants.ATTR_MD_FILE + " string";
} // initialize

Expand Down Expand Up @@ -773,7 +778,10 @@ public void initialize(CygnusEvent cygnusEvent) throws Exception {
super.initialize(cygnusEvent);

// particular initialization
hiveFields = Constants.RECV_TIME + " string";
hiveFields = Constants.RECV_TIME + " string,"
+ Constants.HEADER_NOTIFIED_SERVICE_PATH.replaceAll("-", "") + " string,"
+ Constants.ENTITY_ID + " string,"
+ Constants.ENTITY_TYPE + " string";

// iterate on all this context element attributes; it is supposed all the entity's attributes are notified
ArrayList<ContextAttribute> contextAttributes = cygnusEvent.getContextElement().getAttributes();
Expand Down

0 comments on commit e969b5d

Please sign in to comment.