-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_1.py
90 lines (72 loc) · 2.34 KB
/
example_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""This program demonstrates a synchronous approach
to accomplishing tasks. The worker can't delegate
any tasks and completes them all one after another.
"""
from time import sleep
from queue import Queue
from codetiming import Timer
def factorial(number: int):
def inner_factorial(number):
if number <= 1:
return 1
return number * inner_factorial(number - 1)
return inner_factorial(number)
def io_task(delay: float=0):
"""This is a little task that simulates an IO bound task
Args:
delay (int): The delay the task takes
"""
with Timer(text="IO Task elapsed time: {:.2f} seconds"):
sleep(delay)
return delay
def cpu_task(number: int):
"""This is a cpu bound task that takes some time to complete
Args:
number (int): The number to get calculate a factorial for
"""
with Timer(text="CPU Task elapsed time: {:.2f} seconds"):
return factorial(number)
def worker(name: str, task_queue: Queue):
"""This is our worker that pulls tasks from
the queue and performs them
Args:
name (str): The string name of the task
work_queue (Queue): The queue the tasks are pulled from
"""
# pull tasks from the queue until the queue is empty
print(f"Worker {name} starting to run tasks")
while not task_queue.empty():
fn, kwargs = task_queue.get()
result = fn(**kwargs)
print(f"Worker {name} completed task: {result=}\n")
print(f"Worker {name} finished as there are no more tasks\n")
def main():
"""
This is the main entry point for the program
"""
# Create the queue for tasks
task_queue = Queue()
# Put some tasks in the queue
list(map(task_queue.put, [
(io_task, {"delay": 4.0}),
(cpu_task, {"number": 40}),
(io_task, {"delay": 3.0}),
(io_task, {"delay": 2.0}),
(cpu_task, {"number": 50}),
(io_task, {"delay": 1.0}),
]))
# Create a single worker
workers = [
(worker, "One", task_queue)
]
# Run the workers
with Timer(text="Total elapsed time: {:.2f}"):
while workers:
for worker_ in workers:
worker_fn, name, queue = worker_
worker_fn(name, queue)
workers.remove(worker_)
if __name__ == "__main__":
print()
main()
print()