Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(设备-详细信息-运行状态-详情)增加极差值统计规则 #478

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.stats(name).field(filed).missing(0);
}

};
},

DIFFERENCE("极差值") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RANGE 更合适?

@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.stats(name).field(filed).missing(0);
}
},

;

@Getter
private final String text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.core.param.TermType;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.math.BigDecimal;
import java.time.Duration;
import java.time.ZoneId;
import java.util.*;
Expand Down Expand Up @@ -205,7 +207,10 @@ public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryPar
)
)
.flatMap(restClient::searchForPage)
.flatMapMany(this::parseResult)
.flatMapMany(row -> this.parseResult(row, aggregationQueryParam
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个映射看上去可以在上面构造聚合参数时一起构造。

.getAggColumns()
.stream()
.collect(Collectors.toMap(AggregationColumn::getAlias, AggregationColumn::getAggregation))))
.as(flux -> {
if (!group) {
return flux
Expand All @@ -219,16 +224,18 @@ public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryPar
;
}

protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse) {
protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse, Map<String, Aggregation> columnAggregationMap) {
return Mono.justOrEmpty(searchResponse.getAggregations())
.flatMapIterable(Aggregations::asList)
.flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE);
.flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE);
}


private Flux<Map<String, Object>> parseAggregation(String name,
org.elasticsearch.search.aggregations.Aggregation aggregation) {
org.elasticsearch.search.aggregations.Aggregation aggregation,
Map<String, Aggregation> columnAggregationMap) {
if (aggregation instanceof Terms) {
return parseAggregation(((Terms) aggregation));
return parseAggregation(((Terms) aggregation), columnAggregationMap);
}
if (aggregation instanceof TopHits) {
TopHits topHits = ((TopHits) aggregation);
Expand All @@ -243,7 +250,7 @@ private Flux<Map<String, Object>> parseAggregation(String name,
});
}
if (aggregation instanceof Histogram) {
return parseAggregation(((Histogram) aggregation));
return parseAggregation(((Histogram) aggregation), columnAggregationMap);
}
if (aggregation instanceof ValueCount) {
return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue()));
Expand All @@ -257,6 +264,21 @@ private Flux<Map<String, Object>> parseAggregation(String name,
// TODO: 2020/10/29 只处理了标准差差
return Flux.just(Collections.singletonMap(name, stats.getStdDeviation()));
}
if (aggregation instanceof ParsedStats) {
ParsedStats parsedStats = ((ParsedStats) aggregation);
Aggregation agg = columnAggregationMap.get(parsedStats.getName());
if (agg == null) {
return Flux.empty();
}
switch (agg) {
case DIFFERENCE:
BigDecimal max = (!Double.isInfinite(parsedStats.getMax()) ? BigDecimal.valueOf(parsedStats.getMax()) : BigDecimal.ZERO);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

处理null的情况, max或者min为Infinite时,应该返回 null才对?(表示没有数据)。而0在某的场景下可能存在特殊意义。

BigDecimal min = (!Double.isInfinite(parsedStats.getMin()) ? BigDecimal.valueOf(parsedStats.getMin()) : BigDecimal.ZERO);
return Flux.just(Collections.singletonMap(name, max.subtract(min)));
default:
return Flux.empty();
}
}

return Flux.empty();
}
Expand All @@ -265,13 +287,13 @@ private Object getSafeNumber(double number) {
return (Double.isNaN(number) || Double.isInfinite(number)) ? null : number;
}

private Flux<Map<String, Object>> parseAggregation(Histogram aggregation) {
private Flux<Map<String, Object>> parseAggregation(Histogram aggregation, Map<String, Aggregation> columnAggregationMap) {

return Flux
.fromIterable(aggregation.getBuckets())
.flatMap(bucket ->
Flux.fromIterable(bucket.getAggregations().asList())
.flatMap(agg -> this.parseAggregation(agg.getName(), agg), Integer.MAX_VALUE)
.flatMap(agg -> this.parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE)
.defaultIfEmpty(Collections.emptyMap())
// .map(Map::entrySet)
// .flatMap(Flux::fromIterable)
Expand All @@ -286,11 +308,11 @@ private Flux<Map<String, Object>> parseAggregation(Histogram aggregation) {
);
}

private Flux<Map<String, Object>> parseAggregation(Terms aggregation) {
private Flux<Map<String, Object>> parseAggregation(Terms aggregation, Map<String, Aggregation> columnAggregationMap) {

return Flux.fromIterable(aggregation.getBuckets())
.flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList())
.flatMap(agg -> parseAggregation(agg.getName(), agg)
.flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap)
.map(map -> {
Map<String, Object> val = new HashMap<>(map);
val.put(aggregation.getName(), bucket.getKeyAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public enum Aggregation implements EnumDict<String> {

//去重计数
DISTINCT_COUNT(flux -> flux.distinct().count(),0),
NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0);
NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0),
DIFFERENCE(numberFlux -> MathFlux.sumDouble(numberFlux, Number::doubleValue), 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RANGE ? 默认值应该为0,且 sumDouble应该不符合需求?


private final Function<Flux<Number>, Mono<? extends Number>> computer;
@Getter
Expand Down