-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_7.py
123 lines (101 loc) · 4.07 KB
/
example_7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""This program demonstrates a asynchronous approach
to accomplishing tasks. In this version there is a
second worker created, and the workers can delegate
back to the control loop to cooperate with each other.
In this version the tasks are asynchronous, so the
workers run concurrently.
The tasks for this demo are getting the contents
of webpages and reading files
"""
import asyncio
import aiohttp
import aiofiles
from codetiming import Timer
async def factorial(number: int):
async def inner_factorial(number):
if number <= 1:
return 1
if number % 10 == 0:
print("Context switch to event loop")
await asyncio.sleep(0)
return number * await inner_factorial(number - 1)
return await inner_factorial(number)
async def io_task_get_web_pages(url: str=""):
"""This is a little task that takes some time to complete
Args:
url (str): The url to get via http
"""
with Timer(text="Task elapsed time: {:.2f} seconds"):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
text = await response.text()
return url, text
async def io_task_read_file(filename: str=""):
"""This is a little task that takes some time to complete
Args:
filename (str): The file to read
"""
with Timer(text="Task elapsed time: {:.2f} seconds"):
async with aiofiles.open(filename, "r") as fh:
line_counter = 0
async for line in fh:
line_counter += 1
return filename, line_counter
async def cpu_task(number: int):
"""This is a cpu bound task that takes some time to complete
Args:
number (int): The number to get calculate a factorial for
"""
with Timer(text="CPU Task elapsed time: {:.2f} seconds"):
result = await factorial(number)
return result
async def worker(name: str, task_queue: asyncio.Queue):
"""This is our worker that pulls tasks from
the queue and performs them
Args:
name (str): The string name of the task
task_queue (asyncio.Queue): The queue the tasks are pulled from
"""
# pull tasks from the queue until the queue is empty
print(f"Worker {name} starting to run tasks")
while not task_queue.empty():
fn, kwargs = await task_queue.get()
if fn.__name__ == "io_task_get_web_pages":
url, text = await fn(**kwargs)
print(f"Worker {name} completed task: {url=}, text = {text.strip()[:50]}\n")
elif fn.__name__ == "io_task_read_file":
filename, line_counter = await fn(**kwargs)
print(f"Worker {name} completed task: {filename=}, {line_counter=}")
elif fn.__name__ == "cpu_task":
factorial = await fn(**kwargs)
print(f"Worker {name} completed task: {factorial=}")
print(f"Worker {name} finished as there are no more tasks\n")
async def main():
"""
This is the main entry point for the program
"""
# Create the queue for tasks
task_queue = asyncio.Queue()
# Put some tasks in the queue
list(map(task_queue.put_nowait, [
(io_task_get_web_pages, {"url": "https://weather.com/"}),
(io_task_read_file, {"filename": "textfile1.txt"}),
(cpu_task, {"number": 40}),
(io_task_get_web_pages, {"url": "http://yahoo.com"}),
(io_task_get_web_pages, {"url": "http://linkedin.com"}),
(io_task_get_web_pages, {"url": "https://www.dropbox.com"}),
(io_task_get_web_pages, {"url": "http://microsoft.com"}),
(cpu_task, {"number": 50}),
(io_task_get_web_pages, {"url": "http://facebook.com"}),
(io_task_read_file, {"filename": "textfile2.txt"}),
(io_task_get_web_pages, {"url": "https://www.target.com/"}),
]))
with Timer(text="Total elapsed time: {:.2f}"):
await asyncio.gather(
asyncio.create_task(worker("One", task_queue)),
asyncio.create_task(worker("Two", task_queue)),
)
if __name__ == "__main__":
print()
asyncio.run(main())
print()