Skip to content

Commit a1869c4

Browse files
asdaraujojeffwidman
authored andcommitted
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
1 parent 0f5d35f commit a1869c4

9 files changed

+460
-157
lines changed

pylint.rc

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[TYPECHECK]
22
ignored-classes=SyncManager,_socketobject
3+
generated-members=py.*
34

45
[MESSAGES CONTROL]
56
disable=E1129

test/conftest.py

+96-17
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,117 @@
11
from __future__ import absolute_import
22

3-
import os
3+
import inspect
44

55
import pytest
6+
from decorator import decorate
67

78
from test.fixtures import KafkaFixture, ZookeeperFixture
8-
9+
from test.testutil import kafka_version, random_string
910

1011
@pytest.fixture(scope="module")
1112
def version():
12-
if 'KAFKA_VERSION' not in os.environ:
13-
return ()
14-
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
15-
13+
"""Return the Kafka version set in the OS environment"""
14+
return kafka_version()
1615

1716
@pytest.fixture(scope="module")
18-
def zookeeper(version, request):
19-
assert version
20-
zk = ZookeeperFixture.instance()
21-
yield zk
22-
zk.close()
17+
def zookeeper():
18+
"""Return a Zookeeper fixture"""
19+
zk_instance = ZookeeperFixture.instance()
20+
yield zk_instance
21+
zk_instance.close()
2322

23+
@pytest.fixture(scope="module")
24+
def kafka_broker(kafka_broker_factory):
25+
"""Return a Kafka broker fixture"""
26+
return kafka_broker_factory()[0]
2427

2528
@pytest.fixture(scope="module")
26-
def kafka_broker(version, zookeeper, request):
27-
assert version
28-
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
29-
partitions=4)
30-
yield k
31-
k.close()
29+
def kafka_broker_factory(version, zookeeper):
30+
"""Return a Kafka broker fixture factory"""
31+
assert version, 'KAFKA_VERSION must be specified to run integration tests'
32+
33+
_brokers = []
34+
def factory(**broker_params):
35+
params = {} if broker_params is None else broker_params.copy()
36+
params.setdefault('partitions', 4)
37+
num_brokers = params.pop('num_brokers', 1)
38+
brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
39+
for x in range(num_brokers))
40+
_brokers.extend(brokers)
41+
return brokers
3242

43+
yield factory
44+
45+
for broker in _brokers:
46+
broker.close()
47+
48+
@pytest.fixture
49+
def simple_client(kafka_broker, request, topic):
50+
"""Return a SimpleClient fixture"""
51+
client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
52+
client.ensure_topic_exists(topic)
53+
yield client
54+
client.close()
55+
56+
@pytest.fixture
57+
def kafka_client(kafka_broker, request):
58+
"""Return a KafkaClient fixture"""
59+
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
60+
yield client
61+
client.close()
62+
63+
@pytest.fixture
64+
def kafka_consumer(kafka_consumer_factory):
65+
"""Return a KafkaConsumer fixture"""
66+
return kafka_consumer_factory()
67+
68+
@pytest.fixture
69+
def kafka_consumer_factory(kafka_broker, topic, request):
70+
"""Return a KafkaConsumer factory fixture"""
71+
_consumer = [None]
72+
73+
def factory(**kafka_consumer_params):
74+
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
75+
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
76+
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
77+
return _consumer[0]
78+
79+
yield factory
80+
81+
if _consumer[0]:
82+
_consumer[0].close()
83+
84+
@pytest.fixture
85+
def kafka_producer(kafka_producer_factory):
86+
"""Return a KafkaProducer fixture"""
87+
yield kafka_producer_factory()
88+
89+
@pytest.fixture
90+
def kafka_producer_factory(kafka_broker, request):
91+
"""Return a KafkaProduce factory fixture"""
92+
_producer = [None]
93+
94+
def factory(**kafka_producer_params):
95+
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
96+
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
97+
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
98+
return _producer[0]
99+
100+
yield factory
101+
102+
if _producer[0]:
103+
_producer[0].close()
104+
105+
@pytest.fixture
106+
def topic(kafka_broker, request):
107+
"""Return a topic fixture"""
108+
topic_name = '%s_%s' % (request.node.name, random_string(10))
109+
kafka_broker.create_topics([topic_name])
110+
return topic_name
33111

34112
@pytest.fixture
35113
def conn(mocker):
114+
"""Return a connection mocker fixture"""
36115
from kafka.conn import ConnectionStates
37116
from kafka.future import Future
38117
from kafka.protocol.metadata import MetadataResponse

0 commit comments

Comments
 (0)