Skip to content

Commit

Permalink
update amqp connector
Browse files Browse the repository at this point in the history
  • Loading branch information
nguu0123 committed Oct 9, 2024
1 parent 4e93a6b commit 54a069f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/qoa4ml/config/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class AMQPConnectorConfig(BaseModel):
exchange_name: str
exchange_type: str
out_routing_key: str
health_check_disable: bool


class MQTTConnectorConfig(BaseModel):
Expand Down
44 changes: 24 additions & 20 deletions src/qoa4ml/connector/amqp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def __init__(
self,
config: AMQPConnectorConfig,
log: bool = False,
health_check_disable: bool = False,
):
"""
AmqpConnector handles the connection to an AMQP server for sending messages.
Expand Down Expand Up @@ -62,45 +61,44 @@ def __init__(
self.exchange_type = config.exchange_type
self.out_routing_key = config.out_routing_key
self.log_flag = log
self.health_check_disable = self.config.health_check_disable

# Connect to RabbitMQ host
if "amqps://" in self.config.end_point:
parameters = pika.URLParameters(self.config.end_point)
if health_check_disable:
if self.health_check_disable:
parameters.heartbeat = 0
else:
if health_check_disable:
if self.health_check_disable:
parameters = pika.ConnectionParameters(
host=self.config.end_point, heartbeat=0
)
else:
parameters = pika.ConnectionParameters(host=self.config.end_point)
self.out_connection = pika.BlockingConnection(parameters)
# if "amqps://" in config.end_point:
# self.out_connection = pika.BlockingConnection(
# pika.URLParameters(config.end_point)
# )
# else:
# self.out_connection = pika.BlockingConnection(
# pika.ConnectionParameters(host=config.end_point)
# )

# Create a channel

self.out_channel = self.out_connection.channel()

# Initialize an Exchange
self.out_channel.exchange_declare(
exchange=self.exchange_name, exchange_type=self.exchange_type
)

# def create_connection(self):
# if "amqps://" in self.config.end_point:
# parameters = pika.URLParameters(self.config.end_point)
# parameters.heartbeat = 0
def create_connection(self):
if "amqps://" in self.config.end_point:
parameters = pika.URLParameters(self.config.end_point)
if self.health_check_disable:
parameters.heartbeat = 0
else:
if self.health_check_disable:
parameters = pika.ConnectionParameters(
host=self.config.end_point, heartbeat=0
)
else:
parameters = pika.ConnectionParameters(host=self.config.end_point)
self.out_connection = pika.BlockingConnection(parameters)

# else:
# parameters = pika.ConnectionParameters(host=self.config.end_point, heartbeat=0)
# self.out_connection = pika.BlockingConnection(parameters)
self.out_channel = self.out_connection.channel()

def send_report(
self,
Expand Down Expand Up @@ -152,3 +150,9 @@ def get(self) -> AMQPConnectorConfig:
The AMQP connector configuration.
"""
return self.config

def check_connection(self) -> bool:
return self.out_channel.is_open

def reconnect(self):
self.create_connection()
4 changes: 4 additions & 0 deletions src/qoa4ml/connector/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ class BaseConnector(ABC):
@abstractmethod
def send_report(self, body_message: str):
pass

@abstractmethod
def check_connection(self) -> bool:
pass
3 changes: 3 additions & 0 deletions src/qoa4ml/connector/debug_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ def send_report(self, body_message: str) -> None:
"""
if not self.silence:
debug(json.loads(body_message))

def check_connection(self) -> bool:
return True
9 changes: 4 additions & 5 deletions src/qoa4ml/qoa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def __init__(
config_path: Optional[str] = None,
registration_url: Optional[str] = None,
logging_level: int = 2,
health_check_disable: bool = False,
):
"""
Initialize the QoA Client with configuration settings and a report class.
Expand All @@ -85,7 +84,6 @@ def __init__(
- The method will raise an exception if the necessary configuration details are not found.
"""
set_logger_level(logging_level)
self.health_check_disable = health_check_disable
if config_dict is not None:
self.configuration = ClientConfig.model_validate(config_dict)

Expand Down Expand Up @@ -261,9 +259,7 @@ def init_connector(self, configuration: ConnectorConfig) -> BaseConnector:
if configuration.connector_class == ServiceAPIEnum.amqp and isinstance(
configuration.config, AMQPConnectorConfig
):
return AmqpConnector(
configuration.config, health_check_disable=self.health_check_disable
)
return AmqpConnector(configuration.config)
elif configuration.connector_class == ServiceAPIEnum.debug and isinstance(
configuration.config, DebugConnectorConfig
):
Expand Down Expand Up @@ -408,6 +404,9 @@ def asyn_report(self, body_mess: str, connectors: Optional[list] = None) -> None
if self.default_connector:
chosen_connector = self.connector_list[self.default_connector]
if isinstance(chosen_connector, AmqpConnector):
if not chosen_connector.check_connection():
chosen_connector.reconnect()

chosen_connector.send_report(body_mess, corr_id=str(uuid.uuid4()))
else:
chosen_connector.send_report(body_mess)
Expand Down

0 comments on commit 54a069f

Please sign in to comment.