diff --git a/poetry.lock b/poetry.lock
index 65d7803..134c1c1 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1639,4 +1639,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.7"
-content-hash = "94a3173d5c481088d539e983704eaf9899270026322b6c03ba5aa77dad05078d"
+content-hash = "3cd77ff011a37b1bd0d6c614fb60fc51ebd84290ca6c2e836486e11ea12b46b4"
diff --git a/pyproject.toml b/pyproject.toml
index 2678601..6bc6958 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -27,7 +27,6 @@ packages = [{ include = "taskiq_aio_kafka" }]
 python = "^3.7"
 taskiq = "^0"
 aiokafka = "^0.8.0"
-pydantic = "^1.10.7"
 
 [tool.poetry.group.dev.dependencies]
 pytest = "^7.1.2"
diff --git a/taskiq_aio_kafka/broker.py b/taskiq_aio_kafka/broker.py
index 8d0b16b..9b17ad7 100644
--- a/taskiq_aio_kafka/broker.py
+++ b/taskiq_aio_kafka/broker.py
@@ -115,7 +115,8 @@ def __init__(  # noqa: WPS211
 
         self._delay_kick_tasks: Set[asyncio.Task[None]] = set()
 
-        self._is_started = False
+        self._is_producer_started = False
+        self._is_consumer_started = False
 
     async def startup(self) -> None:
         """Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
@@ -127,7 +128,10 @@ async def startup(self) -> None:
         """
         await super().startup()
 
-        if self._kafka_topic.name not in self._kafka_admin_client.list_topics():
+        is_topic_available: bool = bool(
+            self._kafka_admin_client.describe_topics([self._kafka_topic.name]),
+        )
+        if not is_topic_available:
             self._kafka_admin_client.create_topics(
                 new_topics=[self._kafka_topic],
                 validate_only=False,
@@ -136,8 +140,9 @@ async def startup(self) -> None:
         await self._aiokafka_producer.start()
         if self.is_worker_process:
             await self._aiokafka_consumer.start()
+            self._is_consumer_started = True
 
-        self._is_started = True
+        self._is_producer_started = True
 
     async def shutdown(self) -> None:
         """Close all connections on shutdown."""
@@ -175,7 +180,7 @@ async def kick(self, message: BrokerMessage) -> None:
         :raises ValueError: if startup wasn't called.
         :param message: message to send.
         """
-        if not self._is_started:
+        if not self._is_producer_started:
             raise ValueError("Please run startup before kicking.")
 
         kafka_message: bytes = pickle.dumps(message)
@@ -197,7 +202,7 @@ async def listen(
         :yields: parsed broker message.
         :raises ValueError: if no aiokafka_consumer or startup wasn't called.
         """
-        if not self._is_started:
+        if not self._is_consumer_started:
             raise ValueError("Please run startup before listening.")
 
         async for raw_kafka_message in self._aiokafka_consumer:
diff --git a/tests/conftest.py b/tests/conftest.py
index 0d92e8e..a054b2f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -87,7 +87,6 @@ async def test_kafka_consumer(
 @pytest.fixture()
 async def broker_without_arguments(
     kafka_url: str,
-    base_topic_name: str,
 ) -> AsyncGenerator[AioKafkaBroker, None]:
     """Return AioKafkaBroker default realization.
 
@@ -95,12 +94,12 @@ async def broker_without_arguments(
     and AIOKafkaConsumer.
 
     :param kafka_url: url to kafka.
-    :param base_topic_name: name of the topic.
 
     :yields: AioKafkaBroker.
     """
     broker = AioKafkaBroker(
         bootstrap_servers=kafka_url,
+        delete_topic_on_shutdown=True,
     )
     broker.is_worker_process = True
 
@@ -114,10 +113,8 @@ async def broker_without_arguments(
 @pytest.fixture()
 async def broker(
     kafka_url: str,
-    base_topic: NewTopic,
     test_kafka_producer: AIOKafkaProducer,
     test_kafka_consumer: AIOKafkaConsumer,
-    base_topic_name: str,
 ) -> AsyncGenerator[AioKafkaBroker, None]:
     """Yield new broker instance.
 
@@ -126,18 +123,16 @@ async def broker(
     and shutdown after test.
 
     :param kafka_url: url to kafka.
-    :param base_topic: custom topic.
     :param test_kafka_producer: custom AIOKafkaProducer.
     :param test_kafka_consumer: custom AIOKafkaConsumer.
-    :param base_topic_name: name of the topic.
 
     :yields: broker.
     """
     broker = AioKafkaBroker(
         bootstrap_servers=kafka_url,
-        kafka_topic=base_topic,
         aiokafka_producer=test_kafka_producer,
         aiokafka_consumer=test_kafka_consumer,
+        delete_topic_on_shutdown=True,
     )
     broker.is_worker_process = True
 
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 13d5c40..c4c47a7 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -53,7 +53,6 @@ async def test_kick_success(broker: AioKafkaBroker) -> None:
     received_message: BrokerMessage = pickle.loads(
         received_message_bytes,
     )
-
     assert message_to_send == received_message