1
1
from kubernetes .client .rest import ApiException
2
2
from kubernetes import client
3
3
4
- from typing import Optional
4
+ from typing import Optional , List , Dict
5
5
6
6
7
- def get_crds () -> Optional [dict ]:
7
+ def get_crds () -> Optional [Dict ]:
8
8
crdv1 = client .ApiextensionsV1beta1Api ()
9
9
try :
10
10
crd = crdv1 .list_custom_resource_definition (pretty = "true" )
@@ -14,7 +14,20 @@ def get_crds() -> Optional[dict]:
14
14
return crd .to_dict ()
15
15
16
16
17
- def get_persistent_volumes () -> Optional [dict ]:
17
+ def get_all_mongodb_namespaced (namespace : str ) -> Optional [List ]:
18
+ customv1 = client .CustomObjectsApi ()
19
+ try :
20
+ return list (
21
+ customv1 .list_namespaced_custom_object (
22
+ "mongodb.com" , "v1" , namespace , "mongodb" , pretty = True
23
+ )["items" ]
24
+ )
25
+ except ApiException as e :
26
+ print ("Exception when calling get_namespaced_custom_object %s\n " % e )
27
+ return None
28
+
29
+
30
+ def get_persistent_volumes () -> Optional [Dict ]:
18
31
corev1 = client .CoreV1Api ()
19
32
try :
20
33
pv = corev1 .list_persistent_volume (pretty = "true" )
@@ -24,7 +37,7 @@ def get_persistent_volumes() -> Optional[dict]:
24
37
return pv .to_dict ()
25
38
26
39
27
- def get_stateful_sets_namespaced (namespace : str ) -> Optional [dict ]:
40
+ def get_stateful_sets_namespaced (namespace : str ) -> Optional [Dict ]:
28
41
av1beta1 = client .AppsV1Api ()
29
42
try :
30
43
sst = av1beta1 .list_namespaced_stateful_set (namespace , pretty = "true" )
@@ -34,7 +47,7 @@ def get_stateful_sets_namespaced(namespace: str) -> Optional[dict]:
34
47
return sst .to_dict ()
35
48
36
49
37
- def get_configmap_namespaced (namespace : str , name : str ) -> Optional [dict ]:
50
+ def get_configmap_namespaced (namespace : str , name : str ) -> Optional [Dict ]:
38
51
corev1 = client .CoreV1Api ()
39
52
try :
40
53
config_map = corev1 .read_namespaced_config_map (name , namespace , pretty = "true" )
@@ -44,7 +57,7 @@ def get_configmap_namespaced(namespace: str, name: str) -> Optional[dict]:
44
57
return config_map .to_dict ()
45
58
46
59
47
- def get_pods_namespaced (namespace : str ) -> Optional [list ]:
60
+ def get_pods_namespaced (namespace : str ) -> Optional [List ]:
48
61
corev1 = client .CoreV1Api ()
49
62
try :
50
63
pods = corev1 .list_namespaced_pod (namespace )
@@ -60,6 +73,7 @@ def get_pod_namespaced(namespace: str, pod_name: str) -> Optional[client.V1Pod]:
60
73
pod = corev1 .read_namespaced_pod (name = pod_name , namespace = namespace )
61
74
except ApiException as e :
62
75
print ("Exception when calling read_namespaced_pod: %s\n " % e )
76
+ return None
63
77
return pod
64
78
65
79
0 commit comments