Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieval returns zero samples for slowly changing PV #323 #329

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import org.epics.archiverappliance.EventStream;
import org.epics.archiverappliance.common.BasicContext;
import org.epics.archiverappliance.common.BiDirectionalIterable;
import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection;
import org.epics.archiverappliance.common.EmptyEventIterator;
import org.epics.archiverappliance.common.TimeUtils;
import org.epics.archiverappliance.common.YearSecondTimestamp;
import org.epics.archiverappliance.common.BiDirectionalIterable.IterationDirection;
import org.epics.archiverappliance.config.ArchDBRTypes;
import org.epics.archiverappliance.data.DBRTimeEvent;
import org.epics.archiverappliance.etl.ETLBulkStream;
Expand Down Expand Up @@ -123,16 +123,19 @@ public FileBackedPBEventStream(
// We use a search to locate the boundaries of the data and the constrain based on position.
try {
seekToTimes(path, dbrtype, startTime, endTime);
} catch(IOException ex) {
logger.error("Exception seeking to time in file " + path.toAbsolutePath().toString() + ". Defaulting to linear search; this will impact performance.", ex);
} catch (IOException ex) {
logger.error(
"Exception seeking to time in file "
+ path.toAbsolutePath().toString()
+ ". Defaulting to linear search; this will impact performance.",
ex);
this.positionBoundaries = false;
this.startTime = startTime;
this.endTime = endTime;
this.endTime = endTime;
}
}
}


