Skip to content

Commit b7a2b08

Browse files
refactor for comments
1 parent 8875af7 commit b7a2b08

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

test/commonkafka.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@ def create_kafka_connector(setup, params):
3535
response = requests.post(url=setup["kafka_connect_url"] + "/connectors", data=json.dumps(params),
3636
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
3737
if response.status_code != 201:
38+
logger.error("Failed to create connector, response code - ", response.status_code)
3839
return False
3940
else:
4041
status = get_kafka_connector_status(setup, params)
4142
while status is not None and status["connector"]["state"] != "RUNNING":
4243
status = get_kafka_connector_status(setup, params)
4344

44-
t_end = time.time() + 10
45-
while time.time() < t_end:
46-
status = get_kafka_connector_status(setup, params)
47-
48-
if len(status["tasks"]) > 0 and status["tasks"][0]["state"] == "RUNNING":
45+
time.sleep(10)
46+
status = get_kafka_connector_status(setup, params)
47+
if status is not None and len(status["tasks"]) > 0 and status["tasks"][0]["state"] == "RUNNING":
4948
logger.info("Created connector successfully - " + json.dumps(params))
5049
return True
5150

51+
logger.error("Failed to create connector, connector and tasks are not in a RUNNING state after 10 seconds")
5252
return False
5353

5454
def update_kafka_connector(setup, params):
@@ -65,6 +65,7 @@ def update_kafka_connector(setup, params):
6565
logger.info("Updated connector successfully - " + json.dumps(params))
6666
return True
6767

68+
logger.error("Failed to update connector, response code - ", response.status_code)
6869
return False
6970

7071
def delete_kafka_connector(setup, params):
@@ -78,31 +79,37 @@ def delete_kafka_connector(setup, params):
7879
logger.info("Deleted connector successfully - " + json.dumps(params))
7980
return True
8081

82+
logger.error("Failed to delete connector, response code - ", response.status_code)
8183
return False
8284

8385
def get_kafka_connector_tasks(setup, params):
8486
'''
8587
Get kafka connect connector tasks using kafka connect REST API
8688
'''
87-
status = -1
88-
while status != 200:
89+
90+
t_end = time.time() + 10
91+
while time.time() < t_end:
8992
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/tasks",
9093
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
9194
status = response.status_code
95+
if status == 200:
96+
return len(response.json())
9297

93-
return len(response.json())
98+
return 0
9499

95100
def get_kafka_connector_status(setup, params):
96101
'''
97102
Get kafka connect connector tasks using kafka connect REST API
98103
'''
99-
status = -1
100-
while status != 200:
104+
t_end = time.time() + 10
105+
while time.time() < t_end:
101106
response = requests.get(url=setup["kafka_connect_url"] + "/connectors/" + params["name"] + "/status",
102107
headers={'Accept': 'application/json', 'Content-Type': 'application/json'})
103108
status = response.status_code
109+
if status == 200:
110+
return response.json()
104111

105-
return response.json()
112+
return None
106113

107114
def pause_kafka_connector(setup, params):
108115
'''
@@ -118,6 +125,7 @@ def pause_kafka_connector(setup, params):
118125
logger.info("Paused connector successfully")
119126
return True
120127

128+
logger.error("Failed to pause connector, response code - ", response.status_code)
121129
return False
122130

123131
def resume_kafka_connector(setup, params):
@@ -134,6 +142,7 @@ def resume_kafka_connector(setup, params):
134142
logger.info("Resumed connector successfully")
135143
return True
136144

145+
logger.error("Failed to resume connector, response code - ", response.status_code)
137146
return False
138147

139148
def restart_kafka_connector(setup, params):
@@ -150,4 +159,5 @@ def restart_kafka_connector(setup, params):
150159
logger.info("Restarted connector successfully")
151160
return True
152161

162+
logger.error("Failed to restart connector, response code - ", response.status_code)
153163
return False

test/kafka_connect_sink_crud_tests/test_crud.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ def test_valid_CRUD_tasks(setup, test_input, expected):
2727
"config": {
2828
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
2929
"tasks.max": "3",
30-
"topics": "test-datagen",
31-
"splunk.indexes": setup["kafka_topic"],
30+
"topics": setup["kafka_topic"],
31+
"splunk.indexes": setup["splunk_index"],
3232
"splunk.hec.uri": setup["splunkd_url"],
3333
"splunk.hec.token": setup["splunk_token"],
3434
"splunk.hec.raw": "false",
@@ -46,8 +46,8 @@ def test_valid_CRUD_tasks(setup, test_input, expected):
4646
"config": {
4747
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
4848
"tasks.max": "5",
49-
"topics": "test-datagen",
50-
"splunk.indexes": setup["kafka_topic"],
49+
"topics": setup["kafka_topic"],
50+
"splunk.indexes": setup["splunk_index"],
5151
"splunk.hec.uri": setup["splunkd_url"],
5252
"splunk.hec.token": setup["splunk_token"],
5353
"splunk.hec.raw": "false",
@@ -77,7 +77,7 @@ def test_valid_CRUD_tasks(setup, test_input, expected):
7777

7878

7979
@pytest.mark.parametrize("test_input,expected", [
80-
("create_and_update_valid_task", False)
80+
("test_invalid_CRUD_tasks", False)
8181
])
8282
def test_invalid_CRUD_tasks(setup, test_input, expected):
8383
'''
@@ -92,8 +92,8 @@ def test_invalid_CRUD_tasks(setup, test_input, expected):
9292
"config": {
9393
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
9494
"tasks.max": "dummy-string",
95-
"topics": "test-datagen",
96-
"splunk.indexes": setup["kafka_topic"],
95+
"topics": setup["kafka_topic"],
96+
"splunk.indexes": setup["splunk_index"],
9797
"splunk.hec.uri": setup["splunkd_url"],
9898
"splunk.hec.token": setup["splunk_token"],
9999
"splunk.hec.raw": "false",
@@ -110,8 +110,8 @@ def test_invalid_CRUD_tasks(setup, test_input, expected):
110110
"config": {
111111
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
112112
"tasks.max": "3",
113-
"topics": "test-datagen",
114-
"splunk.indexes": setup["kafka_topic"],
113+
"topics": setup["kafka_topic"],
114+
"splunk.indexes": setup["splunk_index"],
115115
"splunk.hec.uri": setup["splunkd_url"],
116116
"splunk.hec.token": setup["splunk_token"],
117117
"splunk.hec.raw": "disable",
@@ -129,7 +129,7 @@ def test_invalid_CRUD_tasks(setup, test_input, expected):
129129
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
130130
"tasks.max": "3",
131131
"topics": "",
132-
"splunk.indexes": setup["kafka_topic"],
132+
"splunk.indexes": setup["splunk_index"],
133133
"splunk.hec.uri": setup["splunkd_url"],
134134
"splunk.hec.token": setup["splunk_token"],
135135
"splunk.hec.raw": "false",
@@ -146,8 +146,8 @@ def test_invalid_CRUD_tasks(setup, test_input, expected):
146146
"config": {
147147
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
148148
"tasks.max": "3",
149-
"topics": "test-datagen",
150-
"splunk.indexes": setup["kafka_topic"],
149+
"topics": setup["kafka_topic"],
150+
"splunk.indexes": setup["splunk_index"],
151151
"splunk.hec.uri": setup["splunkd_url"],
152152
"splunk.hec.token": setup["splunk_token"],
153153
"splunk.hec.raw": "false",
@@ -165,8 +165,8 @@ def test_invalid_CRUD_tasks(setup, test_input, expected):
165165
"config": {
166166
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
167167
"tasks.max": "3",
168-
"topics": "test-datagen",
169-
"splunk.indexes": setup["kafka_topic"],
168+
"topics": setup["kafka_topic"],
169+
"splunk.indexes": setup["splunk_index"],
170170
"splunk.hec.uri": setup["splunkd_url"],
171171
"splunk.hec.token": setup["splunk_token"],
172172
"splunk.hec.raw": "false",

0 commit comments

Comments
 (0)