-
Notifications
You must be signed in to change notification settings - Fork 820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connection filter interface for IP Bans #747
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import io.moquette.broker.security.DenyAllAuthorizatorPolicy; | ||
import io.moquette.broker.security.IAuthenticator; | ||
import io.moquette.broker.security.IAuthorizatorPolicy; | ||
import io.moquette.broker.security.IConnectionFilter; | ||
import io.moquette.broker.security.PermitAllAuthorizatorPolicy; | ||
import io.moquette.broker.security.ResourceAuthenticator; | ||
import io.moquette.broker.unsafequeues.QueueException; | ||
|
@@ -169,11 +170,11 @@ public void startServer(IConfig config) throws IOException { | |
*/ | ||
public void startServer(IConfig config, List<? extends InterceptHandler> handlers) throws IOException { | ||
LOG.debug("Starting moquette integration using IConfig instance and intercept handlers"); | ||
startServer(config, handlers, null, null, null); | ||
startServer(config, handlers, null, null, null, null); | ||
} | ||
|
||
public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator, | ||
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException { | ||
IConnectionFilter connectionFilter, IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException { | ||
final long start = System.currentTimeMillis(); | ||
if (handlers == null) { | ||
handlers = Collections.emptyList(); | ||
|
@@ -192,6 +193,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler | |
LOG.info("Using default SSL context creator"); | ||
sslCtxCreator = new DefaultMoquetteSslContextCreator(config); | ||
} | ||
connectionFilter = initializeConnectionFilter(connectionFilter, config); | ||
authenticator = initializeAuthenticator(authenticator, config); | ||
authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config); | ||
|
||
|
@@ -258,7 +260,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler | |
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator, | ||
loopsGroup); | ||
final BrokerConfiguration brokerConfig = new BrokerConfiguration(config); | ||
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions, | ||
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, connectionFilter, sessions, | ||
dispatcher); | ||
|
||
final NewNettyMQTTHandler mqttHandler = new NewNettyMQTTHandler(connectionFactory); | ||
|
@@ -275,6 +277,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler | |
initialized = true; | ||
} | ||
|
||
|
||
private static IQueueRepository initQueuesRepository(IConfig config, Path dataPath, H2Builder h2Builder) throws IOException { | ||
final IQueueRepository queueRepository; | ||
final String queueType = config.getProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME); | ||
|
@@ -472,6 +475,16 @@ private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy aut | |
return authorizatorPolicy; | ||
} | ||
|
||
private IConnectionFilter initializeConnectionFilter(IConnectionFilter connectionFilter, IConfig props) { | ||
LOG.debug("Configuring MQTT Connection Filter"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Debug log should carry some useful information like, the |
||
String connectionFilterClassName = props.getProperty(BrokerConstants.CONNECTION_FILTER_CLASS_NAME, ""); | ||
|
||
if (connectionFilter == null && !connectionFilterClassName.isEmpty()) { | ||
connectionFilter = loadClass(connectionFilterClassName, IConnectionFilter.class, IConfig.class, props); | ||
} | ||
return connectionFilter; | ||
} | ||
|
||
Comment on lines
+480
to
+487
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is a copy and paste of the initial part of |
||
private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, IConfig props) { | ||
LOG.debug("Configuring MQTT authenticator"); | ||
String authenticatorClassName = props.getProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, ""); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.moquette.broker.security; | ||
|
||
import io.moquette.broker.ClientDescriptor; | ||
|
||
public interface IConnectionFilter { | ||
boolean allowConnection(ClientDescriptor clientDescriptor); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
*/ | ||
package io.moquette.broker; | ||
|
||
import io.moquette.broker.security.IConnectionFilter; | ||
import io.moquette.broker.security.PermitAllAuthorizatorPolicy; | ||
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; | ||
import io.moquette.broker.subscriptions.ISubscriptionsDirectory; | ||
|
@@ -82,6 +83,7 @@ private void createMQTTConnection(BrokerConfiguration config) { | |
private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { | ||
IAuthenticator mockAuthenticator = new MockAuthenticator(singleton(FAKE_CLIENT_ID), | ||
singletonMap(TEST_USER, TEST_PWD)); | ||
IConnectionFilter connectionFilter = new MockConnectionFilter(); | ||
|
||
ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory(); | ||
ISubscriptionsRepository subscriptionsRepository = new MemorySubscriptionsRepository(); | ||
|
@@ -94,7 +96,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel | |
sessionRegistry = new SessionRegistry(subscriptions, memorySessionsRepository(), queueRepository, permitAll, scheduler, loopsGroup); | ||
final PostOffice postOffice = new PostOffice(subscriptions, | ||
new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup); | ||
return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); | ||
return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, postOffice); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that in this test we don't use the filter we could inline it, without creating an used variable: return new MQTTConnection(channel, config, mockAuthenticator, new MockConnectionFilter(), sessionRegistry, postOffice); Or better, given that it's a pass-all filter, a more explicit implementation could be used for such cases: public class AcceptAllFilter implements IConnectionFilter {
@Override
public boolean allowConnection(ClientDescriptor clientDescriptor) {
return true;
}
} In all test places where special filtering logic is not needed we can use: return new MQTTConnection(channel, config, mockAuthenticator, new AcceptAllFilter(), sessionRegistry, postOffice); This comment is valid also for other places where the MockConnectionFilter is used with the same intention. |
||
} | ||
|
||
// @NotNull | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package io.moquette.broker; | ||
|
||
import io.moquette.broker.security.IConnectionFilter; | ||
|
||
import java.util.Set; | ||
import java.util.HashSet; | ||
import java.util.stream.Stream; | ||
|
||
public class MockConnectionFilter implements IConnectionFilter { | ||
private Set<String> bannedClientIds = new HashSet<>(); | ||
private Set<String> bannedAddresses = new HashSet<>(); | ||
@Override | ||
public boolean allowConnection(ClientDescriptor clientDescriptor) { | ||
return !bannedClientIds.contains(clientDescriptor.getClientID()) | ||
&& !bannedAddresses.contains(clientDescriptor.getAddress()); | ||
} | ||
|
||
public MockConnectionFilter banClientId(String clientId) { | ||
bannedClientIds.add(clientId); | ||
return this; | ||
} | ||
|
||
public MockConnectionFilter banAddress(String address) { | ||
bannedAddresses.add(address); | ||
return this; | ||
} | ||
|
||
public MockConnectionFilter reset() { | ||
bannedClientIds = new HashSet<>(); | ||
bannedAddresses = new HashSet<>(); | ||
return this; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.