-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathicinga2api.py
120 lines (106 loc) · 3.57 KB
/
icinga2api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# create hosts in icinga2 api
import requests
import json
import sys
import time
# disable insecure ssl warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# obviously vagrant
API_USER = 'root'
API_PASS = 'icinga'
API_HOST = '192.168.33.5'
# obviously obvious
base_url = 'https://' + API_HOST + ':5665/v1'
r_header = {'Accept': 'application/json'}
# sleep interval
sleep_interval = 5
class Icinga2Api(object):
# get stuff from API
def rGet(self, uri):
try:
r = requests.get(
base_url + uri,
auth = (API_USER, API_PASS),
headers = r_header,
verify = False)
except Exception as e:
print e
sys.exit(1)
return r
# put stuff to API
def rPut(self, uri, payload = None):
try:
r = requests.put(
base_url + uri,
auth=(API_USER, API_PASS),
headers=r_header,
data=json.dumps(payload),
verify=False)
except Exception as e:
print e
sys.exit(1)
return r
# delete stuff from API
def rDelete(self, uri):
try:
r = requests.delete(
base_url + uri,
auth=(API_USER, API_PASS),
headers=r_header,
verify=False)
except Exception as e:
print e
sys.exit(1)
return r
def createHost(self, host):
data = { "attrs": { "address": "127.0.0.1", "check_command": "dummy" } }
r = self.rPut('/objects/hosts/' + host, data)
if r.status_code != 200:
print r.text
sys.exit(1)
return r
def createService(self, host, service):
data = { "templates":[ "generic-service" ],"attrs": { "display_name": service , "check_command" : "dummy", "host_name": host } }
r = self.rPut('/objects/services/' + host + '!' + service, data)
if r.status_code != 200:
print r.text
sys.exit(1)
return r
def rDeleteHostCascade(self, host):
r = self.rDelete('/objects/hosts/' + host + '?cascade=1')
if r.status_code != 200:
print r.text
sys.exit(1)
return r
def apiCheck(self):
# check if API is working
api_test = self.rGet('/status')
if api_test.status_code != 200:
print "ooops, the API got sour"
print api_test.text
sys.exit(1)
else:
print "API alive"
def genRandomHostsServices(self, hn = 'host', hc = 10, sn = 'service', sc = 10):
"""
this generates hosts & services:
* hn -> static part of hostname to generate
* hc -> number of hosts to generate
* sn -> static part of servicename to generate
* sc -> number of services to generate
"""
lc = 0
for i in range(1, hc+1): # generate random hosts
lc += 1
ht = hn + str(i).zfill(4) # fill i with zeroes
self.createHost(ht)
for k in range(1, sc+1): # generate random hosts
lc += 1
st = sn + str(k).zfill(4) # fill k with zeroes
self.createService(ht, st)
sys.stdout.write("\r\t%i\tof %i host(s) created" % (i, hc))
sys.stdout.flush()
sys.stdout.write("\n")
print 'created all in all %i hosts with %i services each' % (hc, sc)