Skip to content

Commit

Permalink
refactor to async task
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Sep 30, 2024
1 parent 98939c2 commit 5ce9f2e
Showing 1 changed file with 7 additions and 33 deletions.
40 changes: 7 additions & 33 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ async def fan_out_msg(
fan_out_topic,
data
):
try:
await producer.send_and_wait(fan_out_topic, bytes(data))
finally:
await producer.stop()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, b(data))


async def main() -> None:

Expand Down Expand Up @@ -373,50 +372,25 @@ async def main() -> None:
#for fan_out_message in fan_out_message_list:

async for fan_out_message in fan_out_message_list:
try:
await producer.send_and_wait(fan_out_topic, b(fan_out_message))
finally:
await producer.stop()

'''
task = asyncio.create_task(

fan_out_msg(
producer,
fan_out_topic,
fan_out_message
)
)
'''

'''
data = fan_out_message
data_json = json.dumps(data)
logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}

task = asyncio.create_task(
knative_request(
in_process_requests_gauge,
client,
knative_serving_url,
headers,
body,
str(info),
)
)
'''
#tasks.add(task)
#task.add_done_callback(tasks.discard)
tasks.add(task)
task.add_done_callback(tasks.discard)

except ValueError as e:
logging.info("Error ", e)

finally:
await consumer.stop()
await producer.stop()


asyncio.run(main())

0 comments on commit 5ce9f2e

Please sign in to comment.