diff --git a/src/main.py b/src/main.py index 427e51e..bbd3176 100644 --- a/src/main.py +++ b/src/main.py @@ -100,10 +100,9 @@ async def fan_out_msg( fan_out_topic, data ): - try: - await producer.send_and_wait(fan_out_topic, bytes(data)) - finally: - await producer.stop() + logging.info(f"sending msg {data}") + await producer.send_and_wait(fan_out_topic, b(data)) + async def main() -> None: @@ -373,50 +372,25 @@ async def main() -> None: #for fan_out_message in fan_out_message_list: async for fan_out_message in fan_out_message_list: - try: - await producer.send_and_wait(fan_out_topic, b(fan_out_message)) - finally: - await producer.stop() - ''' task = asyncio.create_task( + fan_out_msg( producer, fan_out_topic, fan_out_message ) ) - ''' - - ''' - data = fan_out_message - data_json = json.dumps(data) - logging.info(f"data after json dump {data_json}") - event = CloudEvent(attributes, data_json) - headers, body = to_structured(event) - info = { - key: data[key] for key in ["instrument", "groupId", "detector"] - } - task = asyncio.create_task( - knative_request( - in_process_requests_gauge, - client, - knative_serving_url, - headers, - body, - str(info), - ) - ) - ''' - #tasks.add(task) - #task.add_done_callback(tasks.discard) + tasks.add(task) + task.add_done_callback(tasks.discard) except ValueError as e: logging.info("Error ", e) finally: await consumer.stop() + await producer.stop() asyncio.run(main())