diff --git a/accio-base/src/main/java/io/accio/base/AccioMDL.java b/accio-base/src/main/java/io/accio/base/AccioMDL.java index 12ce822bb..e1fb345a5 100644 --- a/accio-base/src/main/java/io/accio/base/AccioMDL.java +++ b/accio-base/src/main/java/io/accio/base/AccioMDL.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.accio.base.dto.Column; +import io.accio.base.dto.CumulativeMetric; +import io.accio.base.dto.DateSpine; import io.accio.base.dto.EnumDefinition; import io.accio.base.dto.Manifest; import io.accio.base.dto.Metric; @@ -143,6 +145,21 @@ public Optional getMetric(CatalogSchemaTableName name) return Optional.empty(); } + public Optional getCumulativeMetric(String name) + { + return manifest.getCumulativeMetrics().stream() + .filter(metric -> metric.getName().equals(name)) + .findAny(); + } + + public Optional getCumulativeMetric(CatalogSchemaTableName name) + { + if (catalog.equals(name.getCatalogName()) && schema.equals(name.getSchemaTableName().getSchemaName())) { + return getCumulativeMetric(name.getSchemaTableName().getTableName()); + } + return Optional.empty(); + } + public Optional getView(String name) { return manifest.getViews().stream() @@ -172,4 +189,9 @@ private static Optional getColumn(Model model, String name) .filter(column -> column.getName().equals(name)) .findAny(); } + + public DateSpine getDateSpine() + { + return manifest.getDateSpine(); + } } diff --git a/accio-base/src/main/java/io/accio/base/dto/CumulativeMetric.java b/accio-base/src/main/java/io/accio/base/dto/CumulativeMetric.java new file mode 100644 index 000000000..675b4c7de --- /dev/null +++ b/accio-base/src/main/java/io/accio/base/dto/CumulativeMetric.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.base.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.units.Duration; + +import java.util.Objects; + +public class CumulativeMetric + implements PreAggregationInfo +{ + public static CumulativeMetric cumulativeMetric( + String name, + String baseModel, + Measure measure, + Window window) + { + return new CumulativeMetric(name, baseModel, measure, window, false, null, null); + } + + private final String name; + private final String baseModel; + private final Measure measure; + private final Window window; + private final boolean preAggregated; + private final Duration refreshTime; + private final String description; + + @JsonCreator + public CumulativeMetric( + @JsonProperty("name") String name, + @JsonProperty("baseModel") String baseModel, + @JsonProperty("measure") Measure measure, + @JsonProperty("window") Window window, + @JsonProperty("preAggregated") boolean preAggregated, + @JsonProperty("refreshTime") Duration refreshTime, + @JsonProperty("description") String description) + { + this.name = name; + this.baseModel = baseModel; + this.measure = measure; + this.window = window; + this.preAggregated = preAggregated; + this.refreshTime = refreshTime; + this.description = description; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getBaseModel() + { + return baseModel; + } + + @JsonProperty + public Measure getMeasure() + { + return measure; + } + + @JsonProperty + public Window getWindow() + { + return window; + } + + @JsonProperty + public boolean isPreAggregated() + { + return preAggregated; + } + + @JsonProperty + public Duration getRefreshTime() + { + return refreshTime; + } + + @JsonProperty + public String getDescription() + { + return description; + } + + @Override + public int hashCode() + { + return Objects.hash(name, baseModel, measure, window, preAggregated, refreshTime, description); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CumulativeMetric that = (CumulativeMetric) o; + return preAggregated == that.preAggregated && + Objects.equals(name, that.name) && + Objects.equals(baseModel, that.baseModel) && + Objects.equals(measure, that.measure) && + Objects.equals(window, that.window) && + Objects.equals(refreshTime, that.refreshTime) && + Objects.equals(description, that.description); + } + + @Override + public String toString() + { + return "CumulativeMetric{" + + "name='" + name + '\'' + + ", baseModel='" + baseModel + '\'' + + ", measure=" + measure + + ", window=" + window + + ", preAggregated=" + preAggregated + + ", refreshTime=" + refreshTime + + ", description='" + description + '\'' + + '}'; + } +} diff --git a/accio-base/src/main/java/io/accio/base/dto/DateSpine.java b/accio-base/src/main/java/io/accio/base/dto/DateSpine.java new file mode 100644 index 000000000..b00796b75 --- /dev/null +++ b/accio-base/src/main/java/io/accio/base/dto/DateSpine.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.base.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class DateSpine +{ + public static final DateSpine DEFAULT = new DateSpine(TimeUnit.DAY, "1970-01-01", "2077-12-31"); + + private final TimeUnit unit; + private final String start; + private final String end; + + @JsonCreator + public DateSpine( + @JsonProperty("unit") TimeUnit unit, + @JsonProperty("start") String start, + @JsonProperty("end") String end) + { + this.unit = unit; + this.start = start; + this.end = end; + } + + @JsonProperty + public TimeUnit getUnit() + { + return unit; + } + + @JsonProperty + public String getStart() + { + return start; + } + + @JsonProperty + public String getEnd() + { + return end; + } + + @Override + public String toString() + { + return "DateSpine{" + + "unit=" + unit + + ", start='" + start + '\'' + + ", end='" + end + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DateSpine dateSpine = (DateSpine) o; + return unit == dateSpine.unit && + Objects.equals(start, dateSpine.start) && + Objects.equals(end, dateSpine.end); + } + + @Override + public int hashCode() + { + return Objects.hash(unit, start, end); + } +} diff --git a/accio-base/src/main/java/io/accio/base/dto/Manifest.java b/accio-base/src/main/java/io/accio/base/dto/Manifest.java index ca99cf8f5..ab40a95dd 100644 --- a/accio-base/src/main/java/io/accio/base/dto/Manifest.java +++ b/accio-base/src/main/java/io/accio/base/dto/Manifest.java @@ -20,16 +20,19 @@ import java.util.List; import java.util.Objects; +import static io.accio.base.dto.DateSpine.DEFAULT; import static java.util.Objects.requireNonNull; public class Manifest { private final String catalog; private final String schema; + private final DateSpine dateSpine; private final List models; private final List relationships; private final List enumDefinitions; private final List metrics; + private final List cumulativeMetrics; private final List views; @@ -46,7 +49,9 @@ public Manifest( @JsonProperty("relationships") List relationships, @JsonProperty("enumDefinitions") List enumDefinitions, @JsonProperty("metrics") List metrics, - @JsonProperty("views") List views) + @JsonProperty("cumulativeMetrics") List cumulativeMetrics, + @JsonProperty("views") List views, + @JsonProperty("dateSpine") DateSpine dateSpine) { this.catalog = requireNonNull(catalog, "catalog is null"); this.schema = requireNonNull(schema, "schema is null"); @@ -54,7 +59,9 @@ public Manifest( this.relationships = relationships == null ? List.of() : relationships; this.enumDefinitions = enumDefinitions == null ? List.of() : enumDefinitions; this.metrics = metrics == null ? List.of() : metrics; + this.cumulativeMetrics = cumulativeMetrics == null ? List.of() : cumulativeMetrics; this.views = views == null ? List.of() : views; + this.dateSpine = dateSpine == null ? DEFAULT : dateSpine; } @JsonProperty @@ -69,6 +76,12 @@ public String getSchema() return schema; } + @JsonProperty + public DateSpine getDateSpine() + { + return dateSpine; + } + @JsonProperty public List getModels() { @@ -93,6 +106,12 @@ public List getMetrics() return metrics; } + @JsonProperty + public List getCumulativeMetrics() + { + return cumulativeMetrics; + } + @JsonProperty public List getViews() { @@ -117,7 +136,9 @@ public boolean equals(Object o) Objects.equals(relationships, manifest.relationships) && Objects.equals(enumDefinitions, manifest.enumDefinitions) && Objects.equals(metrics, manifest.metrics) && - Objects.equals(views, manifest.views); + Objects.equals(cumulativeMetrics, manifest.cumulativeMetrics) && + Objects.equals(views, manifest.views) && + Objects.equals(dateSpine, manifest.dateSpine); } @Override @@ -130,7 +151,9 @@ public int hashCode() relationships, enumDefinitions, metrics, - views); + cumulativeMetrics, + views, + dateSpine); } @Override @@ -143,7 +166,9 @@ public String toString() ", relationships=" + relationships + ", enumDefinitions=" + enumDefinitions + ", metrics=" + metrics + + ", cumulativeMetrics=" + cumulativeMetrics + ", views=" + views + + ", dateSpine=" + dateSpine + '}'; } @@ -155,9 +180,12 @@ public static class Builder private List relationships; private List enumDefinitions; private List metrics; + private List cumulativeMetrics; private List views; + private DateSpine dateSpine; + private Builder() {} public Builder setCatalog(String catalog) @@ -196,15 +224,27 @@ public Builder setMetrics(List metrics) return this; } + public Builder setCumulativeMetrics(List cumulativeMetrics) + { + this.cumulativeMetrics = cumulativeMetrics; + return this; + } + public Builder setViews(List views) { this.views = views; return this; } + public Builder setDateSpine(DateSpine dateSpine) + { + this.dateSpine = dateSpine; + return this; + } + public Manifest build() { - return new Manifest(catalog, schema, models, relationships, enumDefinitions, metrics, views); + return new Manifest(catalog, schema, models, relationships, enumDefinitions, metrics, cumulativeMetrics, views, dateSpine); } } } diff --git a/accio-base/src/main/java/io/accio/base/dto/Measure.java b/accio-base/src/main/java/io/accio/base/dto/Measure.java new file mode 100644 index 000000000..85dd494bb --- /dev/null +++ b/accio-base/src/main/java/io/accio/base/dto/Measure.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.base.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class Measure +{ + public static Measure measure(String name, String type, String operator, String refColumn) + { + return new Measure(name, type, operator, refColumn); + } + + private final String name; + private final String type; + private final String operator; + private final String refColumn; + + @JsonCreator + public Measure( + @JsonProperty("name") String name, + @JsonProperty("type") String type, + @JsonProperty("operator") String operator, + @JsonProperty("refColumn") String refColumn) + { + this.name = name; + this.type = type; + this.operator = operator; + this.refColumn = refColumn; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getOperator() + { + return operator; + } + + @JsonProperty + public String getRefColumn() + { + return refColumn; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Measure measure = (Measure) o; + return Objects.equals(name, measure.name) && + Objects.equals(type, measure.type) && + Objects.equals(operator, measure.operator) && + Objects.equals(refColumn, measure.refColumn); + } + + @Override + public int hashCode() + { + return Objects.hash(name, type, operator, refColumn); + } +} diff --git a/accio-base/src/main/java/io/accio/base/dto/TimeGrain.java b/accio-base/src/main/java/io/accio/base/dto/TimeGrain.java index 525176772..9bf7e34b1 100644 --- a/accio-base/src/main/java/io/accio/base/dto/TimeGrain.java +++ b/accio-base/src/main/java/io/accio/base/dto/TimeGrain.java @@ -20,25 +20,10 @@ import java.util.List; import java.util.Objects; -import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; public class TimeGrain { - public enum TimeUnit - { - YEAR, - QUARTER, - MONTH, - WEEK, - DAY; - - public static TimeUnit timeUnit(String name) - { - return valueOf(name.toUpperCase(ENGLISH)); - } - } - private final String name; private final String refColumn; diff --git a/accio-base/src/main/java/io/accio/base/dto/TimeUnit.java b/accio-base/src/main/java/io/accio/base/dto/TimeUnit.java new file mode 100644 index 000000000..221f8c2d0 --- /dev/null +++ b/accio-base/src/main/java/io/accio/base/dto/TimeUnit.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.base.dto; + +import static java.util.Locale.ENGLISH; + +public enum TimeUnit +{ + YEAR("INTERVAL '1 YEAR'"), + QUARTER("INTERVAL '3 MONTH'"), + MONTH("INTERVAL '1 MONTH'"), + WEEK("INTERVAL '7 DAY'"), + DAY("INTERVAL '1 DAY'"), + HOUR("INTERVAL '1 HOUR'"), + MINUTE("INTERVAL '1 MINUTE'"), + SECOND("INTERVAL '1 SECOND'"); + + private final String intervalExpression; + + TimeUnit(String intervalExpression) + { + this.intervalExpression = intervalExpression; + } + + public String getIntervalExpression() + { + return intervalExpression; + } + + public static TimeUnit timeUnit(String name) + { + return valueOf(name.toUpperCase(ENGLISH)); + } +} diff --git a/accio-base/src/main/java/io/accio/base/dto/Window.java b/accio-base/src/main/java/io/accio/base/dto/Window.java new file mode 100644 index 000000000..81fdfd734 --- /dev/null +++ b/accio-base/src/main/java/io/accio/base/dto/Window.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.base.dto; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class Window +{ + private final String name; + + private final String refColumn; + + private final TimeUnit timeUnit; + + private final String start; + private final String end; + + public static Window window(String name, String refColumn, TimeUnit timeUnit, String start, String end) + { + return new Window(name, refColumn, timeUnit, start, end); + } + + @JsonCreator + public Window( + @JsonProperty("name") String name, + @JsonProperty("refColumn") String refColumn, + @JsonProperty("timeUnit") TimeUnit timeUnit, + @JsonProperty("start") String start, + @JsonProperty("end") String end) + { + this.name = name; + this.refColumn = refColumn; + this.timeUnit = timeUnit; + this.start = start; + this.end = end; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getRefColumn() + { + return refColumn; + } + + @JsonProperty + public TimeUnit getTimeUnit() + { + return timeUnit; + } + + @JsonProperty + public String getStart() + { + return start; + } + + @JsonProperty + public String getEnd() + { + return end; + } + + @Override + public String toString() + { + return "Window{" + + "name='" + name + '\'' + + ", refColumn='" + refColumn + '\'' + + ", timeUnit=" + timeUnit + + ", start='" + start + '\'' + + ", end='" + end + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Window window = (Window) o; + return Objects.equals(name, window.name) && + Objects.equals(refColumn, window.refColumn) && + timeUnit == window.timeUnit && + Objects.equals(start, window.start) && + Objects.equals(end, window.end); + } + + @Override + public int hashCode() + { + return Objects.hash(name, refColumn, timeUnit, start, end); + } +} diff --git a/accio-base/src/test/java/io/accio/base/dto/TestManifestSerDe.java b/accio-base/src/test/java/io/accio/base/dto/TestManifestSerDe.java index 9f2d509a3..b0173e232 100644 --- a/accio-base/src/test/java/io/accio/base/dto/TestManifestSerDe.java +++ b/accio-base/src/test/java/io/accio/base/dto/TestManifestSerDe.java @@ -19,17 +19,21 @@ import java.util.List; +import static io.accio.base.AccioTypes.INTEGER; import static io.accio.base.dto.Column.column; +import static io.accio.base.dto.CumulativeMetric.cumulativeMetric; import static io.accio.base.dto.EnumDefinition.enumDefinition; import static io.accio.base.dto.EnumValue.enumValue; +import static io.accio.base.dto.Measure.measure; import static io.accio.base.dto.Metric.metric; import static io.accio.base.dto.Model.model; import static io.accio.base.dto.Relationship.SortKey.sortKey; import static io.accio.base.dto.Relationship.relationship; -import static io.accio.base.dto.TimeGrain.TimeUnit.DAY; -import static io.accio.base.dto.TimeGrain.TimeUnit.MONTH; import static io.accio.base.dto.TimeGrain.timeGrain; +import static io.accio.base.dto.TimeUnit.DAY; +import static io.accio.base.dto.TimeUnit.MONTH; import static io.accio.base.dto.View.view; +import static io.accio.base.dto.Window.window; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; public class TestManifestSerDe @@ -105,6 +109,14 @@ private static Manifest createManifest() List.of(timeGrain("orderdate", "orderdate", List.of(DAY, MONTH))), true, "the revenue of an order"))) .setViews(List.of(view("useMetric", "select * from Revenue", "the view for the revenue metric"))) + .setCumulativeMetrics(List.of( + cumulativeMetric("DailyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.DAY, "1994-01-01", "1994-12-31")), + cumulativeMetric("WeeklyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.WEEK, "1994-01-01", "1994-12-31")))) + .setDateSpine(new DateSpine(TimeUnit.DAY, "1970-01-01", "2077-12-31")) .build(); } } diff --git a/accio-main/src/test/java/io/accio/main/pgcatalog/builder/TestCreateBigQueryTempTable.java b/accio-main/src/test/java/io/accio/main/pgcatalog/builder/TestCreateBigQueryTempTable.java index 855833dc3..8979b43f0 100644 --- a/accio-main/src/test/java/io/accio/main/pgcatalog/builder/TestCreateBigQueryTempTable.java +++ b/accio-main/src/test/java/io/accio/main/pgcatalog/builder/TestCreateBigQueryTempTable.java @@ -30,9 +30,9 @@ import static io.accio.base.dto.Model.model; import static io.accio.base.dto.Relationship.SortKey.sortKey; import static io.accio.base.dto.Relationship.relationship; -import static io.accio.base.dto.TimeGrain.TimeUnit.DAY; -import static io.accio.base.dto.TimeGrain.TimeUnit.MONTH; import static io.accio.base.dto.TimeGrain.timeGrain; +import static io.accio.base.dto.TimeUnit.DAY; +import static io.accio.base.dto.TimeUnit.MONTH; import static io.accio.base.dto.View.view; import static io.accio.main.pgcatalog.PgCatalogUtils.ACCIO_TEMP_NAME; import static io.accio.main.pgcatalog.PgCatalogUtils.PG_CATALOG_NAME; diff --git a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/MetricViewSqlRewrite.java b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/MetricViewSqlRewrite.java index 8fc7845e1..c17c33b25 100644 --- a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/MetricViewSqlRewrite.java +++ b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/MetricViewSqlRewrite.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import io.accio.base.AccioMDL; import io.accio.base.SessionContext; +import io.accio.base.dto.CumulativeMetric; import io.accio.base.dto.Metric; import io.accio.base.dto.View; import io.accio.sqlrewrite.analyzer.Analysis; @@ -35,6 +36,8 @@ import java.util.Optional; import java.util.stream.Stream; +import static io.accio.sqlrewrite.Utils.createDateSpineQuery; +import static io.accio.sqlrewrite.Utils.parseCumulativeMetricSql; import static io.accio.sqlrewrite.Utils.parseView; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toUnmodifiableList; @@ -73,12 +76,15 @@ public Statement apply(Statement root, SessionContext sessionContext, Analysis a allAnalysis.stream().flatMap(a -> a.getMetricRollups().values().stream()) .collect(toUnmodifiableMap(rollup -> rollup.getMetric().getName(), Utils::parseMetricRollupSql)); + Map cumulativeMetrics = allAnalysis.stream().flatMap(a -> a.getCumulativeMetrics().stream()) + .collect(toUnmodifiableMap(CumulativeMetric::getName, query -> parseCumulativeMetricSql(query, accioMDL))); + // The generation of views has a sequential order, with later views being able to reference earlier views. Map viewQueries = new LinkedHashMap<>(); allAnalysis.stream().flatMap(a -> a.getViews().stream()) .forEach(view -> viewQueries.put(view.getName(), parseView(view.getStatement()))); - return (Statement) new WithRewriter(metricQueries, metricRollupQueries, ImmutableMap.copyOf(viewQueries)).process(root); + return (Statement) new WithRewriter(metricQueries, metricRollupQueries, cumulativeMetrics, ImmutableMap.copyOf(viewQueries), accioMDL).process(root); } private static class WithRewriter @@ -86,17 +92,23 @@ private static class WithRewriter { private final Map metricQueries; private final Map metricRollupQueries; + private final Map cumulativeMetricQueries; private final Map viewQueries; + private final AccioMDL accioMDL; public WithRewriter( Map metricQueries, Map metricRollupQueries, - Map viewQueries) + Map cumulativeMetricQueries, + Map viewQueries, + AccioMDL accioMDL) { this.metricQueries = requireNonNull(metricQueries, "metricQueries is null"); this.metricRollupQueries = requireNonNull(metricRollupQueries, "metricRollupQueries is null"); this.viewQueries = requireNonNull(viewQueries, "viewQueries is null"); + this.cumulativeMetricQueries = requireNonNull(cumulativeMetricQueries, "cumulativeMetricQueries is null"); + this.accioMDL = requireNonNull(accioMDL, "accioMDL is null"); } @Override @@ -116,9 +128,21 @@ protected Node visitQuery(Query node, Void context) .map(e -> new WithQuery(new Identifier(e.getKey()), e.getValue(), Optional.empty())) .collect(toUnmodifiableList()); + List cumulativeMetricWithQueries = cumulativeMetricQueries.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) // sort here to avoid test failed due to wrong with-query order + .map(e -> new WithQuery(new Identifier(e.getKey()), e.getValue(), Optional.empty())) + .collect(toUnmodifiableList()); + + WithQuery dateSpineWithQuery = new WithQuery(new Identifier("date_spine"), createDateSpineQuery(accioMDL.getDateSpine()), Optional.empty()); + + List addDateSpineIfNeed = cumulativeMetricWithQueries.isEmpty() ? + List.of() : + ImmutableList.builder().add(dateSpineWithQuery).addAll(cumulativeMetricWithQueries).build(); + List withQueries = ImmutableList.builder() .addAll(metricWithQueries) .addAll(metricRollupWithQueries) + .addAll(addDateSpineIfNeed) .addAll(viewWithQueries) .build(); diff --git a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/Utils.java b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/Utils.java index 221c8dc6a..e914ebbd4 100644 --- a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/Utils.java +++ b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/Utils.java @@ -20,6 +20,8 @@ import io.accio.base.CatalogSchemaTableName; import io.accio.base.SessionContext; import io.accio.base.dto.Column; +import io.accio.base.dto.CumulativeMetric; +import io.accio.base.dto.DateSpine; import io.accio.base.dto.Metric; import io.accio.sqlrewrite.analyzer.Field; import io.accio.sqlrewrite.analyzer.MetricRollupInfo; @@ -40,6 +42,7 @@ import java.security.SecureRandom; import java.util.List; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -124,6 +127,81 @@ private static String getMetricSql(Metric metric) return format("SELECT %s FROM %s GROUP BY %s", selectItems, metric.getBaseModel(), groupByItems); } + public static Query parseCumulativeMetricSql(CumulativeMetric cumulativeMetric, AccioMDL accioMDL) + { + String sql = getCumulativeMetricSql(cumulativeMetric, accioMDL); + Statement statement = SQL_PARSER.createStatement(sql, new ParsingOptions(AS_DECIMAL)); + if (statement instanceof Query) { + return (Query) statement; + } + throw new IllegalArgumentException(format("metric %s is not a query, sql %s", cumulativeMetric.getName(), sql)); + } + + public static String getCumulativeMetricSql(CumulativeMetric cumulativeMetric, AccioMDL accioMDL) + { + requireNonNull(cumulativeMetric, "cumulativeMetric is null"); + + String windowType = accioMDL.getModel(cumulativeMetric.getBaseModel()) + .map(model -> model.getColumns().stream() + .filter(column -> column.getName().equals(cumulativeMetric.getWindow().getRefColumn())) + .map(Column::getType).findAny().orElseThrow(() -> new NoSuchElementException(format("Column %s not found in model %s", cumulativeMetric.getWindow().getRefColumn(), cumulativeMetric.getBaseModel())))) + .orElseThrow(() -> new NoSuchElementException(format("Model %s not found", cumulativeMetric.getBaseModel()))); + + String pattern = + "select \n" + + " metric_time as %s,\n" + + " %s(distinct measure_field) as %s\n" + + "from \n" + + " (\n" + + " select \n" + + " date_trunc('%s', d.metric_time) as metric_time,\n" + + " measure_field\n" + + " from \n" + + " (%s) d \n" + + " left join (\n" + + " select \n" + + " measure_field,\n" + + " metric_time\n" + + " from (%s) sub1\n" + + " where \n" + + " metric_time >= cast('%s' as %s) \n" + + " and metric_time <= cast('%s' as %s)\n" + + " ) sub2 on (\n" + + " sub2.metric_time <= d.metric_time \n" + + " and sub2.metric_time > %s\n" + + " )\n" + + " where \n" + + " d.metric_time >= cast('%s' as %s) \n" + + " and d.metric_time <= cast('%s' as %s) \n" + + " ) sub3 \n" + + "group by 1\n" + + "order by 1\n"; + + String castingDateSpine = format("select cast(metric_time as %s) as metric_time from date_spine", windowType); + String windowRange = format("d.metric_time - %s", cumulativeMetric.getWindow().getTimeUnit().getIntervalExpression()); + String selectFromModel = format("select %s as measure_field, %s as metric_time from %s", + cumulativeMetric.getMeasure().getRefColumn(), + cumulativeMetric.getWindow().getRefColumn(), + cumulativeMetric.getBaseModel()); + + return format(pattern, + cumulativeMetric.getWindow().getName(), + cumulativeMetric.getMeasure().getOperator(), + cumulativeMetric.getMeasure().getName(), + cumulativeMetric.getWindow().getTimeUnit().name(), + castingDateSpine, + selectFromModel, + cumulativeMetric.getWindow().getStart(), + windowType, + cumulativeMetric.getWindow().getEnd(), + windowType, + windowRange, + cumulativeMetric.getWindow().getStart(), + windowType, + cumulativeMetric.getWindow().getEnd(), + windowType); + } + private static String getMetricRollupSql(MetricRollupInfo metricRollupInfo) { requireNonNull(metricRollupInfo, "metricRollupInfo is null"); @@ -234,4 +312,15 @@ private static Field toField(AccioMDL accioMDL, String modelName, Column column, .type(column.getType()) .build(); } + + public static Query createDateSpineQuery(DateSpine dateSpine) + { + // TODO: `GENERATE_TIMESTAMP_ARRAY` is a bigquery function. We may need to consider the SQL dialect when Accio planning. + String sql = format("SELECT * FROM UNNEST(GENERATE_TIMESTAMP_ARRAY(TIMESTAMP '%s', TIMESTAMP '%s', %s)) t(metric_time)", dateSpine.getStart(), dateSpine.getEnd(), dateSpine.getUnit().getIntervalExpression()); + Statement statement = SQL_PARSER.createStatement(sql, new ParsingOptions(AS_DECIMAL)); + if (statement instanceof Query) { + return (Query) statement; + } + throw new IllegalArgumentException(format("Failed to parse date spine query: %s", sql)); + } } diff --git a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/Analysis.java b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/Analysis.java index 5fca867cd..0e614c4a4 100644 --- a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/Analysis.java +++ b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/Analysis.java @@ -15,6 +15,7 @@ package io.accio.sqlrewrite.analyzer; import io.accio.base.CatalogSchemaTableName; +import io.accio.base.dto.CumulativeMetric; import io.accio.base.dto.Metric; import io.accio.base.dto.Model; import io.accio.base.dto.Relationship; @@ -40,6 +41,8 @@ public class Analysis private final Set models = new HashSet<>(); private final Set metrics = new HashSet<>(); private final Map, MetricRollupInfo> metricRollups = new HashMap<>(); + + private final Set cumulativeMetrics = new HashSet<>(); private final Set views = new HashSet<>(); Analysis(Statement statement) @@ -107,6 +110,16 @@ public Map, MetricRollupInfo> getMetricRollups() return metricRollups; } + void addCumulativeMetrics(Set cumulativeMetrics) + { + this.cumulativeMetrics.addAll(cumulativeMetrics); + } + + public Set getCumulativeMetrics() + { + return cumulativeMetrics; + } + public Set getViews() { return views; diff --git a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/MetricRollupInfo.java b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/MetricRollupInfo.java index 0d2d893dd..f4b61d164 100644 --- a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/MetricRollupInfo.java +++ b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/MetricRollupInfo.java @@ -16,7 +16,7 @@ import io.accio.base.dto.Metric; import io.accio.base.dto.TimeGrain; -import io.accio.base.dto.TimeGrain.TimeUnit; +import io.accio.base.dto.TimeUnit; import static java.util.Objects.requireNonNull; diff --git a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/StatementAnalyzer.java b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/StatementAnalyzer.java index 64d6dd5f3..565e007ed 100644 --- a/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/StatementAnalyzer.java +++ b/accio-sqlrewrite/src/main/java/io/accio/sqlrewrite/analyzer/StatementAnalyzer.java @@ -17,6 +17,7 @@ import io.accio.base.AccioMDL; import io.accio.base.CatalogSchemaTableName; import io.accio.base.SessionContext; +import io.accio.base.dto.CumulativeMetric; import io.accio.base.dto.Metric; import io.accio.base.dto.Model; import io.accio.base.dto.Relationship; @@ -49,7 +50,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.accio.base.Utils.checkArgument; -import static io.accio.base.dto.TimeGrain.TimeUnit.timeUnit; +import static io.accio.base.dto.TimeUnit.timeUnit; import static io.accio.sqlrewrite.Utils.toCatalogSchemaTableName; import static io.trino.sql.QueryUtil.getQualifiedName; import static java.lang.String.format; @@ -112,6 +113,20 @@ public static Analysis analyze(Statement statement, SessionContext sessionContex analysis.addMetrics(metrics); + Set cumulativeMetrics = analysis.getTables().stream() + .map(accioMDL::getCumulativeMetric) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(toUnmodifiableSet()); + + analysis.addModels( + cumulativeMetrics.stream() + .map(CumulativeMetric::getBaseModel) + .distinct() + .map(model -> accioMDL.getModel(model).orElseThrow(() -> new IllegalArgumentException(format("cumulative metric model %s not exists", model)))) + .collect(toUnmodifiableSet())); + analysis.addCumulativeMetrics(cumulativeMetrics); + Set views = analysis.getTables().stream() .map(accioMDL::getView) .filter(Optional::isPresent) diff --git a/accio-sqlrewrite/src/test/java/io/accio/TestPreAggregationRewrite.java b/accio-sqlrewrite/src/test/java/io/accio/TestPreAggregationRewrite.java index 7223e3173..2dc8093d1 100644 --- a/accio-sqlrewrite/src/test/java/io/accio/TestPreAggregationRewrite.java +++ b/accio-sqlrewrite/src/test/java/io/accio/TestPreAggregationRewrite.java @@ -40,8 +40,8 @@ import static io.accio.base.dto.Column.column; import static io.accio.base.dto.Metric.metric; import static io.accio.base.dto.Model.model; -import static io.accio.base.dto.TimeGrain.TimeUnit.YEAR; import static io.accio.base.dto.TimeGrain.timeGrain; +import static io.accio.base.dto.TimeUnit.YEAR; import static io.accio.testing.AbstractTestFramework.withDefaultCatalogSchema; import static io.trino.sql.SqlFormatter.Dialect.DUCKDB; import static io.trino.sql.SqlFormatter.formatSql; diff --git a/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestCumulativeMetric.java b/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestCumulativeMetric.java new file mode 100644 index 000000000..08fa1578f --- /dev/null +++ b/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestCumulativeMetric.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.accio.sqlrewrite; + +import io.accio.base.AccioMDL; +import io.accio.base.dto.DateSpine; +import io.accio.base.dto.TimeUnit; +import io.accio.testing.AbstractTestFramework; +import org.testng.annotations.Test; + +import java.util.List; + +import static io.accio.base.AccioTypes.DATE; +import static io.accio.base.AccioTypes.INTEGER; +import static io.accio.base.AccioTypes.VARCHAR; +import static io.accio.base.dto.Column.column; +import static io.accio.base.dto.CumulativeMetric.cumulativeMetric; +import static io.accio.base.dto.Measure.measure; +import static io.accio.base.dto.Model.model; +import static io.accio.base.dto.Window.window; +import static io.accio.sqlrewrite.AccioSqlRewrite.ACCIO_SQL_REWRITE; +import static io.accio.sqlrewrite.MetricViewSqlRewrite.METRIC_VIEW_SQL_REWRITE; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestCumulativeMetric + extends AbstractTestFramework +{ + private static AccioMDL accioMDL; + + public TestCumulativeMetric() + { + accioMDL = AccioMDL.fromManifest(withDefaultCatalogSchema() + .setModels(List.of( + model("Orders", + "select * from main.orders", + List.of( + column("orderkey", INTEGER, null, true), + column("custkey", INTEGER, null, true), + column("orderstatus", VARCHAR, null, true), + column("totalprice", INTEGER, null, true), + column("orderdate", DATE, null, true), + column("orderpriority", VARCHAR, null, true), + column("clerk", VARCHAR, null, true), + column("shippriority", INTEGER, null, true), + column("comment", VARCHAR, null, true))))) + .setCumulativeMetrics(List.of( + cumulativeMetric("DailyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.DAY, "1994-01-01", "1994-12-31")), + cumulativeMetric("WeeklyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.WEEK, "1994-01-01", "1994-12-31")), + cumulativeMetric("MonthlyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.MONTH, "1994-01-01", "1994-12-31")), + cumulativeMetric("QuarterlyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.QUARTER, "1994-01-01", "1995-12-31")), + cumulativeMetric("YearlyRevenue", + "Orders", measure("totalprice", INTEGER, "sum", "totalprice"), + window("orderdate", "orderdate", TimeUnit.YEAR, "1994-01-01", "1998-12-31")))) + .setDateSpine(new DateSpine(TimeUnit.DAY, "1970-01-01", "2077-12-31")) + .build()); + } + + @Override + protected void prepareData() + { + String orders = getClass().getClassLoader().getResource("tiny-orders.parquet").getPath(); + exec("create table orders as select * from '" + orders + "'"); + } + + @Test + public void testCumulativeMetric() + { + assertThat(query(rewrite("select * from DailyRevenue")).size()).isEqualTo(365); + assertThat(query(rewrite("select * from WeeklyRevenue")).size()).isEqualTo(53); + assertThat(query(rewrite("select * from MonthlyRevenue")).size()).isEqualTo(12); + assertThat(query(rewrite("select * from QuarterlyRevenue")).size()).isEqualTo(8); + assertThat(query(rewrite("select * from YearlyRevenue")).size()).isEqualTo(5); + } + + private String rewrite(String sql) + { + return rewrite(sql, accioMDL); + } + + private String rewrite(String sql, AccioMDL accioMDL) + { + return AccioPlanner.rewrite(sql, DEFAULT_SESSION_CONTEXT, accioMDL, List.of(METRIC_VIEW_SQL_REWRITE, ACCIO_SQL_REWRITE)); + } +} diff --git a/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestMetricViewSqlRewrite.java b/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestMetricViewSqlRewrite.java index 0b5b7ecb6..7bac2223b 100644 --- a/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestMetricViewSqlRewrite.java +++ b/accio-sqlrewrite/src/test/java/io/accio/sqlrewrite/TestMetricViewSqlRewrite.java @@ -33,8 +33,8 @@ import static io.accio.base.dto.Column.column; import static io.accio.base.dto.Metric.metric; import static io.accio.base.dto.Model.model; -import static io.accio.base.dto.TimeGrain.TimeUnit.YEAR; import static io.accio.base.dto.TimeGrain.timeGrain; +import static io.accio.base.dto.TimeUnit.YEAR; import static io.accio.base.dto.View.view; import static io.accio.sqlrewrite.AccioSqlRewrite.ACCIO_SQL_REWRITE; import static io.accio.sqlrewrite.MetricViewSqlRewrite.METRIC_VIEW_SQL_REWRITE; diff --git a/accio-sqlrewrite/src/test/resources/tiny-orders.parquet b/accio-sqlrewrite/src/test/resources/tiny-orders.parquet new file mode 100644 index 000000000..c0e93fa94 Binary files /dev/null and b/accio-sqlrewrite/src/test/resources/tiny-orders.parquet differ diff --git a/accio-sqlrewrite/src/test/resources/tpch_mdl.json b/accio-sqlrewrite/src/test/resources/tpch_mdl.json index 06bf657ef..d07e90820 100644 --- a/accio-sqlrewrite/src/test/resources/tpch_mdl.json +++ b/accio-sqlrewrite/src/test/resources/tpch_mdl.json @@ -291,6 +291,28 @@ ] } ], + "cumulativeMetrics": [ + { + "name": "WeeklyRevenue", + "baseModel": "Orders", + "measure": { + "name": "totalprice", + "type": "int4", + "operator": "sum", + "refColumn": "totalprice" + }, + "window": { + "name": "orderdate", + "refColumn": "orderdate", + "timeUnit": "WEEK", + "start": "1993-01-01", + "end": "1993-12-31" + }, + "preAggregated": false, + "refreshTime": "30m", + "description": "" + } + ], "enumDefinitions": [ { "name": "Status", diff --git a/accio-tests/src/test/java/io/accio/testing/bigquery/TestAccioWithBigquery.java b/accio-tests/src/test/java/io/accio/testing/bigquery/TestAccioWithBigquery.java index 71d8fde20..90cc80e4b 100644 --- a/accio-tests/src/test/java/io/accio/testing/bigquery/TestAccioWithBigquery.java +++ b/accio-tests/src/test/java/io/accio/testing/bigquery/TestAccioWithBigquery.java @@ -120,6 +120,24 @@ void testQueryMetricRollup() } } + @Test + public void testQueryCumulativeMetric() + throws Exception + { + try (Connection connection = createConnection()) { + PreparedStatement stmt = connection.prepareStatement("select * from WeeklyRevenue"); + ResultSet resultSet = stmt.executeQuery(); + resultSet.next(); + assertThatNoException().isThrownBy(() -> resultSet.getInt("totalprice")); + int count = 1; + + while (resultSet.next()) { + count++; + } + assertThat(count).isEqualTo(53); + } + } + @Test public void testEnum() throws Exception diff --git a/accio-tests/src/test/resources/tpch_mdl.json b/accio-tests/src/test/resources/tpch_mdl.json index 8d669bdc4..2a0c749ca 100644 --- a/accio-tests/src/test/resources/tpch_mdl.json +++ b/accio-tests/src/test/resources/tpch_mdl.json @@ -260,6 +260,28 @@ ] } ], + "cumulativeMetrics": [ + { + "name": "WeeklyRevenue", + "baseModel": "Orders", + "measure": { + "name": "totalprice", + "type": "int4", + "operator": "sum", + "refColumn": "totalprice" + }, + "window": { + "name": "orderdate", + "refColumn": "orderdate", + "timeUnit": "WEEK", + "start": "1993-01-01", + "end": "1993-12-31" + }, + "preAggregated": false, + "refreshTime": "30m", + "description": "" + } + ], "enumDefinitions": [ { "name": "Status", diff --git a/pom.xml b/pom.xml index 56eed2371..365118b27 100644 --- a/pom.xml +++ b/pom.xml @@ -357,7 +357,7 @@ org.duckdb duckdb_jdbc - 0.8.1 + 0.9.1 diff --git a/trino-parser/src/main/java/io/trino/sql/ExpressionFormatter.java b/trino-parser/src/main/java/io/trino/sql/ExpressionFormatter.java index 7501b5403..2bc66e544 100644 --- a/trino-parser/src/main/java/io/trino/sql/ExpressionFormatter.java +++ b/trino-parser/src/main/java/io/trino/sql/ExpressionFormatter.java @@ -368,7 +368,7 @@ protected String visitIntervalLiteral(IntervalLiteral node, Void context) { String sign = (node.getSign() == IntervalLiteral.Sign.NEGATIVE) ? "-" : ""; StringBuilder builder = new StringBuilder(); - if (dialect.equals(BIGQUERY)) { + if (dialect.equals(BIGQUERY) || dialect.equals(DUCKDB)) { builder.append("INTERVAL ") .append("'").append(sign).append(node.getValue()).append("' ") .append(node.getStartField()); @@ -448,6 +448,11 @@ protected String visitFunctionCall(FunctionCall node, Void context) return processSliceInBigQuery(node); } + // TODO: `GENERATE_TIMESTAMP_ARRAY` is a bigquery function. We may need to consider the SQL dialect when Accio planning. + if ("GENERATE_TIMESTAMP_ARRAY".equalsIgnoreCase(node.getName().toString()) && dialect.equals(DUCKDB)) { + return processGenerateTimestampArrayInDuckDB(node); + } + StringBuilder builder = new StringBuilder(); if (node.getProcessingMode().isPresent()) { @@ -956,6 +961,16 @@ private String processSliceInBigQuery(FunctionCall node) start.getValue() - 1, start.getValue() - 1 + length.getValue()); } + + private String processGenerateTimestampArrayInDuckDB(FunctionCall node) + { + List arguments = node.getArguments(); + Expression start = arguments.get(0); + Expression end = arguments.get(1); + return format("GENERATE_SERIES(%s, %s, INTERVAL 1 DAY)", + start, + end); + } } static String formatStringLiteral(String s, Dialect dialect)