Skip to content

Commit

Permalink
add workqueue example
Browse files Browse the repository at this point in the history
  • Loading branch information
thrau committed Apr 27, 2022
1 parent b32f837 commit ec8fdb8
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
58 changes: 58 additions & 0 deletions examples/workqueue-localstack/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Producer/Worker pattern

This example implements a producer/worker application, where work items produced by `producer.py` are balanced between `worker.py` instances using a
[work queue](https://www.oreilly.com/library/view/designing-distributed-systems/9781491983638/ch10.html).
After completing their work, workers then publish the result into a result topic.

## Structure

* `api.py`: shared code with event classes
* `producer.py`: puts items in the work queue and subscribes to the result topic
* `worker.py`: gets items from the work queue and publishes results into the result topic

## Setup

The default provider in this example is [LocalStack](https://github.com/localstack/localstack) to emulate AWS.
To start the application, first run

pip install localstack pymq[aws]

and then start up LocalStack using

localstack start -d

## Start the producer

python3 producer.py

You'll start seeing output that work items are being produced in regular intervals:

```
>> WorkItem(a=84, b=11) (0 items queued)
>> WorkItem(a=18, b=68) (1 items queued)
>> WorkItem(a=74, b=24) (2 items queued)
...
```

## Start a worker

In new terminal windows, start as many workers as you want

python3 worker.py

You should see workers starting to pick work items from the queue

```
worker 53a76083 listening on work queue...
worker 53a76083 doing hard calculations...
worker 53a76083 publishing result
...
```

In the producer terminal, you should see work results being received:

```
<< WorkResult(worker='53a76083', result=2964): 2964
```

Start additional workers to empty the work queue faster.
17 changes: 17 additions & 0 deletions examples/workqueue-localstack/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dataclasses


@dataclasses.dataclass
class WorkItem:
a: int
b: int


@dataclasses.dataclass
class WorkResult:
worker: str
result: int


class ShutdownEvent:
pass
36 changes: 36 additions & 0 deletions examples/workqueue-localstack/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import random
import time

from api import WorkItem, WorkResult

import pymq
from pymq import Queue
from pymq.provider.aws import LocalstackConfig


@pymq.subscriber
def print_result(event: WorkResult):
print(f"<< {event}: {event.result}")


def main():
pymq.init(LocalstackConfig())

queue: Queue[WorkItem] = pymq.queue("work-items")
try:
while True:
item = WorkItem(random.randint(1, 100), random.randint(1, 100))
print(f">> {item} ({queue.qsize()} items queued)")
queue.put(item)

time.sleep(1.2)

except KeyboardInterrupt:
pass
finally:
queue.free()
pymq.shutdown()


if __name__ == '__main__':
main()
39 changes: 39 additions & 0 deletions examples/workqueue-localstack/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import time
import uuid

from api import WorkItem, WorkResult

import pymq
from pymq import Queue
from pymq.provider.aws import LocalstackConfig

worker = str(uuid.uuid4())[:8]


def do_work(item: WorkItem):
print(f"worker {worker} doing hard calculations...")
time.sleep(2)
result = WorkResult(worker, item.a * item.b)
print(f"worker {worker} publishing result")
pymq.publish(result)


def main():
pymq.init(LocalstackConfig())

try:
queue: Queue[WorkItem] = pymq.queue("work-items")
print(f"worker {worker} listening on work queue...")
while True:
event = queue.get()
do_work(event)

except KeyboardInterrupt:
pass
finally:
print(f"worker {worker} exiting")
pymq.shutdown()


if __name__ == '__main__':
main()

0 comments on commit ec8fdb8

Please sign in to comment.