From ef4fa29b57a087fe595e383ca9919ffcc50299e5 Mon Sep 17 00:00:00 2001 From: Aleksandar Stanchev Date: Mon, 9 Sep 2024 15:18:44 +0300 Subject: [PATCH] ffix review findings Signed-off-by: Aleksandar Stanchev --- .../pages/ditto/installation-operating.md | 37 +++--- .../query/AggregateThingsMetrics.java | 91 ++++++++++++-- .../query/AggregateThingsMetricsResponse.java | 111 +++++++++++++++--- ...ava => CustomAggregationMetricConfig.java} | 2 +- ...DefaultCustomAggregationMetricConfig.java} | 10 +- .../config/DefaultOperatorMetricsConfig.java | 34 +++--- .../common/config/OperatorMetricsConfig.java | 10 +- .../MongoThingsAggregationPersistence.java | 9 +- ...CreateBsonAggregationPredicateVisitor.java | 10 +- .../CreateBsonAggregationVisitor.java | 4 +- .../criteria/visitors/CreateBsonVisitor.java | 2 +- .../GroupByPlaceholderResolver.java | 6 +- .../InlinePlaceholderResolver.java | 6 +- .../actors/AggregationThingsMetricsActor.java | 46 ++++---- .../OperatorSearchMetricsProviderActor.java | 42 +++---- .../actors/ThingsAggregationConstants.java | 2 +- .../src/main/resources/search-dev.conf | 2 +- ...ultCustomAggregationMetricConfigTest.java} | 18 +-- .../AggregationThingsMetricsActorTest.java | 5 +- 19 files changed, 310 insertions(+), 137 deletions(-) rename thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/{CustomSearchMetricConfig.java => CustomAggregationMetricConfig.java} (99%) rename thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/{DefaultCustomSearchMetricConfig.java => DefaultCustomAggregationMetricConfig.java} (96%) rename thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/{DefaultCustomSearchMetricConfigTest.java => DefaultCustomAggregationMetricConfigTest.java} (79%) diff --git a/documentation/src/main/resources/pages/ditto/installation-operating.md b/documentation/src/main/resources/pages/ditto/installation-operating.md index fdb351b84e..6c2f47eebc 100644 --- a/documentation/src/main/resources/pages/ditto/installation-operating.md +++ b/documentation/src/main/resources/pages/ditto/installation-operating.md @@ -581,14 +581,14 @@ In order to add custom metrics via System properties, the following example show Ditto will perform a [count things operation](basic-search.html#search-count-queries) each `5m` (5 minutes), providing a gauge named `all_produced_and_not_installed_devices` with the count of the query, adding the tag `company="acme-corp"`. -In Prometheus format this would look like: +In Prometheus format, this would look like: ``` all_produced_and_not_installed_devices{company="acme-corp"} 42.0 ``` -### Operator defined custom search based metrics -Starting with Ditto 3.6.0, the "custom metrics" functionality is extended to support search-based metrics. -This is configured via the [search](architecture-services-things-search.html) service configuration and builds on the -[search things](basic-search.html#search-queries) functionality. + +### Operator defined custom aggregation based metrics +Starting with Ditto 3.6.0, the "custom metrics" functionality is extended to support custom aggregation metrics. +This is configured via the [search](architecture-services-things-search.html) service configuration. > :warning: **Abstain of defining grouping by fields that have a high cardinality, as this will lead to a high number of metrics and may overload the Prometheus server!** @@ -596,8 +596,8 @@ may overload the Prometheus server!** Now you can augment the statistic about "Things" managed in Ditto fulfilling a certain condition with tags with either predefined values, values retrieved from the things or values which are defined based on the matching filter. -This is fulfill by using hardcoded values or placeholders in the tags configuration. -The supported placeholder types are inline and group-by placeholders. +This is fulfilled by using hardcoded values or placeholders in the tags configuration. +The supported placeholder types are `inline` and `group-by` placeholders. [Function expressions](basic-placeholders.html#function-expressions) are also supported to manipulate the values of the placeholders before they are used in the tags. @@ -612,34 +612,33 @@ ditto { custom-metrics { ... } - custom-search-metrics { + custom-aggregate-metrics { online_status { enabled = true scrape-interval = 1m # override scrape interval, run every 20 minute namespaces = [ "org.eclipse.ditto" ] - group-by:{ + group-by { "location" = "attributes/Info/location" "isGateway" = "attributes/Info/gateway" } - tags: { + tags { "online" = "{{ inline:online_placeholder }}" "health" = "{{ inline:health }}" "hardcoded-tag" = "hardcoded_value" "location" = "{{ group-by:location | fn:default('missing location') }}" - "altitude" = "{{ group-by:isGateway }}" } - filters = { - online_filter = { - filter = "gt(features/ConnectionStatus/properties/status/readyUntil/,time:now)" - inline-placeholder-values = { + filters { + online_filter { + filter = "gt(features/ConnectionStatus/properties/status/readyUntil,time:now)" + inline-placeholder-values { "online_placeholder" = true "health" = "good" } } - offline_filter = { - filter = "lt(features/ConnectionStatus/properties/status/readyUntil/,time:now)" + offline_filter { + filter = "lt(features/ConnectionStatus/properties/status/readyUntil,time:now)" inline-placeholder-values = { "online_placeholder" = false "health" = "bad" @@ -671,12 +670,12 @@ To add custom metrics via System properties, the following example shows how the ``` -Ditto will perform a [search things operation](basic-search.html#search-queries) every `20m` (20 minutes), providing +Ditto will perform an [aggregation operation](https://www.mongodb.com/docs/manual/aggregation/) over the search db collection every `20m` (20 minutes), providing a gauge named `online_devices` with the value of devices that match the filter. The tags `online` and `location` will be added. Their values will be resolved from the placeholders `{{online_placeholder}}` and `{{attributes/Info/location}}` respectively. -In Prometheus format this would look like: +In Prometheus format, this would look like: ``` online_status{location="Berlin",online="false"} 6.0 online_status{location="Immenstaad",online="true"} 8.0 diff --git a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetrics.java b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetrics.java index c66d96a383..c638c210ed 100644 --- a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetrics.java +++ b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetrics.java @@ -14,14 +14,41 @@ package org.eclipse.ditto.thingsearch.model.signals.commands.query; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.json.JsonSchemaVersion; +import org.eclipse.ditto.base.model.signals.commands.AbstractCommand; +import org.eclipse.ditto.json.JsonArray; +import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.json.JsonField; +import org.eclipse.ditto.json.JsonObjectBuilder; +import org.eclipse.ditto.json.JsonPointer; + +/** + * A command to aggregate metrics for things. + */ +public final class AggregateThingsMetrics extends AbstractCommand { + + private static final String NAME = "things-metrics"; + /** + * Aggregation resource type. + */ + static final String RESOURCE_TYPE = "aggregation"; + /** + * Type prefix of aggregation command. + */ + private static final String TYPE_PREFIX = RESOURCE_TYPE + "." + TYPE_QUALIFIER + ":"; + /** + * The name of this command. + */ + private static final String TYPE = TYPE_PREFIX + NAME; -public class AggregateThingsMetrics implements WithDittoHeaders { private final String metricName; private final Map groupingBy; @@ -31,13 +58,24 @@ public class AggregateThingsMetrics implements WithDittoHeaders { private AggregateThingsMetrics(final String metricName, final Map groupingBy, final Map namedFilters, final Set namespaces, final DittoHeaders dittoHeaders) { + super(TYPE, dittoHeaders, null); this.metricName = metricName; - this.groupingBy = groupingBy; - this.namedFilters = namedFilters; - this.namespaces = namespaces; + this.groupingBy = Collections.unmodifiableMap(groupingBy); + this.namedFilters = Collections.unmodifiableMap(namedFilters); + this.namespaces = Collections.unmodifiableSet(namespaces); this.dittoHeaders = dittoHeaders; } + /** + * Creates a new {@link AggregateThingsMetrics} instance. + * + * @param metricName the name of the metric to aggregate. + * @param groupingBy the fields we want our metric aggregation to be grouped by. + * @param namedFilters the named filters to use for the aggregation. + * @param namespaces the namespaces the metric should be executed for. + * @param dittoHeaders the headers to use for the command. + * @return a new {@link AggregateThingsMetrics} instance. + */ public static AggregateThingsMetrics of(final String metricName, final Map groupingBy, final Map namedFilters, final Set namespaces, final DittoHeaders dittoHeaders) { return new AggregateThingsMetrics(metricName, groupingBy, namedFilters, namespaces, dittoHeaders); @@ -56,14 +94,53 @@ public Map getNamedFilters() { } @Override - public DittoHeaders getDittoHeaders() { - return dittoHeaders; + protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion, + final Predicate thePredicate) { + + final Predicate predicate = schemaVersion.and(thePredicate); + jsonObjectBuilder.set("metric-name", metricName, predicate); + final JsonObjectBuilder groupingBy = JsonFactory.newObjectBuilder(); + this.groupingBy.forEach(groupingBy::set); + jsonObjectBuilder.set("grouping-by", groupingBy.build(), predicate); + final JsonObjectBuilder jsonFields = JsonFactory.newObjectBuilder(); + namedFilters.forEach(jsonFields::set); + jsonObjectBuilder.set("named-filters",jsonFields.build(), predicate); + final JsonArray array = + JsonFactory.newArrayBuilder(namespaces.stream().map(JsonFactory::newValue).collect( + Collectors.toSet())).build(); + jsonObjectBuilder.set("namespaces", array, predicate); + } public Set getNamespaces() { return namespaces; } + @Override + public String getTypePrefix() { + return TYPE_PREFIX; + } + + @Override + public Category getCategory() { + return Category.STREAM; + } + + @Override + public AggregateThingsMetrics setDittoHeaders(final DittoHeaders dittoHeaders) { + return of(getMetricName(), getGroupingBy(), getNamedFilters(), getNamespaces(), dittoHeaders); + } + + @Override + public JsonPointer getResourcePath() { + return JsonPointer.empty(); + } + + @Override + public String getResourceType() { + return RESOURCE_TYPE; + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java index 32b4e321e0..d9e536a738 100644 --- a/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java +++ b/thingsearch/model/src/main/java/org/eclipse/ditto/thingsearch/model/signals/commands/query/AggregateThingsMetricsResponse.java @@ -18,16 +18,34 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.json.JsonSchemaVersion; +import org.eclipse.ditto.base.model.signals.commands.AbstractCommandResponse; +import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.json.JsonField; import org.eclipse.ditto.json.JsonMissingFieldException; import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonObjectBuilder; import org.eclipse.ditto.json.JsonPointer; -public class AggregateThingsMetricsResponse implements WithDittoHeaders { +/** + * A response to an {@link AggregateThingsMetrics} command. + * Contains the original aggregation as returned by the database as well as fields initialized by that aggregation + * (grouped by and result) + * The result contains the returned values for each filter defined in a metric. + * The groupedBy contains the values that the result was grouped by. + */ +public final class AggregateThingsMetricsResponse extends AbstractCommandResponse { + + private static final String NAME = "things-metrics"; + private static final String AGGREGATION = "aggregation"; + private static final String TYPE_PREFIX = AGGREGATION + "." + TYPE_QUALIFIER + ":"; + private static final String RESOURCE_TYPE = AGGREGATION + "." + TYPE_QUALIFIER; private final Map groupedBy; @@ -36,10 +54,12 @@ public class AggregateThingsMetricsResponse implements WithDittoHeaders { private final DittoHeaders dittoHeaders; private final JsonObject aggregation; private final String metricName; + private AggregateThingsMetricsResponse(final JsonObject aggregation, final DittoHeaders dittoHeaders, final String metricName, final Set filterNames) { + super(TYPE_PREFIX + NAME, HttpStatus.OK, dittoHeaders); this.aggregation = aggregation; - this.dittoHeaders = dittoHeaders; + this.dittoHeaders = DittoHeaders.of(dittoHeaders); this.metricName = metricName; groupedBy = aggregation.getValue("_id") .map(json -> { @@ -53,44 +73,96 @@ private AggregateThingsMetricsResponse(final JsonObject aggregation, final Ditto } ) .orElse(new HashMap<>()); - result = filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key -> - key, key -> aggregation.getValue(JsonPointer.of(key)) - .orElseThrow(getJsonMissingFieldExceptionSupplier(key)) - .asLong())); + result = extractFiltersResults(aggregation, filterNames); // value should always be a number as it will be used for the gauge value in the metrics } + /** + * Creates a new {@link AggregateThingsMetricsResponse} instance. + * @param aggregation the aggregation result. + * @param aggregateThingsMetrics the command that was executed. + * @return the AggregateThingsMetricsResponse instance. + */ public static AggregateThingsMetricsResponse of(final JsonObject aggregation, final AggregateThingsMetrics aggregateThingsMetrics) { return of(aggregation, aggregateThingsMetrics.getDittoHeaders(), aggregateThingsMetrics.getMetricName(), aggregateThingsMetrics.getNamedFilters().keySet()); } + /** + * Creates a new {@link AggregateThingsMetricsResponse} instance. + * @param aggregation the aggregation result. + * @param dittoHeaders the headers to use for the response. + * @param metricName the name of the metric. + * @param filterNames the names of the filters. + * @return the AggregateThingsMetricsResponse instance. + */ public static AggregateThingsMetricsResponse of(final JsonObject aggregation, final DittoHeaders dittoHeaders, final String metricName, final Set filterNames) { return new AggregateThingsMetricsResponse(aggregation, dittoHeaders, metricName, filterNames); } @Override - public DittoHeaders getDittoHeaders() { - return dittoHeaders; + public AggregateThingsMetricsResponse setDittoHeaders(final DittoHeaders dittoHeaders) { + return AggregateThingsMetricsResponse.of(aggregation, dittoHeaders, metricName, result.keySet()); + } + + @Override + public JsonObject toJson() { + return super.toJson(); + } + + @Override + public JsonPointer getResourcePath() { + return JsonPointer.empty(); } + @Override + public String getResourceType() { + return RESOURCE_TYPE; + } + + + @Override + protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion, + final Predicate thePredicate) { + + final Predicate predicate = schemaVersion.and(thePredicate); + jsonObjectBuilder.set("metric-name", metricName, predicate); + final JsonObjectBuilder groupedBy = JsonFactory.newObjectBuilder(); + this.groupedBy.forEach(groupedBy::set); + jsonObjectBuilder.set("grouped-by", groupedBy.build(), predicate); + final JsonObjectBuilder jsonFields = JsonFactory.newObjectBuilder(); + getResult().forEach(jsonFields::set); + jsonObjectBuilder.set("result",jsonFields.build(), predicate); + jsonObjectBuilder.set("aggregation", aggregation, predicate); + + } + + /** + * Returns the grouping by values by witch the result was grouped. + * @return the groupedBy of the response. + */ public Map getGroupedBy() { return groupedBy; } + /** + * Returns the values for each filter defined in the metric + * + * @return the result of the aggregation. + */ public Map getResult() { return result; } + /** + * Returns the metric name. + * @return the metric name. + */ public String getMetricName() { return metricName; } - private Supplier getJsonMissingFieldExceptionSupplier(String field) { - return () -> JsonMissingFieldException.newBuilder().fieldName(field).build(); - } - @Override public boolean equals(final Object o) { if (this == o) return true; @@ -114,8 +186,19 @@ public String toString() { "groupedBy=" + groupedBy + ", result=" + result + ", dittoHeaders=" + dittoHeaders + - ", aggregation=" + aggregation + + ", " + AGGREGATION + "=" + aggregation + ", metricName='" + metricName + '\'' + '}'; } + + private Map extractFiltersResults(final JsonObject aggregation, final Set filterNames) { + return filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key -> + key, key -> aggregation.getValue(JsonPointer.of(key)) + .orElseThrow(getJsonMissingFieldExceptionSupplier(key)) + .asLong())); + } + + private Supplier getJsonMissingFieldExceptionSupplier(final String field) { + return () -> JsonMissingFieldException.newBuilder().fieldName(field).build(); + } } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomSearchMetricConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomAggregationMetricConfig.java similarity index 99% rename from thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomSearchMetricConfig.java rename to thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomAggregationMetricConfig.java index 0e6f2c6df9..77966f8c23 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomSearchMetricConfig.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/CustomAggregationMetricConfig.java @@ -22,7 +22,7 @@ /** * Provides the configuration settings for a single custom search metric. */ -public interface CustomSearchMetricConfig { +public interface CustomAggregationMetricConfig { /** diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfig.java similarity index 96% rename from thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfig.java rename to thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfig.java index 5033fea6f6..eb09aa61ab 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfig.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfig.java @@ -35,7 +35,7 @@ import com.typesafe.config.ConfigFactory; @Immutable -public final class DefaultCustomSearchMetricConfig implements CustomSearchMetricConfig { +public final class DefaultCustomAggregationMetricConfig implements CustomAggregationMetricConfig { private final String metricName; private final boolean enabled; @@ -45,7 +45,7 @@ public final class DefaultCustomSearchMetricConfig implements CustomSearchMetric private final Map tags; private final List filterConfigs; - private DefaultCustomSearchMetricConfig(final String key, final ConfigWithFallback configWithFallback) { + private DefaultCustomAggregationMetricConfig(final String key, final ConfigWithFallback configWithFallback) { this.metricName = key; enabled = configWithFallback.getBoolean(CustomSearchMetricConfigValue.ENABLED.getConfigPath()); scrapeInterval = configWithFallback.getDuration(CustomSearchMetricConfigValue.SCRAPE_INTERVAL.getConfigPath()); @@ -72,8 +72,8 @@ private DefaultCustomSearchMetricConfig(final String key, final ConfigWithFallba validateConfig(); } - public static DefaultCustomSearchMetricConfig of(final String key, final Config config) { - return new DefaultCustomSearchMetricConfig(key, + public static DefaultCustomAggregationMetricConfig of(final String key, final Config config) { + return new DefaultCustomAggregationMetricConfig(key, ConfigWithFallback.newInstance(config, CustomSearchMetricConfigValue.values())); } @@ -185,7 +185,7 @@ private boolean isPlaceHolder(final String value) { public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - final DefaultCustomSearchMetricConfig that = (DefaultCustomSearchMetricConfig) o; + final DefaultCustomAggregationMetricConfig that = (DefaultCustomAggregationMetricConfig) o; return enabled == that.enabled && Objects.equals(metricName, that.metricName) && Objects.equals(scrapeInterval, that.scrapeInterval) && Objects.equals(namespaces, that.namespaces) && Objects.equals(groupBy, that.groupBy) && diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultOperatorMetricsConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultOperatorMetricsConfig.java index a5389dc635..1f0c9a5f5c 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultOperatorMetricsConfig.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultOperatorMetricsConfig.java @@ -50,15 +50,15 @@ public final class DefaultOperatorMetricsConfig implements OperatorMetricsConfig private final boolean enabled; private final Duration scrapeInterval; private final Map customMetricConfigurations; - private final Map customSearchMetricConfigs; + private final Map customAggregationMetricConfigs; private DefaultOperatorMetricsConfig(final ConfigWithFallback updaterScopedConfig) { enabled = updaterScopedConfig.getBoolean(OperatorMetricsConfigValue.ENABLED.getConfigPath()); scrapeInterval = updaterScopedConfig.getNonNegativeDurationOrThrow(OperatorMetricsConfigValue.SCRAPE_INTERVAL); customMetricConfigurations = loadCustomMetricConfigurations(updaterScopedConfig, OperatorMetricsConfigValue.CUSTOM_METRICS); - customSearchMetricConfigs = loadCustomSearchMetricConfigurations(updaterScopedConfig, - OperatorMetricsConfigValue.CUSTOM_SEARCH_METRICS); + customAggregationMetricConfigs = loadCustomAggregatedMetricConfigurations(updaterScopedConfig, + OperatorMetricsConfigValue.CUSTOM_AGGREGATION_METRIC); } /** @@ -81,12 +81,12 @@ private static Map loadCustomMetricConfigurations(fi return customMetricsConfig.entrySet().stream().collect(CustomMetricConfigCollector.toMap()); } - private Map loadCustomSearchMetricConfigurations( + private Map loadCustomAggregatedMetricConfigurations( final ConfigWithFallback config, final KnownConfigValue configValue) { - final ConfigObject customSearchMetricsConfig = config.getObject(configValue.getConfigPath()); + final ConfigObject customAggregatedMetricsConfig = config.getObject(configValue.getConfigPath()); - return customSearchMetricsConfig.entrySet().stream().collect(CustomSearchMetricConfigCollector.toMap()); + return customAggregatedMetricsConfig.entrySet().stream().collect(CustomAggregatedMetricConfigCollector.toMap()); } @Override @@ -132,8 +132,8 @@ public Map getCustomMetricConfigurations() { } @Override - public Map getCustomSearchMetricConfigs() { - return customSearchMetricConfigs; + public Map getCustomAggregationMetricConfigs() { + return customAggregationMetricConfigs; } private static class CustomMetricConfigCollector @@ -172,32 +172,32 @@ public Set characteristics() { } } - private static class CustomSearchMetricConfigCollector implements - Collector, Map, Map> { + private static class CustomAggregatedMetricConfigCollector implements + Collector, Map, Map> { - private static DefaultOperatorMetricsConfig.CustomSearchMetricConfigCollector toMap() { - return new DefaultOperatorMetricsConfig.CustomSearchMetricConfigCollector(); + private static CustomAggregatedMetricConfigCollector toMap() { + return new CustomAggregatedMetricConfigCollector(); } @Override - public Supplier> supplier() { + public Supplier> supplier() { return LinkedHashMap::new; } @Override - public BiConsumer, Map.Entry> accumulator() { + public BiConsumer, Map.Entry> accumulator() { return (map, entry) -> map.put(entry.getKey(), - DefaultCustomSearchMetricConfig.of(entry.getKey(), ConfigFactory.empty().withFallback(entry.getValue()))); + DefaultCustomAggregationMetricConfig.of(entry.getKey(), ConfigFactory.empty().withFallback(entry.getValue()))); } @Override - public BinaryOperator> combiner() { + public BinaryOperator> combiner() { return (left, right) -> Stream.concat(left.entrySet().stream(), right.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override - public Function, Map> finisher() { + public Function, Map> finisher() { return map -> Collections.unmodifiableMap(new LinkedHashMap<>(map)); } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/OperatorMetricsConfig.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/OperatorMetricsConfig.java index 99386ee00c..49b1d6f726 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/OperatorMetricsConfig.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/common/config/OperatorMetricsConfig.java @@ -48,11 +48,11 @@ public interface OperatorMetricsConfig { Map getCustomMetricConfigurations(); /** - * Returns all registered custom search metrics with the key being the metric name to use. + * Returns all registered custom aggregation metrics with the key being the metric name to use. * - * @return the registered custom search metrics. + * @return the registered custom aggregation metrics. */ - Map getCustomSearchMetricConfigs(); + Map getCustomAggregationMetricConfigs(); /** * An enumeration of the known config path expressions and their associated default values for @@ -76,9 +76,9 @@ enum OperatorMetricsConfigValue implements KnownConfigValue { CUSTOM_METRICS("custom-metrics", Collections.emptyMap()), /** - * All registered custom search metrics with the key being the metric name to use. + * All registered custom aggregation metrics with the key being the metric name to use. */ - CUSTOM_SEARCH_METRICS("custom-search-metrics", Collections.emptyMap()); + CUSTOM_AGGREGATION_METRIC("custom-aggregation-metrics", Collections.emptyMap()); private final String path; private final Object defaultValue; diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsAggregationPersistence.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsAggregationPersistence.java index f31bd13faf..e8880422c7 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsAggregationPersistence.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/MongoThingsAggregationPersistence.java @@ -46,7 +46,10 @@ import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; -public class MongoThingsAggregationPersistence implements ThingsAggregationPersistence { +/** + * Persistence implementation for aggregating things. + */ +public final class MongoThingsAggregationPersistence implements ThingsAggregationPersistence { private final MongoCollection collection; @@ -59,6 +62,8 @@ public class MongoThingsAggregationPersistence implements ThingsAggregationPersi * Initializes the things search persistence with a passed in {@code persistence}. * * @param mongoClient the mongoDB persistence wrapper. + * @param mongoHintsByNamespace the mongo hints by namespace. + * @param simpleFieldMappings the simple field mappings. * @param persistenceConfig the search persistence configuration. * @param log the logger. */ @@ -110,7 +115,7 @@ public Source aggregateThings(final AggregateThingsMetrics ag .collect(Collectors.toList()); final Bson group = group(new Document(groupingBy), accumulatorFields); aggregatePipeline.add(group); - log.info("aggregatePipeline: {}", // TODO debug + log.debug("aggregatePipeline: {}", aggregatePipeline.stream().map(bson -> bson.toBsonDocument().toJson()).collect( Collectors.toList())); // Execute the aggregation pipeline diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationPredicateVisitor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationPredicateVisitor.java index 55198aae36..44705378cc 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationPredicateVisitor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationPredicateVisitor.java @@ -35,7 +35,7 @@ import org.eclipse.ditto.rql.query.criteria.visitors.PredicateVisitor; /** - * Creates Bson of a predicate. + * Creates Aggregation Bson of a predicate. */ public class CreateBsonAggregationPredicateVisitor implements PredicateVisitor> { @@ -64,12 +64,12 @@ public static CreateBsonAggregationPredicateVisitor getInstance() { } /** - * Creates a new instance of {@code CreateBsonPredicateVisitor} with additional custom placeholder resolvers. + * Creates a new instance of {@code CreateBsonAggregationPredicateVisitor} with additional custom placeholder resolvers. * * @param additionalPlaceholderResolvers the additional {@code PlaceholderResolver} to use for resolving * placeholders in RQL predicates. * @return the created instance. - * @since 2.3.0 + * @since 3.6.0 */ public static CreateBsonAggregationPredicateVisitor createInstance( final PlaceholderResolver... additionalPlaceholderResolvers) { @@ -77,12 +77,12 @@ public static CreateBsonAggregationPredicateVisitor createInstance( } /** - * Creates a new instance of {@code CreateBsonPredicateVisitor} with additional custom placeholder resolvers. + * Creates a new instance of {@code CreateBsonAggregationPredicateVisitor} with additional custom placeholder resolvers. * * @param additionalPlaceholderResolvers the additional {@code PlaceholderResolver} to use for resolving * placeholders in RQL predicates. * @return the created instance. - * @since 2.3.0 + * @since 3.6.0 */ public static CreateBsonAggregationPredicateVisitor createInstance( final Collection> additionalPlaceholderResolvers) { diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationVisitor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationVisitor.java index 040903d49b..6348060414 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationVisitor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonAggregationVisitor.java @@ -26,7 +26,7 @@ import org.eclipse.ditto.thingsearch.service.persistence.read.expression.visitors.GetFilterBsonVisitor; /** - * Creates the Bson object used for querying. + * Creates the Aggregation Bson object used for the aggregation. */ public class CreateBsonAggregationVisitor extends CreateBsonVisitor { @@ -54,7 +54,7 @@ public Bson visitField(final FilterFieldExpression fieldExpression, final Predic PlaceholderFactory.newPlaceholderResolver(TIME_PLACEHOLDER, new Object()) ) ); - return GetFilterBsonVisitor.apply(fieldExpression, predicateCreator, null); + return GetFilterBsonVisitor.apply(fieldExpression, predicateCreator, authorizationSubjectIds); } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonVisitor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonVisitor.java index 7933197866..e5ecdede62 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonVisitor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/persistence/read/criteria/visitors/CreateBsonVisitor.java @@ -42,7 +42,7 @@ public class CreateBsonVisitor implements CriteriaVisitor { private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance(); @Nullable - private final List authorizationSubjectIds; + protected final List authorizationSubjectIds; CreateBsonVisitor(@Nullable final List authorizationSubjectIds) { this.authorizationSubjectIds = authorizationSubjectIds; diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/GroupByPlaceholderResolver.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/GroupByPlaceholderResolver.java index a631a365b7..ac8c61be37 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/GroupByPlaceholderResolver.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/GroupByPlaceholderResolver.java @@ -21,7 +21,11 @@ import org.eclipse.ditto.placeholders.PlaceholderResolver; -public class GroupByPlaceholderResolver implements PlaceholderResolver> { +/** + * Placeholder resolver for group-by. + * Resolves the group-by placeholders from the given source. + */ +public final class GroupByPlaceholderResolver implements PlaceholderResolver> { public static final String PREFIX = "group-by"; private final List supportedNames; diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/InlinePlaceholderResolver.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/InlinePlaceholderResolver.java index 4376a1aaa6..d5643133c6 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/InlinePlaceholderResolver.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/placeholders/InlinePlaceholderResolver.java @@ -20,7 +20,11 @@ import org.eclipse.ditto.placeholders.PlaceholderResolver; -public class InlinePlaceholderResolver implements PlaceholderResolver> { +/** + * Placeholder resolver for inline. + * Resolves the inline placeholders from the given source. + */ +public final class InlinePlaceholderResolver implements PlaceholderResolver> { public static final String PREFIX = "inline"; private final Map source; diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActor.java index 9c3249c3b2..1bc182693f 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActor.java @@ -24,6 +24,7 @@ import org.apache.pekko.japi.pf.ReceiveBuilder; import org.apache.pekko.pattern.Patterns; import org.apache.pekko.stream.Graph; +import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.SourceShape; import org.apache.pekko.stream.SystemMaterializer; import org.apache.pekko.stream.javadsl.Flow; @@ -48,25 +49,26 @@ /** * Actor handling custom metrics aggregations {@link org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics}. */ -public class AggregationThingsMetricsActor - extends AbstractActor { +public final class AggregationThingsMetricsActor extends AbstractActor { /** * The name of this actor in the system. */ - static final String ACTOR_NAME = ThingsAggregationConstants.AGGREGATE_ACTOR_NAME; + static final String ACTOR_NAME = ThingsAggregationConstants.AGGREGATION_ACTOR_NAME; private static final String TRACING_THINGS_AGGREGATION = "aggregate_things_metrics"; private final ThreadSafeDittoLoggingAdapter log; private final ThingsAggregationPersistence thingsAggregationPersistence; + private final Materializer materializer; private AggregationThingsMetricsActor(final ThingsAggregationPersistence aggregationPersistence) { log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this); - this.thingsAggregationPersistence = aggregationPersistence; + thingsAggregationPersistence = aggregationPersistence; + materializer = SystemMaterializer.get(getContext().getSystem()).materializer(); } public static Props props(final ThingsAggregationPersistence aggregationPersistence) { - return Props.create(AggregationThingsMetricsActor.class,aggregationPersistence); + return Props.create(AggregationThingsMetricsActor.class, aggregationPersistence); } @Override @@ -79,11 +81,12 @@ public Receive createReceive() { .build(); } - private void aggregate(AggregateThingsMetrics aggregateThingsMetrics) { + private void aggregate(final AggregateThingsMetrics aggregateThingsMetrics) { log.debug("Received aggregate command for {}", aggregateThingsMetrics); final StartedTimer aggregationTimer = startNewTimer(aggregateThingsMetrics); final Source source = - DittoJsonException.wrapJsonRuntimeException(aggregateThingsMetrics, aggregateThingsMetrics.getDittoHeaders(), + DittoJsonException.wrapJsonRuntimeException(aggregateThingsMetrics, + aggregateThingsMetrics.getDittoHeaders(), (command, headers) -> thingsAggregationPersistence.aggregateThings(command)); final Source aggregationResult = processAggregationPersistenceResult(source, aggregateThingsMetrics.getDittoHeaders()) @@ -93,23 +96,22 @@ private void aggregate(AggregateThingsMetrics aggregateThingsMetrics) { final Source replySourceWithErrorHandling = aggregationResult.via(stopTimerAndHandleError(aggregationTimer, aggregateThingsMetrics)); - replySourceWithErrorHandling.runWith(Sink.foreach(elem -> { - Patterns.pipe(CompletableFuture.completedFuture(elem), getContext().dispatcher()).to(sender); - - }), SystemMaterializer.get(getContext().getSystem()).materializer()); + replySourceWithErrorHandling.runWith(Sink.foreach( + elem -> Patterns.pipe(CompletableFuture.completedFuture(elem), getContext().dispatcher()).to(sender)), + materializer); } -private Source processAggregationPersistenceResult(final Source source, - final DittoHeaders dittoHeaders) { + private Source processAggregationPersistenceResult(final Source source, + final DittoHeaders dittoHeaders) { - final Flow logAndFinishPersistenceSegmentFlow = - Flow.fromFunction(result -> { - log.withCorrelationId(dittoHeaders) - .debug("aggregation element: {}", result); - return result; - }); -return source.via(logAndFinishPersistenceSegmentFlow); -} + final Flow logAndFinishPersistenceSegmentFlow = + Flow.fromFunction(result -> { + log.withCorrelationId(dittoHeaders) + .debug("aggregation element: {}", result); + return result; + }); + return source.via(logAndFinishPersistenceSegmentFlow); + } private static StartedTimer startNewTimer(final WithDittoHeaders withDittoHeaders) { final StartedTimer startedTimer = DittoMetrics.timer(TRACING_THINGS_AGGREGATION) @@ -126,6 +128,7 @@ private static void stopTimer(final StartedTimer timer) { // it is okay if the timer was stopped. } } + private Flow stopTimerAndHandleError(final StartedTimer searchTimer, final WithDittoHeaders command) { return Flow.fromFunction( @@ -141,6 +144,7 @@ private Flow stopTimerAndHandleError(final StartedTimer .build() ); } + private DittoRuntimeException asDittoRuntimeException(final Throwable error, final WithDittoHeaders trigger) { return DittoRuntimeException.asDittoRuntimeException(error, t -> { log.error(error, "AggregateThingsMetricsActor failed to execute <{}>", trigger); diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorSearchMetricsProviderActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorSearchMetricsProviderActor.java index 10ff90fb32..552e9ca67a 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorSearchMetricsProviderActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorSearchMetricsProviderActor.java @@ -48,7 +48,7 @@ import org.eclipse.ditto.placeholders.PlaceholderResolver; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse; -import org.eclipse.ditto.thingsearch.service.common.config.CustomSearchMetricConfig; +import org.eclipse.ditto.thingsearch.service.common.config.CustomAggregationMetricConfig; import org.eclipse.ditto.thingsearch.service.common.config.OperatorMetricsConfig; import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig; import org.eclipse.ditto.thingsearch.service.persistence.read.MongoThingsAggregationPersistence; @@ -74,7 +74,7 @@ public final class OperatorSearchMetricsProviderActor extends AbstractActorWithT private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); private final ActorRef thingsAggregatorActorSingletonProxy; - private final Map customSearchMetricConfigMap; + private final Map customSearchMetricConfigMap; private final Map metricsGauges; private final Gauge customSearchMetricsGauge; private final Map>> inlinePlaceholderResolvers; @@ -82,7 +82,7 @@ public final class OperatorSearchMetricsProviderActor extends AbstractActorWithT @SuppressWarnings("unused") private OperatorSearchMetricsProviderActor(final SearchConfig searchConfig) { this.thingsAggregatorActorSingletonProxy = initializeAggregationThingsMetricsActor(searchConfig); - this.customSearchMetricConfigMap = searchConfig.getOperatorMetricsConfig().getCustomSearchMetricConfigs(); + this.customSearchMetricConfigMap = searchConfig.getOperatorMetricsConfig().getCustomAggregationMetricConfigs(); this.metricsGauges = new HashMap<>(); this.inlinePlaceholderResolvers = new HashMap<>(); this.customSearchMetricsGauge = KamonGauge.newGauge("custom-search-metrics-count-of-instruments"); @@ -135,30 +135,30 @@ private ActorRef initializeAggregationThingsMetricsActor(final SearchConfig sear private void handleGatheringMetrics(final GatherMetricsCommand gatherMetricsCommand) { final String metricName = gatherMetricsCommand.metricName(); - final CustomSearchMetricConfig config = gatherMetricsCommand.config(); + final CustomAggregationMetricConfig config = gatherMetricsCommand.config(); final DittoHeaders dittoHeaders = DittoHeaders.newBuilder() .correlationId("gather-search-metrics_" + metricName + "_" + UUID.randomUUID()) .build(); final Map namedFilters = config.getFilterConfigs().stream() - .collect(Collectors.toMap(CustomSearchMetricConfig.FilterConfig::getFilterName, - CustomSearchMetricConfig.FilterConfig::getFilter)); - AggregateThingsMetrics + .collect(Collectors.toMap(CustomAggregationMetricConfig.FilterConfig::getFilterName, + CustomAggregationMetricConfig.FilterConfig::getFilter)); + final AggregateThingsMetrics aggregateThingsMetrics = AggregateThingsMetrics.of(metricName, config.getGroupBy(), namedFilters, Set.of(config.getNamespaces().toArray(new String[0])), dittoHeaders); thingsAggregatorActorSingletonProxy.tell(aggregateThingsMetrics, getSelf()); } - private void handleAggregateThingsResponse(AggregateThingsMetricsResponse response) { - log.withCorrelationId(response).info("Received aggregate things response: {} thread: {}", //TODO debug + private void handleAggregateThingsResponse(final AggregateThingsMetricsResponse response) { + log.withCorrelationId(response).debug("Received aggregate things response: {} thread: {}", response, Thread.currentThread().getName()); final String metricName = response.getMetricName(); // record by filter name and tags response.getResult().forEach((filterName, value) -> { resolveTags(filterName, customSearchMetricConfigMap.get(metricName), response); - final CustomSearchMetricConfig customSearchMetricConfig = customSearchMetricConfigMap.get(metricName); - final TagSet tagSet = resolveTags(filterName, customSearchMetricConfig, response) + final CustomAggregationMetricConfig customAggregationMetricConfig = customSearchMetricConfigMap.get(metricName); + final TagSet tagSet = resolveTags(filterName, customAggregationMetricConfig, response) .putTag(Tag.of("filter", filterName)); recordMetric(metricName, tagSet, value); customSearchMetricsGauge.tag(Tag.of(METRIC_NAME, metricName)).set(Long.valueOf(metricsGauges.size()));; @@ -178,17 +178,17 @@ private void recordMetric(final String metricName, final TagSet tagSet, final Lo }); } - private TagSet resolveTags(final String filterName, final CustomSearchMetricConfig customSearchMetricConfig, + private TagSet resolveTags(final String filterName, final CustomAggregationMetricConfig customAggregationMetricConfig, final AggregateThingsMetricsResponse response) { - return TagSet.ofTagCollection(customSearchMetricConfig.getTags().entrySet().stream().map(tagEntry-> { + return TagSet.ofTagCollection(customAggregationMetricConfig.getTags().entrySet().stream().map(tagEntry-> { if (!isPlaceHolder(tagEntry.getValue())) { return Tag.of(tagEntry.getKey(), tagEntry.getValue()); } else { final ExpressionResolver expressionResolver = PlaceholderFactory.newExpressionResolver(List.of( - new GroupByPlaceholderResolver(customSearchMetricConfig.getGroupBy().keySet(), response.getGroupedBy()) - , inlinePlaceholderResolvers.get(new FilterIdentifier(customSearchMetricConfig.getMetricName(), filterName)))); + new GroupByPlaceholderResolver(customAggregationMetricConfig.getGroupBy().keySet(), response.getGroupedBy()) + , inlinePlaceholderResolvers.get(new FilterIdentifier(customAggregationMetricConfig.getMetricName(), filterName)))); return expressionResolver.resolve(tagEntry.getValue()) .findFirst() .map(resolvedValue -> Tag.of(tagEntry.getKey(), resolvedValue)) @@ -228,15 +228,15 @@ private void handleCleanupUnusedMetrics(final CleanupUnusedMetricsCommand cleanu private Duration getMaxConfiguredScrapeInterval(final OperatorMetricsConfig operatorMetricsConfig) { return Stream.concat(Stream.of(operatorMetricsConfig.getScrapeInterval()), - operatorMetricsConfig.getCustomSearchMetricConfigs().values().stream() - .map(CustomSearchMetricConfig::getScrapeInterval) + operatorMetricsConfig.getCustomAggregationMetricConfigs().values().stream() + .map(CustomAggregationMetricConfig::getScrapeInterval) .filter(Optional::isPresent) .map(Optional::get)) .max(Comparator.naturalOrder()) .orElse(operatorMetricsConfig.getScrapeInterval()); } - private void initializeCustomMetricTimer(final String metricName, final CustomSearchMetricConfig config, final Duration scrapeInterval ) { + private void initializeCustomMetricTimer(final String metricName, final CustomAggregationMetricConfig config, final Duration scrapeInterval ) { if (!config.isEnabled()) { log.info("Custom search metric Gauge for metric <{}> is DISABLED. Skipping init.", metricName); return; @@ -263,7 +263,7 @@ private boolean isPlaceHolder(final String value) { return value.startsWith("{{") && value.endsWith("}}"); } - private static class TimestampedGauge { + private final static class TimestampedGauge { private final Gauge gauge; private final Long timestamp; @@ -301,14 +301,14 @@ public int hashCode() { @Override public String toString() { - return "GageWithTimestamp{" + + return "TimestampedGauge{" + "gauge=" + gauge + ", timestamp=" + timestamp + '}'; } } - private record GatherMetricsCommand(String metricName, CustomSearchMetricConfig config) {} + private record GatherMetricsCommand(String metricName, CustomAggregationMetricConfig config) {} private record CleanupUnusedMetricsCommand(OperatorMetricsConfig config) {} diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/ThingsAggregationConstants.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/ThingsAggregationConstants.java index 66853ed044..68bfba9eb0 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/ThingsAggregationConstants.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/ThingsAggregationConstants.java @@ -35,7 +35,7 @@ public final class ThingsAggregationConstants { /** * Name of the Aggregate actor */ - public static final String AGGREGATE_ACTOR_NAME = "aggregateThingsMetrics"; + public static final String AGGREGATION_ACTOR_NAME = "aggregationThingsMetrics"; /* * Inhibit instantiation of this utility class. diff --git a/thingsearch/service/src/main/resources/search-dev.conf b/thingsearch/service/src/main/resources/search-dev.conf index 9d927ae1a6..352a165f0d 100755 --- a/thingsearch/service/src/main/resources/search-dev.conf +++ b/thingsearch/service/src/main/resources/search-dev.conf @@ -25,7 +25,7 @@ ditto { } } } - custom-search-metrics { + custom-aggregation-metrics { online_status { enabled = false scrape-interval = 1m # override scrape interval, run every 20 minute diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfigTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfigTest.java similarity index 79% rename from thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfigTest.java rename to thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfigTest.java index 6c69a5d132..dc39d9ab61 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomSearchMetricConfigTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/common/config/DefaultCustomAggregationMetricConfigTest.java @@ -28,7 +28,7 @@ import nl.jqno.equalsverifier.EqualsVerifier; -public class DefaultCustomSearchMetricConfigTest { +public class DefaultCustomAggregationMetricConfigTest { private static Config config; private static Config customSearchMetricTestConfig; @@ -44,35 +44,35 @@ public static void initTestFixture() { @Test public void testHashCodeAndEquals() { - EqualsVerifier.forClass(DefaultCustomSearchMetricConfig.class) + EqualsVerifier.forClass(DefaultCustomAggregationMetricConfig.class) .usingGetClass() .verify(); } @Test public void gettersReturnConfiguredValues() { - final DefaultCustomSearchMetricConfig underTest = - DefaultCustomSearchMetricConfig.of("online_status", + final DefaultCustomAggregationMetricConfig underTest = + DefaultCustomAggregationMetricConfig.of("online_status", customSearchMetricTestConfig.getConfig("online_status")); softly.assertThat(underTest.isEnabled()) - .as(CustomSearchMetricConfig.CustomSearchMetricConfigValue.ENABLED.getConfigPath()) + .as(CustomAggregationMetricConfig.CustomSearchMetricConfigValue.ENABLED.getConfigPath()) .isEqualTo(true); softly.assertThat(underTest.getScrapeInterval()) - .as(CustomSearchMetricConfig.CustomSearchMetricConfigValue.SCRAPE_INTERVAL.getConfigPath()) + .as(CustomAggregationMetricConfig.CustomSearchMetricConfigValue.SCRAPE_INTERVAL.getConfigPath()) .isEqualTo(Optional.ofNullable(customSearchMetricTestConfig.getDuration( "online_status.scrape-interval"))); softly.assertThat(underTest.getNamespaces()) - .as(CustomSearchMetricConfig.CustomSearchMetricConfigValue.NAMESPACES.getConfigPath()) + .as(CustomAggregationMetricConfig.CustomSearchMetricConfigValue.NAMESPACES.getConfigPath()) .containsExactlyInAnyOrder("org.eclipse.ditto.test.1", "org.eclipse.ditto.test.2"); softly.assertThat(underTest.getTags()) - .as(CustomSearchMetricConfig.CustomSearchMetricConfigValue.TAGS.getConfigPath()) + .as(CustomAggregationMetricConfig.CustomSearchMetricConfigValue.TAGS.getConfigPath()) .containsExactlyInAnyOrderEntriesOf( customSearchMetricTestConfig.getObject("online_status.tags") .unwrapped().entrySet().stream().collect( Collectors.toMap(Map.Entry::getKey, o -> o.getValue().toString()))); softly.assertThat(underTest.getFilterConfigs()) - .as(CustomSearchMetricConfig.CustomSearchMetricConfigValue.FILTERS.getConfigPath()) + .as(CustomAggregationMetricConfig.CustomSearchMetricConfigValue.FILTERS.getConfigPath()) .hasSize(2); softly.assertThat(underTest.getFilterConfigs().get(0).getFilterName()) .as("filter name") diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActorTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActorTest.java index ecc475a64d..ae62c28509 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActorTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/starter/actors/AggregationThingsMetricsActorTest.java @@ -31,16 +31,13 @@ import org.apache.pekko.testkit.javadsl.TestKit; import org.bson.Document; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource; -import org.eclipse.ditto.internal.utils.tracing.config.DefaultTracingConfig; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetrics; import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse; import org.eclipse.ditto.thingsearch.service.persistence.read.ThingsAggregationPersistence; import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -101,7 +98,7 @@ public void testHandleAggregateThingsMetrics() { .set("online", 6) .set("offline", 0) .build(); - AggregateThingsMetricsResponse + final AggregateThingsMetricsResponse expectedResponse = AggregateThingsMetricsResponse.of(mongoAggregationResult, metrics); expectMsg(expectedResponse);