Skip to content

Commit

Permalink
ffix review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandar Stanchev <[email protected]>
  • Loading branch information
alstanchev committed Sep 9, 2024
1 parent 81f185b commit ef4fa29
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,23 +581,23 @@ In order to add custom metrics via System properties, the following example show
Ditto will perform a [count things operation](basic-search.html#search-count-queries) each `5m` (5 minutes), providing
a gauge named `all_produced_and_not_installed_devices` with the count of the query, adding the tag `company="acme-corp"`.

In Prometheus format this would look like:
In Prometheus format, this would look like:
```
all_produced_and_not_installed_devices{company="acme-corp"} 42.0
```
### Operator defined custom search based metrics
Starting with Ditto 3.6.0, the "custom metrics" functionality is extended to support search-based metrics.
This is configured via the [search](architecture-services-things-search.html) service configuration and builds on the
[search things](basic-search.html#search-queries) functionality.

### Operator defined custom aggregation based metrics
Starting with Ditto 3.6.0, the "custom metrics" functionality is extended to support custom aggregation metrics.
This is configured via the [search](architecture-services-things-search.html) service configuration.

> :warning: **Abstain of defining grouping by fields that have a high cardinality, as this will lead to a high number of metrics and
may overload the Prometheus server!**

Now you can augment the statistic about "Things" managed in Ditto
fulfilling a certain condition with tags with either predefined values,
values retrieved from the things or values which are defined based on the matching filter.
This is fulfill by using hardcoded values or placeholders in the tags configuration.
The supported placeholder types are inline and group-by placeholders.
This is fulfilled by using hardcoded values or placeholders in the tags configuration.
The supported placeholder types are `inline` and `group-by` placeholders.
[Function expressions](basic-placeholders.html#function-expressions) are also supported
to manipulate the values of the placeholders before they are used in the tags.

Expand All @@ -612,34 +612,33 @@ ditto {
custom-metrics {
...
}
custom-search-metrics {
custom-aggregate-metrics {
online_status {
enabled = true
scrape-interval = 1m # override scrape interval, run every 20 minute
namespaces = [
"org.eclipse.ditto"
]
group-by:{
group-by {
"location" = "attributes/Info/location"
"isGateway" = "attributes/Info/gateway"
}
tags: {
tags {
"online" = "{{ inline:online_placeholder }}"
"health" = "{{ inline:health }}"
"hardcoded-tag" = "hardcoded_value"
"location" = "{{ group-by:location | fn:default('missing location') }}"
"altitude" = "{{ group-by:isGateway }}"
}
filters = {
online_filter = {
filter = "gt(features/ConnectionStatus/properties/status/readyUntil/,time:now)"
inline-placeholder-values = {
filters {
online_filter {
filter = "gt(features/ConnectionStatus/properties/status/readyUntil,time:now)"
inline-placeholder-values {
"online_placeholder" = true
"health" = "good"
}
}
offline_filter = {
filter = "lt(features/ConnectionStatus/properties/status/readyUntil/,time:now)"
offline_filter {
filter = "lt(features/ConnectionStatus/properties/status/readyUntil,time:now)"
inline-placeholder-values = {
"online_placeholder" = false
"health" = "bad"
Expand Down Expand Up @@ -671,12 +670,12 @@ To add custom metrics via System properties, the following example shows how the
```

Ditto will perform a [search things operation](basic-search.html#search-queries) every `20m` (20 minutes), providing
Ditto will perform an [aggregation operation](https://www.mongodb.com/docs/manual/aggregation/) over the search db collection every `20m` (20 minutes), providing
a gauge named `online_devices` with the value of devices that match the filter.
The tags `online` and `location` will be added.
Their values will be resolved from the placeholders `{{online_placeholder}}` and `{{attributes/Info/location}}` respectively.

In Prometheus format this would look like:
In Prometheus format, this would look like:
```
online_status{location="Berlin",online="false"} 6.0
online_status{location="Immenstaad",online="true"} 8.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,41 @@

package org.eclipse.ditto.thingsearch.model.signals.commands.query;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;

/**
* A command to aggregate metrics for things.
*/
public final class AggregateThingsMetrics extends AbstractCommand<AggregateThingsMetrics> {

private static final String NAME = "things-metrics";
/**
* Aggregation resource type.
*/
static final String RESOURCE_TYPE = "aggregation";
/**
* Type prefix of aggregation command.
*/
private static final String TYPE_PREFIX = RESOURCE_TYPE + "." + TYPE_QUALIFIER + ":";
/**
* The name of this command.
*/
private static final String TYPE = TYPE_PREFIX + NAME;

public class AggregateThingsMetrics implements WithDittoHeaders {

private final String metricName;
private final Map<String, String> groupingBy;
Expand All @@ -31,13 +58,24 @@ public class AggregateThingsMetrics implements WithDittoHeaders {

private AggregateThingsMetrics(final String metricName, final Map<String, String> groupingBy, final Map<String, String> namedFilters, final Set<String> namespaces,
final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders, null);
this.metricName = metricName;
this.groupingBy = groupingBy;
this.namedFilters = namedFilters;
this.namespaces = namespaces;
this.groupingBy = Collections.unmodifiableMap(groupingBy);
this.namedFilters = Collections.unmodifiableMap(namedFilters);
this.namespaces = Collections.unmodifiableSet(namespaces);
this.dittoHeaders = dittoHeaders;
}

/**
* Creates a new {@link AggregateThingsMetrics} instance.
*
* @param metricName the name of the metric to aggregate.
* @param groupingBy the fields we want our metric aggregation to be grouped by.
* @param namedFilters the named filters to use for the aggregation.
* @param namespaces the namespaces the metric should be executed for.
* @param dittoHeaders the headers to use for the command.
* @return a new {@link AggregateThingsMetrics} instance.
*/
public static AggregateThingsMetrics of(final String metricName, final Map<String, String> groupingBy, final Map<String, String> namedFilters, final Set<String> namespaces,
final DittoHeaders dittoHeaders) {
return new AggregateThingsMetrics(metricName, groupingBy, namedFilters, namespaces, dittoHeaders);
Expand All @@ -56,14 +94,53 @@ public Map<String, String> getNamedFilters() {
}

@Override
public DittoHeaders getDittoHeaders() {
return dittoHeaders;
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set("metric-name", metricName, predicate);
final JsonObjectBuilder groupingBy = JsonFactory.newObjectBuilder();
this.groupingBy.forEach(groupingBy::set);
jsonObjectBuilder.set("grouping-by", groupingBy.build(), predicate);
final JsonObjectBuilder jsonFields = JsonFactory.newObjectBuilder();
namedFilters.forEach(jsonFields::set);
jsonObjectBuilder.set("named-filters",jsonFields.build(), predicate);
final JsonArray array =
JsonFactory.newArrayBuilder(namespaces.stream().map(JsonFactory::newValue).collect(
Collectors.toSet())).build();
jsonObjectBuilder.set("namespaces", array, predicate);

}

public Set<String> getNamespaces() {
return namespaces;
}

@Override
public String getTypePrefix() {
return TYPE_PREFIX;
}

@Override
public Category getCategory() {
return Category.STREAM;
}

@Override
public AggregateThingsMetrics setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(getMetricName(), getGroupingBy(), getNamedFilters(), getNamespaces(), dittoHeaders);
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return RESOURCE_TYPE;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,34 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommandResponse;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;

public class AggregateThingsMetricsResponse implements WithDittoHeaders {
/**
* A response to an {@link AggregateThingsMetrics} command.
* Contains the original aggregation as returned by the database as well as fields initialized by that aggregation
* (grouped by and result)
* The result contains the returned values for each filter defined in a metric.
* The groupedBy contains the values that the result was grouped by.
*/
public final class AggregateThingsMetricsResponse extends AbstractCommandResponse<AggregateThingsMetricsResponse> {

private static final String NAME = "things-metrics";
private static final String AGGREGATION = "aggregation";
private static final String TYPE_PREFIX = AGGREGATION + "." + TYPE_QUALIFIER + ":";
private static final String RESOURCE_TYPE = AGGREGATION + "." + TYPE_QUALIFIER;

private final Map<String, String> groupedBy;

Expand All @@ -36,10 +54,12 @@ public class AggregateThingsMetricsResponse implements WithDittoHeaders {
private final DittoHeaders dittoHeaders;
private final JsonObject aggregation;
private final String metricName;

private AggregateThingsMetricsResponse(final JsonObject aggregation, final DittoHeaders dittoHeaders,
final String metricName, final Set<String> filterNames) {
super(TYPE_PREFIX + NAME, HttpStatus.OK, dittoHeaders);
this.aggregation = aggregation;
this.dittoHeaders = dittoHeaders;
this.dittoHeaders = DittoHeaders.of(dittoHeaders);
this.metricName = metricName;
groupedBy = aggregation.getValue("_id")
.map(json -> {
Expand All @@ -53,44 +73,96 @@ private AggregateThingsMetricsResponse(final JsonObject aggregation, final Ditto
}
)
.orElse(new HashMap<>());
result = filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key ->
key, key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()));
result = extractFiltersResults(aggregation, filterNames);
// value should always be a number as it will be used for the gauge value in the metrics
}

/**
* Creates a new {@link AggregateThingsMetricsResponse} instance.
* @param aggregation the aggregation result.
* @param aggregateThingsMetrics the command that was executed.
* @return the AggregateThingsMetricsResponse instance.
*/
public static AggregateThingsMetricsResponse of(final JsonObject aggregation,
final AggregateThingsMetrics aggregateThingsMetrics) {
return of(aggregation, aggregateThingsMetrics.getDittoHeaders(), aggregateThingsMetrics.getMetricName(), aggregateThingsMetrics.getNamedFilters().keySet());
}

/**
* Creates a new {@link AggregateThingsMetricsResponse} instance.
* @param aggregation the aggregation result.
* @param dittoHeaders the headers to use for the response.
* @param metricName the name of the metric.
* @param filterNames the names of the filters.
* @return the AggregateThingsMetricsResponse instance.
*/
public static AggregateThingsMetricsResponse of(final JsonObject aggregation, final DittoHeaders dittoHeaders,
final String metricName, final Set<String> filterNames) {
return new AggregateThingsMetricsResponse(aggregation, dittoHeaders, metricName, filterNames);
}

@Override
public DittoHeaders getDittoHeaders() {
return dittoHeaders;
public AggregateThingsMetricsResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
return AggregateThingsMetricsResponse.of(aggregation, dittoHeaders, metricName, result.keySet());
}

@Override
public JsonObject toJson() {
return super.toJson();
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return RESOURCE_TYPE;
}


@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set("metric-name", metricName, predicate);
final JsonObjectBuilder groupedBy = JsonFactory.newObjectBuilder();
this.groupedBy.forEach(groupedBy::set);
jsonObjectBuilder.set("grouped-by", groupedBy.build(), predicate);
final JsonObjectBuilder jsonFields = JsonFactory.newObjectBuilder();
getResult().forEach(jsonFields::set);
jsonObjectBuilder.set("result",jsonFields.build(), predicate);
jsonObjectBuilder.set("aggregation", aggregation, predicate);

}

/**
* Returns the grouping by values by witch the result was grouped.
* @return the groupedBy of the response.
*/
public Map<String, String> getGroupedBy() {
return groupedBy;
}

/**
* Returns the values for each filter defined in the metric
*
* @return the result of the aggregation.
*/
public Map<String, Long> getResult() {
return result;
}

/**
* Returns the metric name.
* @return the metric name.
*/
public String getMetricName() {
return metricName;
}

private Supplier<RuntimeException> getJsonMissingFieldExceptionSupplier(String field) {
return () -> JsonMissingFieldException.newBuilder().fieldName(field).build();
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand All @@ -114,8 +186,19 @@ public String toString() {
"groupedBy=" + groupedBy +
", result=" + result +
", dittoHeaders=" + dittoHeaders +
", aggregation=" + aggregation +
", " + AGGREGATION + "=" + aggregation +
", metricName='" + metricName + '\'' +
'}';
}

private Map<String, Long> extractFiltersResults(final JsonObject aggregation, final Set<String> filterNames) {
return filterNames.stream().filter(aggregation::contains).collect(Collectors.toMap(key ->
key, key -> aggregation.getValue(JsonPointer.of(key))
.orElseThrow(getJsonMissingFieldExceptionSupplier(key))
.asLong()));
}

private Supplier<RuntimeException> getJsonMissingFieldExceptionSupplier(final String field) {
return () -> JsonMissingFieldException.newBuilder().fieldName(field).build();
}
}
Loading

0 comments on commit ef4fa29

Please sign in to comment.