Skip to content

Commit

Permalink
Merge pull request #2049 from telefonicaid/fix/sth_sink_aggregation
Browse files Browse the repository at this point in the history
Persist STH aggregation with just one event
  • Loading branch information
fgalan authored May 31, 2021
2 parents 9ef6db1 + 3c3b661 commit d11fe71
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[cygnus-ngsi][STHSink] Fix aggregation of events with different entityId (#2048)
[cygnus-common][Mongo] Check mongo uri format (#2046)
[cygnus-ngsi][NGSIRestHandler] ngsiv2 initial notification not includes al list of subservices in servicePath header when is / (#2042)
[cygnus-ngsi][MongoSink] Check access to element aggregation before cast to Date mongo type (#2038)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,17 @@ public void persistBatch(NGSIBatch batch) throws CygnusBadConfiguration, CygnusP
// Get the events within the current sub-batch
ArrayList<NGSIEvent> events = batch.getNextEvents();

// Get an aggregator for this destination and initialize it
STHAggregator aggregator = new STHAggregator();
aggregator.initialize(events.get(0));

// Iterate on the events within the sub-batch and aggregate them
for (NGSIEvent event : events) {
// Current STH Aggregator does not allow more than one event
// but an STH aggregation is big enough to be persited each time
STHAggregator aggregator = new STHAggregator();
aggregator.initialize(event);
aggregator.aggregate(event);
// Persist the aggregation
aggregator.persist(this.getName());
} // for

// Persist the aggregation
aggregator.persist(this.getName());

// Set the sub-batch as persisted
batch.setNextPersisted(true);
} // for
Expand Down

0 comments on commit d11fe71

Please sign in to comment.