Skip to content

Commit

Permalink
[#5] Close AMQP connection when all devices are disconnected.
Browse files Browse the repository at this point in the history
The MQTT endpoints are tracked for each tenant. When the last endpoint of a tenant is
closed, the AMQP connection for this tenant is closed as well.

Signed-off-by: Abel Buechner-Mihaljevic <[email protected]>
  • Loading branch information
b-abel authored Sep 11, 2020
1 parent 8a1602d commit 4a31e18
Show file tree
Hide file tree
Showing 8 changed files with 836 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -32,10 +31,8 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory;
import org.eclipse.hono.config.ClientConfigProperties;
import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.CommandResponseMessage;
import org.eclipse.hono.gateway.sdk.mqtt2amqp.downstream.DownstreamMessage;
Expand Down Expand Up @@ -84,7 +81,7 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle {

private final ClientConfigProperties amqpClientConfig;
private final MqttProtocolGatewayConfig mqttGatewayConfig;
private final Map<String, AmqpAdapterClientFactory> clientFactoryPerTenant = new HashMap<>();
private final MultiTenantConnectionManager tenantConnectionManager;

private MqttServer server;

Expand All @@ -104,11 +101,30 @@ public abstract class AbstractMqttProtocolGateway extends AbstractVerticle {
*/
public AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig,
final MqttProtocolGatewayConfig mqttGatewayConfig) {

this(amqpClientConfig, mqttGatewayConfig, new MultiTenantConnectionManagerImpl());
}

/**
* This constructor is only visible for testing purposes.
*
* @param amqpClientConfig The AMQP client configuration.
* @param mqttGatewayConfig The configuration of the protocol gateway.
* @param tenantConnectionManager The tenant connection manager to be used.
* @throws NullPointerException if any of the parameters is {@code null}.
* @see AbstractMqttProtocolGateway#AbstractMqttProtocolGateway(ClientConfigProperties, MqttProtocolGatewayConfig)
*/
AbstractMqttProtocolGateway(final ClientConfigProperties amqpClientConfig,
final MqttProtocolGatewayConfig mqttGatewayConfig,
final MultiTenantConnectionManager tenantConnectionManager) {

Objects.requireNonNull(amqpClientConfig);
Objects.requireNonNull(mqttGatewayConfig);
Objects.requireNonNull(tenantConnectionManager);

this.amqpClientConfig = amqpClientConfig;
this.mqttGatewayConfig = mqttGatewayConfig;
this.tenantConnectionManager = tenantConnectionManager;
}

/**
Expand Down Expand Up @@ -355,7 +371,11 @@ final void handleEndpointConnection(final MqttEndpoint endpoint) {
: Future.succeededFuture(authenticateDevice));

authAttempt
.compose(this::connectGatewayToAmqpAdapter)
.compose(authenticatedDevice -> {
final String tenantId = authenticatedDevice.getTenantId();
return getTenantConfig(tenantId)
.compose(config -> connectGatewayToAmqpAdapter(tenantId, config, endpoint));
})
.onComplete(result -> {
if (result.succeeded()) {
registerHandlers(endpoint, authAttempt.result());
Expand Down Expand Up @@ -414,56 +434,30 @@ private Future<Device> authenticateWithUsernameAndPassword(final MqttEndpoint en
}
}

private Future<Void> connectGatewayToAmqpAdapter(final Device authenticatedDevice) {

final String tenantId = authenticatedDevice.getTenantId();
private Future<ClientConfigProperties> getTenantConfig(final String tenantId) {

if (amqpClientConfig.getUsername() != null && amqpClientConfig.getPassword() != null) {
return connectGatewayToAmqpAdapter(tenantId, amqpClientConfig);
return Future.succeededFuture(amqpClientConfig);
} else {
return provideGatewayCredentials(tenantId)
.compose(credentials -> {
final ClientConfigProperties tenantConfig = new ClientConfigProperties(amqpClientConfig);
tenantConfig.setUsername(credentials.getUsername());
tenantConfig.setPassword(credentials.getPassword());

return connectGatewayToAmqpAdapter(tenantId, tenantConfig);
return Future.succeededFuture(tenantConfig);
});
}
}

private Future<Void> connectGatewayToAmqpAdapter(final String tenantId, final ClientConfigProperties clientConfig) {

final AmqpAdapterClientFactory amqpAdapterClientFactory = clientFactoryPerTenant.get(tenantId);
if (amqpAdapterClientFactory != null) {
return amqpAdapterClientFactory.isConnected(clientConfig.getConnectTimeout());
} else {

final AmqpAdapterClientFactory factory = createTenantClientFactory(tenantId, clientConfig);
clientFactoryPerTenant.put(tenantId, factory);

return factory.connect()
.map(con -> {
log.debug("Connected to AMQP adapter");
return null;
});
}
}
private Future<Void> connectGatewayToAmqpAdapter(final String tenantId,
final ClientConfigProperties clientConfig,
final MqttEndpoint endpoint) {
return tenantConnectionManager.connect(tenantId, vertx, clientConfig)
.onFailure(e -> log.info("Failed to connect to Hono [tenant-id: {}, username: {}]", tenantId,
clientConfig.getUsername()))
.compose(v -> tenantConnectionManager.addEndpoint(tenantId, endpoint));

/**
* Returns a new {@link AmqpAdapterClientFactory} with a new AMQP connection for the given tenant.
* <p>
* This method is only visible for testing purposes.
*
* @param tenantId The tenant to be connected.
* @param clientConfig The client properties to use for the connection.
* @return The factory. Note that the underlying AMQP connection will not be established until
* {@link AmqpAdapterClientFactory#connect()} is invoked.
*/
AmqpAdapterClientFactory createTenantClientFactory(final String tenantId,
final ClientConfigProperties clientConfig) {
final HonoConnection connection = HonoConnection.newConnection(vertx, clientConfig);
return AmqpAdapterClientFactory.create(connection, tenantId);
}

private void registerHandlers(final MqttEndpoint endpoint, final Device authenticatedDevice) {
Expand All @@ -473,22 +467,29 @@ private void registerHandlers(final MqttEndpoint endpoint, final Device authenti
MqttDownstreamContext.fromPublishPacket(message, endpoint, authenticatedDevice)));

final CommandSubscriptionsManager cmdSubscriptionsManager = createCommandHandler(vertx);
endpoint.closeHandler(v -> close(endpoint, cmdSubscriptionsManager));
endpoint.closeHandler(v -> cleanupConnections(endpoint, cmdSubscriptionsManager, authenticatedDevice));
endpoint.publishAcknowledgeHandler(cmdSubscriptionsManager::handlePubAck);
endpoint.subscribeHandler(msg -> onSubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager));
endpoint.unsubscribeHandler(msg -> onUnsubscribe(endpoint, authenticatedDevice, msg, cmdSubscriptionsManager));

}

