-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathmac.py
221 lines (180 loc) · 8.31 KB
/
mac.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import os
import re
import subprocess
from ipaddress import ip_interface
from .posix import PosixProcessProvider
from .provider import FirewallProvider, RouteProvider, SplitDNSProvider
from .util import get_executable
class PsProvider(PosixProcessProvider):
def __init__(self):
self.lsof = get_executable('/usr/sbin/lsof')
self.ps = get_executable('/bin/ps')
def pid2exe(self, pid):
info = subprocess.check_output([self.lsof, '-p', str(pid)], universal_newlines=True)
for line in info.splitlines():
parts = line.split()
if parts[3] == 'txt':
return parts[8]
def ppid_of(self, pid=None):
if pid is None:
return os.getppid()
try:
return int(subprocess.check_output([self.ps, '-p', str(pid), '-o', 'ppid=']))
except ValueError:
return None
class BSDRouteProvider(RouteProvider):
def __init__(self):
self.route = get_executable('/sbin/route')
self.ifconfig = get_executable('/sbin/ifconfig')
def _route(self, *args):
return subprocess.check_output([self.route, '-n'] + list(map(str, args)), universal_newlines=True)
def _ifconfig(self, *args):
return subprocess.check_output([self.ifconfig] + list(map(str, args)), universal_newlines=True)
def _family_option(self, destination):
return '-inet6' if destination.version == 6 else '-inet'
def add_route(self, destination, *, via=None, dev=None, src=None, mtu=None):
args = ['add', self._family_option(destination)]
if mtu is not None:
args.extend(('-mtu', str(mtu)))
if via is not None:
args.extend((destination, via))
elif dev is not None:
args.extend(('-interface', destination, dev))
self._route(*args)
replace_route = add_route
def remove_route(self, destination):
self._route('delete', self._family_option(destination), destination)
def get_route(self, destination):
# Format of BSD route get output: https://unix.stackexchange.com/questions/53446
info = self._route('get', self._family_option(destination), destination)
lines = iter(info.splitlines())
info_d = {}
for line in lines:
if ':' not in line:
keys = line.split()
vals = next(lines).split()
info_d.update(zip(keys, vals))
break
key, val = line.split(':', 1)
info_d[key.strip()] = val.strip()
if 'gateway' in info_d or 'interface' in info_d:
return {
'via': info_d.get('gateway', None),
'dev': info_d.get('interface', None),
'mtu': info_d.get('mtu', None),
}
def flush_cache(self):
pass
_LINK_INFO_RE = re.compile(r'flags=\d<(.*?)>\smtu\s(\d+)$')
def get_link_info(self, device):
info = self._ifconfig(device)
match = self._LINK_INFO_RE.search(info)
if match:
flags = match.group(1).split(',')
mtu = int(match.group(2))
return {
'state': 'UP' if 'UP' in flags else 'DOWN',
'mtu': mtu,
}
return None
def set_link_info(self, device, state, mtu=None):
args = [device]
if state is not None:
args.append(state)
if mtu is not None:
args.extend(('mtu', str(mtu)))
self._ifconfig(*args)
def add_address(self, device, address):
address = ip_interface(address)
if address.version == 6:
self._ifconfig(device, 'inet6', address)
else:
# Repetition of the IP address is the correct syntax for a point-to-point interface
# with BSD ifconfig. See example in default vpnc-script:
# https://gitlab.com/openconnect/vpnc-scripts/blob/https://gitlab.com/openconnect/vpnc-scripts/blob/921e8760/vpnc-script#L193
self._ifconfig(device, 'inet', address.ip, address.ip, 'netmask', '255.255.255.255')
class MacSplitDNSProvider(SplitDNSProvider):
def configure_domain_vpn_dns(self, domains, nameservers):
if not os.path.exists('/etc/resolver'):
os.makedirs('/etc/resolver')
for domain in domains:
resolver_file_name = f"/etc/resolver/{domain}"
with open(resolver_file_name, "w") as resolver_file:
for nameserver in nameservers:
resolver_file.write(f"nameserver {nameserver}\n")
def deconfigure_domain_vpn_dns(self, domains, nameservers):
for domain in domains:
resolver_file_name = f"/etc/resolver/{domain}"
if os.path.exists(resolver_file_name):
os.remove(resolver_file_name)
if not len(os.listdir('/etc/resolver')):
os.removedirs('/etc/resolver')
class PfFirewallProvider(FirewallProvider):
def __init__(self):
self.pfctl = get_executable('/sbin/pfctl')
_PF_TOKEN_RE = re.compile(r'Token : (\d+)')
_PF_ANCHOR = 'vpn_slice'
_PF_CONF_FILE = '/etc/pf.conf'
def _reload_conf(self):
cmd = [self.pfctl, '-f', self._PF_CONF_FILE]
p = subprocess.Popen(cmd, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
output, stderr = p.communicate()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, cmd, output=output, stderr=stderr)
def configure_firewall(self, device):
# Enabled Packet Filter - increments a reference counter for processes that need packet filter enabled
cl = [self.pfctl, '-E']
p = subprocess.Popen(cl, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
output, stderr = p.communicate()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, cl, output=output, stderr=stderr)
# store token returned to later be able to decrement the reference counter correctly
enable_token = None
for line in stderr.splitlines():
match = self._PF_TOKEN_RE.search(line)
if match:
enable_token = match.group(1)
if not enable_token:
print("WARNING: failed to get pf enable reference token, packet filter might not shutdown correctly")
anchor = f'{self._PF_ANCHOR}/{device}'
# add anchor to generate rules with
with open(self._PF_CONF_FILE, 'a') as file:
file.write(f'anchor "{anchor}" # vpn-slice-{device} AUTOCREATED {enable_token}\n')
# reload config file
self._reload_conf()
p = subprocess.Popen([self.pfctl, '-a', anchor, '-f', '-'],
universal_newlines=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
rules = '''pass out on {0} all keep state
block drop in on {0} all
'''.format(device)
output, stderr = p.communicate(rules)
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, cl, output=output, stderr=stderr)
def deconfigure_firewall(self, device):
# disable anchor
anchor = f'{self._PF_ANCHOR}/{device}'
subprocess.check_call([self.pfctl, '-a', anchor, '-F', 'all'])
with open(self._PF_CONF_FILE, 'r') as file:
lines = file.readlines()
enable_tokens = []
rule_re = re.compile(r'vpn-slice-{} AUTOCREATED (\d+)'.format(device))
with open(self._PF_CONF_FILE, 'w') as file:
for line in lines:
match = rule_re.search(line)
if match:
enable_tokens.append(match.group(1))
else:
file.write(line)
# decrement pf enable reference counter
for token in enable_tokens:
cl = [self.pfctl, '-X', token]
p = subprocess.Popen(cl, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
output, stderr = p.communicate()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, cl, output=output, stderr=stderr)
if not enable_tokens:
print("WARNING: failed to get pf enable reference token, packet filter might not have shutdown correctly")
self._reload_conf()