Skip to content

Commit

Permalink
kafka_user: use infratest to verify message write/read
Browse files Browse the repository at this point in the history
  • Loading branch information
saiello committed Jul 26, 2022
1 parent 4c8e161 commit 8af8e80
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 35 deletions.
10 changes: 9 additions & 1 deletion molecule/scram-kafka-270/converge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
gather_facts: false
tasks:

- kafka_topic:
name: scram-test-topic
state: present
partitions: 1
replica_factor: 1
bootstrap_servers: "{{ bootstrap_servers['plaintext'] }}"


- kafka_users:
users:

Expand All @@ -27,4 +35,4 @@
iterations: 5000
state: 'present'

bootstrap_servers: "kafka1-270:9092,kafka2-270:9092"
bootstrap_servers: "{{ bootstrap_servers['plaintext'] }}"
26 changes: 12 additions & 14 deletions molecule/scram-kafka-270/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ platforms:
# HOSTNAME_COMMAND: "hostname -i | cut -d' ' -f1"
HOSTNAME_COMMAND: "hostname"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_PLAINTEXT://_{HOSTNAME_COMMAND}:9094,SASL_SSL://_{HOSTNAME_COMMAND}:9095,SSL://_{HOSTNAME_COMMAND}:9096
KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9094,SASL_SSL://:9095,SSL://:9096
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_PLAINTEXT://_{HOSTNAME_COMMAND}:9094,SASL_SSL://_{HOSTNAME_COMMAND}:9095,SSL://_{HOSTNAME_COMMAND}:9096,OUTSIDE_SASL_PLAINTEXT://localhost:19094
KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9094,SASL_SSL://:9095,SSL://:9096,OUTSIDE_SASL_PLAINTEXT://:19094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE_SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper-270:2181
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
Expand All @@ -66,6 +67,7 @@ platforms:
KAFKA_SSL_CLIENT_AUTH: required
published_ports:
- "9092"
- "19094:19094"
networks:
- name: molecule
links:
Expand All @@ -84,20 +86,12 @@ platforms:
image: wurstmeister/kafka:2.13-2.7.0
command: "start-kafka.sh"
env:

# security.protocol=SASL_PLAINTEXT
# sasl.mechanism=SCRAM-SHA-512
# sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
# username="user1" \
# password="changeit";

# /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic my-topic --producer.config client.properties

# HOSTNAME_COMMAND: "hostname -i | cut -d' ' -f1"
HOSTNAME_COMMAND: "hostname"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_PLAINTEXT://_{HOSTNAME_COMMAND}:9094,SASL_SSL://_{HOSTNAME_COMMAND}:9095,SSL://_{HOSTNAME_COMMAND}:9096
KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9094,SASL_SSL://:9095,SSL://:9096
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://_{HOSTNAME_COMMAND}:9092,SASL_PLAINTEXT://_{HOSTNAME_COMMAND}:9094,SASL_SSL://_{HOSTNAME_COMMAND}:9095,SSL://_{HOSTNAME_COMMAND}:9096,OUTSIDE_SASL_PLAINTEXT://localhost:29094
KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9094,SASL_SSL://:9095,SSL://:9096,OUTSIDE_SASL_PLAINTEXT://:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE_SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper-270:2181
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
Expand All @@ -114,6 +108,7 @@ platforms:
KAFKA_SSL_CLIENT_AUTH: required
published_ports:
- "9092"
- "29094:29094"
networks:
- name: molecule
links:
Expand All @@ -137,6 +132,9 @@ provisioner:
group_vars:
executors:
ansible_python_interpreter: /usr/local/bin/python
bootstrap_servers:
plaintext: 'kafka1-270:9092,kafka2-270:9092'
outside_sasl_plaintext: 'localhost:19094,localhost:29094'
# In order to avoid code duplication and to reduce
# the time needed to do every tests, `create`, `prepare`
# and `converge` sequences are only used.
Expand All @@ -154,4 +152,4 @@ scenario:
- idempotence
- verify
verifier:
name: ansible
name: testinfra
17 changes: 17 additions & 0 deletions molecule/scram-kafka-270/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

