Skip to content

Commit

Permalink
feat(sensor): reduce log count from messaging agent
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Feb 6, 2023
1 parent 641426a commit 88ce0d9
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions sensor/src/procedures/messaging_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def init(config: custom_types.Config) -> None:
if MessagingAgent.archiving_loop_process is None:
new_process = multiprocessing.Process(
target=MessagingAgent.archiving_loop,
args=(config,),
daemon=True,
)
new_process.start()
Expand All @@ -45,7 +46,10 @@ def init(config: custom_types.Config) -> None:
if config.active_components.mqtt_communication:
new_process = multiprocessing.Process(
target=MessagingAgent.communication_loop,
args=(MessagingAgent.config_request_queue,),
args=(
config,
MessagingAgent.config_request_queue,
),
daemon=True,
)
new_process.start()
Expand All @@ -66,7 +70,7 @@ def deinit() -> None:
MessagingAgent.communication_loop_process = None

@staticmethod
def archiving_loop() -> None:
def archiving_loop(config: custom_types.Config) -> None:
"""archive all message in the active queue that have the
status `delivered` or `sending-skipped`; this function is
blocking and should be called in a thread or subprocess"""
Expand Down Expand Up @@ -102,7 +106,10 @@ def graceful_teardown(*args: Any) -> None:
# DETERMINE MESSAGES TO BE ARCHIVED
records_to_be_archived = active_mqtt_queue.get_rows_by_status("done")

logger.info(f"found {len(records_to_be_archived)} record(s) to be archived")
if config.verbose_logging:
logger.info(
f"found {len(records_to_be_archived)} record(s) to be archived"
)

# SPLIT RECORDS BY DATE
messages_to_be_archived = {}
Expand Down Expand Up @@ -133,6 +140,7 @@ def graceful_teardown(*args: Any) -> None:

@staticmethod
def communication_loop(
config: custom_types.Config,
config_request_queue: queue.Queue[custom_types.MQTTConfigurationRequest],
) -> None:
"""takes messages from the queue file and processes them;
Expand Down Expand Up @@ -256,7 +264,7 @@ def _publish_record(record: custom_types.SQLMQTTRecord) -> None:
MAX_SEND_COUNT - (len(sent_records) - delivered_record_count), 0
)
if OPEN_SENDING_SLOTS == 0:
logger.debug(
logger.warning(
f"sending queue is full ({MAX_SEND_COUNT} "
+ "items not processed by broker yet)"
)
Expand All @@ -278,13 +286,14 @@ def _publish_record(record: custom_types.SQLMQTTRecord) -> None:
if any(
[sent_record_count, resent_record_count, delivered_record_count]
):
logger.info(
f"{sent_record_count}/{resent_record_count}/{delivered_record_count} "
+ "messages have been sent/resent/delivered"
)
logger.debug(
f"sending queue has {OPEN_SENDING_SLOTS - len(records_to_be_sent)} more slot(s)"
)
if config.verbose_logging:
logger.info(
f"{sent_record_count}/{resent_record_count}/{delivered_record_count} "
+ "messages have been sent/resent/delivered"
)
logger.debug(
f"sending queue has {OPEN_SENDING_SLOTS - len(records_to_be_sent)} more slot(s)"
)

time.sleep(3)

Expand Down

0 comments on commit 88ce0d9

Please sign in to comment.