From 144a346cb0530c78f6a069a5f2547f2a00f059ad Mon Sep 17 00:00:00 2001 From: Murali Shankar Date: Tue, 28 Jan 2025 18:25:37 -0800 Subject: [PATCH 1/2] Retrieval returns zero samples for slowly changing PV #323 --- .../PlainPB/FileBackedPBEventStream.java | 51 +++-- .../PlainPB/DataAroundPartitionEndTest.java | 178 ++++++++++++++++++ 2 files changed, 210 insertions(+), 19 deletions(-) create mode 100644 src/test/edu/stanford/slac/archiverappliance/PlainPB/DataAroundPartitionEndTest.java diff --git a/src/main/edu/stanford/slac/archiverappliance/PlainPB/FileBackedPBEventStream.java b/src/main/edu/stanford/slac/archiverappliance/PlainPB/FileBackedPBEventStream.java index f014c80f..dbbdb42b 100644 --- a/src/main/edu/stanford/slac/archiverappliance/PlainPB/FileBackedPBEventStream.java +++ b/src/main/edu/stanford/slac/archiverappliance/PlainPB/FileBackedPBEventStream.java @@ -17,10 +17,10 @@ import org.epics.archiverappliance.EventStream; import org.epics.archiverappliance.common.BasicContext; import org.epics.archiverappliance.common.BiDirectionalIterable; +import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection; import org.epics.archiverappliance.common.EmptyEventIterator; import org.epics.archiverappliance.common.TimeUtils; import org.epics.archiverappliance.common.YearSecondTimestamp; -import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection; import org.epics.archiverappliance.config.ArchDBRTypes; import org.epics.archiverappliance.data.DBRTimeEvent; import org.epics.archiverappliance.etl.ETLBulkStream; @@ -123,16 +123,19 @@ public FileBackedPBEventStream( // We use a search to locate the boundaries of the data and the constrain based on position. try { seekToTimes(path, dbrtype, startTime, endTime); - } catch(IOException ex) { - logger.error("Exception seeking to time in file " + path.toAbsolutePath().toString() + ". Defaulting to linear search; this will impact performance.", ex); + } catch (IOException ex) { + logger.error( + "Exception seeking to time in file " + + path.toAbsolutePath().toString() + + ". Defaulting to linear search; this will impact performance.", + ex); this.positionBoundaries = false; this.startTime = startTime; - this.endTime = endTime; + this.endTime = endTime; } } } - /** * Used for unlimited iteration. * We specify a time to start the iteration at and a direction. @@ -143,7 +146,11 @@ public FileBackedPBEventStream( * @throws IOException   */ public FileBackedPBEventStream( - String pvname, Path path, ArchDBRTypes dbrtype, Instant startAtTime, BiDirectionalIterable.IterationDirection direction) + String pvname, + Path path, + ArchDBRTypes dbrtype, + Instant startAtTime, + BiDirectionalIterable.IterationDirection direction) throws IOException { this.pvName = pvname; this.path = path; @@ -152,19 +159,18 @@ public FileBackedPBEventStream( this.startTime = startAtTime; this.endTime = startAtTime; this.readPayLoadInfo(); - if(direction == IterationDirection.FORWARDS) { + if (direction == IterationDirection.FORWARDS) { this.startFilePos = this.seekToStartTime(path, dbrtype, startAtTime); this.endFilePos = Files.size(path); } else { this.startFilePos = this.fileInfo.positionOfFirstSample; this.endFilePos = this.seekToEndTime(path, dbrtype, startAtTime); - if(this.endFilePos <=0) { + if (this.endFilePos <= 0) { this.endFilePos = Files.size(path); } } } - @Override public Iterator iterator() { try { @@ -181,21 +187,25 @@ public Iterator iterator() { readPayLoadInfo(); } - if(this.direction != null) { - if(this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) { - // If I am going backwards and the first event in this file is after the startAtTime, we don't have any data in this file for the iteration - if(fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) { + if (this.direction != null) { + if (this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) { + // If I am going backwards and the first event in this file is after the startAtTime, we don't have + // any data in this file for the iteration + if (fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) { logger.info("Returning an empty iterator as the time in file is after endtime"); return new EmptyEventIterator(); } - theIterator = new PBEventStreamPositionBasedReverseIterator(path, startFilePos, endFilePos, desc.getYear(), type); + theIterator = new PBEventStreamPositionBasedReverseIterator( + path, startFilePos, endFilePos, desc.getYear(), type); } else { - // If I am going forwards and the last event in the file is before the startAtTime, we don't have any data in this file for the iteration - if(fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) { + // If I am going forwards and the last event in the file is before the startAtTime, we don't have + // any data in this file for the iteration + if (fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) { logger.info("Returning an empty iterator as the time in file is before starttime"); return new EmptyEventIterator(); } - theIterator = new FileBackedPBEventStreamPositionBasedIterator(path, startFilePos, endFilePos, desc.getYear(), type); + theIterator = new FileBackedPBEventStreamPositionBasedIterator( + path, startFilePos, endFilePos, desc.getYear(), type); } return theIterator; } @@ -293,8 +303,11 @@ private void seekToTimes(Path path, ArchDBRTypes dbrtype, Instant queryStartTime YearSecondTimestamp firstSampleEpoch = (fileInfo.getFirstEvent()).getYearSecondTimestamp(); YearSecondTimestamp lastSampleEpoch = (fileInfo.getLastEvent()).getYearSecondTimestamp(); - if (queryEndEpoch.compareTo(firstSampleEpoch) < 0 || queryStartEpoch.compareTo(lastSampleEpoch) > 0) { - logger.debug("Case 1 - this file should not be included in request"); + if (queryEndEpoch.compareTo(firstSampleEpoch) < 0) { + logger.debug( + "Case 1 - this file should not be included in request {} {}", + (queryEndEpoch.compareTo(firstSampleEpoch) < 0), + (queryStartEpoch.compareTo(lastSampleEpoch) > 0)); this.positionBoundaries = false; this.startTime = queryStartTime; this.endTime = queryEndTime; diff --git a/src/test/edu/stanford/slac/archiverappliance/PlainPB/DataAroundPartitionEndTest.java b/src/test/edu/stanford/slac/archiverappliance/PlainPB/DataAroundPartitionEndTest.java new file mode 100644 index 00000000..9b1ac18a --- /dev/null +++ b/src/test/edu/stanford/slac/archiverappliance/PlainPB/DataAroundPartitionEndTest.java @@ -0,0 +1,178 @@ +package edu.stanford.slac.archiverappliance.PlainPB; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.epics.archiverappliance.Event; +import org.epics.archiverappliance.EventStream; +import org.epics.archiverappliance.common.BasicContext; +import org.epics.archiverappliance.common.POJOEvent; +import org.epics.archiverappliance.common.PartitionGranularity; +import org.epics.archiverappliance.common.TimeUtils; +import org.epics.archiverappliance.config.ArchDBRTypes; +import org.epics.archiverappliance.config.ConfigService; +import org.epics.archiverappliance.config.ConfigServiceForTests; +import org.epics.archiverappliance.config.StoragePluginURLParser; +import org.epics.archiverappliance.config.exception.ConfigException; +import org.epics.archiverappliance.data.ScalarValue; +import org.epics.archiverappliance.engine.membuf.ArrayListEventStream; +import org.epics.archiverappliance.retrieval.RemotableEventStreamDesc; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.concurrent.Callable; + +import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileExtension; +import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileSuffix; + +/** + * Test data retrieval around the partition boundaries. + * We use daily partitions; create data for two days ( 1Hz ) but leave large gaps in between. + * We then ask for minutely data and make sure we always get data when the start time is >= the timestamp of the very first sample. + * @author mshankar + * + */ +public class DataAroundPartitionEndTest { + private static final Logger logger = LogManager.getLogger(DataAroundPartitionEndTest.class.getName()); + private static final File testFolder = + new File(ConfigServiceForTests.getDefaultPBTestFolder() + File.separator + "DataAroundPartitionEndTest"); + private static final String pvName = "DataAroundPartitionEndTest"; + private static final ConfigService configService; + + static { + try { + configService = new ConfigServiceForTests(-1); + } catch (ConfigException e) { + throw new RuntimeException(e); + } + } + + ArchDBRTypes dbrType = ArchDBRTypes.DBR_SCALAR_DOUBLE; + + private static final short dataYear = (short) (TimeUtils.getCurrentYear() - 1); + private static Instant generatedEndDate = Instant.parse(dataYear + "-06-01T00:00:00.00Z"); + private static Instant generatedStartDate = generatedEndDate.minus(2, ChronoUnit.DAYS); + + private static final Path pbFilePath = Paths.get( + testFolder.getAbsolutePath(), + pvName.replace(":", "/").replace("--", "") + ":" + dataYear + pbFileExtension); + + private static void generateData() throws Exception { + logger.info("Generating data info to " + pbFilePath); + + PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin( + pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath() + + "&partitionGranularity=PARTITION_DAY", + DataAroundPartitionEndTest.configService); + ArrayListEventStream strm = new ArrayListEventStream( + PartitionGranularity.PARTITION_DAY.getApproxSecondsPerChunk() * 3, + new RemotableEventStreamDesc(ArchDBRTypes.DBR_SCALAR_DOUBLE, pvName, dataYear)); + + Instant ts = generatedStartDate; + int sampleCount = 1; + while (ts.isBefore(generatedEndDate)) { + if (sampleCount % 3600 == 0) { + sampleCount = 1; + ts = ts.plus(1, ChronoUnit.DAYS); + } else { + ts = ts.plus(1, ChronoUnit.SECONDS); + } + + strm.add(new POJOEvent( + ArchDBRTypes.DBR_SCALAR_DOUBLE, ts, new ScalarValue<>((double) ts.getEpochSecond()), 0, 0)); + sampleCount++; + logger.debug("Inserting data at {}", TimeUtils.convertToHumanReadableString(ts)); + } + + try (BasicContext context = new BasicContext()) { + assert storagePlugin != null; + storagePlugin.appendData(context, pvName, strm); + } + } + + @Test + public void checkLowerLevelRetrieval() throws Exception { + Path pbFilePath = Paths.get( + testFolder.getAbsolutePath(), + pvName.replace(":", "/").replace("--", "") + ":" + dataYear + "_05_30.pb"); + + try (BasicContext context = new BasicContext()) { + EventStream strm = FileStreamCreator.getTimeStream( + pvName, + pbFilePath, + ArchDBRTypes.DBR_SCALAR_DOUBLE, + Instant.parse(dataYear + "-05-30T23:59:00.00Z"), + Instant.parse(dataYear + "-06-01T00:00:00.00Z"), + false); + int totalEvents = 0; + for (Event ev : strm) { + logger.debug( + "Got lower level data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp())); + totalEvents++; + } + Assertions.assertTrue(totalEvents > 0, "Expected at least one event, got 0 events"); + } + } + + @Test + public void checkRetrieval() throws Exception { + PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin( + pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath() + + "&partitionGranularity=PARTITION_DAY", + DataAroundPartitionEndTest.configService); + try (BasicContext context = new BasicContext()) { + Instant rstart = generatedStartDate; + Instant rend = rstart.plus(1, ChronoUnit.MINUTES); + while (rend.isBefore(generatedEndDate.plus(14, ChronoUnit.DAYS))) { + int totalEvents = 0; + List> cstrms = storagePlugin.getDataForPV(context, pvName, rstart, rend); + for (Callable cstrm : cstrms) { + EventStream st = cstrm.call(); + for (Event ev : st) { + logger.debug("Data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp())); + totalEvents++; + } + } + if (rstart.isAfter(generatedStartDate)) { + Assertions.assertTrue( + totalEvents > 0, + "Did not receive any events for start " + + TimeUtils.convertToHumanReadableString(rstart) + + " and " + + TimeUtils.convertToHumanReadableString(rend)); + logger.info("Got " + totalEvents + " total events for " + + TimeUtils.convertToHumanReadableString(rstart) + + " and " + + TimeUtils.convertToHumanReadableString(rend)); + } + + rstart = rend; + rend = rstart.plus(1, ChronoUnit.MINUTES); + } + } + } + + @BeforeAll + public static void setUp() throws Exception { + try { + FileUtils.deleteDirectory(testFolder); + generateData(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @AfterAll + public static void tearDown() throws Exception { + FileUtils.deleteDirectory(testFolder); + } +} From b1f6999231f884dadd0ca0c9458536e96fbcdce5 Mon Sep 17 00:00:00 2001 From: Murali Shankar Date: Fri, 31 Jan 2025 16:11:14 -0800 Subject: [PATCH 2/2] For queries whose start time is after the last known timestamp in the data, we should expect a Iterator with at least one sample. So not the EmptyIterator --- .../slac/archiverappliance/PlainPB/FileBackedIteratorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/edu/stanford/slac/archiverappliance/PlainPB/FileBackedIteratorTest.java b/src/test/edu/stanford/slac/archiverappliance/PlainPB/FileBackedIteratorTest.java index 672c32b6..bcfedc09 100644 --- a/src/test/edu/stanford/slac/archiverappliance/PlainPB/FileBackedIteratorTest.java +++ b/src/test/edu/stanford/slac/archiverappliance/PlainPB/FileBackedIteratorTest.java @@ -154,7 +154,6 @@ public static Stream provideCorrectIterator() { TimeUtils.plusDays(LKTS, 10), TimeUtils.plusDays(LKTS, 1), TimeUtils.plusDays(LKTS, 10), - EmptyEventIterator.class, mainIteratorClass)); }