Skip to content

Commit

Permalink
process message in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Sep 30, 2024
1 parent 97bdaf7 commit 1443aba
Showing 1 changed file with 64 additions and 31 deletions.
95 changes: 64 additions & 31 deletions wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@
import json
import time

from threading import Lock
from threading import Thread

from prometheus_client import start_http_server, Counter, Gauge

# de-register default-collectors
from prometheus_client import REGISTRY, PROCESS_COLLECTOR, PLATFORM_COLLECTOR

message_buffer = []
buffer_lock = Lock()

REGISTRY.unregister(PROCESS_COLLECTOR)
REGISTRY.unregister(PLATFORM_COLLECTOR)

Expand Down Expand Up @@ -160,39 +166,61 @@ def sub_mqtt_metrics(client, userdata, msg):
if str(msg.topic).startswith('$SYS'):
return

m = json.loads(msg.payload.decode('utf-8'))
if str(msg.topic).startswith('wis2box/stations'):
update_stations_gauge(m['station_list'])
elif str(msg.topic).startswith('wis2box/notifications'):
wsi = 'none'
if 'wigos_station_identifier' in m['properties']:
wsi = m['properties']['wigos_station_identifier']
# if label wsi is not in notify_wsi_total, set to 0 and sleep 5s
if wsi not in notify_wsi_total._metrics:
logger.info(f"new station: {wsi}, sleep 5s before incrementing")
notify_wsi_total.labels(wsi).inc(0)
with buffer_lock:
message_buffer.append((msg.topic, msg))
# Process buffered messages if buffer size reaches a threshold
if len(message_buffer) >= 10: # Adjust this threshold as needed
process_buffered_messages()


def process_buffered_messages():
global message_buffer

with buffer_lock:
messages_to_process = message_buffer
message_buffer = []

for topic, msg in messages_to_process:
m = json.loads(msg.payload.decode('utf-8'))
if str(topic).startswith('wis2box/stations'):
update_stations_gauge(m['station_list'])
elif str(topic).startswith('wis2box/notifications'):
wsi = 'none'
if 'wigos_station_identifier' in m['properties']:
wsi = m['properties']['wigos_station_identifier']
# if label wsi is not in notify_wsi_total, set to 0 and sleep 5s
if wsi not in notify_wsi_total._metrics:
logger.info(f"new station={wsi}, sleep 5s before incrementing")
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
time.sleep(5)
notify_wsi_total.labels(wsi).inc(1)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
time.sleep(5)
notify_wsi_total.labels(wsi).inc(1)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
notify_total.inc(1)
elif str(msg.topic).startswith('wis2box/failure'):
descr = m['description'] if 'description' in m else 'none'
wsi = 'none'
if 'wigos_station_identifier' in m:
wsi = m['wigos_station_identifier']
failure_descr_wsi_total.labels(descr, wsi).inc(1)
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(1)
station_wsi.labels(wsi).set(1)
failure_total.inc(1)
elif str(msg.topic).startswith('wis2box/storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
storage_public_total.inc(1)
notify_total.inc(1)
elif str(topic).startswith('wis2box/failure'):
descr = m['description'] if 'description' in m else 'none'
wsi = 'none'
if 'wigos_station_identifier' in m:
wsi = m['wigos_station_identifier']
failure_descr_wsi_total.labels(descr, wsi).inc(1)
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(1)
station_wsi.labels(wsi).set(1)
failure_total.inc(1)
elif str(topic).startswith('wis2box/storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
storage_public_total.inc(1)


# Call this function periodically, e.g., in a separate thread
def periodic_buffer_processing():
while True:
process_buffered_messages()
time.sleep(1) # Adjust sleep time as needed


def gather_mqtt_metrics():
Expand Down Expand Up @@ -226,7 +254,12 @@ def gather_mqtt_metrics():

def main():
start_http_server(8001)

init_stations_gauge()

# Start periodic buffer processing
Thread(target=periodic_buffer_processing, daemon=True).start()

gather_mqtt_metrics()


Expand Down

0 comments on commit 1443aba

Please sign in to comment.