def pytest_runtest_setup(item):
"""Run tests only when under molecule with testinfra installed."""
try:
import testinfra
except ImportError:
pytest.skip("Test requires testinfra", allow_module_level=True)

if "MOLECULE_INVENTORY_FILE" in os.environ:
pytest.testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
os.environ["MOLECULE_INVENTORY_FILE"]
).get_hosts("executors")
else:
pytest.skip(
"Test should run only from inside molecule.", allow_module_level=True
)
125 changes: 125 additions & 0 deletions molecule/scram-kafka-270/tests/test_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import pytest
import time
import uuid

from kafka import KafkaConsumer, KafkaProducer, TopicPartition

testinfra_hosts = ["ansible://executors"]


# ---------------
# Common (TO BE SHARED)
# ---------------
def get_bootstrap_servers(host, listener_name='PLAINTEXT'):
return host.ansible.get_variables()['bootstrap_servers'][listener_name.lower()]


def execute_kafka_info(host, resource, check=False):

module_args = {
'resource': resource,
}

module_args.update({
'bootstrap_servers': get_bootstrap_servers(host)
})

return host.ansible('kafka_info', "{{ module_args }}",
check=check,
extra_vars={ 'module_args': module_args })


def produce_messages(topic, messages, **config):
correlation_id = bytes(str(uuid.uuid4()), 'utf-8')

producer = KafkaProducer(**config)

record_metas = []
record_headers = [('correlation_id', correlation_id)]

for message in messages:
fut = producer.send(topic, bytes(message, 'utf-8'), headers=record_headers)
record_metas.append(fut.get())

producer.flush()
producer.close()

return correlation_id, record_metas


def consume_messages(topic, topic_partitions=1, max_messages=1, max_wait_timeout=120,
poll_timeout_ms=100, poll_max_records=10,
from_beginning=False, record_filter=None, **config):

consumer = KafkaConsumer(**config)


# manually assign topic partitions
consumer.assign([TopicPartition(topic, partition)
for partition in range(0, topic_partitions)])

if from_beginning:
consumer.seek_to_beginning()


messages = []
start_ts = time.time() # expressed in seconds
while True:
if len(messages) >= max_messages or (time.time() - start_ts) > max_wait_timeout:
break

batch = consumer.poll(timeout_ms=poll_timeout_ms, max_records=poll_max_records)

for subscription, records in batch.items():
for record in filter(record_filter, records):
messages.append(record.value.decode('utf-8'))

return messages



# ---------------
# Actual user test
# ---------------
def test_user_info_should_return_created_users(host):

kafka_user_info = execute_kafka_info(host, 'user')


assert kafka_user_info['ansible_module_results'] is not None
assert kafka_user_info['ansible_module_results']['users'] is not None

users = kafka_user_info['ansible_module_results']['users']

assert users['alice'] is not None
assert users['alice'][0]['iterations'] == 4096
assert users['alice'][0]['mechanism'] == "SCRAM-SHA-512"

assert users['bob'] is not None
assert users['bob'][0]['iterations'] == 5000
assert users['bob'][0]['mechanism'] == "SCRAM-SHA-256"


def test_client_should_be_able_to_write_and_read_messages(host):
topic = 'scram-test-topic'

messages = [
'Test Message #1',
'Test Message #2',
'Test Message #3'
]

config = {
'bootstrap_servers': get_bootstrap_servers(host, listener_name='OUTSIDE_SASL_PLAINTEXT'),
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'SCRAM-SHA-512',
'sasl_plain_username': 'alice',
'sasl_plain_password': 'changeit'
}

produce_messages(topic, messages, **config)
actual_messages = consume_messages(topic, from_beginning=True, max_messages=3, **config)

print(actual_messages)

assert set(actual_messages) == set(messages)
20 changes: 0 additions & 20 deletions molecule/scram-kafka-270/verify.yml

This file was deleted.

0 comments on commit 8af8e80

Please sign in to comment.