-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtdm.py
131 lines (89 loc) · 3.25 KB
/
tdm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import os
import re
import socket
import flask
app = flask.Flask(__name__)
HOSTS_DIR = os.environ.get('HOSTS_DIR', 'hosts_dir/')
ALLOWED_DOMAIN = os.environ.get('ALLOWED_DOMAIN', 'example.com')
def _validate_ip(data):
"""Make sure this is IPv4 address.
Taken from https://stackoverflow.com/questions/319279/how-to-validate-ip-address-in-python"""
socket.inet_pton(socket.AF_INET, data)
return data
def _sanitize_hostname(data):
"""Just sanitize hostname into unified form"""
# Strip exactly one dot from the right, if present
if data[-1] == ".":
data = data[:-1]
# Make it lowercase
data = data.lower()
return data
def _validate_hostname(data):
"""Make sure this is suitable for hostname.
Taken from https://stackoverflow.com/questions/2532053/validate-a-hostname-string"""
data = _sanitize_hostname(data)
assert len(data) <= 255, f"Too long hostname {data}: {len(data)} chars"
# Ensures that each segment:
# - contains at least one character and a maximum of 63 characters
# - consists only of allowed characters
# - doesn't begin or end with a hyphen
allowed = re.compile("(?!-)[a-z0-9-]{1,63}(?<!-)$")
assert all(allowed.match(x) for x in data.split(".")), "Hostname segment is not valid"
# Ensure hostname is from allowed domain
assert data.endswith('.' + ALLOWED_DOMAIN), "Hostname have to be in allowed domain"
return data
def _load_hosts():
"""Load hosts in hosts directory."""
hosts = {}
for f in os.listdir(HOSTS_DIR):
f_path = os.path.join(HOSTS_DIR, f)
if os.path.isfile(f_path):
with open(f_path, 'r') as fp:
for line in fp:
line = line.strip()
if line == '' or line.startswith('#'):
continue
ip, hostname = line.split()
hostname = _sanitize_hostname(hostname)
ip = _validate_ip(ip)
assert hostname not in hosts, f"Hostname {hostname} duplicated"
hosts[hostname] = ip
return hosts
def _manage_host(hostname, ip):
"""Add host to the directory"""
hostname = _validate_hostname(hostname)
ip = _validate_ip(ip)
hostname_safe = re.sub(r'[^a-z0-9_-]', '_', hostname.lower())
f_path = os.path.join(HOSTS_DIR, hostname_safe)
with open(f_path, 'w') as fp:
fp.write(f"{ip} {hostname}")
return f"Host {hostname} with IP {ip} added"
@app.route('/', methods=['GET'])
def index():
"""Return dict with managed hosts."""
try:
hosts = _load_hosts()
app.logger.debug(f"Listing {len(hosts)} hosts")
return hosts
except Exception as e:
return {
"result": "failed",
"message": str(e),
}, 500
@app.route('/manage/<string:hostname>', methods=['PUT'])
def manage(hostname):
"""Add or modify host."""
try:
ip = flask.request.remote_addr
message = _manage_host(hostname, ip)
return {
"result": "success",
"message": message,
}
except Exception as e:
return {
"result": "failed",
"message": str(e),
}, 500