Skip to content

Commit

Permalink
fix: taskiq 0.11 compat
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 21, 2024
1 parent ec29277 commit 0b35454
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: python -m build

- name: Publish
uses: pypa/gh-action-pypi-publish@v1.8.14
uses: pypa/gh-action-pypi-publish@v1.9.0
with:
password: ${{ secrets.PYPI_TOKEN }}

Expand Down
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ classifiers = [
dynamic = ["version"]

dependencies = [
"taskiq>=0.10.0,<1.0.0",
"taskiq>=0.11.0,<0.12.0",
"faststream>=0.3.14,<0.6.0",
]

Expand All @@ -59,6 +59,10 @@ kafka = [
"faststream[kafka]"
]

confluent = [
"faststream[confluent]"
]

redis = [
"faststream[redis]"
]
Expand All @@ -68,6 +72,7 @@ test = [
"taskiq-faststream[nats]",
"taskiq-faststream[rabbit]",
"taskiq-faststream[kafka]",
"taskiq-faststream[confluent]",
"taskiq-faststream[redis]",

"coverage[toml]>=7.2.0,<8.0.0",
Expand All @@ -77,7 +82,7 @@ test = [
dev = [
"taskiq-faststream[test]",

"mypy>=1.8.0,<1.10.0",
"mypy>=1.8.0,<1.12.0",
"ruff==0.4.1",
"pre-commit >=3.6.0,<4.0.0",
]
Expand Down
2 changes: 1 addition & 1 deletion taskiq_faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""FastStream - taskiq integration to schedule FastStream tasks."""

__version__ = "0.1.8"
__version__ = "0.2.0"
24 changes: 9 additions & 15 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import anyio
from faststream.app import FastStream
from faststream.types import SendableMessage
from taskiq import AsyncBroker, BrokerMessage
from taskiq import AsyncBroker
from taskiq.acks import AckableMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from typing_extensions import TypeAlias, override

from taskiq_faststream.formatter import PatchedFormatter
from taskiq_faststream.serializer import PatchedSerializer
from taskiq_faststream.formatter import PatchedFormatter, PathcedMessage
from taskiq_faststream.types import ScheduledTask
from taskiq_faststream.utils import resolve_msg

Expand All @@ -34,8 +33,7 @@ class BrokerWrapper(AsyncBroker):

def __init__(self, broker: Any) -> None:
super().__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter(self)
self.formatter = PatchedFormatter()
self.broker = broker

async def startup(self) -> None:
Expand All @@ -48,7 +46,7 @@ async def shutdown(self) -> None:
await self.broker.close()
await super().shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
await _broker_publish(self.broker, message)

Expand Down Expand Up @@ -111,7 +109,7 @@ class AppWrapper(BrokerWrapper):

def __init__(self, app: FastStream) -> None:
super(BrokerWrapper, self).__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter()
self.app = app

async def startup(self) -> None:
Expand All @@ -124,7 +122,7 @@ async def shutdown(self) -> None:
await self.app._shutdown() # noqa: SLF001
await super(BrokerWrapper, self).shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
assert ( # noqa: S101
self.app.broker
Expand All @@ -134,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None:

async def _broker_publish(
broker: Any,
message: BrokerMessage,
message: PathcedMessage,
) -> None:
labels = message.labels
labels.pop("schedule", None)
async for msg in resolve_msg(
msg=labels.pop("message", message.message),
):
await broker.publish(msg, **labels)
async for msg in resolve_msg(message.body):
await broker.publish(msg, **message.labels)
41 changes: 19 additions & 22 deletions taskiq_faststream/formatter.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
from dataclasses import dataclass
from typing import Any, Dict

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.formatter import TaskiqFormatter
from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate
from taskiq.message import BrokerMessage, TaskiqMessage
from taskiq.message import TaskiqMessage

if IS_PYDANTIC2:

def model_dump(instance: Model) -> Dict[str, Any]:
"""Model dump."""
return instance.model_dump()
@dataclass
class PathcedMessage:
"""DTO to transfer data to `broker.kick`."""

else:

def model_dump(instance: Model) -> Dict[str, Any]:
"""Model dump."""
return instance.dict()
body: Any
labels: Dict[str, Any]


class PatchedFormatter(TaskiqFormatter):
"""Default taskiq formatter."""

def __init__(self, broker: AsyncBroker) -> None:
self.broker = broker

def dumps(self, message: TaskiqMessage) -> BrokerMessage:
def dumps( # type: ignore[override]
self,
message: TaskiqMessage,
) -> PathcedMessage:
"""
Dumps taskiq message to some broker message format.
:param message: message to send.
:return: Dumped message.
"""
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
message=self.broker.serializer.dumpb(model_dump(message)),
labels=message.labels,
labels = message.labels
labels.pop("schedule", None)
labels.pop("schedule_id", None)

return PathcedMessage(
body=labels.pop("message", None),
labels=labels,
)

def loads(self, message: bytes) -> TaskiqMessage:
Expand All @@ -45,4 +42,4 @@ def loads(self, message: bytes) -> TaskiqMessage:
:param message: broker's message.
:return: parsed taskiq message.
"""
return model_validate(TaskiqMessage, self.broker.serializer.loadb(message))
raise NotImplementedError
17 changes: 0 additions & 17 deletions taskiq_faststream/serializer.py

This file was deleted.

1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ def mock() -> MagicMock:


@pytest.fixture()
@pytest.mark.anyio
async def event() -> asyncio.Event:
return asyncio.Event()
4 changes: 2 additions & 2 deletions tests/testcase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from datetime import datetime
import datetime
from typing import Any
from unittest.mock import MagicMock

Expand Down Expand Up @@ -44,7 +44,7 @@ async def handler(msg: str) -> None:
**{self.subj_name: subject},
schedule=[
{
"time": datetime.utcnow(),
"time": datetime.datetime.now(datetime.UTC),
},
],
)
Expand Down

0 comments on commit 0b35454

Please sign in to comment.