connecting and reading from AMQ using jpype hangs some times #1219
Unanswered
SIVAPRAS329
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
we are using Jpype python to connect AMQ broker url and read the queue . the programs works fine however frequently its getting hang in the connection state. below is the program we are using.
from flask import Flask, request, jsonify
import jpype,time
from jpype.types import JString
import re
import os
import xml.etree.ElementTree as ET
from datetime import datetime,timedelta
import argparse
#import sys
Create the argument parser
parser = argparse.ArgumentParser()
#Parameters to the script
parser.add_argument('client_id', type=str, help='Client id to connect to the AMQ')
parser.add_argument('queue_name', type=str, help='Name of the queue')
parser.add_argument('incoming_path', type=str, help='Path of the file in landing zone')
parser.add_argument('class_path', type=str, help='Jar file and class path to connect to AMQ')
parser.add_argument('keystore',type=str, help='Path to keystore')
parser.add_argument('keystore_pwd',type=str,help='Keystore password')
parser.add_argument('truststore',type=str, help='Path to truststore')
parser.add_argument('truststore_pwd',type=str,help='Truststorestore password')
parser.add_argument('broker_url',type=str,help='url to connect to AMQ broker')
parser.add_argument('end_time',type=int,help='How long the code to listen to AMQ to extract the message in minutes')
parser.add_argument('sleep_time',type=int,help='time interval between each waiting period in seconds')
Parse the arguments
args = parser.parse_args()
print("Parameters from arg_parse:",args)
#print("Arguments:", sys.argv[1:])
logging_properties_path = "/mtproject/logging.properties"
os.environ['CLASSPATH'] = args.class_path
def parse_message(xml_string):
#Parse the XML string
#print(xml_string)
xml_string = str(xml_string)
root = ET.fromstring(xml_string)
interface_name = root.get('InterfaceName')
file_name = interface_name.replace("Interface", "")
result = ''
for i, char in enumerate(file_name):
if char.isupper() and i > 0:
result += ''
result += char.upper()
#final_file_name = f"{result}{datetime.now().strftime('%Y%m%d_%H%M%S%f')}.xml"
if 'Sec' in result:
final_file_name = "SECURITY_LENDING_" + datetime.now().strftime('%Y%m%d_%H%M%S') + ".xml"
final_file_path = os.path.join(args.incoming_path, final_file_name)
else:
final_file_name = f"{result}{datetime.now().strftime('%Y%m%d%H%M%S')}.xml"
final_file_path = os.path.join(args.incoming_path, final_file_name)
with open(final_file_path, 'w') as file:
file.write(xml_string)
file.write('\n')
os.chmod(final_file_path,0o644)
def consume_message(queue_name):
try:
print("Start the JVM")
if not jpype.isJVMStarted():
jpype.startJVM(jpype.getDefaultJVMPath())
print("Set SSL properties")
ssl_options = {
"java.naming.security.protocol": "ssl",
"javax.net.ssl.keyStore": args.keystore,
"javax.net.ssl.keyStorePassword": args.keystore_pwd,
"javax.net.ssl.trustStore": args.truststore,
"javax.net.ssl.trustStorePassword": args.truststore_pwd
}
print("Create a connection factory")
connectionFactory = jpype.JClass("org.apache.activemq.ActiveMQConnectionFactory")()
connectionFactory.setBrokerURL(args.broker_url)
print("Set SSL options")
for key, value in ssl_options.items():
jpype.java.lang.System.setProperty(key, value)
print("Create a connection")
connection = connectionFactory.createConnection()
print("client_id")
print(args.client_id)
#connection.setClientID(args.client_id)
connection.setDefaultClientID('queue' + str(datetime.now()))
print("start")
connection.start()
print("Create session")
session = connection.createSession(False, jpype.JInt(1))
print("Lookup queue")
queue = session.createQueue(queue_name)
print("Create consumer")
consumer = session.createConsumer(queue)
#print(consumer)
print("Hello connecting to AMQ and started receiving messages.......................")
print("Start receiving messages and wait until end time")
end_time = datetime.now() + timedelta(minutes=args.end_time)
while datetime.now() <= end_time:
message = consumer.receiveNoWait()
if message:
#print(message)
parse_message(message.getText())
time.sleep(args.sleep_time)
print("Done waiting......")
except jpype.JException as e:
print(f"JPype Exception: {e}")
except Exception as e:
print(f"An Error Occurred: {e}")
return "An error occurred: " + str(e)
finally:
#Close connections and shutdown JVM
if connection:
connection.close()
if jpype.isJVMStarted():
jpype.shutdownJVM()
if name == "main":
if not jpype.isJVMStarted():
jpype.startJVM(jpype.getDefaultJVMPath(),"-Djava.class.path=" + "/mtproject/activemq-all-5.10.2.jar","-Djava.util.logging.config.file=" + "/mtproject/logging.properties")
print("start amq client...")
queue_name = args.queue_name
consume_message(queue_name)
print("end amq client...")
jpype.shutdownJVM()
Can anyone assist us.
Beta Was this translation helpful? Give feedback.
All reactions