Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a single ActiveMQ session per connection, to prevent OOM errors #205

Open
wants to merge 8 commits into
base: GROUPER_2_6_BRANCH
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public GrouperMessageSendResult send(GrouperMessageSendParam grouperMessageSendP
Connection connection = ActiveMQClientConnectionFactory.INSTANCE.getActiveMQConnection(systemParam.getMessageSystemName());

// Create a non-transactional session with automatic acknowledgement
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = ActiveMQClientConnectionFactory.INSTANCE.getActiveMQSendSession(systemParam.getMessageSystemName());

Destination destination = null;
if (queueParam.getQueueType() == GrouperMessageQueueType.queue) {
Expand Down Expand Up @@ -236,6 +236,29 @@ private enum ActiveMQClientConnectionFactory {
INSTANCE;

private Map<String, Connection> messagingSystemNameConnection = new HashMap<String, Connection>();

private Map<String,Session> messagingSystemNameSession = new HashMap<>();

private Session getActiveMQSendSession(String messagingSystemName) throws JMSException {
if (StringUtils.isBlank(messagingSystemName)) {
throw new IllegalArgumentException("messagingSystemName is required.");
}

Connection connection = messagingSystemNameConnection.get(messagingSystemName);

if (connection == null) {
throw new JMSException("Connection does not exist. Create a connection first");
}

Session session = messagingSystemNameSession.get(messagingSystemName);

if (session == null) {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messagingSystemNameSession.put(messagingSystemName,session);
}

return session;
}

private Connection getActiveMQConnection(String messagingSystemName) throws JMSException {

Expand All @@ -255,7 +278,7 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE
String uri = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "uri");
String username = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "username");
String password = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "password");

if (StringUtils.isNotBlank(password)) {
password = GrouperClientUtils.decryptFromFileIfFileExists(password, null);
}
Expand All @@ -279,7 +302,6 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE
connection = factory.createConnection();
connection.start();
messagingSystemNameConnection.put(messagingSystemName, connection);

}
}
return connection;
Expand All @@ -298,6 +320,7 @@ private void closeConnection(String messagingSystemName) throws JMSException {
throw new RuntimeException("Error occurred while closing ActiveMQ connection for "+messagingSystemName, e);
} finally {
messagingSystemNameConnection.remove(messagingSystemName);
messagingSystemNameSession.remove(messagingSystemName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ grouper.messaging.system.activeMqSystem.username =

# password of activemq queue
grouper.messaging.system.activeMqSystem.password =

# number of seconds to sleep while waiting
grouper.messaging.system.activeMqSystem.polling.sleep.seconds = 5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ grouper.messaging.system.grouperBuiltinMessaging.class = edu.internet2.middlewar

# password of activemq queue
# {valueType: "password", required: true}
# grouper.messaging.system.activeMqMessaging.password =
# grouper.messaging.system.activeMqMessaging.password =

# if this activemq connector is enabled
# {valueType: "boolean", defaultValue: "true"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ public String propertyValueString(GrouperClientConfig grouperClientConfig, Strin

return null;
}

}