-
-
Notifications
You must be signed in to change notification settings - Fork 525
/
async.py
84 lines (63 loc) · 2.42 KB
/
async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""
Concurrent programming with an event loop is a relatively new concept in
Python 3.x. This module aims to highlight how it could be used in the
context of a scheduler which runs a fire-and-forget operation for starting
jobs. In the real world, it takes time for a scheduler to start a job (i.e.
hit an API endpoint, ask the operating system for resources) so we assume
that starting a job has some intrinsic delay.
"""
import asyncio
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid4
# Module-level constants
_DELAY_SMALL = 0.001
_DELAY_LARGE = 3600
@dataclass
class JobRecord:
"""Job record with useful metadata."""
guid: str
queued_at: datetime
started_at: datetime
def _is_valid_record(record):
"""Check whether job record is valid or not."""
return record.queued_at < record.started_at
def _current_time():
"""Return current time that is timezone-naive."""
return datetime.now()
async def start_job(job_id, delay):
"""Start job ID after a certain amount of delay."""
queue_time = _current_time()
await asyncio.sleep(delay)
start_time = _current_time()
return JobRecord(job_id, queue_time, start_time)
async def schedule_jobs():
"""Schedule jobs concurrently."""
# Start a job which also represents a coroutine
single_job = start_job(uuid4().hex, _DELAY_SMALL)
assert asyncio.iscoroutine(single_job)
# Grab a job record from the coroutine
single_record = await single_job
assert _is_valid_record(single_record)
# Task is a wrapped coroutine which also represents a future
single_task = asyncio.create_task(start_job(uuid4().hex, _DELAY_LARGE))
assert asyncio.isfuture(single_task)
# Futures are different from other coroutines since they can be cancelled
single_task.cancel()
task_failed = False
try:
await single_task
except asyncio.exceptions.CancelledError:
assert single_task.cancelled()
task_failed = True
assert task_failed is True
# Gather coroutines for batch start
batch_jobs = [start_job(uuid4().hex, _DELAY_SMALL) for _ in range(10)]
batch_records = await asyncio.gather(*batch_jobs)
# We get the same amount of records as we have coroutines
assert len(batch_records) == len(batch_jobs)
assert all(_is_valid_record(record) for record in batch_records)
def main():
asyncio.run(schedule_jobs())
if __name__ == "__main__":
main()