diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java index aa775d56..5d9898e9 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGateway.java @@ -19,7 +19,6 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -32,10 +31,8 @@ import org.apache.qpid.proton.message.Message; import org.eclipse.hono.auth.Device; import org.eclipse.hono.client.ClientErrorException; -import org.eclipse.hono.client.HonoConnection; import org.eclipse.hono.client.MessageConsumer; import org.eclipse.hono.client.ServiceInvocationException; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.config.ClientConfigProperties; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.CommandResponseMessage; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.DownstreamMessage; @@ -84,7 +81,7 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle { private final ClientConfigProperties amqpClientConfig; private final MqttProtocolGatewayConfig mqttGatewayConfig; - private final Map clientFactoryPerTenant = new HashMap<>(); + private final MultiTenantConnectionManager tenantConnectionManager; private MqttServer server; @@ -104,11 +101,30 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle { */ public AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig, final MqttProtocolGatewayConfig mqttGatewayConfig) { + + this(amqpClientConfig, mqttGatewayConfig, new MultiTenantConnectionManagerImpl()); + } + + /** + * This constructor is only visible for testing purposes. + * + * @param amqpClientConfig The AMQP client configuration. + * @param mqttGatewayConfig The configuration of the protocol gateway. + * @param tenantConnectionManager The tenant connection manager to be used. + * @throws NullPointerException if any of the parameters is {@code null}. + * @see AbstractMqttProtocolGateway#AbstractMqttProtocolGateway(ClientConfigProperties, MqttProtocolGatewayConfig) + */ + AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig, + final MqttProtocolGatewayConfig mqttGatewayConfig, + final MultiTenantConnectionManager tenantConnectionManager) { + Objects.requireNonNull(amqpClientConfig); Objects.requireNonNull(mqttGatewayConfig); + Objects.requireNonNull(tenantConnectionManager); this.amqpClientConfig = amqpClientConfig; this.mqttGatewayConfig = mqttGatewayConfig; + this.tenantConnectionManager = tenantConnectionManager; } /** @@ -355,7 +371,11 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) { : Future.succeededFuture(authenticateDevice)); authAttempt - .compose(this::connectGatewayToAmqpAdapter) + .compose(authenticatedDevice -> { + final String tenantId = authenticatedDevice.getTenantId(); + return getTenantConfig(tenantId) + .compose(config -> connectGatewayToAmqpAdapter(tenantId, config, endpoint)); + }) .onComplete(result -> { if (result.succeeded()) { registerHandlers(endpoint, authAttempt.result()); @@ -414,12 +434,10 @@ private Future authenticateWithUsernameAndPassword(final MqttEndpoint en } } - private Future connectGatewayToAmqpAdapter(final Device authenticatedDevice) { - - final String tenantId = authenticatedDevice.getTenantId(); + private Future getTenantConfig(final String tenantId) { if (amqpClientConfig.getUsername() != null && amqpClientConfig.getPassword() != null) { - return connectGatewayToAmqpAdapter(tenantId, amqpClientConfig); + return Future.succeededFuture(amqpClientConfig); } else { return provideGatewayCredentials(tenantId) .compose(credentials -> { @@ -427,43 +445,19 @@ private Future connectGatewayToAmqpAdapter(final Device authenticatedDevic tenantConfig.setUsername(credentials.getUsername()); tenantConfig.setPassword(credentials.getPassword()); - return connectGatewayToAmqpAdapter(tenantId, tenantConfig); + return Future.succeededFuture(tenantConfig); }); } } - private Future connectGatewayToAmqpAdapter(final String tenantId, final ClientConfigProperties clientConfig) { - - final AmqpAdapterClientFactory amqpAdapterClientFactory = clientFactoryPerTenant.get(tenantId); - if (amqpAdapterClientFactory != null) { - return amqpAdapterClientFactory.isConnected(clientConfig.getConnectTimeout()); - } else { - - final AmqpAdapterClientFactory factory = createTenantClientFactory(tenantId, clientConfig); - clientFactoryPerTenant.put(tenantId, factory); - - return factory.connect() - .map(con -> { - log.debug("Connected to AMQP adapter"); - return null; - }); - } - } + private Future connectGatewayToAmqpAdapter(final String tenantId, + final ClientConfigProperties clientConfig, + final MqttEndpoint endpoint) { + return tenantConnectionManager.connect(tenantId, vertx, clientConfig) + .onFailure(e -> log.info("Failed to connect to Hono [tenant-id: {}, username: {}]", tenantId, + clientConfig.getUsername())) + .compose(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint)); - /** - * Returns a new {@link AmqpAdapterClientFactory} with a new AMQP connection for the given tenant. - *

- * This method is only visible for testing purposes. - * - * @param tenantId The tenant to be connected. - * @param clientConfig The client properties to use for the connection. - * @return The factory. Note that the underlying AMQP connection will not be established until - * {@link AmqpAdapterClientFactory#connect()} is invoked. - */ - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { - final HonoConnection connection = HonoConnection.newConnection(vertx, clientConfig); - return AmqpAdapterClientFactory.create(connection, tenantId); } private void registerHandlers(final MqttEndpoint endpoint, final Device authenticatedDevice) { @@ -473,22 +467,29 @@ private void registerHandlers(final MqttEndpoint endpoint, final Device authenti MqttDownstreamContext.fromPublishPacket(message, endpoint, authenticatedDevice))); final CommandSubscriptionsManager cmdSubscriptionsManager = createCommandHandler(vertx); - endpoint.closeHandler(v -> close(endpoint, cmdSubscriptionsManager)); + endpoint.closeHandler(v -> cleanupConnections(endpoint, cmdSubscriptionsManager, authenticatedDevice)); endpoint.publishAcknowledgeHandler(cmdSubscriptionsManager::handlePubAck); endpoint.subscribeHandler(msg -> onSubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager)); endpoint.unsubscribeHandler(msg -> onUnsubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager)); } - private void close(final MqttEndpoint endpoint, final CommandSubscriptionsManager cmdSubscriptionsManager) { + private void cleanupConnections(final MqttEndpoint endpoint, + final CommandSubscriptionsManager cmdSubscriptionsManager, + final Device authenticatedDevice) { + + log.info("closing connection to device {}", authenticatedDevice.toString()); + onDeviceConnectionClose(endpoint); cmdSubscriptionsManager.removeAllSubscriptions(); - if (endpoint.isConnected()) { - log.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier()); - endpoint.close(); - } else { - log.trace("connection to client is already closed"); - } + + final String tenantId = authenticatedDevice.getTenantId(); + tenantConnectionManager.closeEndpoint(tenantId, endpoint) + .onSuccess(amqpLinkClosed -> { + if (amqpLinkClosed) { + log.info("closed AMQP connection for tenant [{}]", tenantId); + } + }); } /** @@ -585,8 +586,7 @@ private void onUploadFailure(final MqttDownstreamContext ctx, final Throwable ca } if (ctx.deviceEndpoint().isConnected()) { - log.info("closing connection to device {}", ctx.authenticatedDevice().toString()); - ctx.deviceEndpoint().close(); + ctx.deviceEndpoint().close(); // cleanupConnections() will be called by close handler } } @@ -594,7 +594,7 @@ private Future sendTelemetry(final String tenantId, final String final Map properties, final byte[] payload, final String contentType, final boolean waitForOutcome) { - return clientFactoryPerTenant.get(tenantId).getOrCreateTelemetrySender() + return tenantConnectionManager.getOrCreateTelemetrySender(tenantId) .compose(sender -> { if (waitForOutcome) { log.trace( @@ -616,7 +616,7 @@ private Future sendEvent(final String tenantId, final String dev log.trace("sending event message [tenantId: {}, deviceId: {}, contentType: {}, properties: {}]", tenantId, deviceId, contentType, properties); - return clientFactoryPerTenant.get(tenantId).getOrCreateEventSender() + return tenantConnectionManager.getOrCreateEventSender(tenantId) .compose(sender -> sender.send(deviceId, payload, contentType, properties)); } @@ -628,7 +628,7 @@ private Future sendCommandResponse(final String tenantId, final "sending command response [tenantId: {}, deviceId: {}, targetAddress: {}, correlationId: {}, status: {}, contentType: {}, properties: {}]", tenantId, deviceId, targetAddress, correlationId, status, contentType, properties); - return clientFactoryPerTenant.get(tenantId).getOrCreateCommandResponseSender() + return tenantConnectionManager.getOrCreateCommandResponseSender(tenantId) .compose(sender -> sender.sendCommandResponse(deviceId, targetAddress, correlationId, status, payload, contentType, properties)); } @@ -733,7 +733,9 @@ private void onUnsubscribe(final MqttEndpoint endpoint, final Device authenticat private Future createCommandConsumer(final MqttEndpoint endpoint, final CommandSubscriptionsManager cmdSubscriptionsManager, final Device authenticatedDevice) { - return clientFactoryPerTenant.get(authenticatedDevice.getTenantId()).createDeviceSpecificCommandConsumer( + + return tenantConnectionManager.createDeviceSpecificCommandConsumer( + authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId(), cmd -> handleCommand(endpoint, cmd, cmdSubscriptionsManager, authenticatedDevice)); } @@ -887,6 +889,9 @@ public final void stop(final Promise stopPromise) { final Promise stopTracker = Promise.promise(); beforeShutdown(stopTracker); + + tenantConnectionManager.closeAllTenants(); + stopTracker.future().onComplete(v -> { if (server != null) { server.close(stopPromise); diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java new file mode 100644 index 00000000..34952d69 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManager.java @@ -0,0 +1,125 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.client.device.amqp.CommandResponder; +import org.eclipse.hono.client.device.amqp.EventSender; +import org.eclipse.hono.client.device.amqp.TelemetrySender; +import org.eclipse.hono.config.ClientConfigProperties; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Manages connections for multiple tenants. + *

+ * NB The {@link #connect(String, Vertx, ClientConfigProperties)} method needs to be invoked before calling any + * of the other methods. + */ +public interface MultiTenantConnectionManager { + + /** + * Connects to Hono's AMQP adapter with the given configuration. + * + * @param tenantId The tenant to connect. + * @param vertx The Vert.x instance to use for the connection. + * @param clientConfig The configuration of the connection. + * @return a succeeded future if the connection could be established within the time frame configured with + * {@link ClientConfigProperties#getConnectTimeout()}, a failed future otherwise. + */ + Future connect(String tenantId, Vertx vertx, ClientConfigProperties clientConfig); + + /** + * Adds an MQTT endpoint for the given tenant. + * + * @param tenantId The tenant to which the endpoint belongs. + * @param mqttEndpoint The endpoint to be added. + * @return A future indicating the outcome of the operation. The future will succeed if the endpoint has been added + * successfully. Otherwise the future will fail with a failure message indicating the cause of the failure. + */ + Future addEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + + /** + * Closes the given MQTT endpoint and if there are no other open endpoints for this tenant, it closes the + * corresponding AMQP connection. + * + * @param tenantId The tenant to which the endpoint belongs. + * @param mqttEndpoint The endpoint to be closed. + * @return A future indicating the outcome of the operation. The future will succeed with a boolean that is + * {@code true} if the AMQP connection (and all MQTT endpoints) have been closed. If an error occurs, the + * future will fail with a failure message indicating the cause of the failure. + */ + Future closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint); + + /** + * Closes all connections, MQTT connections as well as AMQP connections for all tenants. + */ + void closeAllTenants(); + + /** + * Gets a client for sending telemetry data to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateTelemetrySender(String tenantId); + + /** + * Gets a client for sending events to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateEventSender(String tenantId); + + /** + * Gets a client for sending command responses to Hono's AMQP protocol adapter. + * + * @param tenantId The tenant to which the sender belongs. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future getOrCreateCommandResponseSender(String tenantId); + + /** + * Creates a client for consuming commands from Hono's AMQP protocol adapter for a specific device. + * + * @param tenantId The tenant to which the sender belongs. + * @param deviceId The device to consume commands for. + * @param messageHandler The handler to invoke with every command received. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future createDeviceSpecificCommandConsumer(String tenantId, String deviceId, + Consumer messageHandler); + + /** + * Creates a client for consuming commands from Hono's AMQP protocol adapter for all devices of this tenant. + * + * @param tenantId The tenant to which the sender belongs. + * @param messageHandler The handler to invoke with every command received. + * @return a future with the open sender or a failed future. + * @see AmqpAdapterClientFactory#getOrCreateTelemetrySender() + */ + Future createCommandConsumer(String tenantId, Consumer messageHandler); + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java new file mode 100644 index 00000000..7bba53f1 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImpl.java @@ -0,0 +1,131 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.MessageConsumer; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.client.device.amqp.CommandResponder; +import org.eclipse.hono.client.device.amqp.EventSender; +import org.eclipse.hono.client.device.amqp.TelemetrySender; +import org.eclipse.hono.config.ClientConfigProperties; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Tracks MQTT connections per tenant and closes the AMQP connection automatically when the last MQTT connection of the + * tenant is closed. + *

