-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsshadduser.py
320 lines (242 loc) · 8.47 KB
/
sshadduser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import grp
from io import StringIO
import logging
import os
import pwd
import random
import string
import subprocess
import sys
import tempfile
import click
format_ = '[%(levelname)s] %(filename)s:%(lineno)d %(message)s'
logging.basicConfig(format=format_)
logger = logging.getLogger()
@click.command()
@click.argument('username', required=True)
@click.argument('groups', required=False, nargs=-1)
@click.version_option()
@click.option('-v', '--verbosity', default='warning',
help='Verbosity: error, warning, info, or debug')
def main(username, groups, verbosity):
'''
sshadduser: grant SSH access in a single step.
USERNAME is the name of the user you wish to create. You may optionally list
one or more supplemental GROUPS to add that user to (separated by spaces),
such as sudo or wheel.
The SSH public keys are read from STDIN, one per line, terminated by a blank
line.
You should run this script as root.
'''
logger.setLevel(_str_to_log_level(verbosity))
try:
if (os.geteuid() != 0):
raise click.ClickException('You must run this tool as root.')
_check_username(username)
_check_groups(groups)
password = _get_password()
shell = _get_shell()
ssh_keys = _get_ssh_keys()
_wait_for(_useradd(username, groups, shell))
_wait_for(_chpasswd(username, password))
_add_authorized_keys(username, ssh_keys)
msg = 'Created an account named {} with password {} and {} SSH key{}.'
msg_args = [
username,
password,
len(ssh_keys),
'' if len(ssh_keys) == 1 else 's'
]
click.secho(msg.format(*msg_args), fg='green')
if len(groups) == 0:
msg = 'Did not any supplemental groups.'
msg_args = []
else:
msg = 'Added supplemental group{}: {}.'
msg_args = ['' if len(groups) == 1 else 's', _commas(groups)]
click.secho(msg.format(*msg_args), fg='green')
except click.ClickException as ce:
click.secho('Error: {}'.format(ce), fg='red', err=True)
sys.exit(1)
def _add_authorized_keys(username, ssh_keys):
'''
Create or append ``ssh_keys`` to the user's ``authorized_keys`` file.
'''
user = pwd.getpwnam(username)
ssh_dir = os.path.join(user.pw_dir, '.ssh')
authorized_keys_path = os.path.join(ssh_dir, 'authorized_keys')
try:
logger.debug('Creating ~/.ssh directory.')
os.mkdir(ssh_dir)
os.chown(ssh_dir, user.pw_uid, user.pw_gid)
os.chmod(ssh_dir, 0o700)
except FileExistsError:
pass
fix_perms = not os.path.exists(authorized_keys_path)
logger.debug('Appending SSH keys to authorized_keys.')
with open(authorized_keys_path, 'w+') as authorized_keys:
for ssh_key in ssh_keys:
authorized_keys.write(ssh_key)
authorized_keys.write('\n')
if fix_perms:
logger.debug('Setting owner/permissions on authorized_keys.')
os.chown(authorized_keys_path, user.pw_uid, user.pw_gid)
os.chmod(authorized_keys_path, 0o600)
def _check_groups(groups):
'''
Ensure that all of the ``groups`` exist.
Raises exception if any group does not exist.
'''
for group in groups:
logger.debug('Checking if group "{}" exists.'.format(group))
try:
grp.getgrnam(group)
except KeyError:
msg = 'Group "{}" does not exist.'.format(group)
raise click.ClickException(msg)
logger.debug('Group "{}" exists.'.format(group))
def _check_username(username):
'''
Ensure that ``username`` does not already exist.
Raises exception if it does.
'''
logger.debug('Checking if username "{}"" exists.'.format(username))
try:
pwd.getpwnam(username)
msg = 'Username "{}" already exists.'.format(username)
raise click.ClickException(msg)
except KeyError:
pass
logger.debug('Username "{}" does not exist.'.format(username))
def _chpasswd(username, password):
'''
Set password using ``chpasswd``.
I don't think that ``chpasswd`` is POSIX compliant but I can't find any
compliant alternative.
'''
process = subprocess.Popen(
['chpasswd'],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
)
credentials = '{}:{}'.format(username, password)
process.communicate(credentials.encode('ascii'))
return process
def _commas(list_):
''' Return an English comma-delimited list. '''
if len(list_) == 1:
return str(list_[0])
elif len(list_) == 2:
return '{} and {}'.format(*list_)
elif len(list_) > 2:
return ', '.join(list_[:-1]) + ', and {}'.format(list_[-1])
else:
raise ValueError('_commas() requires at least one list item.')
def _get_password(length=12):
'''
Prompt for a password. If it's blank, then generate a random password.
'''
logger.debug('Asking for a password.')
if sys.stdin.isatty:
click.secho(
'Enter a password (or leave blank to generate random password):',
fg='green'
)
password = sys.stdin.readline().strip()
if password == '':
charset = string.ascii_uppercase + \
string.ascii_lowercase + \
string.digits
rng = random.SystemRandom()
password = ''.join(rng.choice(charset) for _ in range(length))
logger.debug('Generated a random password.')
else:
logger.debug('Got a password.')
return password
def _get_shell():
'''
Prompt for a path to shell. If blank, use system default.
'''
logger.debug('Asking for a shell.')
if sys.stdin.isatty:
click.secho(
'Path to shell (or leave blank for system default):',
fg='green'
)
shell = sys.stdin.readline().strip()
if shell == '':
shell = None
logger.debug('Using system default shell.')
elif not (os.path.isfile(shell) and os.access(shell, os.X_OK)):
if sys.stdin.isatty:
click.secho(
'Invalid shell: {} (does it exist? is it executable?)'
.format(shell),
fg='red'
)
shell = _get_shell()
else:
raise click.ClickException('Invalid shell: {}'.format(shell))
else:
logger.debug('Shell is: {}.'.format(shell))
return shell
def _get_ssh_keys():
'''
Prompt for SSH keys, one per line, on stdin. A blank line terminates.
Keys are returned as a list.
'''
logger.debug('Asking for SSH keys.')
if sys.stdin.isatty:
click.secho(
'Enter SSH keys one per line. A blank line terminates.',
fg='green'
)
ssh_keys = list()
for line in sys.stdin:
line = line.strip()
if line == '':
break
if not line.startswith('ssh-'):
msg = 'That doesn\'t look like an OpenSSH public key! ' \
'(It should start with "ssh-") {}'
if sys.stdin.isatty:
click.secho(msg, fg='red')
continue
else:
raise click.ClickException(msg.format(line))
ssh_keys.append(line)
logger.debug('SSH keys: got {} lines.'.format(len(ssh_keys)))
return ssh_keys
def _str_to_log_level(log_level_str):
''' Given a string like `warning`, return the corresponding log level. '''
try:
return getattr(logging, log_level_str.upper())
except AttributeError:
msg = 'Invalid log level: {}'.format(log_level_str)
raise click.ClickException(msg)
def _wait_for(process):
''' Wait for ``process`` to finish and check the exit code. '''
process.wait()
if process.returncode != 0:
msg = 'failed to run external tool "{}" (exit {}):\n{}'
params = (
process.args[0],
process.returncode,
process.stderr.read().decode('utf8')
)
raise click.ClickException(msg.format(*params))
def _useradd(username, groups, shell=None):
''' Use POSIX ``useradd`` to add the requested user. '''
command = [
'useradd',
'--create-home',
]
if len(groups):
command.extend(['--groups', ','.join(groups)])
if shell is not None:
command.extend(['--shell', shell])
command.append(username)
logger.debug('Adding user: {}'.format(command))
return subprocess.Popen(command, stderr=subprocess.PIPE)
if __name__ == '__main__':
main()