Skip to content

Commit

Permalink
Merge pull request #1837 from YasasRangika/yasas-master
Browse files Browse the repository at this point in the history
Improve aggregation initialisation
  • Loading branch information
AnuGayan authored Mar 12, 2024
2 parents 52b82ac + 0cdf25a commit 3ebda2c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.siddhi.query.api.execution.query.selection.OrderByAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -118,7 +119,8 @@ public synchronized void initialiseExecutors() {

// Get max(AGG_TIMESTAMP) from table corresponding to max duration
Table tableForMaxDuration = aggregationTables.get(incrementalDurations.get(incrementalDurations.size() - 1));
OnDemandQuery onDemandQuery = getOnDemandQuery(tableForMaxDuration, true, endOFLatestEventTimestamp);
OnDemandQuery onDemandQuery = getOnDemandQuery(tableForMaxDuration, true, endOFLatestEventTimestamp, false,
OrderByAttribute.Order.DESC);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiAppContext, tableMap, windowMap, aggregationMap);
Expand All @@ -133,17 +135,19 @@ public synchronized void initialiseExecutors() {

if (isPersistedAggregation) {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
if (lastData != null && !IncrementalTimeConverterUtil.
isAggregationDataComplete(lastData, incrementalDurations.get(i), timeZone)) {
if (lastData != null && !(IncrementalTimeConverterUtil.isAggregationDataCompleteAgainstTime(lastData,
incrementalDurations.get(i), timeZone) && isAggregationDataComplete(incrementalDurations.get(i),
incrementalDurations.get(i - 1), endOFLatestEventTimestamp, lastData))) {
recreateState(lastData, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
} else if (lastData == null) {
} else if (lastData == null && !isAggregationDataComplete(incrementalDurations.get(i),
incrementalDurations.get(i - 1), endOFLatestEventTimestamp, null)) {
recreateState(null, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
}
if (i > 1) {
onDemandQuery = getOnDemandQuery(aggregationTables.get(incrementalDurations.get(i - 1)), true,
endOFLatestEventTimestamp);
endOFLatestEventTimestamp, true, OrderByAttribute.Order.DESC);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null,
siddhiAppContext, tableMap, windowMap, aggregationMap);
Expand All @@ -163,7 +167,8 @@ public synchronized void initialiseExecutors() {
// This lookup is filtered by endOFLatestEventTimestamp
Table recreateFromTable = aggregationTables.get(incrementalDurations.get(i - 1));

onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp, false,
OrderByAttribute.Order.DESC);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
Expand Down Expand Up @@ -223,15 +228,23 @@ private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreat

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
OnDemandQuery onDemandQuery;
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, recreateForDuration, timeZone);
}
OnDemandQuery onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp);
if (recreateFromTable.getTableDefinition().getId().contains(TimePeriod.Duration.MONTHS.name())
|| recreateFromTable.getTableDefinition().getId().contains(TimePeriod.Duration.YEARS.name())
|| recreateFromTable.getTableDefinition().getId().contains(TimePeriod.Duration.DAYS.name())) {
onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp, true,
OrderByAttribute.Order.DESC);
} else {
onDemandQuery = getOnDemandQuery(recreateFromTable, false, endOFLatestEventTimestamp, false,
OrderByAttribute.Order.DESC);
}
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap,
aggregationMap);
tableMap, windowMap, aggregationMap);
Event[] events = onDemandQueryRuntime.execute();
if (events != null) {
long referenceToNextLatestEvent = (Long) events[events.length - 1].getData(0);
Expand All @@ -257,12 +270,16 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio
}
}

private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp) {
Selector selector = Selector.selector();
private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp,
boolean isPersistedAggregation, OrderByAttribute.Order sortOrder) {
Selector selector;
if (isPersistedAggregation) {
selector = Selector.selector().select(new Variable(AGG_START_TIMESTAMP_COL));
} else {
selector = Selector.selector();
}
if (isLargestGranularity) {
selector = selector
.orderBy(
Expression.variable(AGG_START_TIMESTAMP_COL), OrderByAttribute.Order.DESC)
selector = selector.orderBy(Expression.variable(AGG_START_TIMESTAMP_COL), sortOrder)
.limit(Expression.value(1));
} else {
selector = selector.orderBy(Expression.variable(AGG_START_TIMESTAMP_COL));
Expand Down Expand Up @@ -298,7 +315,26 @@ private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity
Expression.value(endOFLatestEventTimestamp))));
}
}

return OnDemandQuery.query().from(inputStore).select(selector);
}

private boolean isAggregationDataComplete(TimePeriod.Duration parentDuration, TimePeriod.Duration childDuration,
Long endOFLatestEventTimestamp, Long lastData) {
OnDemandQuery onDemandQuery = getOnDemandQuery(aggregationTables.get(childDuration), true,
endOFLatestEventTimestamp, true, OrderByAttribute.Order.ASC);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap, aggregationMap);
Event[] events = onDemandQueryRuntime.execute();
if (lastData == null && events == null) {
return true;
} else if (lastData == null
&& (Long) events[events.length - 1].getData(0) >= IncrementalTimeConverterUtil.getStartTimeOfAggregates(
System.currentTimeMillis(), parentDuration, timeZone)) {
return true;

}

return events == null || events.length == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public static int getMillisecondsPerDuration(TimePeriod.Duration duration) {
}
}

public static boolean isAggregationDataComplete(long timestamp, TimePeriod.Duration duration, String timeZone) {
public static boolean isAggregationDataCompleteAgainstTime(long timestamp, TimePeriod.Duration duration,
String timeZone) {
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneId.of(timeZone));
ZonedDateTime zonedCurrentDateTime = ZonedDateTime.ofInstant(Instant.now(), ZoneId.of(timeZone));
Expand Down

0 comments on commit 3ebda2c

Please sign in to comment.