From 615c881b6c47211d4866cf7bbc7d93fb9e713add Mon Sep 17 00:00:00 2001 From: YangJiaHui <819582890@qq.com> Date: Tue, 12 Mar 2024 16:17:27 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E8=AE=BE=E5=A4=87-=E8=AF=A6=E7=BB=86?= =?UTF-8?q?=E4=BF=A1=E6=81=AF-=E8=BF=90=E8=A1=8C=E7=8A=B6=E6=80=81-?= =?UTF-8?q?=E8=AF=A6=E6=83=85)=E5=A2=9E=E5=8A=A0=E6=9E=81=E5=B7=AE?= =?UTF-8?q?=E5=80=BC=E7=BB=9F=E8=AE=A1=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../search/service/reactive/AggType.java | 11 ++++- .../reactive/ReactiveAggregationService.java | 44 ++++++++++++++----- .../timeseries/query/Aggregation.java | 3 +- 3 files changed, 45 insertions(+), 13 deletions(-) diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java index 409ba07c0..d22f90b7d 100755 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java @@ -82,7 +82,16 @@ public AggregationBuilder aggregationBuilder(String name, String filed) { return AggregationBuilders.stats(name).field(filed).missing(0); } - }; + }, + + DIFFERENCE("极差值") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.stats(name).field(filed).missing(0); + } + }, + + ; @Getter private final String text; diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java index 50040de8d..c024c06c7 100755 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java @@ -19,12 +19,13 @@ import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.ezorm.core.param.TermType; -import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.community.Interval; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.service.AggregationService; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; +import org.jetlinks.community.timeseries.query.Aggregation; import org.jetlinks.community.timeseries.query.*; +import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.reactor.ql.utils.CastUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; @@ -32,6 +33,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.math.BigDecimal; import java.time.Duration; import java.time.ZoneId; import java.util.*; @@ -205,7 +207,10 @@ public Flux> aggregation(String[] index, AggregationQueryPar ) ) .flatMap(restClient::searchForPage) - .flatMapMany(this::parseResult) + .flatMapMany(row -> this.parseResult(row, aggregationQueryParam + .getAggColumns() + .stream() + .collect(Collectors.toMap(AggregationColumn::getAlias, AggregationColumn::getAggregation)))) .as(flux -> { if (!group) { return flux @@ -219,16 +224,18 @@ public Flux> aggregation(String[] index, AggregationQueryPar ; } - protected Flux> parseResult(SearchResponse searchResponse) { + protected Flux> parseResult(SearchResponse searchResponse, Map columnAggregationMap) { return Mono.justOrEmpty(searchResponse.getAggregations()) .flatMapIterable(Aggregations::asList) - .flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE); + .flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE); } + private Flux> parseAggregation(String name, - org.elasticsearch.search.aggregations.Aggregation aggregation) { + org.elasticsearch.search.aggregations.Aggregation aggregation, + Map columnAggregationMap) { if (aggregation instanceof Terms) { - return parseAggregation(((Terms) aggregation)); + return parseAggregation(((Terms) aggregation), columnAggregationMap); } if (aggregation instanceof TopHits) { TopHits topHits = ((TopHits) aggregation); @@ -243,7 +250,7 @@ private Flux> parseAggregation(String name, }); } if (aggregation instanceof Histogram) { - return parseAggregation(((Histogram) aggregation)); + return parseAggregation(((Histogram) aggregation), columnAggregationMap); } if (aggregation instanceof ValueCount) { return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue())); @@ -257,6 +264,21 @@ private Flux> parseAggregation(String name, // TODO: 2020/10/29 只处理了标准差差 return Flux.just(Collections.singletonMap(name, stats.getStdDeviation())); } + if (aggregation instanceof ParsedStats) { + ParsedStats parsedStats = ((ParsedStats) aggregation); + Aggregation agg = columnAggregationMap.get(parsedStats.getName()); + if (agg == null) { + return Flux.empty(); + } + switch (agg) { + case DIFFERENCE: + BigDecimal max = (!Double.isInfinite(parsedStats.getMax()) ? BigDecimal.valueOf(parsedStats.getMax()) : BigDecimal.ZERO); + BigDecimal min = (!Double.isInfinite(parsedStats.getMin()) ? BigDecimal.valueOf(parsedStats.getMin()) : BigDecimal.ZERO); + return Flux.just(Collections.singletonMap(name, max.subtract(min))); + default: + return Flux.empty(); + } + } return Flux.empty(); } @@ -265,13 +287,13 @@ private Object getSafeNumber(double number) { return (Double.isNaN(number) || Double.isInfinite(number)) ? null : number; } - private Flux> parseAggregation(Histogram aggregation) { + private Flux> parseAggregation(Histogram aggregation, Map columnAggregationMap) { return Flux .fromIterable(aggregation.getBuckets()) .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList()) - .flatMap(agg -> this.parseAggregation(agg.getName(), agg), Integer.MAX_VALUE) + .flatMap(agg -> this.parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE) .defaultIfEmpty(Collections.emptyMap()) // .map(Map::entrySet) // .flatMap(Flux::fromIterable) @@ -286,11 +308,11 @@ private Flux> parseAggregation(Histogram aggregation) { ); } - private Flux> parseAggregation(Terms aggregation) { + private Flux> parseAggregation(Terms aggregation, Map columnAggregationMap) { return Flux.fromIterable(aggregation.getBuckets()) .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList()) - .flatMap(agg -> parseAggregation(agg.getName(), agg) + .flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap) .map(map -> { Map val = new HashMap<>(map); val.put(aggregation.getName(), bucket.getKeyAsString()); diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java index 1c0ff0797..047e66551 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java @@ -25,7 +25,8 @@ public enum Aggregation implements EnumDict { //去重计数 DISTINCT_COUNT(flux -> flux.distinct().count(),0), - NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0); + NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0), + DIFFERENCE(numberFlux -> MathFlux.sumDouble(numberFlux, Number::doubleValue), 0); private final Function, Mono> computer; @Getter