Skip to content

Commit

Permalink
Merge branch 'release/0.5.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed May 21, 2023
2 parents 9001a97 + fe736eb commit 66d685f
Show file tree
Hide file tree
Showing 45 changed files with 3,316 additions and 1,886 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ ignore =
F403,
; Found wrong metadata variable
WPS410,
; Found commented out cod
E800,

per-file-ignores =
; all tests
Expand Down
3 changes: 2 additions & 1 deletion docs/.vuepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { hopeTheme } from "vuepress-theme-hope";
export default defineUserConfig({
lang: "en-US",
title: "Taskiq",
description: "Distributed task queue with full async support",
description: "Async Distributed Task Manager",
head: [
[
"meta",
Expand All @@ -31,6 +31,7 @@ export default defineUserConfig({
backToTop: false,

plugins: {
readingTime: false,
copyCode: {
showInMobile: true,
},
Expand Down
11 changes: 10 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Home
title: Task manager for asyncio
home: true
heroImage: logo.svg
heroAlt: logo
Expand Down Expand Up @@ -28,6 +28,15 @@ features:
footer: MIT Licensed | Copyright© 2022-2023
---

## What is taskiq in a nutshell

Consider taskiq as an asyncio celery implementation. It uses almost the same patterns, but it's more modern
and flexible.

It's not a drop-in replacement for any other task manager. It has a different ecosystem of libraries and a different set of features.
Also, it doesn't work for synchronous projects. You won't be able to send tasks synchronously.


## Installation

You can install taskiq with pip or your favorite dependency manager:
Expand Down
2 changes: 1 addition & 1 deletion docs/available-components/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
order: 1
dir:
order: 3
order: 4
---

# Available components
Expand Down
2 changes: 1 addition & 1 deletion docs/available-components/brokers.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ In this section we'll list officially supported brokers.

This is a special broker for local development. It uses the same functions to execute tasks,
but all tasks are executed locally in the current thread.
By default it uses `InMemoryResultBackend` but this can be overriden.
By default it uses `InMemoryResultBackend` but this can be overridden.

## ZeroMQBroker

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/extending/result_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def is_result_ready(
Check if result exists.
This function must check whether result
is available in your resul backend
is available in your result backend
without fetching the result.
:param task_id: id of a task.
Expand Down
2 changes: 1 addition & 1 deletion docs/extending-taskiq/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
dir:
order: 2
order: 3
---

# Extending taskiq
Expand Down
13 changes: 13 additions & 0 deletions docs/framework_integrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
order: 1
dir:
order: 2
---

# Framework integrations

Taskiq is meant to be simple and adaptive. That's why we try to add different integrations to make development with taskiq and your favorite framework easy and fun!

Integrations with frameworks add two things:
1. Startup and Shutdown events;
1. Dependencies to use in your handler.
126 changes: 126 additions & 0 deletions docs/framework_integrations/taskiq-with-aiohttp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
---
order: 2
---

# Taskiq + AioHTTP

AioHTTP is a framework for building robust applications. We created several libraries to make the experience with AioHTTP even better.

# Dependency inecjeciton for AioHTTP

We created a library [aiohttp-deps](https://pypi.org/project/aiohttp-deps/) to add FastAPI-like dependency injection in AioHTTP.

To install it, simply run:

```python
pip install "aiohttp-deps"
```

After the installation, please add startup event to your application to initialize dependencies context.

```python
from aiohttp import web
import aiohttp_deps


app = web.Application()

# This startup event makes all the magic happen.
# It parses current handlers and create dependency graphs for them.
app.on_startup.append(aiohttp_deps.init)

web.run_app(app)
```

You can read more about dependency injection and available dependencies in the project's [README.md](https://github.com/taskiq-python/aiohttp-deps).



## Adding taskiq integration

We highly recommend using aiohttp with aiohttp-deps because it allows us to reuse the same dependencies for your handlers and tasks. First of all, you should install the [taskiq-aiohttp](https://pypi.org/project/taskiq-aiohttp/) library.

```python
pip install "taskiq-aiohttp"
```

After the installation is complete, add an initialization function call to your broker's main file so it becomes something like this:

```python
import taskiq_aiohttp

broker = MyBroker()

# The second argument is a path to web.Application variable.
# Also you can provide here a factory function that takes no
# arguments and returns an application. This function can be async.
taskiq_aiohttp.init(broker, "my_project.main:app")
```

From this point, you'll be able to reuse the same dependencies as with `aiohttp-deps`.
Let's take a look at this function:

```python
from aiohttp import web
from taskiq import TaskiqDepends
from my_project.tkq import broker

@broker.task
async def my_task(app: web.Application = TaskiqDepends()):
...

```

In this example, we depend on the current application. We can use its state in a current task or any other dependency. We can take db_pool from your application's state, which is the same pool, as the one you've created on AiohTTP's startup.
But this application is only a mock of your application. It has correct types and all your variables that you filled on startup, but it doesn't handle any request.
This integration adds two main dependencies:
* web.Application - current application.
* web.Request - mocked request. This request only exists to be able to use the same dependencies.

You can find more detailed examples in the [examples repo](https://github.com/taskiq-python/examples).

## Testing

Writing tests for AioHTTP with taskiq is as easy as writing tests for the aiohttp application. The only difference is that, if you want to use InMemoryBroker, then you need to add context for dependency injection. It's easier to call `populate_context` when creating a `test_client` fixture.

```python
import taskiq_aiohttp

@pytest.fixture
async def test_client(
app: web.Application,
) -> AsyncGenerator[TestClient, None]:
"""
Create a test client.
This function creates a TestServer
and a test client for the application.
Also this fixture populates context
with needed variables.
:param app: current application.
:yield: ready to use client.
"""
loop = asyncio.get_running_loop()
server = TestServer(app)
client = TestClient(server, loop=loop)

await client.start_server()

# This is important part.
# Since InMemoryBroker doesn't
# run as a worker process, we have to populate
# broker's context by hand.
taskiq_aiohttp.populate_context(
broker=broker,
server=server.runner.server,
app=app,
loop=loop,
)

yield client

broker.custom_dependency_context = {}
await client.close()
```
118 changes: 118 additions & 0 deletions docs/framework_integrations/taskiq-with-fastapi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
---
order: 1
---

# Taskiq + FastAPI

FastAPI is a highly popular async web framework in Python. It has gained its popularity because of two things:
1. It's easy to use;
2. Cool dependency injection.

In taskiq, we try to make our libraries easy to use, and We have a dependency injection too. So we have created the library "[taskiq-fastapi](https://github.com/taskiq-python/taskiq-fastapi)" to make integration with FastAPI as smooth as possible.

Let's see what we got here. In this library, we provide users with only one public function called `init`. It takes a broker and a string path (as in uvicorn) to the fastapi application (or factory function). People should call this function in their main broker file.

```python
from taskiq import ZeroMQBroker
import taskiq_fastapi

broker = ZeroMQBroker()

taskiq_fastapi.init(broker, "my_package.application:app")

```

There are two rules to make everything work as you expect:
1. Add `TaskiqDepends` as a default value for every parameter with `Request` or `HTTPConnection` types in base dependencies.
2. Use only `TaskiqDepends` in tasks.


::: tip Cool and important note!

The `Request` or `HTTPConnection` that you'll get injected in your task is not the same request or connection you have had in your handler when you were sending the task!

:::

Many fastapi dependency functions depend on `fastapi.Request`. We provide a mocked request to such dependencies. But taskiq cannot resolve dependencies until you explicitly specify that this parameter must be injected.

As an example. If you previously had a dependency like this:

```python
from fastapi import Request
from typing import Any

def get_redis_pool(request: Request) -> Any:
return request.app.state.redis_pool

```

To make it resolvable in taskiq, people should add `TaskiqDepends` as a default value for each parameter. Like this:
```python
from fastapi import Request
from taskiq import TaskiqDepends


async def get_redis_pool(request: Request = TaskiqDepends()):
return request.app.state.redis_pool

```


Also you want to call startup of your brokers somewhere.

```python
from fastapi import FastAPI
from your_project.taskiq import broker

app = FastAPI()


@app.on_event("startup")
async def app_startup():
if not broker.is_worker_process:
await broker.startup()


@app.on_event("shutdown")
async def app_shutdown():
if not broker.is_worker_process:
await broker.shutdown()

```

And that's it. Now you can use your taskiq tasks with functions and classes that depend on FastAPI dependenices. You can find bigger examples in the [examples repo](https://github.com/taskiq-python/examples/).


## Testing

Testing is no different from general testing advice from articles about [testing](../guide/testing-taskiq.md). But if you use `InMemoryBroker` in your tests, you need to provide it with a custom dependency context because it doesn't run as a worker process.

Let's imagine that you have a fixture of your application. It returns a new fastapi application to use in tests.
```python

@pytest.fixture
def fastapi_app() -> FastAPI:
return get_app()

```

Right after this fixture, we define another one.

```python
import taskiq_fastapi


@pytest.fixture(autouse=True)
def init_taskiq_deps(fastapi_app: FastAPI):
# This is important part. Here we add dependency context,
# this thing helps in resolving dependencies for tasks
# for inmemory broker.
taskiq_fastapi.populate_dependency_context(broker, fastapi_app)

yield

broker.custom_dependency_context = {}

```

This fixture has autouse flag, which means it would run on every test automatically.
4 changes: 2 additions & 2 deletions docs/guide/architecture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ test_project
You can specify all tasks modules to import manually.

```bash
taskiq test_project.broker:broker test_projec.submodule.tasks test_projec.utils.tasks
taskiq worker test_project.broker:broker test_project.submodule.tasks test_project.utils.tasks
```

Or you can let taskiq find all python modules named tasks in current directory recursively.

```bash
taskiq test_project.broker:broker -fsd
taskiq worker test_project.broker:broker -fsd
```

If you have uvloop installed, taskiq will automatically install new policies to event loop.
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Of course we can implement loop like this:
await asyncio.sleep(timedelta(minutes=5).total_seconds)
```

But if you have many schedules it may be a little painful to implement. So let me introuce you the `TaskiqScheduler`.
But if you have many schedules it may be a little painful to implement. So let me introduce you the `TaskiqScheduler`.
Let's add scheduler to our module.

@[code python](../examples/schedule/intro.py)
Expand Down
Loading

0 comments on commit 66d685f

Please sign in to comment.