diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java index 9997a41e..ed877104 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/channel/MQTTServerCnx.java @@ -13,12 +13,14 @@ */ package io.streamnative.pulsar.handlers.mqtt.broker.channel; +import static io.streamnative.pulsar.handlers.mqtt.common.Constants.AUTH_DATA_ATTRIBUTE_KEY; import io.netty.channel.ChannelHandlerContext; import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.MQTTConsumer; import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.ServerCnx; @@ -65,4 +67,9 @@ private void safelyRemoveConsumer(Consumer consumer) { }); } } + + @Override + public AuthenticationDataSource getAuthenticationData() { + return ctx.channel().attr(AUTH_DATA_ATTRIBUTE_KEY).get(); + } } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java index 8f8a0417..b631ec1a 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.CONNECT_ACK; import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.DISCONNECTED; import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.ESTABLISHED; +import static io.streamnative.pulsar.handlers.mqtt.common.Constants.AUTH_DATA_ATTRIBUTE_KEY; import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.getAuthMethod; import static io.streamnative.pulsar.handlers.mqtt.common.utils.NettyUtils.ATTR_KEY_CONNECTION; import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; @@ -101,6 +102,7 @@ public class Connection { static ChannelException channelInactiveException = new ChannelException("Channel is inactive"); + Connection(ConnectionBuilder builder) { this.clientId = builder.clientId; this.protocolVersion = builder.protocolVersion; @@ -116,6 +118,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.authData = builder.authData; + this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData); this.addIdleStateHandler(); this.manager.addConnection(this); this.topicAliasManager = new TopicAliasManager(clientRestrictions.getTopicAliasMaximum()); @@ -162,6 +165,7 @@ public CompletableFuture sendAck(MqttAck mqttAck) { public void updateAuthData(AuthenticationDataSource authData) { this.authData = authData; + this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData); } public CompletableFuture sendAckThenClose(MqttAck mqttAck) { diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Constants.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Constants.java index c3a62a1a..255e9293 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Constants.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Constants.java @@ -13,6 +13,9 @@ */ package io.streamnative.pulsar.handlers.mqtt.common; +import io.netty.util.AttributeKey; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; + /** * Server constants keeper. */ @@ -47,6 +50,9 @@ public final class Constants { public static final String MQTT_SUB_PROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1, mqttv5.0"; + public static final AttributeKey AUTH_DATA_ATTRIBUTE_KEY = + AttributeKey.valueOf("authData"); + private Constants() { } }