+ * Note: {@link #connect(String, Vertx, ClientConfigProperties)} needs to be invoked before using the instance. + */ +public class MultiTenantConnectionManagerImpl implements MultiTenantConnectionManager { + + private final Map connectionsPerTenant = new HashMap<>(); + + @Override + public Future connect(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { + + connectionsPerTenant.computeIfAbsent(tenantId, k -> { + final TenantConnections tenantConnections = new TenantConnections(k, vertx, clientConfig); + tenantConnections.connect(); + return tenantConnections; + }); + + return getTenantConnections(tenantId) + .compose(tenantConnections -> tenantConnections.isConnected(clientConfig.getConnectTimeout())) + .onFailure(ex -> { + final TenantConnections failedTenant = connectionsPerTenant.remove(tenantId); + if (failedTenant != null) { + failedTenant.closeAllConnections(); + } + }); + } + + @Override + public Future addEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + return getTenantConnections(tenantId) + .compose(tenantConnections -> tenantConnections.addEndpoint(mqttEndpoint)); + } + + @Override + public Future closeEndpoint(final String tenantId, final MqttEndpoint mqttEndpoint) { + + return getTenantConnections(tenantId) + .map(tenantConnections -> tenantConnections.closeEndpoint(mqttEndpoint)) + .onSuccess(amqpLinkClosed -> { + if (amqpLinkClosed) { + connectionsPerTenant.remove(tenantId); + } + }); + + } + + @Override + public void closeAllTenants() { + connectionsPerTenant.forEach((k, connections) -> connections.closeAllConnections()); + connectionsPerTenant.clear(); + } + + @Override + public Future getOrCreateTelemetrySender(final String tenantId) { + return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateTelemetrySender); + } + + @Override + public Future getOrCreateEventSender(final String tenantId) { + return getAmqpAdapterClientFactory(tenantId).compose(AmqpAdapterClientFactory::getOrCreateEventSender); + } + + @Override + public Future getOrCreateCommandResponseSender(final String tenantId) { + return getAmqpAdapterClientFactory(tenantId) + .compose(AmqpAdapterClientFactory::getOrCreateCommandResponseSender); + } + + @Override + public Future createDeviceSpecificCommandConsumer(final String tenantId, final String deviceId, + final Consumer messageHandler) { + + return getAmqpAdapterClientFactory(tenantId) + .compose(factory -> factory.createDeviceSpecificCommandConsumer(deviceId, messageHandler)); + + } + + @Override + public Future createCommandConsumer(final String tenantId, + final Consumer messageHandler) { + + return getAmqpAdapterClientFactory(tenantId).compose(factory -> factory.createCommandConsumer(messageHandler)); + + } + + private Future getTenantConnections(final String tenantId) { + final TenantConnections tenantConnections = connectionsPerTenant.get(tenantId); + if (tenantConnections == null) { + return Future.failedFuture("tenant [" + tenantId + "] is not connected"); + } else { + return Future.succeededFuture(tenantConnections); + } + } + + private Future getAmqpAdapterClientFactory(final String tenantId) { + return getTenantConnections(tenantId).compose(TenantConnections::getAmqpAdapterClientFactory); + } +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java new file mode 100644 index 00000000..c6697e66 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/main/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnections.java @@ -0,0 +1,171 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.hono.client.ConnectionLifecycle; +import org.eclipse.hono.client.HonoConnection; +import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.eclipse.hono.config.ClientConfigProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Manages all connections of one tenant, MQTT connections of devices as well as the AMQP connection to Hono's AMQP + * adapter. + * + * By invoking {@link #connect()} an AMQP client for the tenant is connected. Each MQTT endpoint needs to be added to + * keep track of all MQTT connections belonging to the tenant. When the last MQTT endpoint for the tenant is closed, the + * AMQP client - and thus this instance - is closed automatically. + *

+ * Note: do not re-use an instance if it is already closed. + */ +class TenantConnections { + + // visible for testing + final List mqttEndpoints = new ArrayList<>(); + + private final AmqpAdapterClientFactory amqpAdapterClientFactory; + private final Logger log = LoggerFactory.getLogger(getClass()); + private final String tenantId; + + private boolean closed = false; + + /** + * Creates a new instance with a new {@link AmqpAdapterClientFactory} and a new {@link HonoConnection}. + * + * @param tenantId The ID of the tenant whose connections are to be managed + * @param vertx The Vert.x instance to be used by the HonoConnection. + * @param clientConfig The client configuration to be used by the HonoConnection. + */ + TenantConnections(final String tenantId, final Vertx vertx, final ClientConfigProperties clientConfig) { + this(AmqpAdapterClientFactory.create(HonoConnection.newConnection(vertx, clientConfig), tenantId), tenantId); + } + + /** + * Creates a new instance for the given {@link AmqpAdapterClientFactory}. + *

+ * This constructor is for testing purposes only. + * + * @param amqpAdapterClientFactory The AmqpAdapterClientFactory to use for creating AMQP clients. + * @param tenantId The ID of the tenant whose connections are to be managed + */ + TenantConnections(final AmqpAdapterClientFactory amqpAdapterClientFactory, final String tenantId) { + this.amqpAdapterClientFactory = amqpAdapterClientFactory; + this.tenantId = tenantId; + } + + /** + * Opens a connection to Hono's AMQP protocol adapter for the tenant to be managed. + * + * @return A future indicating the outcome of the operation. + */ + public Future connect() { + return getAmqpAdapterClientFactory().compose(ConnectionLifecycle::connect) + .onSuccess(con -> log.debug("Connected to AMQP adapter")); + } + + /** + * Adds an MQTT endpoint for the tenant. + * + * @param mqttEndpoint The endpoint to add. + * @return A future indicating the outcome of the operation. + */ + public Future addEndpoint(final MqttEndpoint mqttEndpoint) { + return failIfClosed().onSuccess(v -> mqttEndpoints.add(mqttEndpoint)); + } + + /** + * Closes the given MQTT endpoint and if there are no other MQTT endpoints present, it closes the AMQP client and + * this instance. + * + * @param mqttEndpoint The endpoint to be closed. + * @return {@code true} if the AMQP connection has been closed. + */ + public boolean closeEndpoint(final MqttEndpoint mqttEndpoint) { + + closeEndpointIfConnected(mqttEndpoint); + + mqttEndpoints.remove(mqttEndpoint); + + if (mqttEndpoints.isEmpty()) { + closeThisInstance(); + } + + return closed; + } + + /** + * Closes all MQTT endpoints and the AMQP connection. + */ + public void closeAllConnections() { + log.info("closing all AMQP connections"); + + mqttEndpoints.forEach(this::closeEndpointIfConnected); + mqttEndpoints.clear(); + closeThisInstance(); + } + + private void closeEndpointIfConnected(final MqttEndpoint mqttEndpoint) { + if (mqttEndpoint.isConnected()) { + log.debug("closing connection with client [client ID: {}]", mqttEndpoint.clientIdentifier()); + mqttEndpoint.close(); + } else { + log.trace("connection to client is already closed"); + } + } + + private void closeThisInstance() { + amqpAdapterClientFactory.disconnect(); + closed = true; + } + + /** + * Checks whether the AMQP connection is currently established. + * + * @param connectTimeout The maximum number of milliseconds to wait for an ongoing connection attempt to finish. + * @return A succeeded future if this connection is established. Otherwise, the future will be failed with a + * {@link ServerErrorException}, or an {@link IllegalStateException} if this instance is already closed. + */ + public Future isConnected(final long connectTimeout) { + return getAmqpAdapterClientFactory().compose(f -> f.isConnected(connectTimeout)); + } + + /** + * Returns the AmqpAdapterClientFactory for the tenant. + * + * @return A future containing the AmqpAdapterClientFactory, or, if this instance is already closed, a failed + * future. + */ + public Future getAmqpAdapterClientFactory() { + return failIfClosed().map(amqpAdapterClientFactory); + } + + private Future failIfClosed() { + if (closed) { + final Exception ex = new IllegalStateException("connections for this tenant are already closed"); + log.warn("This should not happen", ex); + return Future.failedFuture(ex); + } else { + return Future.succeededFuture(); + } + } +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java index e5dcca78..c0ce6eab 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/AbstractMqttProtocolGatewayTest.java @@ -35,12 +35,12 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.Target; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.eclipse.hono.client.HonoConnection; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.client.device.amqp.CommandResponder; import org.eclipse.hono.client.device.amqp.EventSender; import org.eclipse.hono.client.device.amqp.TelemetrySender; @@ -79,6 +79,7 @@ import io.vertx.junit5.VertxTestContext; import io.vertx.mqtt.MqttEndpoint; import io.vertx.mqtt.MqttServerOptions; +import io.vertx.proton.ProtonDelivery; import io.vertx.proton.ProtonQoS; import io.vertx.proton.ProtonReceiver; import io.vertx.proton.ProtonSender; @@ -94,7 +95,7 @@ public class AbstractMqttProtocolGatewayTest { private Vertx vertx; private ProtonSender protonSender; private NetServer netServer; - private AmqpAdapterClientFactory amqpAdapterClientFactory; + private MultiTenantConnectionManager tenantConnectionManager; private Consumer commandHandler; /** @@ -102,12 +103,14 @@ public class AbstractMqttProtocolGatewayTest { */ @BeforeEach public void setUp() { - amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class); netServer = mock(NetServer.class); vertx = mock(Vertx.class); protonSender = mockProtonSender(); - when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture()); + tenantConnectionManager = mock(MultiTenantConnectionManager.class); + when(tenantConnectionManager.connect(anyString(), any(), any())).thenReturn(Future.succeededFuture()); + when(tenantConnectionManager.addEndpoint(anyString(), any())).thenReturn(Future.succeededFuture()); + when(tenantConnectionManager.closeEndpoint(anyString(), any())).thenReturn(Future.succeededFuture(true)); amqpClientConfig = new ClientConfigProperties(); final HonoConnection connection = mockHonoConnection(vertx, amqpClientConfig, protonSender); @@ -115,21 +118,21 @@ public void setUp() { final Future eventSender = AmqpAdapterClientEventSenderImpl .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateEventSender()).thenReturn(eventSender); + when(tenantConnectionManager.getOrCreateEventSender(anyString())).thenReturn(eventSender); final Future telemetrySender = AmqpAdapterClientTelemetrySenderImpl .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateTelemetrySender()).thenReturn(telemetrySender); + when(tenantConnectionManager.getOrCreateTelemetrySender(anyString())).thenReturn(telemetrySender); final Future commandResponseSender = AmqpAdapterClientCommandResponseSender .createWithAnonymousLinkAddress(connection, TestMqttProtocolGateway.TENANT_ID, s -> { }); - when(amqpAdapterClientFactory.getOrCreateCommandResponseSender()).thenReturn(commandResponseSender); + when(tenantConnectionManager.getOrCreateCommandResponseSender(anyString())).thenReturn(commandResponseSender); - when(amqpAdapterClientFactory.createDeviceSpecificCommandConsumer(anyString(), any())) + when(tenantConnectionManager.createDeviceSpecificCommandConsumer(anyString(), anyString(), any())) .thenAnswer(invocation -> { - final Consumer msgHandler = invocation.getArgument(1); + final Consumer msgHandler = invocation.getArgument(2); setCommandHandler(msgHandler); return AmqpAdapterClientCommandConsumer.create(connection, TestMqttProtocolGateway.TENANT_ID, TestMqttProtocolGateway.DEVICE_ID, @@ -380,7 +383,7 @@ public void testConnectWithClientCertSucceeds() { // GIVEN a protocol gateway configured with a trust anchor final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future> getTrustAnchors(final List certificates) { @@ -405,7 +408,7 @@ public void testAuthenticationWithClientCertFailsIfTrustAnchorDoesNotMatch() { // GIVEN a protocol gateway configured with a trust anchor final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future> getTrustAnchors(final List certificates) { @@ -432,7 +435,8 @@ protected Future> getTrustAnchors(final List c public void testConnectFailsWhenGatewayCouldNotConnect() { // GIVEN a protocol gateway where establishing a connection to Hono's AMQP adapter fails - when(amqpAdapterClientFactory.connect()).thenReturn(Future.failedFuture("Connect failed")); + when(tenantConnectionManager.connect(anyString(), any(), any())) + .thenReturn(Future.failedFuture("Connect failed")); final TestMqttProtocolGateway gateway = createGateway(); @@ -455,26 +459,25 @@ public void testConnectWithGatewayCredentialsResolvedDynamicallySucceeds() { // ... and where the gateway credentials are resolved by the implementation final ClientConfigProperties configWithoutCredentials = new ClientConfigProperties(); final AbstractMqttProtocolGateway gateway = new TestMqttProtocolGateway(configWithoutCredentials, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager); - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { + // WHEN the gateway connects + connectTestDevice(gateway); - // THEN the AMQP connection is authenticated with the provided credentials... - assertThat(clientConfig.getUsername()).isEqualTo(GW_USERNAME); - assertThat(clientConfig.getPassword()).isEqualTo(GW_PASSWORD); + final ArgumentCaptor configPropertiesArgumentCaptor = ArgumentCaptor + .forClass(ClientConfigProperties.class); - // ... and not with the credentials from the configuration - assertThat(clientConfig.getUsername()).isNotEqualTo(configWithoutCredentials.getUsername()); - assertThat(clientConfig.getPassword()).isNotEqualTo(configWithoutCredentials.getPassword()); + verify(tenantConnectionManager).connect(anyString(), any(), configPropertiesArgumentCaptor.capture()); - return super.createTenantClientFactory(tenantId, clientConfig); - } - }; + final ClientConfigProperties clientConfig = configPropertiesArgumentCaptor.getValue(); - // WHEN the gateway connects - connectTestDevice(gateway); + // THEN the AMQP connection is authenticated with the provided credentials... + assertThat(clientConfig.getUsername()).isEqualTo(TestMqttProtocolGateway.GW_USERNAME); + assertThat(clientConfig.getPassword()).isEqualTo(TestMqttProtocolGateway.GW_PASSWORD); + + // ... and not with the credentials from the configuration + assertThat(clientConfig.getUsername()).isNotEqualTo(configWithoutCredentials.getUsername()); + assertThat(clientConfig.getPassword()).isNotEqualTo(configWithoutCredentials.getPassword()); } @@ -494,27 +497,25 @@ public void testConfiguredCredentialsTakePrecedenceOverImplementation() { // GIVEN a protocol gateway where the AMQP config does contains credentials final AbstractMqttProtocolGateway gateway = new TestMqttProtocolGateway(configWithCredentials, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager); - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { + // WHEN the gateway connects + connectTestDevice(gateway); - // THEN the AMQP connection is authenticated with the configured credentials... - assertThat(clientConfig.getUsername()).isEqualTo(username); - assertThat(clientConfig.getPassword()).isEqualTo(password); + final ArgumentCaptor configPropertiesArgumentCaptor = ArgumentCaptor + .forClass(ClientConfigProperties.class); - // ... and not with the credentials from the implementation - assertThat(clientConfig.getUsername()).isNotEqualTo(GW_USERNAME); - assertThat(clientConfig.getPassword()).isNotEqualTo(GW_PASSWORD); + verify(tenantConnectionManager).connect(anyString(), any(), configPropertiesArgumentCaptor.capture()); - return super.createTenantClientFactory(tenantId, clientConfig); - } - }; + final ClientConfigProperties clientConfig = configPropertiesArgumentCaptor.getValue(); - // WHEN the gateway connects - connectTestDevice(gateway); + // THEN the AMQP connection is authenticated with the configured credentials... + assertThat(clientConfig.getUsername()).isEqualTo(username); + assertThat(clientConfig.getPassword()).isEqualTo(password); + // ... and not with the credentials from the implementation + assertThat(clientConfig.getUsername()).isNotEqualTo(TestMqttProtocolGateway.GW_USERNAME); + assertThat(clientConfig.getPassword()).isNotEqualTo(TestMqttProtocolGateway.GW_PASSWORD); } /** @@ -579,6 +580,43 @@ public void testEventMessage() { } + /** + * Verifies that when a message is being rejected by the remote, the connection to the device is closed. + */ + @Test + public void sendEventClosesEndpointWhenMessageIsRejected() { + + // GIVEN a protocol gateway that sends every MQTT publish message as an event downstream and a connected MQTT + // endpoint + final TestMqttProtocolGateway gateway = createGateway(); + final MqttEndpoint mqttEndpoint = connectTestDevice(gateway); + + // WHEN sending a MQTT message... + ProtocolGatewayTestHelper.sendMessage(mqttEndpoint, Buffer.buffer("payload1"), "topic/1"); + // ... that gets rejected by the remote + rejectAmqpMessage(); + + // THEN the endpoint has been closed + assertThat(mqttEndpoint.isConnected()).isFalse(); + // ... and the callback onDeviceConnectionClose() has been invoked + assertThat(gateway.isConnectionClosed()).isTrue(); + + } + + private void rejectAmqpMessage() { + + @SuppressWarnings("unchecked") + final ArgumentCaptor> handlerArgumentCaptor = ArgumentCaptor.forClass(Handler.class); + + verify(protonSender).send(any(), handlerArgumentCaptor.capture()); + + final ProtonDelivery protonDelivery = mock(ProtonDelivery.class); + when(protonDelivery.getRemoteState()).thenReturn(new Rejected()); + when(protonDelivery.remotelySettled()).thenReturn(true); + + handlerArgumentCaptor.getValue().handle(protonDelivery); + } + /** * Verifies that a telemetry message is being sent to the right address. */ @@ -590,7 +628,7 @@ public void testTelemetryMessage() { // GIVEN a protocol gateway that sends every MQTT publish messages as telemetry messages downstream and a // connected MQTT endpoint final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future onPublishedMessage(final MqttDownstreamContext ctx) { @@ -627,7 +665,7 @@ public void testCommandResponse() { // GIVEN a protocol gateway that sends every MQTT publish messages as command response messages downstream and a // connected MQTT endpoint final TestMqttProtocolGateway gateway = new TestMqttProtocolGateway(amqpClientConfig, - new MqttProtocolGatewayConfig(), vertx, amqpAdapterClientFactory) { + new MqttProtocolGatewayConfig(), vertx, tenantConnectionManager) { @Override protected Future onPublishedMessage(final MqttDownstreamContext ctx) { @@ -924,7 +962,7 @@ private TestMqttProtocolGateway createGateway() { } private TestMqttProtocolGateway createGateway(final MqttProtocolGatewayConfig gatewayServerConfig) { - return new TestMqttProtocolGateway(amqpClientConfig, gatewayServerConfig, vertx, amqpAdapterClientFactory); + return new TestMqttProtocolGateway(amqpClientConfig, gatewayServerConfig, vertx, tenantConnectionManager); } } diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java new file mode 100644 index 00000000..dc0d2d91 --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/MultiTenantConnectionManagerImplTest.java @@ -0,0 +1,127 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.eclipse.hono.config.ClientConfigProperties; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Verifies behavior of {@link MultiTenantConnectionManagerImpl}. + */ +public class MultiTenantConnectionManagerImplTest { + + private static final String TENANT_ID = "test-tenant"; + private MultiTenantConnectionManagerImpl connectionManager; + private MqttEndpoint endpoint; + private Vertx vertx; + + /** + * Sets up common fixture. + */ + @BeforeEach + public void setUp() { + connectionManager = new MultiTenantConnectionManagerImpl(); + endpoint = mock(MqttEndpoint.class); + + vertx = mock(Vertx.class); + when(vertx.getOrCreateContext()).thenReturn(mock(Context.class)); + } + + /** + * Verifies that closing the last endpoint of the tenant, closes the AMQP connection. + */ + @Test + public void amqpConnectionIsClosedWhenClosingLastEndpoint() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + connectionManager.addEndpoint(TENANT_ID, endpoint); + + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isTrue(); + + } + + /** + * Verifies that closing an endpoint while there are others for the same tenant, the AMQP connection is not closed. + */ + @Test + public void amqpConnectionIsOpenWhenClosingEndpointThatIsNotTheLastOne() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + connectionManager.addEndpoint(TENANT_ID, endpoint); + connectionManager.addEndpoint(TENANT_ID, mock(MqttEndpoint.class)); + + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).result()).isFalse(); + + } + + /** + * Verifies that all tenants are closed when closeAllTenants() is invoked. + */ + @Test + public void addEndpointFailsIfInstanceIsClosed() { + + connectionManager.connect(TENANT_ID, vertx, new ClientConfigProperties()); + + connectionManager.closeAllTenants(); + + assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue(); + } + + /** + * Verifies that trying to add an endpoint without connecting the tenant first fails. + */ + @Test + public void addEndpointFailsIfNotConnected() { + assertThat(connectionManager.addEndpoint(TENANT_ID, endpoint).failed()).isTrue(); + } + + /** + * Verifies that trying to close an endpoint without connecting the tenant first fails. + */ + @Test + public void closeEndpointFailsIfNotConnected() { + assertThat(connectionManager.closeEndpoint(TENANT_ID, endpoint).failed()).isTrue(); + } + + /** + * Verifies that calling one of the methods that delegate to AmqpAdapterClientFactory fails if the tenant is not + * connected. + */ + @Test + public void futureFailsIfNotConnected() { + + assertThat(connectionManager.getOrCreateTelemetrySender(TENANT_ID).failed()).isTrue(); + + assertThat(connectionManager.getOrCreateEventSender(TENANT_ID).failed()).isTrue(); + + assertThat(connectionManager.getOrCreateCommandResponseSender(TENANT_ID).failed()).isTrue(); + + assertThat(connectionManager.createDeviceSpecificCommandConsumer(TENANT_ID, "device-id", msg -> { + }).failed()).isTrue(); + + assertThat(connectionManager.createCommandConsumer(TENANT_ID, msg -> { + }).failed()).isTrue(); + + } + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java new file mode 100644 index 00000000..43aeb36f --- /dev/null +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TenantConnectionsTest.java @@ -0,0 +1,134 @@ +/******************************************************************************* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.gateway.sdk.mqtt2amqp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Future; +import io.vertx.mqtt.MqttEndpoint; + +/** + * Verifies behavior of {@link TenantConnections}. + */ +public class TenantConnectionsTest { + + private TenantConnections tenantConnections; + private MqttEndpoint endpoint; + private AmqpAdapterClientFactory amqpAdapterClientFactory; + + /** + * Sets up common fixture. + */ + @BeforeEach + public void setUp() { + amqpAdapterClientFactory = mock(AmqpAdapterClientFactory.class); + + tenantConnections = new TenantConnections(amqpAdapterClientFactory, "a-tenant-id"); + endpoint = mock(MqttEndpoint.class); + } + + /** + * Verifies that adding an endpoint works. + */ + @Test + public void containsEndpointWhenAdding() { + tenantConnections.addEndpoint(endpoint); + + assertThat(tenantConnections.mqttEndpoints.size()).isEqualTo(1); + assertThat(tenantConnections.mqttEndpoints.contains(endpoint)).isTrue(); + } + + /** + * Verifies that removing an endpoint works. + */ + @Test + public void endpointIsRemovedWhenClosingEndpoint() { + tenantConnections.addEndpoint(endpoint); + + tenantConnections.closeEndpoint(endpoint); + + assertThat(tenantConnections.mqttEndpoints.isEmpty()).isTrue(); + } + + /** + * Verifies that the instance is closed when the last endpoint is closed. + */ + @Test + public void instanceIsClosedWhenClosingLastEndpoint() { + tenantConnections.addEndpoint(endpoint); + + tenantConnections.closeEndpoint(endpoint); + + assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue(); + } + + /** + * Verifies that the instance is NOT closed when an endpoint is closed while other endpoints are still open. + */ + @Test + public void instanceIsOpenWhenClosingEndpointThatIsNotTheLastOne() { + tenantConnections.addEndpoint(endpoint); + tenantConnections.addEndpoint(mock(MqttEndpoint.class)); + + tenantConnections.closeEndpoint(endpoint); + + assertThat(tenantConnections.getAmqpAdapterClientFactory().succeeded()).isTrue(); + } + + /** + * Verifies that the instance is closed when closeAllConnections() is invoked. + */ + @Test + public void instanceIsClosedWhenInvokingClose() { + + tenantConnections.getAmqpAdapterClientFactory(); + + tenantConnections.closeAllConnections(); + + assertThat(tenantConnections.getAmqpAdapterClientFactory().failed()).isTrue(); + } + + /** + * Verifies that the isConnected() method delegates the check to the client factory. + */ + @Test + public void isConnectedDelegatesToClientFactory() { + when(amqpAdapterClientFactory.isConnected(anyLong())).thenReturn(Future.succeededFuture()); + + tenantConnections.isConnected(5L); + verify(amqpAdapterClientFactory).isConnected(eq(5L)); + } + + /** + * Verifies that the connect() method delegates the call to the client factory. + */ + @Test + public void connectDelegatesToClientFactory() { + when(amqpAdapterClientFactory.connect()).thenReturn(Future.succeededFuture()); + + tenantConnections.connect(); + verify(amqpAdapterClientFactory).connect(); + + } + +} diff --git a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java index 76036cd4..6831a818 100644 --- a/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java +++ b/protocol-gateway/mqtt-protocol-gateway-template/src/test/java/org/eclipse/hono/gateway/sdk/mqtt2amqp/TestMqttProtocolGateway.java @@ -17,14 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.hono.auth.Device; -import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory; import org.eclipse.hono.config.ClientConfigProperties; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.Command; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.CommandSubscriptionsManager; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.Credentials; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttCommandContext; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttDownstreamContext; -import org.eclipse.hono.gateway.sdk.mqtt2amqp.MqttProtocolGatewayConfig; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.DownstreamMessage; import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.EventMessage; @@ -66,15 +59,15 @@ class TestMqttProtocolGateway extends AbstractMqttProtocolGateway { private final AtomicBoolean startupComplete = new AtomicBoolean(); private final AtomicBoolean shutdownStarted = new AtomicBoolean(); private final AtomicBoolean connectionClosed = new AtomicBoolean(); - private final AmqpAdapterClientFactory amqpAdapterClientFactory; private CommandSubscriptionsManager commandSubscriptionsManager; TestMqttProtocolGateway(final ClientConfigProperties clientConfigProperties, - final MqttProtocolGatewayConfig mqttProtocolGatewayConfig, final Vertx vertx, - final AmqpAdapterClientFactory amqpAdapterClientFactory) { - super(clientConfigProperties, mqttProtocolGatewayConfig); - this.amqpAdapterClientFactory = amqpAdapterClientFactory; + final MqttProtocolGatewayConfig mqttProtocolGatewayConfig, + final Vertx vertx, + final MultiTenantConnectionManager tenantConnectionManager) { + + super(clientConfigProperties, mqttProtocolGatewayConfig, tenantConnectionManager); super.vertx = vertx; } @@ -115,12 +108,6 @@ public CommandSubscriptionsManager getCommandSubscriptionsManager() { return commandSubscriptionsManager; } - @Override - AmqpAdapterClientFactory createTenantClientFactory(final String tenantId, - final ClientConfigProperties clientConfig) { - return amqpAdapterClientFactory; - } - @Override protected Future authenticateDevice(final String username, final String password, final String clientId) {