diff --git a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java index 591ddd36..c7a0812b 100644 --- a/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java +++ b/writer/src/main/java/com/opendatahub/timeseries/bdp/writer/dal/MeasurementAbstractHistory.java @@ -197,6 +197,7 @@ public static void pushRecords(EntityManager em, String stationType, DataMapDto< log.warn("Empty data set. Skipping..."); continue; } + dataRecords.sort((l, r) -> Long.compare(l.getTimestamp(), r.getTimestamp())); // Some datacollectors write multiple periods in a single call. // They need to be handled as if they were separate datatypes, each with their @@ -300,23 +301,25 @@ private static class Period { private class TimeSeries { private MeasurementAbstract latest; - private long latestTime; + private long newestTime; private RecordDtoImpl newest; public TimeSeries(EntityManager em, Class clazz) { latest = MeasurementAbstract.findLatestEntry(em, station, type, period, clazz); - latestTime = (latest != null) ? latest.getTimestamp().getTime() : 0; + newestTime = (latest != null) ? latest.getTimestamp().getTime() : 0; newest = null; } private void updateNewest(RecordDtoImpl dto) { if (newest == null || newest.getTimestamp() < dto.getTimestamp()) { newest = dto; + newestTime = newest.getTimestamp(); } } public void addHistory(EntityManager em, Log log, SimpleRecordDto dto, MeasurementAbstractHistory rec) { - if (latestTime < dto.getTimestamp()) { + // In case of duplicates within a single push, which one is written and which one is discarded, is undefined (depends on the record sorting above) + if (newestTime < dto.getTimestamp()) { rec.setProvenance(provenance); em.persist(rec); updateNewest(dto); @@ -332,7 +335,7 @@ public void updateLatest(EntityManager em, Function latestTime) { + } else if (newest.getTimestamp() > latest.getTimestamp().getTime()) { latest.setTimestamp(new Date(newest.getTimestamp())); latest.setValue(measurement.getValue()); latest.setProvenance(provenance);