-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Script to enroll Yubikey PIV Certificates
The script creates a certificate signing request on the Yubikey and sends it to privacyIDEA to get it signed. It also creates an attestation certificate to attest the CSR coming from the Yubikey hardware (currently privacyIDEA does not support this). This script is still work in progress. Working on #42 #43
- Loading branch information
Showing
3 changed files
with
347 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,337 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Create a PIV Certificate on a YubiKey and send it to privacyIDEA for validation | ||
and enrolment. | ||
TODO: | ||
- implement a client-wait/2-step functionality in case the CA can not | ||
automatically sign the request | ||
- proper checking of response signature | ||
- integrate into privacyidea admin tool | ||
- check received certificates | ||
""" | ||
import binascii | ||
import json | ||
import os | ||
import sys | ||
import click | ||
import logging | ||
|
||
import requests | ||
import smartcard | ||
from cryptography.hazmat.primitives import serialization, hashes | ||
from cryptography.hazmat.primitives.asymmetric import padding | ||
from cryptography.hazmat.backends import default_backend | ||
from cryptography import x509 | ||
import ykman.logging_setup | ||
from ykman.cli.__main__ import _run_cmd_for_serial, _run_cmd_for_single | ||
from ykman.cli.piv import (_ensure_authenticated, click_parse_management_key, | ||
_verify_pin) | ||
from ykman.cli.util import UpperCaseChoice, YkmanContextObject | ||
from ykman.descriptor import get_descriptors, list_devices | ||
from ykman.driver_ccid import list_readers, APDUError, SW | ||
from ykman.piv import PivController, ALGO, SLOT, KeypairMismatch | ||
from ykman.util import Cve201715361VulnerableError, YUBIKEY, TRANSPORT | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
__version__ = '0.0.1' | ||
|
||
CLICK_CONTEXT_SETTINGS = dict( | ||
help_option_names=['-h', '--help'], | ||
max_content_width=999 | ||
) | ||
click_management_key_option = click.option( | ||
'-m', '--management-key', | ||
help='The management key.', | ||
callback=click_parse_management_key, | ||
default='010203040506070801020304050607080102030405060708') | ||
click_pin_option = click.option( | ||
'-P', '--pin', help='PIN code.', | ||
default='123456') | ||
|
||
|
||
def _prompt_for_touch(): | ||
try: | ||
click.echo('Touch your YubiKey...', err=True) | ||
except Exception: | ||
sys.stderr.write('Touch your YubiKey...\n') | ||
|
||
|
||
def _call_pi(url, data, headers=None, pi_key=None): | ||
r = requests.post(url, data=data, headers=headers) | ||
# TODO: check response | ||
rj = r.json() | ||
if pi_key: | ||
sig = rj['signature'] | ||
msg = {k: v for k, v in rj.items() if k != 'signature'} | ||
sigver, signature = sig.split(':') | ||
pi_key.verify(binascii.unhexlify(signature), | ||
json.dumps(msg, sort_keys=True).encode('utf8'), | ||
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), | ||
salt_length=padding.PSS.MAX_LENGTH), | ||
hashes.SHA256()) | ||
return rj | ||
|
||
|
||
@click.group(context_settings=CLICK_CONTEXT_SETTINGS) | ||
@click.version_option(__version__) | ||
@click.option('-l', '--log-level', default=None, | ||
type=UpperCaseChoice(ykman.logging_setup.LOG_LEVEL_NAMES), | ||
help='Enable logging at given verbosity level.') | ||
@click.option('--log-file', default=None, | ||
type=str, metavar='FILE', | ||
help='Write logs to the given FILE instead of standard error; ' | ||
'ignored unless --log-level is also set.') | ||
@click.pass_context | ||
def cli(ctx, log_level, log_file): | ||
ctx.obj = YkmanContextObject() | ||
|
||
if log_level: | ||
ykman.logging_setup.setup(log_level, log_file=log_file) | ||
|
||
|
||
@cli.command('list-reader') | ||
@click.pass_context | ||
def yk_list_readers(ctx): | ||
""" | ||
List connected SmartCard readers. | ||
""" | ||
for reader in list_readers(): | ||
click.echo(reader.name) | ||
ctx.exit() | ||
|
||
|
||
@cli.command('list-keys') | ||
@click.pass_context | ||
def yk_list_keys(ctx): | ||
""" | ||
List connected YubiKeys. | ||
""" | ||
def _print_device(device, ykserial): | ||
click.echo('{} [{}]{}'.format( | ||
device.device_name, | ||
device.mode, | ||
' Serial: {}'.format(ykserial) if ykserial else '') | ||
) | ||
|
||
descriptors = get_descriptors() | ||
handled_serials = set() | ||
|
||
try: | ||
for dev in list_devices(): | ||
if dev.key_type == YUBIKEY.SKY: | ||
# We have nothing to match on, so just drop a SKY descriptor | ||
d = next(x for x in descriptors if x.key_type == YUBIKEY.SKY) | ||
descriptors.remove(d) | ||
_print_device(dev, None) | ||
else: | ||
serial = dev.serial | ||
if serial not in handled_serials: | ||
# Drop a descriptor with a matching serial and mode | ||
handled_serials.add(serial) | ||
matches = [d for d in descriptors if (d.key_type, d.mode) | ||
== (dev.driver.key_type, dev.driver.mode)] | ||
if len(matches) > 0: | ||
d = matches[0] | ||
descriptors.remove(d) | ||
_print_device(dev, serial) | ||
dev.close() | ||
if not descriptors: | ||
break | ||
except smartcard.pcsc.PCSCExceptions.EstablishContextException as e: | ||
logger.error('Failed to list devices', exc_info=e) | ||
ctx.fail('Failed to establish CCID context. Is the pcscd service running?') | ||
|
||
# List descriptors that failed to open. | ||
if len(descriptors) > 0: | ||
logger.debug('Failed to open some devices, listing based on descriptors') | ||
for desc in descriptors: | ||
click.echo('{} [{}]'.format(desc.name, desc.mode)) | ||
|
||
|
||
@cli.command('init-cert') | ||
@click.pass_context | ||
@click.option('-r', '--reader', help='Use an external smart card reader.', | ||
metavar='NAME', default=None) | ||
@click.option('-d', '--device', type=int, metavar='SERIAL') | ||
@click_pin_option | ||
@click_management_key_option | ||
@click.option('-s', '--subject', help='Subject string for the Certificate') | ||
@click.option('-U', '--url', help='URL of privacyIDEA API.', required=True) | ||
# TODO: The user is currently given in the Subject String | ||
@click.option('-u', '--user', help='YubiKey/Certificate user') | ||
@click.option('-a', '--admin', help='Admin user for the privacyIDEA API') | ||
@click.option('-p', '--password', help='Password of the admin user') | ||
@click.option('-t', '--token', help='Admin token for the privacyIDEA API') | ||
@click.option('-c', '--ca', help='CA connector in privacyIDEA') | ||
@click.option('-k', '--keyfile', help='Verify response from privacyIDEA with key', | ||
default=None) | ||
def yk_enroll_cert(ctx, reader, device, pin, management_key, subject, url, | ||
user, admin, password, token, ca, keyfile): | ||
""" | ||
Create Certificate on YubiKey | ||
""" | ||
if reader and device: | ||
ctx.fail('--reader and --device options can\'t be combined.') | ||
|
||
if admin: | ||
if token: | ||
ctx.fail('Choose either "admin"/"password" or "token" parameter!') | ||
if not password: | ||
ctx.fail('Password is required when using an admin user') | ||
|
||
def resolve_device(): | ||
if device is not None: | ||
dev = _run_cmd_for_serial(ctx, 'init-cert', TRANSPORT.CCID, device) | ||
else: | ||
dev = _run_cmd_for_single(ctx, 'init-cert', TRANSPORT.CCID, reader) | ||
ctx.call_on_close(dev.close) | ||
return dev | ||
|
||
ctx.obj.add_resolver('dev', resolve_device) | ||
|
||
try: | ||
ctx.obj['controller'] = PivController(ctx.obj['dev'].driver) | ||
except APDUError as e: | ||
if e.sw == SW.NOT_FOUND: | ||
ctx.fail("The PIV application can't be found on this YubiKey.") | ||
raise | ||
|
||
yk_key = ctx.obj['dev'] | ||
yk_serial = yk_key.serial | ||
yk_pc = ctx.obj['controller'] | ||
|
||
click.echo('Generating Key on YubiKey with serial {0!s}...'.format(yk_serial)) | ||
# TODO: Testing only! | ||
if os.path.exists('{0!s}_pubkey.pem'.format(yk_serial)): | ||
with open('{0!s}_pubkey.pem'.format(yk_serial), 'rb') as f: | ||
pubkey = serialization.load_pem_public_key(f.read(), | ||
backend=default_backend()) | ||
else: | ||
_ensure_authenticated(ctx, yk_pc, pin, management_key, no_prompt=True) | ||
pubkey = yk_pc.generate_key(SLOT.AUTHENTICATION, ALGO.RSA2048) | ||
with open('{0!s}_pubkey.pem'.format(yk_serial), 'wb') as f: | ||
f.write(pubkey.public_bytes( | ||
encoding=serialization.Encoding.PEM, | ||
format=serialization.PublicFormat.SubjectPublicKeyInfo)) | ||
|
||
click.echo('Generating CSR for YubiKey with serial {0!s}...'.format(yk_serial)) | ||
# TODO: Testing only! | ||
if os.path.exists('{0!s}_csr.pem'.format(yk_serial)): | ||
with open('{0!s}_csr.pem'.format(yk_serial), 'rb') as f: | ||
csr = x509.load_pem_x509_csr(f.read(), backend=default_backend()) | ||
else: | ||
_verify_pin(ctx, yk_pc, pin) | ||
try: | ||
csr = yk_pc.generate_certificate_signing_request( | ||
SLOT.AUTHENTICATION, pubkey, subject=subject, | ||
touch_callback=_prompt_for_touch) | ||
except APDUError: | ||
logger.debug(sys.exc_info()) | ||
ctx.fail('Certificate Signing Request generation failed.') | ||
|
||
with open('{0!s}_csr.pem'.format(yk_serial), 'wb') as f: | ||
f.write(csr.public_bytes(encoding=serialization.Encoding.PEM)) | ||
|
||
# Generate the attestation certificate | ||
try: | ||
cert = yk_pc.attest(SLOT.AUTHENTICATION) | ||
except APDUError as e: | ||
logger.error('Attestation failed', exc_info=e) | ||
ctx.fail('Attestation failed.') | ||
with open('{0!s}_attest.pem'.format(yk_serial), 'wb') as f: | ||
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) | ||
|
||
# TODO: send the attestation certificate with the CSR to privacyIDEA | ||
|
||
pi_key = None | ||
if keyfile: | ||
with open(keyfile, 'rb') as f: | ||
pi_key = serialization.load_pem_public_key(f.read(), backend=default_backend()) | ||
|
||
# send the CSR to privacyIDEA | ||
if admin: | ||
# get authentication token | ||
rj = _call_pi(url=url + '/auth', | ||
data={'username': admin, 'password': password}, | ||
pi_key=pi_key) | ||
if rj['result']['status'] is False: | ||
ctx.fail('Could not get auth-token from privacyIDEA at {0!s}'.format(url)) | ||
else: | ||
token = rj['result']['value']['token'] | ||
|
||
# TODO: get the ca connector name | ||
headers = {'content-type': 'application/json', | ||
'PI-Authorization': token} | ||
payload = {'type': 'certificate', | ||
'ca': ca, | ||
'template': 'user', | ||
'request': csr.public_bytes(encoding=serialization.Encoding.PEM).decode()} | ||
rj = _call_pi(url=url + '/token/init', data=json.dumps(payload), | ||
headers=headers, pi_key=pi_key) | ||
if rj['result']['status'] is False: | ||
ctx.fail('Could not get CSR signed by privacyIDEA') | ||
cert = rj['detail']['certificate'] | ||
pi_serial = rj['detail']['serial'] | ||
|
||
with open('{0!s}_cert_{1!s}.pem'.format(yk_serial, pi_serial), 'w') as f: | ||
f.write(cert) | ||
|
||
def do_import(retry=True): | ||
try: | ||
yk_pc.import_certificate( | ||
SLOT.AUTHENTICATION, | ||
x509.load_pem_x509_certificate(cert.encode(), | ||
backend=default_backend()), | ||
verify=True, touch_callback=_prompt_for_touch) | ||
|
||
except KeypairMismatch: | ||
ctx.fail('This certificate is not tied to the private key in the ' | ||
'{} slot.'.format(SLOT.AUTHENTICATION.name)) | ||
|
||
except APDUError as e: | ||
if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED and retry: | ||
_verify_pin(ctx, yk_pc, pin) | ||
do_import(retry=False) | ||
else: | ||
raise | ||
|
||
do_import() | ||
|
||
|
||
def main(): | ||
click.echo(""" | ||
_ _______ _______ | ||
___ ____(_) _____ _______ __/ _/ _ \/ __/ _ | | ||
/ _ \/ __/ / |/ / _ `/ __/ // // // // / _// __ | | ||
/ .__/_/ /_/|___/\_,_/\__/\_, /___/____/___/_/ |_| | ||
/_/ /___/ | ||
""") | ||
click.echo(""" | ||
+--- WARNING ------ WARNING ------ WARNING ------ WARNING ------ WARNING ---+ | ||
| | | ||
| This script is still work in progress! | | ||
| Proper Certificate checking can NOT be guaranteed! | | ||
| Currently all (generated) certs will be written out to ease | | ||
| development and debugging. | | ||
| One day this script might also become part of the privacyIDEA admin tool. | | ||
| | | ||
+--- WARNING ------ WARNING ------ WARNING ------ WARNING ------ WARNING ---+ | ||
""") | ||
try: | ||
cli(obj={}) | ||
except ValueError as e: | ||
logger.error('Error', exc_info=e) | ||
click.echo('Error: ' + str(e)) | ||
return 1 | ||
|
||
except Cve201715361VulnerableError as err: | ||
logger.error('Error', exc_info=err) | ||
click.echo('Error: ' + str(err)) | ||
return 2 | ||
|
||
|
||
if __name__ == '__main__': | ||
sys.exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters