Skip to content

Commit

Permalink
[#3520] specific metrics for unknown messages
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Claerhout <[email protected]>
  • Loading branch information
BobClaerhout committed Jul 25, 2023
1 parent 4a8911a commit d8b7bda
Show file tree
Hide file tree
Showing 43 changed files with 46,029 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.hono.client.telemetry.TelemetrySender;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.ValidityBasedTrustOptions;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.util.ServiceBaseUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AdapterDisabledException;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
Expand Down Expand Up @@ -69,6 +69,7 @@
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
import org.eclipse.hono.service.metric.MetricsTags.EndpointType;
Expand Down Expand Up @@ -1325,7 +1326,8 @@ private Future<Void> doUploadMessage(
ProcessingOutcome.from(t),
context.isRemotelySettled() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE,
context.getPayloadSize(),
context.getTimer());
context.getTimer(),
MetricsTags.Reason.from(t));
return Future.failedFuture(t);

}).map(ok -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ public void testUploadTelemetryMessageFailsForDisabledAdapter(final VertxTestCon
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -1000,7 +1001,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand All @@ -1027,7 +1029,8 @@ public void testMessageLimitExceededForAnEventMessage(final VertxTestContext ctx
eq(ProcessingOutcome.UNPROCESSABLE),
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.service.metric.MetricsTags.Direction;
Expand Down Expand Up @@ -413,7 +415,8 @@ protected final Future<Void> doUploadMessage(
qos,
payload.length(),
getTtdStatus(context),
context.getTimer());
context.getTimer(),
MetricsTags.Reason.from(t));
TracingHelper.logError(currentSpan, t);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
return Future.failedFuture(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public void testUploadEventFailsForRejectedOutcome(final VertxTestContext ctx) {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
});
ctx.completeNow();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.californium.core.coap.OptionSet;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.CommandContext;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
final var resource = givenAResource(adapter);
// which is disabled for tenant "my-tenant"
when(adapter.isAdapterEnabled(any(TenantObject.class)))
.thenReturn(Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN)));
.thenReturn(Future.failedFuture(new AdapterDisabledException("my-tenant")));

// WHEN a device that belongs to "my-tenant" publishes a telemetry message
final Buffer payload = Buffer.buffer("some payload");
Expand All @@ -118,7 +119,8 @@ public void testUploadTelemetryFailsForDisabledTenant(final VertxTestContext ctx
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.TENANT_DISABLED_FOR_ADAPTER));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -371,7 +373,8 @@ public void testMessageLimitExceededForATelemetryMessage(final VertxTestContext
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
});
ctx.completeNow();
}));
Expand Down Expand Up @@ -560,7 +563,8 @@ public void testUploadTelemetryReleasesCommandForFailedDownstreamSender(final Ve
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.COMMAND),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
// and the command delivery is released
verify(commandContext).release(any(Throwable.class));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.QoS;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
import org.eclipse.hono.util.TenantObject;
Expand Down Expand Up @@ -85,7 +86,7 @@ public abstract class AbstractVertxBasedHttpProtocolAdapter<T extends HttpProtoc

private static final String KEY_MATCH_ALL_ROUTE_APPLIED = "matchAllRouteApplied";

private HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
protected HttpAdapterMetrics metrics = HttpAdapterMetrics.NOOP;
private HttpServer server;
private HttpServer insecureServer;

Expand Down Expand Up @@ -203,7 +204,15 @@ public final void doStart(final Promise<Void> startPromise) {
.onComplete(startPromise);
}

private Sample getMicrometerSample(final RoutingContext ctx) {
/**
* Gets the timer used to track the processing of a telemetry message.
*
* @param ctx The routing context to extract the sample from.
* @return The sample or {@code null} if the context does not
* contain a sample.
* @throws NullPointerException if ctx is {@code null}.
*/
protected Sample getMicrometerSample(final RoutingContext ctx) {
return ctx.get(KEY_MICROMETER_SAMPLE);
}

Expand Down Expand Up @@ -778,7 +787,8 @@ private void doUploadMessage(
qos,
payloadSize,
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.Reason.from(t));
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
return Future.failedFuture(t);
Expand Down Expand Up @@ -1295,9 +1305,16 @@ public final void uploadCommandResponseMessage(
});
}

private static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final org.eclipse.hono.util.QoS requestedQos) {
/**
* Get the QoS based on the endpoint and the requested QoS.
*
* @param endpoint The endpoint the message was sent to.
* @param requestedQos The QoS requested by the sender.
* @return The resulting QoS.
*/
protected static MetricsTags.QoS getQoSLevel(
final EndpointType endpoint,
final QoS requestedQos) {

if (endpoint == EndpointType.EVENT) {
return MetricsTags.QoS.AT_LEAST_ONCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ public void testUploadTelemetryWithTtdClosesCommandConsumerIfSendingFails() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.UNKNOWN));
// and the command consumer is closed
verify(commandConsumer).close(eq(false), any());
}
Expand Down Expand Up @@ -786,7 +787,8 @@ public void testMessageLimitExceededForATelemetryMessage() {
eq(MetricsTags.QoS.AT_MOST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down Expand Up @@ -825,7 +827,8 @@ public void testMessageLimitExceededForAnEventMessage() {
eq(MetricsTags.QoS.AT_LEAST_ONCE),
eq(payload.length()),
eq(TtdStatus.NONE),
any());
any(),
eq(MetricsTags.Reason.MESSAGE_LIMIT_EXCEEDED));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.http.HttpServerSpanHelper;
import org.eclipse.hono.service.http.HttpUtils;
import org.eclipse.hono.service.metric.MetricsTags;
Expand Down Expand Up @@ -236,6 +237,11 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {

final var gatewayDevice = ctx.getAuthenticatedDevice();
TracingHelper.setDeviceTags(currentSpan, gatewayDevice.getTenantId(), gatewayDevice.getDeviceId());

final Future<TenantObject> tenantTracker = getTenantConfiguration(gatewayDevice.getTenantId(), currentSpan.context());
final MetricsTags.EndpointType endpoint = MetricsTags.EndpointType.fromString(ctx.getRequestedResource().getEndpoint());
final MetricsTags.QoS qos = getQoSLevel(endpoint, ctx.getRequestedQos());

try {
final LoraMessage loraMessage = provider.getMessage(ctx.getRoutingContext());
final LoraMessageType type = loraMessage.getType();
Expand Down Expand Up @@ -263,18 +269,41 @@ void handleProviderRoute(final HttpContext ctx, final LoraProvider provider) {
registerCommandConsumerIfNeeded(provider, gatewayDevice, currentSpan.context());
break;
default:

LOG.debug("discarding message of unsupported type [tenant: {}, device-id: {}, type: {}]",
gatewayDevice.getTenantId(), deviceId, type);
gatewayDevice.getTenantId(), deviceId, type);
currentSpan.log("discarding message of unsupported type");
currentSpan.finish();
// discard the message but return 202 to not cause errors on the LoRa provider side
handle202(ctx.getRoutingContext());

final MetricsTags.Reason reason = type == LoraMessageType.UNKNOWN ? MetricsTags.Reason.UNKNOWN_TYPE : MetricsTags.Reason.UNSUPPORTED_TYPE;
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantTracker.result(),
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
reason);
}
} catch (final LoraProviderMalformedPayloadException e) {
LOG.debug("error processing request from provider [name: {}]", provider.getProviderName(), e);
TracingHelper.logError(currentSpan, "error processing request", e);
currentSpan.finish();
handle400(ctx.getRoutingContext(), ERROR_MSG_INVALID_PAYLOAD);
metrics.reportTelemetry(
endpoint,
gatewayDevice.getTenantId(),
tenantTracker.result(),
MetricsTags.ProcessingOutcome.UNPROCESSABLE,
qos,
ctx.getRoutingContext().body().buffer().length(),
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()),
MetricsTags.Reason.BAD_SYNTAX);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/


package org.eclipse.hono.adapter.lora;

import io.vertx.core.buffer.Buffer;


/**
* A Lora message that contains unknown data sent from an end-device to a Network Server.
*
*/
public class UnknownLoraMessage implements LoraMessage {

/**
* {@inheritDoc}
*/
@Override
public final byte[] getDevEUI() {
return new byte[0];
}

/**
* {@inheritDoc}
*/
@Override
public final String getDevEUIAsString() {
return "";
}

/**
* {@inheritDoc}
*/
@Override
public final LoraMessageType getType() {
return LoraMessageType.UNKNOWN;
}

/**
* {@inheritDoc}
*/
@Override
public final Buffer getPayload() {
return Buffer.buffer();
}
}
Loading

0 comments on commit d8b7bda

Please sign in to comment.