Skip to content

Commit

Permalink
Merge pull request #329 from slacmshankar/Issue323
Browse files Browse the repository at this point in the history
Retrieval returns zero samples for slowly changing PV #323
  • Loading branch information
slacmshankar authored Feb 3, 2025
2 parents 61c7442 + b1f6999 commit 668b061
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import org.epics.archiverappliance.EventStream;
import org.epics.archiverappliance.common.BasicContext;
import org.epics.archiverappliance.common.BiDirectionalIterable;
import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection;
import org.epics.archiverappliance.common.EmptyEventIterator;
import org.epics.archiverappliance.common.TimeUtils;
import org.epics.archiverappliance.common.YearSecondTimestamp;
import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection;
import org.epics.archiverappliance.config.ArchDBRTypes;
import org.epics.archiverappliance.data.DBRTimeEvent;
import org.epics.archiverappliance.etl.ETLBulkStream;
Expand Down Expand Up @@ -123,16 +123,19 @@ public FileBackedPBEventStream(
// We use a search to locate the boundaries of the data and the constrain based on position.
try {
seekToTimes(path, dbrtype, startTime, endTime);
} catch(IOException ex) {
logger.error("Exception seeking to time in file " + path.toAbsolutePath().toString() + ". Defaulting to linear search; this will impact performance.", ex);
} catch (IOException ex) {
logger.error(
"Exception seeking to time in file "
+ path.toAbsolutePath().toString()
+ ". Defaulting to linear search; this will impact performance.",
ex);
this.positionBoundaries = false;
this.startTime = startTime;
this.endTime = endTime;
this.endTime = endTime;
}
}
}


