From 5a0cff8b9ef3f3af690b7f2700d7d8ee8c0698fe Mon Sep 17 00:00:00 2001 From: "chandr-andr (Kiselev Aleksandr)" <askiselev00@gmail.com> Date: Tue, 11 Apr 2023 00:52:42 +0400 Subject: [PATCH] Removed pydantic. --- poetry.lock | 2 +- pyproject.toml | 1 - taskiq_aio_kafka/broker.py | 15 ++++++++++----- tests/conftest.py | 9 ++------- tests/test_broker.py | 1 - 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/poetry.lock b/poetry.lock index 65d7803..134c1c1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1639,4 +1639,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.7" -content-hash = "94a3173d5c481088d539e983704eaf9899270026322b6c03ba5aa77dad05078d" +content-hash = "3cd77ff011a37b1bd0d6c614fb60fc51ebd84290ca6c2e836486e11ea12b46b4" diff --git a/pyproject.toml b/pyproject.toml index 2678601..6bc6958 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,6 @@ packages = [{ include = "taskiq_aio_kafka" }] python = "^3.7" taskiq = "^0" aiokafka = "^0.8.0" -pydantic = "^1.10.7" [tool.poetry.group.dev.dependencies] pytest = "^7.1.2" diff --git a/taskiq_aio_kafka/broker.py b/taskiq_aio_kafka/broker.py index 8d0b16b..9b17ad7 100644 --- a/taskiq_aio_kafka/broker.py +++ b/taskiq_aio_kafka/broker.py @@ -115,7 +115,8 @@ def __init__( # noqa: WPS211 self._delay_kick_tasks: Set[asyncio.Task[None]] = set() - self._is_started = False + self._is_producer_started = False + self._is_consumer_started = False async def startup(self) -> None: """Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics. @@ -127,7 +128,10 @@ async def startup(self) -> None: """ await super().startup() - if self._kafka_topic.name not in self._kafka_admin_client.list_topics(): + is_topic_available: bool = bool( + self._kafka_admin_client.describe_topics([self._kafka_topic.name]), + ) + if not is_topic_available: self._kafka_admin_client.create_topics( new_topics=[self._kafka_topic], validate_only=False, @@ -136,8 +140,9 @@ async def startup(self) -> None: await self._aiokafka_producer.start() if self.is_worker_process: await self._aiokafka_consumer.start() + self._is_consumer_started = True - self._is_started = True + self._is_producer_started = True async def shutdown(self) -> None: """Close all connections on shutdown.""" @@ -175,7 +180,7 @@ async def kick(self, message: BrokerMessage) -> None: :raises ValueError: if startup wasn't called. :param message: message to send. """ - if not self._is_started: + if not self._is_producer_started: raise ValueError("Please run startup before kicking.") kafka_message: bytes = pickle.dumps(message) @@ -197,7 +202,7 @@ async def listen( :yields: parsed broker message. :raises ValueError: if no aiokafka_consumer or startup wasn't called. """ - if not self._is_started: + if not self._is_consumer_started: raise ValueError("Please run startup before listening.") async for raw_kafka_message in self._aiokafka_consumer: diff --git a/tests/conftest.py b/tests/conftest.py index 0d92e8e..a054b2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -87,7 +87,6 @@ async def test_kafka_consumer( @pytest.fixture() async def broker_without_arguments( kafka_url: str, - base_topic_name: str, ) -> AsyncGenerator[AioKafkaBroker, None]: """Return AioKafkaBroker default realization. @@ -95,12 +94,12 @@ async def broker_without_arguments( and AIOKafkaConsumer. :param kafka_url: url to kafka. - :param base_topic_name: name of the topic. :yields: AioKafkaBroker. """ broker = AioKafkaBroker( bootstrap_servers=kafka_url, + delete_topic_on_shutdown=True, ) broker.is_worker_process = True @@ -114,10 +113,8 @@ async def broker_without_arguments( @pytest.fixture() async def broker( kafka_url: str, - base_topic: NewTopic, test_kafka_producer: AIOKafkaProducer, test_kafka_consumer: AIOKafkaConsumer, - base_topic_name: str, ) -> AsyncGenerator[AioKafkaBroker, None]: """Yield new broker instance. @@ -126,18 +123,16 @@ async def broker( and shutdown after test. :param kafka_url: url to kafka. - :param base_topic: custom topic. :param test_kafka_producer: custom AIOKafkaProducer. :param test_kafka_consumer: custom AIOKafkaConsumer. - :param base_topic_name: name of the topic. :yields: broker. """ broker = AioKafkaBroker( bootstrap_servers=kafka_url, - kafka_topic=base_topic, aiokafka_producer=test_kafka_producer, aiokafka_consumer=test_kafka_consumer, + delete_topic_on_shutdown=True, ) broker.is_worker_process = True diff --git a/tests/test_broker.py b/tests/test_broker.py index 13d5c40..c4c47a7 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -53,7 +53,6 @@ async def test_kick_success(broker: AioKafkaBroker) -> None: received_message: BrokerMessage = pickle.loads( received_message_bytes, ) - assert message_to_send == received_message