Skip to content

Commit 0e92f91

Browse files
committed
Merge branch 'release/0.9.1'
2 parents 0c526fa + b9246af commit 0e92f91

25 files changed

+713
-192
lines changed

docs/examples/dynamics/broker.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
3+
from taskiq_redis import ListQueueBroker
4+
5+
6+
async def main() -> None:
7+
# Here we define a broker.
8+
dyn_broker = ListQueueBroker("redis://localhost")
9+
await dyn_broker.startup()
10+
11+
# Now we register lambda as a task.
12+
dyn_task = dyn_broker.register_task(
13+
lambda x: print("A", x),
14+
task_name="dyn_task",
15+
)
16+
17+
# now we can send it.
18+
await dyn_task.kiq(x=1)
19+
20+
await dyn_broker.shutdown()
21+
22+
23+
if __name__ == "__main__":
24+
asyncio.run(main())

docs/examples/dynamics/receiver.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
3+
from taskiq_redis import ListQueueBroker
4+
5+
from taskiq.api import run_receiver_task
6+
7+
8+
async def main() -> None:
9+
# Here we define a broker.
10+
dyn_broker = ListQueueBroker("redis://localhost")
11+
await dyn_broker.startup()
12+
worker_task = asyncio.create_task(run_receiver_task(dyn_broker))
13+
14+
# Now we register lambda as a task.
15+
dyn_task = dyn_broker.register_task(
16+
lambda x: print("A", x),
17+
task_name="dyn_task",
18+
)
19+
20+
# now we can send it.
21+
await dyn_task.kiq(x=1)
22+
23+
await asyncio.sleep(2)
24+
25+
worker_task.cancel()
26+
try:
27+
await worker_task
28+
except asyncio.CancelledError:
29+
print("Worker successfully exited.")
30+
31+
await dyn_broker.shutdown()
32+
33+
34+
if __name__ == "__main__":
35+
asyncio.run(main())

docs/examples/dynamics/scheduler.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import asyncio
2+
import datetime
3+
4+
from taskiq_redis import ListQueueBroker
5+
6+
from taskiq import TaskiqScheduler
7+
from taskiq.api import run_receiver_task, run_scheduler_task
8+
from taskiq.schedule_sources import LabelScheduleSource
9+
10+
11+
async def main() -> None:
12+
# Here we define a broker.
13+
dyn_broker = ListQueueBroker("redis://localhost")
14+
dyn_scheduler = TaskiqScheduler(dyn_broker, [LabelScheduleSource(dyn_broker)])
15+
16+
await dyn_broker.startup()
17+
18+
# Now we register lambda as a task.
19+
dyn_task = dyn_broker.register_task(
20+
lambda x: print("A", x),
21+
task_name="dyn_task",
22+
# We add a schedule when to run task.
23+
schedule=[
24+
{
25+
# Here we also can specify cron instead of time.
26+
"time": datetime.datetime.utcnow() + datetime.timedelta(seconds=2),
27+
"args": [22],
28+
},
29+
],
30+
)
31+
32+
# We create scheduler after the task declaration,
33+
# so we don't have to wait a minute before it gets to the task.
34+
# However, defining a scheduler before the task declaration is also possible.
35+
# but we have to wait till it gets to task execution for the second time.
36+
worker_task = asyncio.create_task(run_receiver_task(dyn_broker))
37+
scheduler_task = asyncio.create_task(run_scheduler_task(dyn_scheduler))
38+
39+
# We still able to send the task.
40+
await dyn_task.kiq(x=1)
41+
42+
await asyncio.sleep(10)
43+
44+
worker_task.cancel()
45+
try:
46+
await worker_task
47+
except asyncio.CancelledError:
48+
print("Worker successfully exited.")
49+
50+
scheduler_task.cancel()
51+
try:
52+
await scheduler_task
53+
except asyncio.CancelledError:
54+
print("Scheduler successfully exited.")
55+
56+
await dyn_broker.shutdown()
57+
58+
59+
if __name__ == "__main__":
60+
asyncio.run(main())

docs/guide/dynamic-brokers.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
---
2+
title: Dynamic Environments
3+
order: 9
4+
---
5+
6+
This article is for all the people who want to dynamically create brokers, register tasks, and run them inside their code. Or maybe implement more complex logic.
7+
8+
The Taskiq allows you to create broker instances in all parts of your application. You
9+
can register tasks dynamically and run them. But when tasks are created dynamically,
10+
the `taskiq worker` command won't be able to find them.
11+
12+
To define tasks and assign them to broker, use `register_task` method.
13+
14+
@[code python](../examples/dynamics/broker.py)
15+
16+
The problem with this code is that if we run the `taskiq worker` command, it won't be able
17+
to execute our tasks. Because lambdas are created within the `main` function and they
18+
are not visible outside of it.
19+
20+
To surpass this issue, we need to create a dynamic worker task within the current loop.
21+
Or, we can create a code that can listen to our brokers and have all information about dynamic
22+
functions.
23+
24+
Here I won't be showing how to create your own CLI command, but I'll show you how to create
25+
a dynamic worker within the current loop.
26+
27+
@[code python](../examples/dynamics/receiver.py)
28+
29+
Here we define a dynamic lambda task with some name, assign it to broker, as we did before.
30+
The only difference is that we start our receiver coroutine, that will listen to the new
31+
messages and execute them. Receiver task will be executed in the current loop, and when main function
32+
exits, the receriver task is canceled. But for illustration purpose, I canceled it manually.
33+
34+
Sometimes you need to run not only receiver, but a scheduler as well. You can do it, by using
35+
another function that also can work within the current loop.
36+
37+
@[code python](../examples/dynamics/scheduler.py)

docs/guide/testing-taskiq.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
order: 9
2+
order: 10
33
---
44

55
# Testing with taskiq

0 commit comments

Comments
 (0)