/**
* Used for unlimited iteration.
* We specify a time to start the iteration at and a direction.
Expand All @@ -143,7 +146,11 @@ public FileBackedPBEventStream(
* @throws IOException  
*/
public FileBackedPBEventStream(
String pvname, Path path, ArchDBRTypes dbrtype, Instant startAtTime, BiDirectionalIterable.IterationDirection direction)
String pvname,
Path path,
ArchDBRTypes dbrtype,
Instant startAtTime,
BiDirectionalIterable.IterationDirection direction)
throws IOException {
this.pvName = pvname;
this.path = path;
Expand All @@ -152,19 +159,18 @@ public FileBackedPBEventStream(
this.startTime = startAtTime;
this.endTime = startAtTime;
this.readPayLoadInfo();
if(direction == IterationDirection.FORWARDS) {
if (direction == IterationDirection.FORWARDS) {
this.startFilePos = this.seekToStartTime(path, dbrtype, startAtTime);
this.endFilePos = Files.size(path);
} else {
this.startFilePos = this.fileInfo.positionOfFirstSample;
this.endFilePos = this.seekToEndTime(path, dbrtype, startAtTime);
if(this.endFilePos <=0) {
if (this.endFilePos <= 0) {
this.endFilePos = Files.size(path);
}
}
}


@Override
public Iterator<Event> iterator() {
try {
Expand All @@ -181,21 +187,25 @@ public Iterator<Event> iterator() {
readPayLoadInfo();
}

if(this.direction != null) {
if(this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) {
// If I am going backwards and the first event in this file is after the startAtTime, we don't have any data in this file for the iteration
if(fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) {
if (this.direction != null) {
if (this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) {
// If I am going backwards and the first event in this file is after the startAtTime, we don't have
// any data in this file for the iteration
if (fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) {
logger.info("Returning an empty iterator as the time in file is after endtime");
return new EmptyEventIterator();
}
theIterator = new PBEventStreamPositionBasedReverseIterator(path, startFilePos, endFilePos, desc.getYear(), type);
theIterator = new PBEventStreamPositionBasedReverseIterator(
path, startFilePos, endFilePos, desc.getYear(), type);
} else {
// If I am going forwards and the last event in the file is before the startAtTime, we don't have any data in this file for the iteration
if(fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) {
// If I am going forwards and the last event in the file is before the startAtTime, we don't have
// any data in this file for the iteration
if (fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) {
logger.info("Returning an empty iterator as the time in file is before starttime");
return new EmptyEventIterator();
}
theIterator = new FileBackedPBEventStreamPositionBasedIterator(path, startFilePos, endFilePos, desc.getYear(), type);
theIterator = new FileBackedPBEventStreamPositionBasedIterator(
path, startFilePos, endFilePos, desc.getYear(), type);
}
return theIterator;
}
Expand Down Expand Up @@ -293,8 +303,11 @@ private void seekToTimes(Path path, ArchDBRTypes dbrtype, Instant queryStartTime
YearSecondTimestamp firstSampleEpoch = (fileInfo.getFirstEvent()).getYearSecondTimestamp();
YearSecondTimestamp lastSampleEpoch = (fileInfo.getLastEvent()).getYearSecondTimestamp();

if (queryEndEpoch.compareTo(firstSampleEpoch) < 0 || queryStartEpoch.compareTo(lastSampleEpoch) > 0) {
logger.debug("Case 1 - this file should not be included in request");
if (queryEndEpoch.compareTo(firstSampleEpoch) < 0) {
logger.debug(
"Case 1 - this file should not be included in request {} {}",
(queryEndEpoch.compareTo(firstSampleEpoch) < 0),
(queryStartEpoch.compareTo(lastSampleEpoch) > 0));
this.positionBoundaries = false;
this.startTime = queryStartTime;
this.endTime = queryEndTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package edu.stanford.slac.archiverappliance.PlainPB;

import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.epics.archiverappliance.Event;
import org.epics.archiverappliance.EventStream;
import org.epics.archiverappliance.common.BasicContext;
import org.epics.archiverappliance.common.POJOEvent;
import org.epics.archiverappliance.common.PartitionGranularity;
import org.epics.archiverappliance.common.TimeUtils;
import org.epics.archiverappliance.config.ArchDBRTypes;
import org.epics.archiverappliance.config.ConfigService;
import org.epics.archiverappliance.config.ConfigServiceForTests;
import org.epics.archiverappliance.config.StoragePluginURLParser;
import org.epics.archiverappliance.config.exception.ConfigException;
import org.epics.archiverappliance.data.ScalarValue;
import org.epics.archiverappliance.engine.membuf.ArrayListEventStream;
import org.epics.archiverappliance.retrieval.RemotableEventStreamDesc;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.Callable;

import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileExtension;
import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileSuffix;

/**
* Test data retrieval around the partition boundaries.
* We use daily partitions; create data for two days ( 1Hz ) but leave large gaps in between.
* We then ask for minutely data and make sure we always get data when the start time is >= the timestamp of the very first sample.
* @author mshankar
*
*/
public class DataAroundPartitionEndTest {
private static final Logger logger = LogManager.getLogger(DataAroundPartitionEndTest.class.getName());
private static final File testFolder =
new File(ConfigServiceForTests.getDefaultPBTestFolder() + File.separator + "DataAroundPartitionEndTest");
private static final String pvName = "DataAroundPartitionEndTest";
private static final ConfigService configService;

static {
try {
configService = new ConfigServiceForTests(-1);
} catch (ConfigException e) {
throw new RuntimeException(e);
}
}

ArchDBRTypes dbrType = ArchDBRTypes.DBR_SCALAR_DOUBLE;

private static final short dataYear = (short) (TimeUtils.getCurrentYear() - 1);
private static Instant generatedEndDate = Instant.parse(dataYear + "-06-01T00:00:00.00Z");
private static Instant generatedStartDate = generatedEndDate.minus(2, ChronoUnit.DAYS);

private static final Path pbFilePath = Paths.get(
testFolder.getAbsolutePath(),
pvName.replace(":", "/").replace("--", "") + ":" + dataYear + pbFileExtension);

private static void generateData() throws Exception {
logger.info("Generating data info to " + pbFilePath);

PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin(
pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath()
+ "&partitionGranularity=PARTITION_DAY",
DataAroundPartitionEndTest.configService);
ArrayListEventStream strm = new ArrayListEventStream(
PartitionGranularity.PARTITION_DAY.getApproxSecondsPerChunk() * 3,
new RemotableEventStreamDesc(ArchDBRTypes.DBR_SCALAR_DOUBLE, pvName, dataYear));

Instant ts = generatedStartDate;
int sampleCount = 1;
while (ts.isBefore(generatedEndDate)) {
if (sampleCount % 3600 == 0) {
sampleCount = 1;
ts = ts.plus(1, ChronoUnit.DAYS);
} else {
ts = ts.plus(1, ChronoUnit.SECONDS);
}

strm.add(new POJOEvent(
ArchDBRTypes.DBR_SCALAR_DOUBLE, ts, new ScalarValue<>((double) ts.getEpochSecond()), 0, 0));
sampleCount++;
logger.debug("Inserting data at {}", TimeUtils.convertToHumanReadableString(ts));
}

try (BasicContext context = new BasicContext()) {
assert storagePlugin != null;
storagePlugin.appendData(context, pvName, strm);
}
}

@Test
public void checkLowerLevelRetrieval() throws Exception {
Path pbFilePath = Paths.get(
testFolder.getAbsolutePath(),
pvName.replace(":", "/").replace("--", "") + ":" + dataYear + "_05_30.pb");

try (BasicContext context = new BasicContext()) {
EventStream strm = FileStreamCreator.getTimeStream(
pvName,
pbFilePath,
ArchDBRTypes.DBR_SCALAR_DOUBLE,
Instant.parse(dataYear + "-05-30T23:59:00.00Z"),
Instant.parse(dataYear + "-06-01T00:00:00.00Z"),
false);
int totalEvents = 0;
for (Event ev : strm) {
logger.debug(
"Got lower level data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp()));
totalEvents++;
}
Assertions.assertTrue(totalEvents > 0, "Expected at least one event, got 0 events");
}
}

@Test
public void checkRetrieval() throws Exception {
PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin(
pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath()
+ "&partitionGranularity=PARTITION_DAY",
DataAroundPartitionEndTest.configService);
try (BasicContext context = new BasicContext()) {
Instant rstart = generatedStartDate;
Instant rend = rstart.plus(1, ChronoUnit.MINUTES);
while (rend.isBefore(generatedEndDate.plus(14, ChronoUnit.DAYS))) {
int totalEvents = 0;
List<Callable<EventStream>> cstrms = storagePlugin.getDataForPV(context, pvName, rstart, rend);
for (Callable<EventStream> cstrm : cstrms) {
EventStream st = cstrm.call();
for (Event ev : st) {
logger.debug("Data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp()));
totalEvents++;
}
}
if (rstart.isAfter(generatedStartDate)) {
Assertions.assertTrue(
totalEvents > 0,
"Did not receive any events for start "
+ TimeUtils.convertToHumanReadableString(rstart)
+ " and "
+ TimeUtils.convertToHumanReadableString(rend));
logger.info("Got " + totalEvents + " total events for "
+ TimeUtils.convertToHumanReadableString(rstart)
+ " and "
+ TimeUtils.convertToHumanReadableString(rend));
}

rstart = rend;
rend = rstart.plus(1, ChronoUnit.MINUTES);
}
}
}

@BeforeAll
public static void setUp() throws Exception {
try {
FileUtils.deleteDirectory(testFolder);
generateData();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterAll
public static void tearDown() throws Exception {
FileUtils.deleteDirectory(testFolder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public static Stream<Arguments> provideCorrectIterator() {
TimeUtils.plusDays(LKTS, 10),
TimeUtils.plusDays(LKTS, 1),
TimeUtils.plusDays(LKTS, 10),
EmptyEventIterator.class,
mainIteratorClass));
}

Expand Down

0 comments on commit 668b061

Please sign in to comment.