Skip to content

Commit 53e4d6a

Browse files
Merge pull request #217 from splunk/crud-tests
Crud tests
2 parents a20f5aa + b7a2b08 commit 53e4d6a

File tree

2 files changed

+263
-17
lines changed

2 files changed

+263
-17
lines changed

test/commonkafka.py

Lines changed: 116 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import requests
1919
import sys
2020
import json
21+
import time
2122

2223
logger = logging.getLogger(__name__)
2324
logger.setLevel(logging.INFO)
@@ -31,25 +32,132 @@ def create_kafka_connector(setup, params):
3132
'''
3233
Create kafka connect connector using kafka connect REST API
3334
'''
34-
response = requests.post(url=setup["kafka_connect_url"]+"/connectors", data=json.dumps(params),
35+
response = requests.post(url=setup["kafka_connect_url"] + "/connectors", data=json.dumps(params),
3536
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
37+
if response.status_code != 201:
38+
logger.error("Failed to create connector, response code - ", response.status_code)
39+
return False
40+
else:
41+
status = get_kafka_connector_status(setup, params)
42+
while status is not None and status["connector"]["state"] != "RUNNING":
43+
status = get_kafka_connector_status(setup, params)
44+
45+
time.sleep(10)
46+
status = get_kafka_connector_status(setup, params)
47+
if status is not None and len(status["tasks"]) > 0 and status["tasks"][0]["state"] == "RUNNING":
48+
logger.info("Created connector successfully - " + json.dumps(params))
49+
return True
50+
51+
logger.error("Failed to create connector, connector and tasks are not in a RUNNING state after 10 seconds")
52+
return False
3653

37-
if response.status_code == 201:
38-
logger.info("Created connector successfully - " + json.dumps(params))
54+
def update_kafka_connector(setup, params):
55+
'''
56+
Update kafka connect connector using kafka connect REST API
57+
'''
58+
response = requests.put(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/config", data=json.dumps(params["config"]),
59+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
60+
61+
if response.status_code == 200:
62+
status = get_kafka_connector_status(setup, params)
63+
while status is not None and status["connector"]["state"] != "RUNNING":
64+
status = get_kafka_connector_status(setup, params)
65+
logger.info("Updated connector successfully - " + json.dumps(params))
3966
return True
4067

68+
logger.error("Failed to update connector, response code - ", response.status_code)
4169
return False
4270

4371
def delete_kafka_connector(setup, params):
4472
'''
4573
Delete kafka connect connector using kafka connect REST API
4674
'''
47-
response = requests.delete(url=setup["kafka_connect_url"]+"/connectors/" + params["name"],
75+
response = requests.delete(url=setup["kafka_connect_url"] + "/connectors/" + params["name"],
4876
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
77+
4978
if response.status_code == 204:
5079
logger.info("Deleted connector successfully - " + json.dumps(params))
5180
return True
52-
else:
53-
logger.error("failed to create connector", param)
54-
logger.error(response.status_code)
55-
return False
81+
82+
logger.error("Failed to delete connector, response code - ", response.status_code)
83+
return False
84+
85+
def get_kafka_connector_tasks(setup, params):
86+
'''
87+
Get kafka connect connector tasks using kafka connect REST API
88+
'''
89+
90+
t_end = time.time() + 10
91+
while time.time() < t_end:
92+
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks",
93+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
94+
status = response.status_code
95+
if status == 200:
96+
return len(response.json())
97+
98+
return 0
99+
100+
def get_kafka_connector_status(setup, params):
101+
'''
102+
Get kafka connect connector tasks using kafka connect REST API
103+
'''
104+
t_end = time.time() + 10
105+
while time.time() < t_end:
106+
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/status",
107+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
108+
status = response.status_code
109+
if status == 200:
110+
return response.json()
111+
112+
return None
113+
114+
def pause_kafka_connector(setup, params):
115+
'''
116+
Pause kafka connect connector using kafka connect REST API
117+
'''
118+
response = requests.put(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/pause",
119+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
120+
121+
if response.status_code == 202:
122+
status = get_kafka_connector_status(setup, params)
123+
while status is not None and status["connector"]["state"] != "PAUSED":
124+
status = get_kafka_connector_status(setup, params)
125+
logger.info("Paused connector successfully")
126+
return True
127+
128+
logger.error("Failed to pause connector, response code - ", response.status_code)
129+
return False
130+
131+
def resume_kafka_connector(setup, params):
132+
'''
133+
Resume kafka connect connector using kafka connect REST API
134+
'''
135+
response = requests.put(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/resume",
136+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
137+
138+
if response.status_code == 202:
139+
status = get_kafka_connector_status(setup, params)
140+
while status is not None and status["connector"]["state"] != "RUNNING":
141+
status = get_kafka_connector_status(setup, params)
142+
logger.info("Resumed connector successfully")
143+
return True
144+
145+
logger.error("Failed to resume connector, response code - ", response.status_code)
146+
return False
147+
148+
def restart_kafka_connector(setup, params):
149+
'''
150+
Restart kafka connect connector using kafka connect REST API
151+
'''
152+
response = requests.post(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/restart",
153+
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
154+
155+
if response.status_code == 200 or response.status_code == 204:
156+
status = get_kafka_connector_status(setup, params)
157+
while status is not None and status["connector"]["state"] != "RUNNING":
158+
status = get_kafka_connector_status(setup, params)
159+
logger.info("Restarted connector successfully")
160+
return True
161+
162+
logger.error("Failed to restart connector, response code - ", response.status_code)
163+
return False
Lines changed: 147 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pytest
22
import logging
33
import sys
4-
from ..commonkafka import create_kafka_connector, delete_kafka_connector
4+
import time
5+
from ..commonkafka import create_kafka_connector, delete_kafka_connector, update_kafka_connector, get_kafka_connector_tasks, get_kafka_connector_status, pause_kafka_connector, resume_kafka_connector, restart_kafka_connector
56

67
logger = logging.getLogger(__name__)
78
logger.setLevel(logging.INFO)
@@ -11,13 +12,13 @@
1112
logger.addHandler(handler)
1213

1314
@pytest.mark.parametrize("test_input,expected", [
14-
("create_and_delete_valid_task", True)
15+
("test_valid_CRUD_tasks", True)
1516
])
16-
def test_create_and_delete_valid_task(setup, test_input, expected):
17+
def test_valid_CRUD_tasks(setup, test_input, expected):
1718
'''
18-
Test that valid kafka connect task can be created
19+
Test that valid kafka connect task can be created, updated, paused, resumed, restarted and deleted
1920
'''
20-
logger.info("testing create_and_delete_valid_task input={0} expected={1} ".format(
21+
logger.info("testing test_valid_CRUD_tasks input={0} expected={1} ".format(
2122
test_input, expected))
2223

2324
# defining a connector definition dict for the parameters to be sent to the API
@@ -26,16 +27,153 @@ def test_create_and_delete_valid_task(setup, test_input, expected):
2627
"config": {
2728
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
2829
"tasks.max": "3",
29-
"topics": "test-datagen", # set kafka topic later
30-
"splunk.indexes": setup["kafka_topic"],
30+
"topics": setup["kafka_topic"],
31+
"splunk.indexes": setup["splunk_index"],
3132
"splunk.hec.uri": setup["splunkd_url"],
3233
"splunk.hec.token": setup["splunk_token"],
3334
"splunk.hec.raw": "false",
3435
"splunk.hec.ack.enabled": "false",
35-
"splunk.hec.ssl.validate.certs": "true"
36+
"splunk.hec.ssl.validate.certs": "false"
3637
}
3738
}
3839

40+
#Validate create task
3941
assert create_kafka_connector(setup, connector_definition) == expected
4042

41-
assert delete_kafka_connector(setup, connector_definition) == expected
43+
# updating the definition to use 5 tasks instead of 3
44+
connector_definition = {
45+
"name": "kafka-connect-splunk",
46+
"config": {
47+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
48+
"tasks.max": "5",
49+
"topics": setup["kafka_topic"],
50+
"splunk.indexes": setup["splunk_index"],
51+
"splunk.hec.uri": setup["splunkd_url"],
52+
"splunk.hec.token": setup["splunk_token"],
53+
"splunk.hec.raw": "false",
54+
"splunk.hec.ack.enabled": "false",
55+
"splunk.hec.ssl.validate.certs": "false"
56+
}
57+
}
58+
59+
# Validate update task
60+
assert update_kafka_connector(setup, connector_definition) == expected
61+
62+
# Validate get tasks
63+
tasks = get_kafka_connector_tasks(setup, connector_definition)
64+
assert tasks == int(connector_definition["config"]["tasks.max"])
65+
66+
# Validate pause task
67+
assert pause_kafka_connector(setup, connector_definition) == expected
68+
69+
# Validate resume task
70+
assert resume_kafka_connector(setup, connector_definition) == expected
71+
72+
# Validate restart task
73+
assert restart_kafka_connector(setup, connector_definition) == expected
74+
75+
# Validate delete task
76+
assert delete_kafka_connector(setup, connector_definition) == expected
77+
78+
79+
@pytest.mark.parametrize("test_input,expected", [
80+
("test_invalid_CRUD_tasks", False)
81+
])
82+
def test_invalid_CRUD_tasks(setup, test_input, expected):
83+
'''
84+
Test that invalid kafka connect task cannot be created
85+
'''
86+
logger.info("testing test_invalid_CRUD_tasks input={0} expected={1} ".format(
87+
test_input, expected))
88+
89+
# connector definition with tasks.max invalid(not number)
90+
connector_definition_invalid_tasks = {
91+
"name": "kafka-connect-splunk",
92+
"config": {
93+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
94+
"tasks.max": "dummy-string",
95+
"topics": setup["kafka_topic"],
96+
"splunk.indexes": setup["splunk_index"],
97+
"splunk.hec.uri": setup["splunkd_url"],
98+
"splunk.hec.token": setup["splunk_token"],
99+
"splunk.hec.raw": "false",
100+
"splunk.hec.ack.enabled": "false",
101+
"splunk.hec.ssl.validate.certs": "false"
102+
}
103+
}
104+
105+
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected
106+
107+
# connector definition with splunk.hec.raw invalid(not boolean)
108+
connector_definition_invalid_tasks = {
109+
"name": "kafka-connect-splunk",
110+
"config": {
111+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
112+
"tasks.max": "3",
113+
"topics": setup["kafka_topic"],
114+
"splunk.indexes": setup["splunk_index"],
115+
"splunk.hec.uri": setup["splunkd_url"],
116+
"splunk.hec.token": setup["splunk_token"],
117+
"splunk.hec.raw": "disable",
118+
"splunk.hec.ack.enabled": "false",
119+
"splunk.hec.ssl.validate.certs": "false"
120+
}
121+
}
122+
123+
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected
124+
125+
# connector definition with topics invalid(empty string)
126+
connector_definition_invalid_tasks = {
127+
"name": "kafka-connect-splunk",
128+
"config": {
129+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
130+
"tasks.max": "3",
131+
"topics": "",
132+
"splunk.indexes": setup["splunk_index"],
133+
"splunk.hec.uri": setup["splunkd_url"],
134+
"splunk.hec.token": setup["splunk_token"],
135+
"splunk.hec.raw": "false",
136+
"splunk.hec.ack.enabled": "false",
137+
"splunk.hec.ssl.validate.certs": "false"
138+
}
139+
}
140+
141+
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected
142+
143+
# connector definition with splunk.hec.json.event.enrichment invalid(non key value pairs)
144+
connector_definition_invalid_tasks = {
145+
"name": "kafka-connect-splunk",
146+
"config": {
147+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
148+
"tasks.max": "3",
149+
"topics": setup["kafka_topic"],
150+
"splunk.indexes": setup["splunk_index"],
151+
"splunk.hec.uri": setup["splunkd_url"],
152+
"splunk.hec.token": setup["splunk_token"],
153+
"splunk.hec.raw": "false",
154+
"splunk.hec.ack.enabled": "false",
155+
"splunk.hec.ssl.validate.certs": "false",
156+
"splunk.hec.json.event.enrichment": "testing-testing non KV"
157+
}
158+
}
159+
160+
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected
161+
162+
# connector definition with splunk.hec.json.event.enrichment invalid(key value pairs not separated by commas)
163+
connector_definition_invalid_tasks = {
164+
"name": "kafka-connect-splunk",
165+
"config": {
166+
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
167+
"tasks.max": "3",
168+
"topics": setup["kafka_topic"],
169+
"splunk.indexes": setup["splunk_index"],
170+
"splunk.hec.uri": setup["splunkd_url"],
171+
"splunk.hec.token": setup["splunk_token"],
172+
"splunk.hec.raw": "false",
173+
"splunk.hec.ack.enabled": "false",
174+
"splunk.hec.ssl.validate.certs": "false",
175+
"splunk.hec.json.event.enrichment": "key1=value1 key2=value2"
176+
}
177+
}
178+
179+
assert create_kafka_connector(setup, connector_definition_invalid_tasks) == expected

0 commit comments

Comments
 (0)