diff --git a/scripts/privacyidea-enroll-yubikey-piv b/scripts/privacyidea-enroll-yubikey-piv new file mode 100755 index 0000000..beb8386 --- /dev/null +++ b/scripts/privacyidea-enroll-yubikey-piv @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Create a PIV Certificate on a YubiKey and send it to privacyIDEA for validation +and enrolment. + +TODO: + - implement a client-wait/2-step functionality in case the CA can not + automatically sign the request + - proper checking of response signature + - integrate into privacyidea admin tool + - check received certificates +""" +import binascii +import json +import os +import sys +import click +import logging + +import requests +import smartcard +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.backends import default_backend +from cryptography import x509 +import ykman.logging_setup +from ykman.cli.__main__ import _run_cmd_for_serial, _run_cmd_for_single +from ykman.cli.piv import (_ensure_authenticated, click_parse_management_key, + _verify_pin) +from ykman.cli.util import UpperCaseChoice, YkmanContextObject +from ykman.descriptor import get_descriptors, list_devices +from ykman.driver_ccid import list_readers, APDUError, SW +from ykman.piv import PivController, ALGO, SLOT, KeypairMismatch +from ykman.util import Cve201715361VulnerableError, YUBIKEY, TRANSPORT + +logger = logging.getLogger(__name__) + +__version__ = '0.0.1' + +CLICK_CONTEXT_SETTINGS = dict( + help_option_names=['-h', '--help'], + max_content_width=999 +) +click_management_key_option = click.option( + '-m', '--management-key', + help='The management key.', + callback=click_parse_management_key, + default='010203040506070801020304050607080102030405060708') +click_pin_option = click.option( + '-P', '--pin', help='PIN code.', + default='123456') + + +def _prompt_for_touch(): + try: + click.echo('Touch your YubiKey...', err=True) + except Exception: + sys.stderr.write('Touch your YubiKey...\n') + + +def _call_pi(url, data, headers=None, pi_key=None): + r = requests.post(url, data=data, headers=headers) + # TODO: check response + rj = r.json() + if pi_key: + sig = rj['signature'] + msg = {k: v for k, v in rj.items() if k != 'signature'} + sigver, signature = sig.split(':') + pi_key.verify(binascii.unhexlify(signature), + json.dumps(msg, sort_keys=True).encode('utf8'), + padding.PSS(mgf=padding.MGF1(hashes.SHA256()), + salt_length=padding.PSS.MAX_LENGTH), + hashes.SHA256()) + return rj + + +@click.group(context_settings=CLICK_CONTEXT_SETTINGS) +@click.version_option(__version__) +@click.option('-l', '--log-level', default=None, + type=UpperCaseChoice(ykman.logging_setup.LOG_LEVEL_NAMES), + help='Enable logging at given verbosity level.') +@click.option('--log-file', default=None, + type=str, metavar='FILE', + help='Write logs to the given FILE instead of standard error; ' + 'ignored unless --log-level is also set.') +@click.pass_context +def cli(ctx, log_level, log_file): + ctx.obj = YkmanContextObject() + + if log_level: + ykman.logging_setup.setup(log_level, log_file=log_file) + + +@cli.command('list-reader') +@click.pass_context +def yk_list_readers(ctx): + """ + List connected SmartCard readers. + """ + for reader in list_readers(): + click.echo(reader.name) + ctx.exit() + + +@cli.command('list-keys') +@click.pass_context +def yk_list_keys(ctx): + """ + List connected YubiKeys. + """ + def _print_device(device, ykserial): + click.echo('{} [{}]{}'.format( + device.device_name, + device.mode, + ' Serial: {}'.format(ykserial) if ykserial else '') + ) + + descriptors = get_descriptors() + handled_serials = set() + + try: + for dev in list_devices(): + if dev.key_type == YUBIKEY.SKY: + # We have nothing to match on, so just drop a SKY descriptor + d = next(x for x in descriptors if x.key_type == YUBIKEY.SKY) + descriptors.remove(d) + _print_device(dev, None) + else: + serial = dev.serial + if serial not in handled_serials: + # Drop a descriptor with a matching serial and mode + handled_serials.add(serial) + matches = [d for d in descriptors if (d.key_type, d.mode) + == (dev.driver.key_type, dev.driver.mode)] + if len(matches) > 0: + d = matches[0] + descriptors.remove(d) + _print_device(dev, serial) + dev.close() + if not descriptors: + break + except smartcard.pcsc.PCSCExceptions.EstablishContextException as e: + logger.error('Failed to list devices', exc_info=e) + ctx.fail('Failed to establish CCID context. Is the pcscd service running?') + + # List descriptors that failed to open. + if len(descriptors) > 0: + logger.debug('Failed to open some devices, listing based on descriptors') + for desc in descriptors: + click.echo('{} [{}]'.format(desc.name, desc.mode)) + + +@cli.command('init-cert') +@click.pass_context +@click.option('-r', '--reader', help='Use an external smart card reader.', + metavar='NAME', default=None) +@click.option('-d', '--device', type=int, metavar='SERIAL') +@click_pin_option +@click_management_key_option +@click.option('-s', '--subject', help='Subject string for the Certificate') +@click.option('-U', '--url', help='URL of privacyIDEA API.', required=True) +# TODO: The user is currently given in the Subject String +@click.option('-u', '--user', help='YubiKey/Certificate user') +@click.option('-a', '--admin', help='Admin user for the privacyIDEA API') +@click.option('-p', '--password', help='Password of the admin user') +@click.option('-t', '--token', help='Admin token for the privacyIDEA API') +@click.option('-c', '--ca', help='CA connector in privacyIDEA') +@click.option('-k', '--keyfile', help='Verify response from privacyIDEA with key', + default=None) +def yk_enroll_cert(ctx, reader, device, pin, management_key, subject, url, + user, admin, password, token, ca, keyfile): + """ + Create Certificate on YubiKey + """ + if reader and device: + ctx.fail('--reader and --device options can\'t be combined.') + + if admin: + if token: + ctx.fail('Choose either "admin"/"password" or "token" parameter!') + if not password: + ctx.fail('Password is required when using an admin user') + + def resolve_device(): + if device is not None: + dev = _run_cmd_for_serial(ctx, 'init-cert', TRANSPORT.CCID, device) + else: + dev = _run_cmd_for_single(ctx, 'init-cert', TRANSPORT.CCID, reader) + ctx.call_on_close(dev.close) + return dev + + ctx.obj.add_resolver('dev', resolve_device) + + try: + ctx.obj['controller'] = PivController(ctx.obj['dev'].driver) + except APDUError as e: + if e.sw == SW.NOT_FOUND: + ctx.fail("The PIV application can't be found on this YubiKey.") + raise + + yk_key = ctx.obj['dev'] + yk_serial = yk_key.serial + yk_pc = ctx.obj['controller'] + + click.echo('Generating Key on YubiKey with serial {0!s}...'.format(yk_serial)) + # TODO: Testing only! + if os.path.exists('{0!s}_pubkey.pem'.format(yk_serial)): + with open('{0!s}_pubkey.pem'.format(yk_serial), 'rb') as f: + pubkey = serialization.load_pem_public_key(f.read(), + backend=default_backend()) + else: + _ensure_authenticated(ctx, yk_pc, pin, management_key, no_prompt=True) + pubkey = yk_pc.generate_key(SLOT.AUTHENTICATION, ALGO.RSA2048) + with open('{0!s}_pubkey.pem'.format(yk_serial), 'wb') as f: + f.write(pubkey.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo)) + + click.echo('Generating CSR for YubiKey with serial {0!s}...'.format(yk_serial)) + # TODO: Testing only! + if os.path.exists('{0!s}_csr.pem'.format(yk_serial)): + with open('{0!s}_csr.pem'.format(yk_serial), 'rb') as f: + csr = x509.load_pem_x509_csr(f.read(), backend=default_backend()) + else: + _verify_pin(ctx, yk_pc, pin) + try: + csr = yk_pc.generate_certificate_signing_request( + SLOT.AUTHENTICATION, pubkey, subject=subject, + touch_callback=_prompt_for_touch) + except APDUError: + logger.debug(sys.exc_info()) + ctx.fail('Certificate Signing Request generation failed.') + + with open('{0!s}_csr.pem'.format(yk_serial), 'wb') as f: + f.write(csr.public_bytes(encoding=serialization.Encoding.PEM)) + + # Generate the attestation certificate + try: + cert = yk_pc.attest(SLOT.AUTHENTICATION) + except APDUError as e: + logger.error('Attestation failed', exc_info=e) + ctx.fail('Attestation failed.') + with open('{0!s}_attest.pem'.format(yk_serial), 'wb') as f: + f.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) + + # TODO: send the attestation certificate with the CSR to privacyIDEA + + pi_key = None + if keyfile: + with open(keyfile, 'rb') as f: + pi_key = serialization.load_pem_public_key(f.read(), backend=default_backend()) + + # send the CSR to privacyIDEA + if admin: + # get authentication token + rj = _call_pi(url=url + '/auth', + data={'username': admin, 'password': password}, + pi_key=pi_key) + if rj['result']['status'] is False: + ctx.fail('Could not get auth-token from privacyIDEA at {0!s}'.format(url)) + else: + token = rj['result']['value']['token'] + + # TODO: get the ca connector name + headers = {'content-type': 'application/json', + 'PI-Authorization': token} + payload = {'type': 'certificate', + 'ca': ca, + 'template': 'user', + 'request': csr.public_bytes(encoding=serialization.Encoding.PEM).decode()} + rj = _call_pi(url=url + '/token/init', data=json.dumps(payload), + headers=headers, pi_key=pi_key) + if rj['result']['status'] is False: + ctx.fail('Could not get CSR signed by privacyIDEA') + cert = rj['detail']['certificate'] + pi_serial = rj['detail']['serial'] + + with open('{0!s}_cert_{1!s}.pem'.format(yk_serial, pi_serial), 'w') as f: + f.write(cert) + + def do_import(retry=True): + try: + yk_pc.import_certificate( + SLOT.AUTHENTICATION, + x509.load_pem_x509_certificate(cert.encode(), + backend=default_backend()), + verify=True, touch_callback=_prompt_for_touch) + + except KeypairMismatch: + ctx.fail('This certificate is not tied to the private key in the ' + '{} slot.'.format(SLOT.AUTHENTICATION.name)) + + except APDUError as e: + if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED and retry: + _verify_pin(ctx, yk_pc, pin) + do_import(retry=False) + else: + raise + + do_import() + + +def main(): + click.echo(""" + _ _______ _______ + ___ ____(_) _____ _______ __/ _/ _ \/ __/ _ | + / _ \/ __/ / |/ / _ `/ __/ // // // // / _// __ | + / .__/_/ /_/|___/\_,_/\__/\_, /___/____/___/_/ |_| +/_/ /___/ + """) + click.echo(""" ++--- WARNING ------ WARNING ------ WARNING ------ WARNING ------ WARNING ---+ +| | +| This script is still work in progress! | +| Proper Certificate checking can NOT be guaranteed! | +| Currently all (generated) certs will be written out to ease | +| development and debugging. | +| One day this script might also become part of the privacyIDEA admin tool. | +| | ++--- WARNING ------ WARNING ------ WARNING ------ WARNING ------ WARNING ---+ +""") + try: + cli(obj={}) + except ValueError as e: + logger.error('Error', exc_info=e) + click.echo('Error: ' + str(e)) + return 1 + + except Cve201715361VulnerableError as err: + logger.error('Error', exc_info=err) + click.echo('Error: ' + str(err)) + return 2 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/setup.py b/setup.py index 95d78df..67e269a 100644 --- a/setup.py +++ b/setup.py @@ -45,14 +45,16 @@ def get_file_contents(file_path): 'sphinx >= 2.0;python_version>="3.0"'], install_requires=[ "cffi", + "click", + "cryptography", "enum34", + "pyscard", "python-yubico", "pyusb", "qrcode", "requests", - 'six', - "click", - "python-yubico" + "six", + "yubikey-manager" ], cmdclass=cmdclass, command_options={ @@ -69,7 +71,8 @@ def get_file_contents(file_path): 'scripts/privacyidea-authorizedkeys', 'scripts/privacyidea-check-offline-otp', 'scripts/privacyidea-get-offline-otp', - 'scripts/privacyidea-validate'], + 'scripts/privacyidea-validate', + 'scripts/privacyidea-enroll-yubikey-piv'], data_files=[('share/man/man1', ["doc/_build/man/privacyidea.1"])], license='AGPLv3', long_description=get_file_contents('DESCRIPTION') diff --git a/tests/test_clientutils.py b/tests/test_clientutils.py index 986f904..33fc440 100644 --- a/tests/test_clientutils.py +++ b/tests/test_clientutils.py @@ -1,11 +1,13 @@ +# -*- coding: utf-8 -*- + import unittest -import json import privacyideautils.clientutils as clientutils from privacyideautils.clientutils import PrivacyIDEAClientError PWFILE = "/home/cornelius/src/flask/tests/testdata/passwords" + class TestClientUtils(unittest.TestCase): serial1 = "S1"