/**
* Used for unlimited iteration.
* We specify a time to start the iteration at and a direction.
Expand All @@ -143,7 +146,11 @@ public FileBackedPBEventStream(
* @throws IOException  
*/
public FileBackedPBEventStream(
String pvname, Path path, ArchDBRTypes dbrtype, Instant startAtTime, BiDirectionalIterable.IterationDirection direction)
String pvname,
Path path,
ArchDBRTypes dbrtype,
Instant startAtTime,
BiDirectionalIterable.IterationDirection direction)
throws IOException {
this.pvName = pvname;
this.path = path;
Expand All @@ -152,19 +159,18 @@ public FileBackedPBEventStream(
this.startTime = startAtTime;
this.endTime = startAtTime;
this.readPayLoadInfo();
if(direction == IterationDirection.FORWARDS) {
if (direction == IterationDirection.FORWARDS) {
this.startFilePos = this.seekToStartTime(path, dbrtype, startAtTime);
this.endFilePos = Files.size(path);
} else {
this.startFilePos = this.fileInfo.positionOfFirstSample;
this.endFilePos = this.seekToEndTime(path, dbrtype, startAtTime);
if(this.endFilePos <=0) {
if (this.endFilePos <= 0) {
this.endFilePos = Files.size(path);
}
}
}


@Override
public Iterator<Event> iterator() {
try {
Expand All @@ -181,21 +187,25 @@ public Iterator<Event> iterator() {
readPayLoadInfo();
}

if(this.direction != null) {
if(this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) {
// If I am going backwards and the first event in this file is after the startAtTime, we don't have any data in this file for the iteration
if(fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) {
if (this.direction != null) {
if (this.direction == BiDirectionalIterable.IterationDirection.BACKWARDS) {
// If I am going backwards and the first event in this file is after the startAtTime, we don't have
// any data in this file for the iteration
if (fileInfo.firstEvent.getEventTimeStamp().isAfter(this.endTime)) {
logger.info("Returning an empty iterator as the time in file is after endtime");
return new EmptyEventIterator();
}
theIterator = new PBEventStreamPositionBasedReverseIterator(path, startFilePos, endFilePos, desc.getYear(), type);
theIterator = new PBEventStreamPositionBasedReverseIterator(
path, startFilePos, endFilePos, desc.getYear(), type);
} else {
// If I am going forwards and the last event in the file is before the startAtTime, we don't have any data in this file for the iteration
if(fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) {
// If I am going forwards and the last event in the file is before the startAtTime, we don't have
// any data in this file for the iteration
if (fileInfo.lastEvent.getEventTimeStamp().isBefore(this.startTime)) {
logger.info("Returning an empty iterator as the time in file is before starttime");
return new EmptyEventIterator();
}
theIterator = new FileBackedPBEventStreamPositionBasedIterator(path, startFilePos, endFilePos, desc.getYear(), type);
theIterator = new FileBackedPBEventStreamPositionBasedIterator(
path, startFilePos, endFilePos, desc.getYear(), type);
}
return theIterator;
}
Expand Down Expand Up @@ -293,8 +303,11 @@ private void seekToTimes(Path path, ArchDBRTypes dbrtype, Instant queryStartTime
YearSecondTimestamp firstSampleEpoch = (fileInfo.getFirstEvent()).getYearSecondTimestamp();
YearSecondTimestamp lastSampleEpoch = (fileInfo.getLastEvent()).getYearSecondTimestamp();

if (queryEndEpoch.compareTo(firstSampleEpoch) < 0 || queryStartEpoch.compareTo(lastSampleEpoch) > 0) {
logger.debug("Case 1 - this file should not be included in request");
if (queryEndEpoch.compareTo(firstSampleEpoch) < 0) {
logger.debug(
"Case 1 - this file should not be included in request {} {}",
(queryEndEpoch.compareTo(firstSampleEpoch) < 0),
(queryStartEpoch.compareTo(lastSampleEpoch) > 0));
this.positionBoundaries = false;
this.startTime = queryStartTime;
this.endTime = queryEndTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package edu.stanford.slac.archiverappliance.PlainPB;

import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.epics.archiverappliance.Event;
import org.epics.archiverappliance.EventStream;
import org.epics.archiverappliance.common.BasicContext;
import org.epics.archiverappliance.common.POJOEvent;
import org.epics.archiverappliance.common.PartitionGranularity;
import org.epics.archiverappliance.common.TimeUtils;
import org.epics.archiverappliance.config.ArchDBRTypes;
import org.epics.archiverappliance.config.ConfigService;
import org.epics.archiverappliance.config.ConfigServiceForTests;
import org.epics.archiverappliance.config.StoragePluginURLParser;
import org.epics.archiverappliance.config.exception.ConfigException;
import org.epics.archiverappliance.data.ScalarValue;
import org.epics.archiverappliance.engine.membuf.ArrayListEventStream;
import org.epics.archiverappliance.retrieval.RemotableEventStreamDesc;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.Callable;

import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileExtension;
import static edu.stanford.slac.archiverappliance.PlainPB.PlainPBStoragePlugin.pbFileSuffix;

/**
* Test data retrieval around the partition boundaries.
* We use daily partitions; create data for two days ( 1Hz ) but leave large gaps in between.
* We then ask for minutely data and make sure we always get data when the start time is >= the timestamp of the very first sample.
* @author mshankar
*
*/
public class DataAroundPartitionEndTest {
private static final Logger logger = LogManager.getLogger(DataAroundPartitionEndTest.class.getName());
private static final File testFolder =
new File(ConfigServiceForTests.getDefaultPBTestFolder() + File.separator + "DataAroundPartitionEndTest");
private static final String pvName = "DataAroundPartitionEndTest";
private static final ConfigService configService;

static {
try {
configService = new ConfigServiceForTests(-1);
} catch (ConfigException e) {
throw new RuntimeException(e);
}
}

ArchDBRTypes dbrType = ArchDBRTypes.DBR_SCALAR_DOUBLE;

private static final short dataYear = (short) (TimeUtils.getCurrentYear() - 1);
private static Instant generatedEndDate = Instant.parse(dataYear + "-06-01T00:00:00.00Z");
private static Instant generatedStartDate = generatedEndDate.minus(2, ChronoUnit.DAYS);

private static final Path pbFilePath = Paths.get(
testFolder.getAbsolutePath(),
pvName.replace(":", "/").replace("--", "") + ":" + dataYear + pbFileExtension);

private static void generateData() throws Exception {
logger.info("Generating data info to " + pbFilePath);

PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin(
pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath()
+ "&partitionGranularity=PARTITION_DAY",
DataAroundPartitionEndTest.configService);
ArrayListEventStream strm = new ArrayListEventStream(
PartitionGranularity.PARTITION_DAY.getApproxSecondsPerChunk() * 3,
new RemotableEventStreamDesc(ArchDBRTypes.DBR_SCALAR_DOUBLE, pvName, dataYear));

Instant ts = generatedStartDate;
int sampleCount = 1;
while (ts.isBefore(generatedEndDate)) {
if (sampleCount % 3600 == 0) {
sampleCount = 1;
ts = ts.plus(1, ChronoUnit.DAYS);
} else {
ts = ts.plus(1, ChronoUnit.SECONDS);
}

strm.add(new POJOEvent(
ArchDBRTypes.DBR_SCALAR_DOUBLE, ts, new ScalarValue<>((double) ts.getEpochSecond()), 0, 0));
sampleCount++;
logger.debug("Inserting data at {}", TimeUtils.convertToHumanReadableString(ts));
}

try (BasicContext context = new BasicContext()) {
assert storagePlugin != null;
storagePlugin.appendData(context, pvName, strm);
}
}

@Test
public void checkLowerLevelRetrieval() throws Exception {
Path pbFilePath = Paths.get(
testFolder.getAbsolutePath(),
pvName.replace(":", "/").replace("--", "") + ":" + dataYear + "_05_30.pb");

try (BasicContext context = new BasicContext()) {
EventStream strm = FileStreamCreator.getTimeStream(
pvName,
pbFilePath,
ArchDBRTypes.DBR_SCALAR_DOUBLE,
Instant.parse(dataYear + "-05-30T23:59:00.00Z"),
Instant.parse(dataYear + "-06-01T00:00:00.00Z"),
false);
int totalEvents = 0;
for (Event ev : strm) {
logger.debug(
"Got lower level data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp()));
totalEvents++;
}
Assertions.assertTrue(totalEvents > 0, "Expected at least one event, got 0 events");
}
}

@Test
public void checkRetrieval() throws Exception {
PlainPBStoragePlugin storagePlugin = (PlainPBStoragePlugin) StoragePluginURLParser.parseStoragePlugin(
pbFileSuffix + "://localhost?name=DataAroundPartitionEndTest&rootFolder=" + testFolder.getAbsolutePath()
+ "&partitionGranularity=PARTITION_DAY",
DataAroundPartitionEndTest.configService);
try (BasicContext context = new BasicContext()) {
Instant rstart = generatedStartDate;
Instant rend = rstart.plus(1, ChronoUnit.MINUTES);
while (rend.isBefore(generatedEndDate.plus(14, ChronoUnit.DAYS))) {
int totalEvents = 0;
List<Callable<EventStream>> cstrms = storagePlugin.getDataForPV(context, pvName, rstart, rend);
for (Callable<EventStream> cstrm : cstrms) {
EventStream st = cstrm.call();
for (Event ev : st) {
logger.debug("Data at {}", TimeUtils.convertToHumanReadableString(ev.getEventTimeStamp()));
totalEvents++;
}
}
if (rstart.isAfter(generatedStartDate)) {
Assertions.assertTrue(
totalEvents > 0,
"Did not receive any events for start "
+ TimeUtils.convertToHumanReadableString(rstart)
+ " and "
+ TimeUtils.convertToHumanReadableString(rend));
logger.info("Got " + totalEvents + " total events for "
+ TimeUtils.convertToHumanReadableString(rstart)
+ " and "
+ TimeUtils.convertToHumanReadableString(rend));
}

rstart = rend;
rend = rstart.plus(1, ChronoUnit.MINUTES);
}
}
}

@BeforeAll
public static void setUp() throws Exception {
try {
FileUtils.deleteDirectory(testFolder);
generateData();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterAll
public static void tearDown() throws Exception {
FileUtils.deleteDirectory(testFolder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public static Stream<Arguments> provideCorrectIterator() {
TimeUtils.plusDays(LKTS, 10),
TimeUtils.plusDays(LKTS, 1),
TimeUtils.plusDays(LKTS, 10),
EmptyEventIterator.class,
mainIteratorClass));
}

Expand Down
Loading