Skip to content

Commit

Permalink
Removed pydantic.
Browse files Browse the repository at this point in the history
  • Loading branch information
chandr-andr committed Apr 11, 2023
1 parent 6b4ce80 commit 5a0cff8
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ packages = [{ include = "taskiq_aio_kafka" }]
python = "^3.7"
taskiq = "^0"
aiokafka = "^0.8.0"
pydantic = "^1.10.7"

[tool.poetry.group.dev.dependencies]
pytest = "^7.1.2"
Expand Down
15 changes: 10 additions & 5 deletions taskiq_aio_kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def __init__( # noqa: WPS211

self._delay_kick_tasks: Set[asyncio.Task[None]] = set()

self._is_started = False
self._is_producer_started = False
self._is_consumer_started = False

async def startup(self) -> None:
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
Expand All @@ -127,7 +128,10 @@ async def startup(self) -> None:
"""
await super().startup()

if self._kafka_topic.name not in self._kafka_admin_client.list_topics():
is_topic_available: bool = bool(
self._kafka_admin_client.describe_topics([self._kafka_topic.name]),
)
if not is_topic_available:
self._kafka_admin_client.create_topics(
new_topics=[self._kafka_topic],
validate_only=False,
Expand All @@ -136,8 +140,9 @@ async def startup(self) -> None:
await self._aiokafka_producer.start()
if self.is_worker_process:
await self._aiokafka_consumer.start()
self._is_consumer_started = True

self._is_started = True
self._is_producer_started = True

async def shutdown(self) -> None:
"""Close all connections on shutdown."""
Expand Down Expand Up @@ -175,7 +180,7 @@ async def kick(self, message: BrokerMessage) -> None:
:raises ValueError: if startup wasn't called.
:param message: message to send.
"""
if not self._is_started:
if not self._is_producer_started:
raise ValueError("Please run startup before kicking.")

kafka_message: bytes = pickle.dumps(message)
Expand All @@ -197,7 +202,7 @@ async def listen(
:yields: parsed broker message.
:raises ValueError: if no aiokafka_consumer or startup wasn't called.
"""
if not self._is_started:
if not self._is_consumer_started:
raise ValueError("Please run startup before listening.")

async for raw_kafka_message in self._aiokafka_consumer:
Expand Down
9 changes: 2 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,19 @@ async def test_kafka_consumer(
@pytest.fixture()
async def broker_without_arguments(
kafka_url: str,
base_topic_name: str,
) -> AsyncGenerator[AioKafkaBroker, None]:
"""Return AioKafkaBroker default realization.
In this fixture we don't pass custom topic, AIOKafkaProducer
and AIOKafkaConsumer.
:param kafka_url: url to kafka.
:param base_topic_name: name of the topic.
:yields: AioKafkaBroker.
"""
broker = AioKafkaBroker(
bootstrap_servers=kafka_url,
delete_topic_on_shutdown=True,
)
broker.is_worker_process = True

Expand All @@ -114,10 +113,8 @@ async def broker_without_arguments(
@pytest.fixture()
async def broker(
kafka_url: str,
base_topic: NewTopic,
test_kafka_producer: AIOKafkaProducer,
test_kafka_consumer: AIOKafkaConsumer,
base_topic_name: str,
) -> AsyncGenerator[AioKafkaBroker, None]:
"""Yield new broker instance.
Expand All @@ -126,18 +123,16 @@ async def broker(
and shutdown after test.
:param kafka_url: url to kafka.
:param base_topic: custom topic.
:param test_kafka_producer: custom AIOKafkaProducer.
:param test_kafka_consumer: custom AIOKafkaConsumer.
:param base_topic_name: name of the topic.
:yields: broker.
"""
broker = AioKafkaBroker(
bootstrap_servers=kafka_url,
kafka_topic=base_topic,
aiokafka_producer=test_kafka_producer,
aiokafka_consumer=test_kafka_consumer,
delete_topic_on_shutdown=True,
)
broker.is_worker_process = True

Expand Down
1 change: 0 additions & 1 deletion tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ async def test_kick_success(broker: AioKafkaBroker) -> None:
received_message: BrokerMessage = pickle.loads(
received_message_bytes,
)

assert message_to_send == received_message


Expand Down

0 comments on commit 5a0cff8

Please sign in to comment.