-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathckad_validate.py
220 lines (189 loc) · 9.14 KB
/
ckad_validate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import subprocess
import sys
import logging
import requests
import re
import yaml
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def is_kubectl_available():
"""Check if kubectl is available."""
try:
subprocess.run(["kubectl", "version", "--client"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError:
return False
def run_command(command):
"""Run a shell command and return its output."""
try:
result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logging.error(f"Command failed: {e.stderr}")
sys.exit(1)
def validate_resource(resource_type, resource_name):
"""Validate if a specific Kubernetes resource exists."""
logging.info(f"Validating {resource_type} {resource_name}...")
run_command(f"kubectl get {resource_type} {resource_name}")
def check_pod_state(pod_name, expected_state, namespace="default"):
"""Check if a Kubernetes pod is in a particular state."""
logging.info(f"Checking if pod '{pod_name}' in namespace '{namespace}' is in '{expected_state}' state...")
command = f"kubectl get pod {pod_name} -n {namespace} -o custom-columns=:status.phase --no-headers"
try:
# Run the kubectl command
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
pod_state = result.stdout.strip()
if pod_state == expected_state:
logging.info(f"Pod '{pod_name}' is in the expected '{expected_state}' state.")
return True
else:
logging.error(f"Pod '{pod_name}' is in '{pod_state}' state, expected '{expected_state}'.")
return False
except subprocess.CalledProcessError as e:
logging.error(f"Failed to get the state of the pod: {e}")
return False
def validate_pod_connectivity(pod_name, target_pod_name, target_port):
"""Validate if one pod can connect to another on a specific port."""
logging.info(f"Checking connectivity from {pod_name} to {target_pod_name} on port {target_port}...")
# Define the command to check connectivity using netcat (nc)
command = f"kubectl exec {pod_name} -- nc -zv {target_pod_name} {target_port}"
try:
output = run_command(command)
if "succeeded" in output:
logging.info(f"Connectivity from {pod_name} to {target_pod_name} on port {target_port} is successful.")
else:
logging.error(f"Connectivity test failed from {pod_name} to {target_pod_name} on port {target_port}.")
logging.error(f"Command output: {output}")
sys.exit(1)
except subprocess.CalledProcessError as e:
logging.error(f"Failed to execute connectivity test: {e}")
sys.exit(1)
def validate_url_accessibility(pod_name, url):
"""Validate if a URL is accessible from a specific pod."""
logging.info(f"Checking URL accessibility from {pod_name} to {url}...")
command = f"kubectl exec {pod_name} -- curl -s -o /dev/null -w '%{{http_code}}' {url}"
try:
http_status = run_command(command)
if http_status == "200":
logging.info(f"URL {url} is accessible from pod {pod_name}.")
else:
logging.error(f"URL {url} is not accessible from pod {pod_name}. HTTP status code: {http_status}")
sys.exit(1)
except subprocess.CalledProcessError as e:
logging.error(f"Failed to execute URL accessibility test: {e}")
sys.exit(1)
def validate_external_url_accessibility(url):
"""Validate if a URL is accessible from outside the Kubernetes cluster."""
logging.info(f"Checking external URL accessibility for {url}...")
try:
response = requests.get(url)
if response.status_code == 200:
logging.info(f"URL {url} is externally accessible.")
else:
logging.error(f"URL {url} is not externally accessible. HTTP status code: {response.status_code}")
return False
except requests.RequestException as e:
logging.error(f"Failed to execute external URL accessibility test: {e}")
return False
return True
def check_user_permission(verb, resource, namespace="default", username=None):
"""Check if a specified user has permissions to perform a given action on a resource."""
user_info = f"user '{username}'" if username else "the current user"
logging.info(f"Checking if {user_info} has permission to '{verb}' on '{resource}' in '{namespace}' namespace...")
command = f"kubectl auth can-i {verb} {resource} --namespace {namespace}"
if username:
command += f" --as {username}"
try:
output = run_command(command)
if output.strip() == "yes":
logging.info(f"{user_info} has permission to '{verb}' on '{resource}' in '{namespace}' namespace.")
return True
else:
logging.warning(f"{user_info} does not have permission to '{verb}' on '{resource}' in '{namespace}' namespace.")
return False
except subprocess.CalledProcessError as e:
logging.error(f"Failed to execute permission check: {e}")
sys.exit(1)
def check_controller_enabled(controller_name):
"""Check if a specific Kubernetes controller is enabled."""
logging.info(f"Checking if the controller '{controller_name}' is enabled...")
try:
# This command might vary depending on your cluster setup and the controller
output = run_command("kubectl get deployment -n kube-system")
if controller_name in output:
logging.info(f"Controller '{controller_name}' is enabled.")
return True
else:
logging.warning(f"Controller '{controller_name}' is not enabled or not found.")
return False
except subprocess.CalledProcessError as e:
logging.error(f"Failed to check controller status: {e}")
sys.exit(1)
def check_api_version_available(api_version):
"""Check if a specific version of an API is available in the Kubernetes cluster."""
logging.info(f"Checking if API version '{api_version}' is available...")
try:
output = run_command("kubectl api-versions")
if api_version in output.splitlines():
logging.info(f"API version '{api_version}' is available.")
return True
else:
logging.warning(f"API version '{api_version}' is not available.")
return False
except subprocess.CalledProcessError as e:
logging.error(f"Failed to check API version availability: {e}")
sys.exit(1)
def get_and_verify_pod_attributes(pod_name, namespace="default", attributes_to_verify=None):
"""Get a pod's YAML definition and verify specified attributes."""
logging.info(f"Retrieving YAML for pod {pod_name} in namespace {namespace}...")
command = f"kubectl get po {pod_name} -n {namespace} -o yaml"
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
pod_yaml = result.stdout
pod_data = yaml.safe_load(pod_yaml)
if attributes_to_verify:
for attribute, expected_value in attributes_to_verify.items():
actual_value = get_nested_attribute(pod_data, attribute.split('.'))
if actual_value != expected_value:
logging.error(f"Attribute '{attribute}' has value '{actual_value}', expected '{expected_value}'")
return False
else:
logging.info(f"Attribute '{attribute}' correctly has value '{expected_value}'")
return True
except subprocess.CalledProcessError as e:
logging.error(f"Failed to retrieve YAML: {e}")
return False
except Exception as e:
logging.error(f"Error processing YAML: {e}")
return False
def get_nested_attribute(data, attribute_path):
"""Recursively fetch a nested attribute."""
for key in attribute_path:
if isinstance(data, dict) and key in data:
data = data[key]
else:
return None
return data
def execute_validation(exercise_number):
"""Execute validation based on exercise number using a dynamic approach."""
try:
function_name = f"validate_exercise_{exercise_number}"
validation_function = getattr(sys.modules[__name__], function_name)
validation_function()
except AttributeError:
logging.error(f"No validation function found for exercise number {exercise_number}")
sys.exit(1)
except Exception as e:
logging.error(f"An error occurred during validation: {e}")
sys.exit(1)
def main():
if not is_kubectl_available():
logging.error("kubectl is not available. Please install it and ensure it's in your PATH.")
sys.exit(1)
try:
exercise_number = int(input("Enter the exercise number to validate: "))
execute_validation(exercise_number)
except ValueError:
logging.error("Please enter a valid number.")
sys.exit(1)
if __name__ == "__main__":
main()