-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(设备-详细信息-运行状态-详情)增加极差值统计规则 #478
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,19 +19,21 @@ | |
import org.hswebframework.ezorm.core.param.QueryParam; | ||
import org.hswebframework.ezorm.core.param.Term; | ||
import org.hswebframework.ezorm.core.param.TermType; | ||
import org.jetlinks.core.metadata.types.DateTimeType; | ||
import org.jetlinks.community.Interval; | ||
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; | ||
import org.jetlinks.community.elastic.search.service.AggregationService; | ||
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; | ||
import org.jetlinks.community.timeseries.query.Aggregation; | ||
import org.jetlinks.community.timeseries.query.*; | ||
import org.jetlinks.core.metadata.types.DateTimeType; | ||
import org.jetlinks.reactor.ql.utils.CastUtils; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.util.CollectionUtils; | ||
import org.springframework.util.StringUtils; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.math.BigDecimal; | ||
import java.time.Duration; | ||
import java.time.ZoneId; | ||
import java.util.*; | ||
|
@@ -205,7 +207,10 @@ public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryPar | |
) | ||
) | ||
.flatMap(restClient::searchForPage) | ||
.flatMapMany(this::parseResult) | ||
.flatMapMany(row -> this.parseResult(row, aggregationQueryParam | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个映射看上去可以在上面构造聚合参数时一起构造。 |
||
.getAggColumns() | ||
.stream() | ||
.collect(Collectors.toMap(AggregationColumn::getAlias, AggregationColumn::getAggregation)))) | ||
.as(flux -> { | ||
if (!group) { | ||
return flux | ||
|
@@ -219,16 +224,18 @@ public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryPar | |
; | ||
} | ||
|
||
protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse) { | ||
protected Flux<Map<String, Object>> parseResult(SearchResponse searchResponse, Map<String, Aggregation> columnAggregationMap) { | ||
return Mono.justOrEmpty(searchResponse.getAggregations()) | ||
.flatMapIterable(Aggregations::asList) | ||
.flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE); | ||
.flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE); | ||
} | ||
|
||
|
||
private Flux<Map<String, Object>> parseAggregation(String name, | ||
org.elasticsearch.search.aggregations.Aggregation aggregation) { | ||
org.elasticsearch.search.aggregations.Aggregation aggregation, | ||
Map<String, Aggregation> columnAggregationMap) { | ||
if (aggregation instanceof Terms) { | ||
return parseAggregation(((Terms) aggregation)); | ||
return parseAggregation(((Terms) aggregation), columnAggregationMap); | ||
} | ||
if (aggregation instanceof TopHits) { | ||
TopHits topHits = ((TopHits) aggregation); | ||
|
@@ -243,7 +250,7 @@ private Flux<Map<String, Object>> parseAggregation(String name, | |
}); | ||
} | ||
if (aggregation instanceof Histogram) { | ||
return parseAggregation(((Histogram) aggregation)); | ||
return parseAggregation(((Histogram) aggregation), columnAggregationMap); | ||
} | ||
if (aggregation instanceof ValueCount) { | ||
return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue())); | ||
|
@@ -257,6 +264,21 @@ private Flux<Map<String, Object>> parseAggregation(String name, | |
// TODO: 2020/10/29 只处理了标准差差 | ||
return Flux.just(Collections.singletonMap(name, stats.getStdDeviation())); | ||
} | ||
if (aggregation instanceof ParsedStats) { | ||
ParsedStats parsedStats = ((ParsedStats) aggregation); | ||
Aggregation agg = columnAggregationMap.get(parsedStats.getName()); | ||
if (agg == null) { | ||
return Flux.empty(); | ||
} | ||
switch (agg) { | ||
case DIFFERENCE: | ||
BigDecimal max = (!Double.isInfinite(parsedStats.getMax()) ? BigDecimal.valueOf(parsedStats.getMax()) : BigDecimal.ZERO); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 处理null的情况, max或者min为Infinite时,应该返回 null才对?(表示没有数据)。而0在某的场景下可能存在特殊意义。 |
||
BigDecimal min = (!Double.isInfinite(parsedStats.getMin()) ? BigDecimal.valueOf(parsedStats.getMin()) : BigDecimal.ZERO); | ||
return Flux.just(Collections.singletonMap(name, max.subtract(min))); | ||
default: | ||
return Flux.empty(); | ||
} | ||
} | ||
|
||
return Flux.empty(); | ||
} | ||
|
@@ -265,13 +287,13 @@ private Object getSafeNumber(double number) { | |
return (Double.isNaN(number) || Double.isInfinite(number)) ? null : number; | ||
} | ||
|
||
private Flux<Map<String, Object>> parseAggregation(Histogram aggregation) { | ||
private Flux<Map<String, Object>> parseAggregation(Histogram aggregation, Map<String, Aggregation> columnAggregationMap) { | ||
|
||
return Flux | ||
.fromIterable(aggregation.getBuckets()) | ||
.flatMap(bucket -> | ||
Flux.fromIterable(bucket.getAggregations().asList()) | ||
.flatMap(agg -> this.parseAggregation(agg.getName(), agg), Integer.MAX_VALUE) | ||
.flatMap(agg -> this.parseAggregation(agg.getName(), agg, columnAggregationMap), Integer.MAX_VALUE) | ||
.defaultIfEmpty(Collections.emptyMap()) | ||
// .map(Map::entrySet) | ||
// .flatMap(Flux::fromIterable) | ||
|
@@ -286,11 +308,11 @@ private Flux<Map<String, Object>> parseAggregation(Histogram aggregation) { | |
); | ||
} | ||
|
||
private Flux<Map<String, Object>> parseAggregation(Terms aggregation) { | ||
private Flux<Map<String, Object>> parseAggregation(Terms aggregation, Map<String, Aggregation> columnAggregationMap) { | ||
|
||
return Flux.fromIterable(aggregation.getBuckets()) | ||
.flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList()) | ||
.flatMap(agg -> parseAggregation(agg.getName(), agg) | ||
.flatMap(agg -> parseAggregation(agg.getName(), agg, columnAggregationMap) | ||
.map(map -> { | ||
Map<String, Object> val = new HashMap<>(map); | ||
val.put(aggregation.getName(), bucket.getKeyAsString()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,8 @@ public enum Aggregation implements EnumDict<String> { | |
|
||
//去重计数 | ||
DISTINCT_COUNT(flux -> flux.distinct().count(),0), | ||
NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0); | ||
NONE(numberFlux -> Reactors.ALWAYS_ZERO, 0), | ||
DIFFERENCE(numberFlux -> MathFlux.sumDouble(numberFlux, Number::doubleValue), 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RANGE ? 默认值应该为0,且 sumDouble应该不符合需求? |
||
|
||
private final Function<Flux<Number>, Mono<? extends Number>> computer; | ||
@Getter | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RANGE 更合适?