private void close(final MqttEndpoint endpoint, final CommandSubscriptionsManager cmdSubscriptionsManager) {
private void cleanupConnections(final MqttEndpoint endpoint,
final CommandSubscriptionsManager cmdSubscriptionsManager,
final Device authenticatedDevice) {

log.info("closing connection to device {}", authenticatedDevice.toString());

onDeviceConnectionClose(endpoint);
cmdSubscriptionsManager.removeAllSubscriptions();
if (endpoint.isConnected()) {
log.debug("closing connection with client [client ID: {}]", endpoint.clientIdentifier());
endpoint.close();
} else {
log.trace("connection to client is already closed");
}

final String tenantId = authenticatedDevice.getTenantId();
tenantConnectionManager.closeEndpoint(tenantId, endpoint)
.onSuccess(amqpLinkClosed -> {
if (amqpLinkClosed) {
log.info("closed AMQP connection for tenant [{}]", tenantId);
}
});
}

/**
Expand Down Expand Up @@ -585,16 +586,15 @@ private void onUploadFailure(final MqttDownstreamContext ctx, final Throwable ca
}

if (ctx.deviceEndpoint().isConnected()) {
log.info("closing connection to device {}", ctx.authenticatedDevice().toString());
ctx.deviceEndpoint().close();
ctx.deviceEndpoint().close(); // cleanupConnections() will be called by close handler
}
}

private Future<ProtonDelivery> sendTelemetry(final String tenantId, final String deviceId,
final Map<String, ?> properties, final byte[] payload, final String contentType,
final boolean waitForOutcome) {

return clientFactoryPerTenant.get(tenantId).getOrCreateTelemetrySender()
return tenantConnectionManager.getOrCreateTelemetrySender(tenantId)
.compose(sender -> {
if (waitForOutcome) {
log.trace(
Expand All @@ -616,7 +616,7 @@ private Future<ProtonDelivery> sendEvent(final String tenantId, final String dev
log.trace("sending event message [tenantId: {}, deviceId: {}, contentType: {}, properties: {}]",
tenantId, deviceId, contentType, properties);

return clientFactoryPerTenant.get(tenantId).getOrCreateEventSender()
return tenantConnectionManager.getOrCreateEventSender(tenantId)
.compose(sender -> sender.send(deviceId, payload, contentType, properties));
}

Expand All @@ -628,7 +628,7 @@ private Future<ProtonDelivery> sendCommandResponse(final String tenantId, final
"sending command response [tenantId: {}, deviceId: {}, targetAddress: {}, correlationId: {}, status: {}, contentType: {}, properties: {}]",
tenantId, deviceId, targetAddress, correlationId, status, contentType, properties);

return clientFactoryPerTenant.get(tenantId).getOrCreateCommandResponseSender()
return tenantConnectionManager.getOrCreateCommandResponseSender(tenantId)
.compose(sender -> sender.sendCommandResponse(deviceId, targetAddress, correlationId, status, payload,
contentType, properties));
}
Expand Down Expand Up @@ -733,7 +733,9 @@ private void onUnsubscribe(final MqttEndpoint endpoint, final Device authenticat

private Future<MessageConsumer> createCommandConsumer(final MqttEndpoint endpoint,
final CommandSubscriptionsManager cmdSubscriptionsManager, final Device authenticatedDevice) {
return clientFactoryPerTenant.get(authenticatedDevice.getTenantId()).createDeviceSpecificCommandConsumer(

return tenantConnectionManager.createDeviceSpecificCommandConsumer(
authenticatedDevice.getTenantId(),
authenticatedDevice.getDeviceId(),
cmd -> handleCommand(endpoint, cmd, cmdSubscriptionsManager, authenticatedDevice));
}
Expand Down Expand Up @@ -887,6 +889,9 @@ public final void stop(final Promise<Void> stopPromise) {

final Promise<Void> stopTracker = Promise.promise();
beforeShutdown(stopTracker);

tenantConnectionManager.closeAllTenants();

stopTracker.future().onComplete(v -> {
if (server != null) {
server.close(stopPromise);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*******************************************************************************
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.gateway.sdk.mqtt2amqp;

import java.util.function.Consumer;

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.device.amqp.AmqpAdapterClientFactory;
import org.eclipse.hono.client.device.amqp.CommandResponder;
import org.eclipse.hono.client.device.amqp.EventSender;
import org.eclipse.hono.client.device.amqp.TelemetrySender;
import org.eclipse.hono.config.ClientConfigProperties;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttEndpoint;

/**
* Manages connections for multiple tenants.
* <p>
* <b>NB</b> The {@link #connect(String, Vertx, ClientConfigProperties)} method needs to be invoked before calling any
* of the other methods.
*/
public interface MultiTenantConnectionManager {

/**
* Connects to Hono's AMQP adapter with the given configuration.
*
* @param tenantId The tenant to connect.
* @param vertx The Vert.x instance to use for the connection.
* @param clientConfig The configuration of the connection.
* @return a succeeded future if the connection could be established within the time frame configured with
* {@link ClientConfigProperties#getConnectTimeout()}, a failed future otherwise.
*/
Future<Void> connect(String tenantId, Vertx vertx, ClientConfigProperties clientConfig);

/**
* Adds an MQTT endpoint for the given tenant.
*
* @param tenantId The tenant to which the endpoint belongs.
* @param mqttEndpoint The endpoint to be added.
* @return A future indicating the outcome of the operation. The future will succeed if the endpoint has been added
* successfully. Otherwise the future will fail with a failure message indicating the cause of the failure.
*/
Future<Void> addEndpoint(String tenantId, MqttEndpoint mqttEndpoint);

/**
* Closes the given MQTT endpoint and if there are no other open endpoints for this tenant, it closes the
* corresponding AMQP connection.
*
* @param tenantId The tenant to which the endpoint belongs.
* @param mqttEndpoint The endpoint to be closed.
* @return A future indicating the outcome of the operation. The future will succeed with a boolean that is
* {@code true} if the AMQP connection (and all MQTT endpoints) have been closed. If an error occurs, the
* future will fail with a failure message indicating the cause of the failure.
*/
Future<Boolean> closeEndpoint(String tenantId, MqttEndpoint mqttEndpoint);

/**
* Closes all connections, MQTT connections as well as AMQP connections for all tenants.
*/
void closeAllTenants();

/**
* Gets a client for sending telemetry data to Hono's AMQP protocol adapter.
*
* @param tenantId The tenant to which the sender belongs.
* @return a future with the open sender or a failed future.
* @see AmqpAdapterClientFactory#getOrCreateTelemetrySender()
*/
Future<TelemetrySender> getOrCreateTelemetrySender(String tenantId);

/**
* Gets a client for sending events to Hono's AMQP protocol adapter.
*
* @param tenantId The tenant to which the sender belongs.
* @return a future with the open sender or a failed future.
* @see AmqpAdapterClientFactory#getOrCreateTelemetrySender()
*/
Future<EventSender> getOrCreateEventSender(String tenantId);

/**
* Gets a client for sending command responses to Hono's AMQP protocol adapter.
*
* @param tenantId The tenant to which the sender belongs.
* @return a future with the open sender or a failed future.
* @see AmqpAdapterClientFactory#getOrCreateTelemetrySender()
*/
Future<CommandResponder> getOrCreateCommandResponseSender(String tenantId);

/**
* Creates a client for consuming commands from Hono's AMQP protocol adapter for a specific device.
*
* @param tenantId The tenant to which the sender belongs.
* @param deviceId The device to consume commands for.
* @param messageHandler The handler to invoke with every command received.
* @return a future with the open sender or a failed future.
* @see AmqpAdapterClientFactory#getOrCreateTelemetrySender()
*/
Future<MessageConsumer> createDeviceSpecificCommandConsumer(String tenantId, String deviceId,
Consumer<Message> messageHandler);

/**
* Creates a client for consuming commands from Hono's AMQP protocol adapter for all devices of this tenant.
*
* @param tenantId The tenant to which the sender belongs.
* @param messageHandler The handler to invoke with every command received.
* @return a future with the open sender or a failed future.
* @see AmqpAdapterClientFactory#getOrCreateTelemetrySender()
*/
Future<MessageConsumer> createCommandConsumer(String tenantId, Consumer<Message> messageHandler);

}
Loading

0 comments on commit 4a31e18

Please sign in to comment.