From 5b42644ff63b1c27ee1437f298c7d5c8a24ad19e Mon Sep 17 00:00:00 2001 From: Steve Hillman Date: Fri, 12 Aug 2022 10:44:47 -0700 Subject: [PATCH 1/4] Use a single JMS Session per connection, rather than one per event Each created Session spawns a thread to handle message transmission. Prior to this change, each change resulted in a new thread being created. If the ChangeLog had a large number of pending changes (10s of thousands), memory starvation occurred. This change serializes message transmission --- .../GrouperMessagingActiveMQSystem.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java index d448bf2c08db..d43ee0692470 100644 --- a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java +++ b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java @@ -237,6 +237,29 @@ private enum ActiveMQClientConnectionFactory { INSTANCE; private Map messagingSystemNameConnection = new HashMap(); + + private Map messagingSystemNameSession = new HashMap<>(); + + private Session getActiveMQSendSession(String messagingSystemName) throws JMSException { + if (StringUtils.isBlank(messagingSystemName)) { + throw new IllegalArgumentException("messagingSystemName is required."); + } + + Connection connection = messagingSystemNameConnection.get(messagingSystemName); + + if (connection == null) { + throw new JMSException("Connection does not exist. Create a connection first"); + } + + Session session = messagingSystemNameSession.get(messagingSystemName); + + if (session == null) { + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messagingSystemNameSession.put(messagingSystemName,session); + } + + return session; + } private Connection getActiveMQConnection(String messagingSystemName) throws JMSException { @@ -299,6 +322,7 @@ private void closeConnection(String messagingSystemName) throws JMSException { throw new RuntimeException("Error occurred while closing ActiveMQ connection for "+messagingSystemName, e); } finally { messagingSystemNameConnection.remove(messagingSystemName); + messagingSystemNameSession.remove(messagingSystemName); } } } From e3929b87d7dab9cef68802ae5f7e06c5f080a01a Mon Sep 17 00:00:00 2001 From: Steve Hillman Date: Fri, 26 Aug 2022 16:23:42 -0700 Subject: [PATCH 2/4] Use a single shared session per ActiveMQ connection by default - The previous behaviour used a separate session in the ActiveMQ client library for each message being sent. Each session spawns two threads. If a Change Log update involved too many changes, memory exhaustion could occur. The old behaviour can be restored by setting grouper.messaging.system.activeMqMessaging.useSharedSession = false in grouper.client.properties --- .../GrouperMessagingActiveMQSystem.java | 16 ++++++++++--- ...grouper.client.activeMq.example.properties | 3 +++ .../conf/grouper.client.base.properties | 7 +++++- .../messaging/GrouperMessagingConfig.java | 23 +++++++++++++++++++ 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java index d43ee0692470..66d405eba0f1 100644 --- a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java +++ b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java @@ -66,7 +66,7 @@ public GrouperMessageSendResult send(GrouperMessageSendParam grouperMessageSendP Connection connection = ActiveMQClientConnectionFactory.INSTANCE.getActiveMQConnection(systemParam.getMessageSystemName()); // Create a non-transactional session with automatic acknowledgement - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session = ActiveMQClientConnectionFactory.INSTANCE.getActiveMQSendSession(systemParam.getMessageSystemName()); Destination destination = null; if (queueParam.getQueueType() == GrouperMessageQueueType.queue) { @@ -240,6 +240,8 @@ private enum ActiveMQClientConnectionFactory { private Map messagingSystemNameSession = new HashMap<>(); + private Map useSharedSessionsMap = new HashMap<>(); + private Session getActiveMQSendSession(String messagingSystemName) throws JMSException { if (StringUtils.isBlank(messagingSystemName)) { throw new IllegalArgumentException("messagingSystemName is required."); @@ -251,10 +253,14 @@ private Session getActiveMQSendSession(String messagingSystemName) throws JMSExc throw new JMSException("Connection does not exist. Create a connection first"); } + if (!(useSharedSessionsMap.get(messagingSystemName))) { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + Session session = messagingSystemNameSession.get(messagingSystemName); if (session == null) { - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); messagingSystemNameSession.put(messagingSystemName,session); } @@ -279,6 +285,7 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE String uri = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "uri"); String username = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "username"); String password = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "password"); + boolean useSharedSession = grouperMessagingConfig.propertyValueBoolean(GrouperClientConfig.retrieveConfig(), "useSharedSession", true); if (StringUtils.isNotBlank(password)) { password = GrouperClientUtils.decryptFromFileIfFileExists(password, null); @@ -303,7 +310,10 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE connection = factory.createConnection(); connection.start(); messagingSystemNameConnection.put(messagingSystemName, connection); - + useSharedSessionsMap.put(messagingSystemName,useSharedSession); + if (!useSharedSession) { + LOG.info("Warning: Using a session per ActiveMQ message can lead to memory exhaustion"); + } } } return connection; diff --git a/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties b/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties index 9725b89ec929..c1ef5f1647bb 100644 --- a/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties +++ b/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties @@ -23,6 +23,9 @@ grouper.messaging.system.activeMqSystem.username = # password of activemq queue grouper.messaging.system.activeMqSystem.password = + +# Whether to use shared ActiveMQ session per connection (default=true) +grouper.messaging.system.activeMqSystem.useSharedSession = true # number of seconds to sleep while waiting grouper.messaging.system.activeMqSystem.polling.sleep.seconds = 5 diff --git a/grouper-misc/grouperClient/conf/grouper.client.base.properties b/grouper-misc/grouperClient/conf/grouper.client.base.properties index a5b931936662..bfa3e639981f 100644 --- a/grouper-misc/grouperClient/conf/grouper.client.base.properties +++ b/grouper-misc/grouperClient/conf/grouper.client.base.properties @@ -989,7 +989,12 @@ grouper.messaging.system.grouperBuiltinMessaging.class = edu.internet2.middlewar # password of activemq queue # {valueType: "password", required: true} -# grouper.messaging.system.activeMqMessaging.password = +# grouper.messaging.system.activeMqMessaging.password = + +# Whether to use shared session for sending ActiveMQ messages (shared = one per Connection, vs one per message) +# Disabling this may lead to memory exhaustion during large bursts of changes +# {valueType: "boolean", defaultValue: "true"} +# grouper.messaging.system.activeMqMessaging.useSharedSession = # if this activemq connector is enabled # {valueType: "boolean", defaultValue: "true"} diff --git a/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java b/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java index 78ffcc6347b8..ad254497ce6b 100644 --- a/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java +++ b/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java @@ -137,6 +137,29 @@ public String propertyValueString(GrouperClientConfig grouperClientConfig, Strin return null; } + + /** + * + * @param grouperClientConfig + * @param propertyNameSuffix + * @param defaultValue + * @return the value or the override + */ + public boolean propertyValueBoolean(GrouperClientConfig grouperClientConfig, String propertyNameSuffix, boolean defaultValue) { + + String propertyValueString = this.propertyValueString(grouperClientConfig, propertyNameSuffix); + + if (!StringUtils.isBlank(propertyValueString)) { + try { + return ConfigPropertiesCascadeUtils.booleanValue(propertyValueString); + } catch (Exception e) { + + } + throw new RuntimeException("Invalid boolean value: '" + propertyValueString + "' for property sufffix: " + + propertyNameSuffix + " in messaging system: " + this.name + " in config file: grouper.client.properties file"); + } + return defaultValue; + } } From 5eb6baca9855f6e6e9ded165575185a37efc2ead Mon Sep 17 00:00:00 2001 From: Steve Hillman Date: Tue, 30 Aug 2022 15:38:04 -0700 Subject: [PATCH 3/4] Remove feature-flag code for shared session --- .../GrouperMessagingActiveMQSystem.java | 15 ++---------- ...grouper.client.activeMq.example.properties | 3 --- .../conf/grouper.client.base.properties | 5 ---- .../messaging/GrouperMessagingConfig.java | 23 ------------------- 4 files changed, 2 insertions(+), 44 deletions(-) diff --git a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java index 66d405eba0f1..37e0e3d83d96 100644 --- a/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java +++ b/grouper-misc/grouper-messaging-activemq/src/main/java/edu/internet2/middleware/grouperMessagingActiveMQ/GrouperMessagingActiveMQSystem.java @@ -240,8 +240,6 @@ private enum ActiveMQClientConnectionFactory { private Map messagingSystemNameSession = new HashMap<>(); - private Map useSharedSessionsMap = new HashMap<>(); - private Session getActiveMQSendSession(String messagingSystemName) throws JMSException { if (StringUtils.isBlank(messagingSystemName)) { throw new IllegalArgumentException("messagingSystemName is required."); @@ -252,11 +250,7 @@ private Session getActiveMQSendSession(String messagingSystemName) throws JMSExc if (connection == null) { throw new JMSException("Connection does not exist. Create a connection first"); } - - if (!(useSharedSessionsMap.get(messagingSystemName))) { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - + Session session = messagingSystemNameSession.get(messagingSystemName); if (session == null) { @@ -285,8 +279,7 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE String uri = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "uri"); String username = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "username"); String password = grouperMessagingConfig.propertyValueString(GrouperClientConfig.retrieveConfig(), "password"); - boolean useSharedSession = grouperMessagingConfig.propertyValueBoolean(GrouperClientConfig.retrieveConfig(), "useSharedSession", true); - + if (StringUtils.isNotBlank(password)) { password = GrouperClientUtils.decryptFromFileIfFileExists(password, null); } @@ -310,10 +303,6 @@ private Connection getActiveMQConnection(String messagingSystemName) throws JMSE connection = factory.createConnection(); connection.start(); messagingSystemNameConnection.put(messagingSystemName, connection); - useSharedSessionsMap.put(messagingSystemName,useSharedSession); - if (!useSharedSession) { - LOG.info("Warning: Using a session per ActiveMQ message can lead to memory exhaustion"); - } } } return connection; diff --git a/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties b/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties index c1ef5f1647bb..ec95dca3a6ee 100644 --- a/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties +++ b/grouper-misc/grouper-messaging-activemq/src/main/resources/grouper.client.activeMq.example.properties @@ -24,9 +24,6 @@ grouper.messaging.system.activeMqSystem.username = # password of activemq queue grouper.messaging.system.activeMqSystem.password = -# Whether to use shared ActiveMQ session per connection (default=true) -grouper.messaging.system.activeMqSystem.useSharedSession = true - # number of seconds to sleep while waiting grouper.messaging.system.activeMqSystem.polling.sleep.seconds = 5 diff --git a/grouper-misc/grouperClient/conf/grouper.client.base.properties b/grouper-misc/grouperClient/conf/grouper.client.base.properties index bfa3e639981f..78689c246c27 100644 --- a/grouper-misc/grouperClient/conf/grouper.client.base.properties +++ b/grouper-misc/grouperClient/conf/grouper.client.base.properties @@ -991,11 +991,6 @@ grouper.messaging.system.grouperBuiltinMessaging.class = edu.internet2.middlewar # {valueType: "password", required: true} # grouper.messaging.system.activeMqMessaging.password = -# Whether to use shared session for sending ActiveMQ messages (shared = one per Connection, vs one per message) -# Disabling this may lead to memory exhaustion during large bursts of changes -# {valueType: "boolean", defaultValue: "true"} -# grouper.messaging.system.activeMqMessaging.useSharedSession = - # if this activemq connector is enabled # {valueType: "boolean", defaultValue: "true"} # grouper.messaging.system.activeMqMessaging.enabled = diff --git a/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java b/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java index ad254497ce6b..85794abfaefd 100644 --- a/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java +++ b/grouper-misc/grouperClient/src/java/edu/internet2/middleware/grouperClient/messaging/GrouperMessagingConfig.java @@ -138,28 +138,5 @@ public String propertyValueString(GrouperClientConfig grouperClientConfig, Strin return null; } - /** - * - * @param grouperClientConfig - * @param propertyNameSuffix - * @param defaultValue - * @return the value or the override - */ - public boolean propertyValueBoolean(GrouperClientConfig grouperClientConfig, String propertyNameSuffix, boolean defaultValue) { - - String propertyValueString = this.propertyValueString(grouperClientConfig, propertyNameSuffix); - - if (!StringUtils.isBlank(propertyValueString)) { - try { - return ConfigPropertiesCascadeUtils.booleanValue(propertyValueString); - } catch (Exception e) { - - } - throw new RuntimeException("Invalid boolean value: '" + propertyValueString + "' for property sufffix: " - + propertyNameSuffix + " in messaging system: " + this.name + " in config file: grouper.client.properties file"); - } - return defaultValue; - } - } From d58233de41ce159ecaa8a688f977b746cb2f1049 Mon Sep 17 00:00:00 2001 From: Steve Hillman Date: Thu, 1 Dec 2022 10:39:34 -0800 Subject: [PATCH 4/4] Fixes to make Ant happy --- .../webservicesClient/RampartPwHandlerClient.java | 2 +- .../middleware/grouper/ws/GrouperServiceJ2ee.java | 11 ++++------- .../grouper/ws/security/GrouperWssecSample.java | 12 ++++++------ .../grouper/ws/security/RampartHandlerServer.java | 2 +- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/grouper-ws/grouper-ws-java-generated-client/src/edu/internet2/middleware/grouper/webservicesClient/RampartPwHandlerClient.java b/grouper-ws/grouper-ws-java-generated-client/src/edu/internet2/middleware/grouper/webservicesClient/RampartPwHandlerClient.java index 8759019044c7..9e1b932feeac 100644 --- a/grouper-ws/grouper-ws-java-generated-client/src/edu/internet2/middleware/grouper/webservicesClient/RampartPwHandlerClient.java +++ b/grouper-ws/grouper-ws-java-generated-client/src/edu/internet2/middleware/grouper/webservicesClient/RampartPwHandlerClient.java @@ -36,7 +36,7 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { for (int i = 0; i < callbacks.length; i++) { WSPasswordCallback pwcb = (WSPasswordCallback) callbacks[i]; - String id = pwcb.getIdentifer(); + String id = pwcb.getIdentifier(); if ("GrouperSystem".equals(id)) { pwcb.setPassword("mypass"); diff --git a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/GrouperServiceJ2ee.java b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/GrouperServiceJ2ee.java index fd7137681f84..9e4337c929af 100644 --- a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/GrouperServiceJ2ee.java +++ b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/GrouperServiceJ2ee.java @@ -21,10 +21,7 @@ import java.io.IOException; import java.security.Principal; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Vector; +import java.util.*; import javax.servlet.Filter; import javax.servlet.FilterChain; @@ -192,16 +189,16 @@ public static Subject retrieveSubjectLoggedIn() { LOG.debug("Number of rampart results: " + results.size()); OUTER: for (int i = 0; i < results.size(); i++) { WSHandlerResult rResult = (WSHandlerResult) results.get(i); - Vector wsSecEngineResults = rResult.getResults(); + List wsSecEngineResults = rResult.getResults(); for (int j = 0; j < wsSecEngineResults.size(); j++) { WSSecurityEngineResult wser = (WSSecurityEngineResult) wsSecEngineResults .get(j); - if (wser.getAction() == WSConstants.UT && wser.getPrincipal() != null) { + if ((Integer)wser.get("action") == WSConstants.UT && wser.get("principal") != null) { //Extract the principal WSUsernameTokenPrincipal principal = (WSUsernameTokenPrincipal) wser - .getPrincipal(); + .get("principal"); //Get user userIdLoggedIn = principal.getName(); diff --git a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/GrouperWssecSample.java b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/GrouperWssecSample.java index a72f5c9e5838..d6d31ba837e7 100644 --- a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/GrouperWssecSample.java +++ b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/GrouperWssecSample.java @@ -31,7 +31,7 @@ public class GrouperWssecSample implements GrouperWssecAuthentication { * @see edu.internet2.middleware.grouper.ws.security.GrouperWssecAuthentication#authenticate(org.apache.ws.security.WSPasswordCallback) */ public boolean authenticate(WSPasswordCallback wsPasswordCallback) throws IOException { - System.out.println("identifier: " + wsPasswordCallback.getIdentifer() + ", usage: " + System.out.println("identifier: " + wsPasswordCallback.getIdentifier() + ", usage: " + wsPasswordCallback.getUsage()); if (wsPasswordCallback.getUsage() == WSPasswordCallback.USERNAME_TOKEN) { @@ -39,8 +39,8 @@ public boolean authenticate(WSPasswordCallback wsPasswordCallback) throws IOExce // because the original one can't be un-digested from the message // we can throw either of the two Exception types if authentication fails - if (!"GrouperSystem".equals(wsPasswordCallback.getIdentifer())) { - throw new IOException("unknown user: " + wsPasswordCallback.getIdentifer()); + if (!"GrouperSystem".equals(wsPasswordCallback.getIdentifier())) { + throw new IOException("unknown user: " + wsPasswordCallback.getIdentifier()); } // this will throw an exception if the passwords don't match @@ -51,14 +51,14 @@ public boolean authenticate(WSPasswordCallback wsPasswordCallback) throws IOExce if (wsPasswordCallback.getUsage() == WSPasswordCallback.USERNAME_TOKEN_UNKNOWN) { // for passwords sent in cleartext mode we can compare passwords directly - if (!"GrouperSystem".equals(wsPasswordCallback.getIdentifer())) { - throw new IOException("unknown user: " + wsPasswordCallback.getIdentifer()); + if (!"GrouperSystem".equals(wsPasswordCallback.getIdentifier())) { + throw new IOException("unknown user: " + wsPasswordCallback.getIdentifier()); } // we can throw either of the two Exception types if authentication fails if (!"mypass".equals(wsPasswordCallback.getPassword())) { throw new IOException("password incorrect for user: " - + wsPasswordCallback.getIdentifer()); + + wsPasswordCallback.getIdentifier()); } return true; } diff --git a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/RampartHandlerServer.java b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/RampartHandlerServer.java index 006abfac9373..81f25d60ac25 100644 --- a/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/RampartHandlerServer.java +++ b/grouper-ws/grouper-ws/src/grouper-ws/edu/internet2/middleware/grouper/ws/security/RampartHandlerServer.java @@ -69,7 +69,7 @@ public void handle (Callback[] callbacks) throws IOException, UnsupportedCallbac for (int i = 0; i < callbacks.length; i++) { if (callbacks[i] instanceof WSPasswordCallback) { WSPasswordCallback wsPasswordCallback = (WSPasswordCallback) callbacks[i]; - LOG.debug("identifier: "+wsPasswordCallback.getIdentifer()+", usage: "+wsPasswordCallback.getUsage()); + LOG.debug("identifier: "+wsPasswordCallback.getIdentifier()+", usage: "+wsPasswordCallback.getUsage()); // if (pc.getUsage() == WSPasswordCallback.USERNAME_TOKEN) { // // for passwords sent in digest mode we need to provide the password,