Skip to content

Commit 9885649

Browse files
authored
Added test cases to tests/test_kafka.py (#1043)
* added test cases to tests/test_kafka.py * resolved pyupgrade lint issue * fixed the encoding of message keys in kafka tests
1 parent 45ed155 commit 9885649

File tree

4 files changed

+215
-14
lines changed

4 files changed

+215
-14
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ tfio_*.tar.gz
3434
# Prometheus
3535
.coredns
3636
.prometheus
37+
38+
# Kafka
39+
/confluent*

tensorflow_io/kafka/python/ops/kafka_dataset_ops.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ def __init__(
5454
topics: A `tf.string` tensor containing one or more subscriptions,
5555
in the format of [topic:partition:offset:length],
5656
by default length is -1 for unlimited.
57-
eg. ["sampleTopic:0:0:10"]
57+
eg. ["sampleTopic:0:0:10"] will fetch the first 10 messages from
58+
the 0th partition of sampleTopic.
59+
eg. ["sampleTopic:0:0:10","sampleTopic:1:0:10"] will fetch
60+
the first 10 messages from the 0th partition followed
61+
by the first 10 messages from the 1st partition of sampleTopic.
5862
servers: A list of bootstrap servers.
5963
group: The consumer group id.
6064
eof: If True, the kafka reader will stop on EOF.

tests/test_kafka.py

+187-9
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ class KafkaDatasetTest(test.TestCase):
4040
# To setup the Kafka server:
4141
# $ bash kafka_test.sh start kafka
4242
#
43-
# To team down the Kafka server:
43+
# To tear down the Kafka server:
4444
# $ bash kafka_test.sh stop kafka
4545

4646
def test_kafka_dataset(self):
47-
"""Tests for KafkaDataset."""
47+
"""Tests for KafkaDataset when reading non-keyed messages
48+
from a single-partitioned topic"""
4849
topics = tf.compat.v1.placeholder(dtypes.string, shape=[None])
4950
num_epochs = tf.compat.v1.placeholder(dtypes.int64, shape=[])
5051
batch_size = tf.compat.v1.placeholder(dtypes.int64, shape=[])
@@ -60,21 +61,21 @@ def test_kafka_dataset(self):
6061
get_next = iterator.get_next()
6162

6263
with self.cached_session() as sess:
63-
# Basic test: read from topic 0.
64+
# Basic test: read a limited number of messages from the topic.
6465
sess.run(init_op, feed_dict={topics: ["test:0:0:4"], num_epochs: 1})
6566
for i in range(5):
6667
self.assertEqual(("D" + str(i)).encode(), sess.run(get_next))
6768
with self.assertRaises(errors.OutOfRangeError):
6869
sess.run(get_next)
6970

70-
# Basic test: read from topic 1.
71+
# Basic test: read all the messages from the topic from offset 5.
7172
sess.run(init_op, feed_dict={topics: ["test:0:5:-1"], num_epochs: 1})
7273
for i in range(5):
7374
self.assertEqual(("D" + str(i + 5)).encode(), sess.run(get_next))
7475
with self.assertRaises(errors.OutOfRangeError):
7576
sess.run(get_next)
7677

77-
# Basic test: read from both topics.
78+
# Basic test: read from different subscriptions of the same topic.
7879
sess.run(
7980
init_op,
8081
feed_dict={topics: ["test:0:0:4", "test:0:5:-1"], num_epochs: 1},
@@ -87,7 +88,7 @@ def test_kafka_dataset(self):
8788
with self.assertRaises(errors.OutOfRangeError):
8889
sess.run(get_next)
8990

90-
# Test repeated iteration through both files.
91+
# Test repeated iteration through both subscriptions.
9192
sess.run(
9293
init_op,
9394
feed_dict={topics: ["test:0:0:4", "test:0:5:-1"], num_epochs: 10},
@@ -101,7 +102,7 @@ def test_kafka_dataset(self):
101102
with self.assertRaises(errors.OutOfRangeError):
102103
sess.run(get_next)
103104

104-
# Test batched and repeated iteration through both files.
105+
# Test batched and repeated iteration through both subscriptions.
105106
sess.run(
106107
init_batch_op,
107108
feed_dict={
@@ -276,7 +277,8 @@ def test_write_kafka(self):
276277
sess.run(get_next)
277278

278279
def test_kafka_dataset_with_key(self):
279-
"""Tests for KafkaDataset."""
280+
"""Tests for KafkaDataset when reading keyed-messages
281+
from a single-partitioned topic"""
280282
topics = tf.compat.v1.placeholder(dtypes.string, shape=[None])
281283
num_epochs = tf.compat.v1.placeholder(dtypes.int64, shape=[])
282284
batch_size = tf.compat.v1.placeholder(dtypes.int64, shape=[])
@@ -288,10 +290,11 @@ def test_kafka_dataset_with_key(self):
288290

289291
iterator = data.Iterator.from_structure(batch_dataset.output_types)
290292
init_op = iterator.make_initializer(repeat_dataset)
293+
init_batch_op = iterator.make_initializer(batch_dataset)
291294
get_next = iterator.get_next()
292295

293296
with self.cached_session() as sess:
294-
# Basic test: read from topic 0.
297+
# Basic test: read a limited number of keyed messages from the topic.
295298
sess.run(init_op, feed_dict={topics: ["key-test:0:0:4"], num_epochs: 1})
296299
for i in range(5):
297300
self.assertEqual(
@@ -301,6 +304,181 @@ def test_kafka_dataset_with_key(self):
301304
with self.assertRaises(errors.OutOfRangeError):
302305
sess.run(get_next)
303306

307+
# Basic test: read all the keyed messages from the topic from offset 5.
308+
sess.run(init_op, feed_dict={topics: ["key-test:0:5:-1"], num_epochs: 1})
309+
for i in range(5):
310+
self.assertEqual(
311+
(("D" + str(i + 5)).encode(), ("K" + str((i + 5) % 2)).encode()),
312+
sess.run(get_next),
313+
)
314+
with self.assertRaises(errors.OutOfRangeError):
315+
sess.run(get_next)
316+
317+
# Basic test: read from different subscriptions of the same topic.
318+
sess.run(
319+
init_op,
320+
feed_dict={
321+
topics: ["key-test:0:0:4", "key-test:0:5:-1"],
322+
num_epochs: 1,
323+
},
324+
)
325+
for j in range(2):
326+
for i in range(5):
327+
self.assertEqual(
328+
(
329+
("D" + str(i + j * 5)).encode(),
330+
("K" + str((i + j * 5) % 2)).encode(),
331+
),
332+
sess.run(get_next),
333+
)
334+
with self.assertRaises(errors.OutOfRangeError):
335+
sess.run(get_next)
336+
337+
# Test repeated iteration through both subscriptions.
338+
sess.run(
339+
init_op,
340+
feed_dict={
341+
topics: ["key-test:0:0:4", "key-test:0:5:-1"],
342+
num_epochs: 10,
343+
},
344+
)
345+
for _ in range(10):
346+
for j in range(2):
347+
for i in range(5):
348+
self.assertEqual(
349+
(
350+
("D" + str(i + j * 5)).encode(),
351+
("K" + str((i + j * 5) % 2)).encode(),
352+
),
353+
sess.run(get_next),
354+
)
355+
with self.assertRaises(errors.OutOfRangeError):
356+
sess.run(get_next)
357+
358+
# Test batched and repeated iteration through both subscriptions.
359+
sess.run(
360+
init_batch_op,
361+
feed_dict={
362+
topics: ["key-test:0:0:4", "key-test:0:5:-1"],
363+
num_epochs: 10,
364+
batch_size: 5,
365+
},
366+
)
367+
for _ in range(10):
368+
self.assertAllEqual(
369+
[
370+
[("D" + str(i)).encode() for i in range(5)],
371+
[("K" + str(i % 2)).encode() for i in range(5)],
372+
],
373+
sess.run(get_next),
374+
)
375+
self.assertAllEqual(
376+
[
377+
[("D" + str(i + 5)).encode() for i in range(5)],
378+
[("K" + str((i + 5) % 2)).encode() for i in range(5)],
379+
],
380+
sess.run(get_next),
381+
)
382+
383+
def test_kafka_dataset_with_partitioned_key(self):
384+
"""Tests for KafkaDataset when reading keyed-messages
385+
from a multi-partitioned topic"""
386+
topics = tf.compat.v1.placeholder(dtypes.string, shape=[None])
387+
num_epochs = tf.compat.v1.placeholder(dtypes.int64, shape=[])
388+
batch_size = tf.compat.v1.placeholder(dtypes.int64, shape=[])
389+
390+
repeat_dataset = kafka_io.KafkaDataset(
391+
topics, group="test", eof=True, message_key=True
392+
).repeat(num_epochs)
393+
batch_dataset = repeat_dataset.batch(batch_size)
394+
395+
iterator = data.Iterator.from_structure(batch_dataset.output_types)
396+
init_op = iterator.make_initializer(repeat_dataset)
397+
init_batch_op = iterator.make_initializer(batch_dataset)
398+
get_next = iterator.get_next()
399+
400+
with self.cached_session() as sess:
401+
# Basic test: read first 5 messages from the first partition of the topic.
402+
# NOTE: The key-partition mapping occurs based on the order in which the data
403+
# is being stored in kafka. Please check kafka_test.sh for the sample data.
404+
405+
sess.run(
406+
init_op,
407+
feed_dict={topics: ["key-partition-test:0:0:5"], num_epochs: 1},
408+
)
409+
for i in range(5):
410+
self.assertEqual(
411+
(("D" + str(i * 2)).encode(), (b"K0")), sess.run(get_next),
412+
)
413+
with self.assertRaises(errors.OutOfRangeError):
414+
sess.run(get_next)
415+
416+
# Basic test: read first 5 messages from the second partition of the topic.
417+
sess.run(
418+
init_op,
419+
feed_dict={topics: ["key-partition-test:1:0:5"], num_epochs: 1},
420+
)
421+
for i in range(5):
422+
self.assertEqual(
423+
(("D" + str(i * 2 + 1)).encode(), (b"K1")), sess.run(get_next),
424+
)
425+
with self.assertRaises(errors.OutOfRangeError):
426+
sess.run(get_next)
427+
428+
# Basic test: read from different subscriptions to the same topic.
429+
sess.run(
430+
init_op,
431+
feed_dict={
432+
topics: ["key-partition-test:0:0:5", "key-partition-test:1:0:5"],
433+
num_epochs: 1,
434+
},
435+
)
436+
for j in range(2):
437+
for i in range(5):
438+
self.assertEqual(
439+
(("D" + str(i * 2 + j)).encode(), ("K" + str(j)).encode()),
440+
sess.run(get_next),
441+
)
442+
with self.assertRaises(errors.OutOfRangeError):
443+
sess.run(get_next)
444+
445+
# Test repeated iteration through both subscriptions.
446+
sess.run(
447+
init_op,
448+
feed_dict={
449+
topics: ["key-partition-test:0:0:5", "key-partition-test:1:0:5"],
450+
num_epochs: 10,
451+
},
452+
)
453+
for _ in range(10):
454+
for j in range(2):
455+
for i in range(5):
456+
self.assertEqual(
457+
(("D" + str(i * 2 + j)).encode(), ("K" + str(j)).encode()),
458+
sess.run(get_next),
459+
)
460+
with self.assertRaises(errors.OutOfRangeError):
461+
sess.run(get_next)
462+
463+
# Test batched and repeated iteration through both subscriptions.
464+
sess.run(
465+
init_batch_op,
466+
feed_dict={
467+
topics: ["key-partition-test:0:0:5", "key-partition-test:1:0:5"],
468+
num_epochs: 10,
469+
batch_size: 5,
470+
},
471+
)
472+
for _ in range(10):
473+
for j in range(2):
474+
self.assertAllEqual(
475+
[
476+
[("D" + str(i * 2 + j)).encode() for i in range(5)],
477+
[("K" + str(j)).encode() for i in range(5)],
478+
],
479+
sess.run(get_next),
480+
)
481+
304482

305483
if __name__ == "__main__":
306484
test.main()

tests/test_kafka/kafka_test.sh

+20-4
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,41 @@ set -o pipefail
1919

2020
VERSION=5.4.1
2121

22+
echo "Downloading the confluent packages"
2223
curl -sSOL http://packages.confluent.io/archive/5.4/confluent-community-5.4.1-2.12.tar.gz
2324
tar -xzf confluent-community-5.4.1-2.12.tar.gz
25+
2426
(cd confluent-$VERSION/ && sudo bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties)
25-
echo Wait 10 secs until zookeeper is up and running
27+
echo "Waiting for 10 secs until zookeeper is up and running"
2628
sleep 10
29+
2730
(cd confluent-$VERSION/ && sudo bin/kafka-server-start -daemon etc/kafka/server.properties)
28-
echo Wait 10 secs until kafka is up and running
31+
echo "Waiting for 10 secs until kafka is up and running"
2932
sleep 10
33+
3034
(cd confluent-$VERSION/ && sudo bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties)
3135
echo -e "D0\nD1\nD2\nD3\nD4\nD5\nD6\nD7\nD8\nD9" > confluent-$VERSION/test
3236
echo -e "K0:D0\nK1:D1\nK0:D2\nK1:D3\nK0:D4\nK1:D5\nK0:D6\nK1:D7\nK0:D8\nK1:D9" > confluent-$VERSION/key-test
33-
echo Wait 15 secs until all is up and running
37+
echo -e "K0:D0\nK1:D1\nK0:D2\nK1:D3\nK0:D4\nK1:D5\nK0:D6\nK1:D7\nK0:D8\nK1:D9" > confluent-$VERSION/key-partition-test
38+
echo "Waiting for 15 secs until schema registry is ready and other services are up and running"
3439
sleep 15
40+
41+
echo "Creating and populating 'test' topic with sample non-keyed messages"
3542
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
3643
sudo confluent-$VERSION/bin/kafka-console-producer --topic test --broker-list 127.0.0.1:9092 < confluent-$VERSION/test
44+
45+
echo "Creating and populating 'key-test' topic with sample keyed messages"
3746
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic key-test
3847
sudo confluent-$VERSION/bin/kafka-console-producer --topic key-test --property "parse.key=true" --property "key.separator=:" --broker-list 127.0.0.1:9092 < confluent-$VERSION/key-test
48+
49+
echo "Creating and populating 'key-partition-test' multi-partition topic with sample keyed messages"
50+
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic key-partition-test
51+
sudo confluent-$VERSION/bin/kafka-console-producer --topic key-partition-test --property "parse.key=true" --property "key.separator=:" --broker-list 127.0.0.1:9092 < confluent-$VERSION/key-partition-test
52+
53+
echo "Creating and populating 'avro-test' topic with sample messages."
3954
sudo confluent-$VERSION/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic avro-test
4055
echo -e "{\"f1\":\"value1\",\"f2\":1,\"f3\":null}\n{\"f1\":\"value2\",\"f2\":2,\"f3\":{\"string\":\"2\"}}\n{\"f1\":\"value3\",\"f2\":3,\"f3\":null}" > confluent-$VERSION/avro-test
4156
sudo confluent-$VERSION/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic avro-test --property value.schema="{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"long\"},{\"name\":\"f3\",\"type\":[\"null\",\"string\"],\"default\":null}]}" < confluent-$VERSION/avro-test
42-
echo Everything started
57+
58+
echo "Kafka test setup completed."
4359
exit 0

0 commit comments

Comments
 (0)