From ec8fdb8dd6750e7fd44aaf11345d1322aa2abd7b Mon Sep 17 00:00:00 2001 From: Thomas Rausch Date: Wed, 27 Apr 2022 12:29:35 +0200 Subject: [PATCH] add workqueue example --- examples/workqueue-localstack/README.md | 58 +++++++++++++++++++++++ examples/workqueue-localstack/api.py | 17 +++++++ examples/workqueue-localstack/producer.py | 36 ++++++++++++++ examples/workqueue-localstack/worker.py | 39 +++++++++++++++ 4 files changed, 150 insertions(+) create mode 100644 examples/workqueue-localstack/README.md create mode 100644 examples/workqueue-localstack/api.py create mode 100644 examples/workqueue-localstack/producer.py create mode 100644 examples/workqueue-localstack/worker.py diff --git a/examples/workqueue-localstack/README.md b/examples/workqueue-localstack/README.md new file mode 100644 index 0000000..d5eb917 --- /dev/null +++ b/examples/workqueue-localstack/README.md @@ -0,0 +1,58 @@ +# Producer/Worker pattern + +This example implements a producer/worker application, where work items produced by `producer.py` are balanced between `worker.py` instances using a +[work queue](https://www.oreilly.com/library/view/designing-distributed-systems/9781491983638/ch10.html). +After completing their work, workers then publish the result into a result topic. + +## Structure + +* `api.py`: shared code with event classes +* `producer.py`: puts items in the work queue and subscribes to the result topic +* `worker.py`: gets items from the work queue and publishes results into the result topic + +## Setup + +The default provider in this example is [LocalStack](https://github.com/localstack/localstack) to emulate AWS. +To start the application, first run + + pip install localstack pymq[aws] + +and then start up LocalStack using + + localstack start -d + +## Start the producer + + python3 producer.py + +You'll start seeing output that work items are being produced in regular intervals: + +``` +>> WorkItem(a=84, b=11) (0 items queued) +>> WorkItem(a=18, b=68) (1 items queued) +>> WorkItem(a=74, b=24) (2 items queued) +... +``` + +## Start a worker + +In new terminal windows, start as many workers as you want + + python3 worker.py + +You should see workers starting to pick work items from the queue + +``` +worker 53a76083 listening on work queue... +worker 53a76083 doing hard calculations... +worker 53a76083 publishing result +... +``` + +In the producer terminal, you should see work results being received: + +``` +<< WorkResult(worker='53a76083', result=2964): 2964 +``` + +Start additional workers to empty the work queue faster. diff --git a/examples/workqueue-localstack/api.py b/examples/workqueue-localstack/api.py new file mode 100644 index 0000000..ab8b553 --- /dev/null +++ b/examples/workqueue-localstack/api.py @@ -0,0 +1,17 @@ +import dataclasses + + +@dataclasses.dataclass +class WorkItem: + a: int + b: int + + +@dataclasses.dataclass +class WorkResult: + worker: str + result: int + + +class ShutdownEvent: + pass diff --git a/examples/workqueue-localstack/producer.py b/examples/workqueue-localstack/producer.py new file mode 100644 index 0000000..52017e0 --- /dev/null +++ b/examples/workqueue-localstack/producer.py @@ -0,0 +1,36 @@ +import random +import time + +from api import WorkItem, WorkResult + +import pymq +from pymq import Queue +from pymq.provider.aws import LocalstackConfig + + +@pymq.subscriber +def print_result(event: WorkResult): + print(f"<< {event}: {event.result}") + + +def main(): + pymq.init(LocalstackConfig()) + + queue: Queue[WorkItem] = pymq.queue("work-items") + try: + while True: + item = WorkItem(random.randint(1, 100), random.randint(1, 100)) + print(f">> {item} ({queue.qsize()} items queued)") + queue.put(item) + + time.sleep(1.2) + + except KeyboardInterrupt: + pass + finally: + queue.free() + pymq.shutdown() + + +if __name__ == '__main__': + main() diff --git a/examples/workqueue-localstack/worker.py b/examples/workqueue-localstack/worker.py new file mode 100644 index 0000000..8df8793 --- /dev/null +++ b/examples/workqueue-localstack/worker.py @@ -0,0 +1,39 @@ +import time +import uuid + +from api import WorkItem, WorkResult + +import pymq +from pymq import Queue +from pymq.provider.aws import LocalstackConfig + +worker = str(uuid.uuid4())[:8] + + +def do_work(item: WorkItem): + print(f"worker {worker} doing hard calculations...") + time.sleep(2) + result = WorkResult(worker, item.a * item.b) + print(f"worker {worker} publishing result") + pymq.publish(result) + + +def main(): + pymq.init(LocalstackConfig()) + + try: + queue: Queue[WorkItem] = pymq.queue("work-items") + print(f"worker {worker} listening on work queue...") + while True: + event = queue.get() + do_work(event) + + except KeyboardInterrupt: + pass + finally: + print(f"worker {worker} exiting") + pymq.shutdown() + + +if __name__ == '__main__': + main()