From 7efcdbf877ea2c0534669e79b66b8fb6ebfb7bb4 Mon Sep 17 00:00:00 2001 From: dspeck1 Date: Tue, 1 Oct 2024 13:33:25 -0500 Subject: [PATCH] convert producer message to json --- src/main.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main.py b/src/main.py index 4169d20..84a0d6a 100644 --- a/src/main.py +++ b/src/main.py @@ -101,7 +101,7 @@ async def fan_out_msg( data ): logging.info(f"sending msg {data}") - await producer.send_and_wait(fan_out_topic, data) + await producer.send_and_wait(fan_out_topic, json.dumps(data)) async def main() -> None: @@ -227,6 +227,9 @@ async def main() -> None: logging.info("Message does not have an instrument. Assuming " "it's not an observation.") continue + + ''' + # Temporary disable so we can see older messages for testing. # efdStamp is visit publication, in seconds since 1970-01-01 UTC if next_visit_message_initial["message"]["private_efdStamp"]: @@ -240,7 +243,7 @@ async def main() -> None: continue else: logging.warning("Message does not have private_efdStamp, can't determine age.") - + ''' next_visit_message_updated = NextVisitModel( salIndex=next_visit_message_initial["message"]["salIndex"], scriptSalIndex=next_visit_message_initial["message"][ @@ -369,8 +372,6 @@ async def main() -> None: await producer.start() logging.info ("started kafka producer") - #for fan_out_message in fan_out_message_list: - for fan_out_message in fan_out_message_list: task = asyncio.create_task(