diff --git a/docs/index.rst b/docs/index.rst index d945d377..e4b8880e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,8 @@ Welcome to yubikey-manager's documentation! :maxdepth: 2 :caption: Contents: + rst/packages + Indices and tables ================== diff --git a/docs/rst/packages.rst b/docs/rst/packages.rst new file mode 100644 index 00000000..e922028c --- /dev/null +++ b/docs/rst/packages.rst @@ -0,0 +1,12 @@ +yubikey-manager +=============== + +.. toctree:: + :maxdepth: 4 + + yubikit + +.. toctree:: + :maxdepth: 4 + + ykman \ No newline at end of file diff --git a/docs/rst/ykman.rst b/docs/rst/ykman.rst new file mode 100644 index 00000000..f1701ce2 --- /dev/null +++ b/docs/rst/ykman.rst @@ -0,0 +1,77 @@ +ykman package +============= + +Submodules +---------- + +ykman.base module +----------------- + +.. automodule:: ykman.base + :members: + :undoc-members: + :show-inheritance: + +ykman.device module +------------------- + +.. automodule:: ykman.device + :members: + :undoc-members: + :show-inheritance: + +ykman.fido module +----------------- + +.. automodule:: ykman.fido + :members: + :undoc-members: + :show-inheritance: + +ykman.hsmauth module +-------------------- + +.. automodule:: ykman.hsmauth + :members: + :undoc-members: + :show-inheritance: + +ykman.oath module +----------------- + +.. automodule:: ykman.oath + :members: + :undoc-members: + :show-inheritance: + +ykman.openpgp module +-------------------- + +.. automodule:: ykman.openpgp + :members: + :undoc-members: + :show-inheritance: + +ykman.piv module +----------------- + +.. automodule:: ykman.piv + :members: + :undoc-members: + :show-inheritance: + +ykman.scripting module +---------------------- + +.. automodule:: ykman.scripting + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: ykman + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/rst/yubikit.rst b/docs/rst/yubikit.rst index 9167c3fa..d107db9c 100644 --- a/docs/rst/yubikit.rst +++ b/docs/rst/yubikit.rst @@ -5,7 +5,7 @@ Subpackages ----------- .. toctree:: - :maxdepth: 4 + :maxdepth: 1 yubikit.core diff --git a/ykman/device.py b/ykman/device.py index 8eda690f..9935d0cd 100644 --- a/ykman/device.py +++ b/ykman/device.py @@ -85,16 +85,19 @@ def inner(): EstablishContextException, ) def list_ccid_devices(): + """List CCID devices.""" return _list_ccid_devices() @_warn_once("No CTAP HID backend available. FIDO protocols will not function.") def list_ctap_devices(): + """List CTAP devices.""" return _list_ctap_devices() @_warn_once("No OTP HID backend available. OTP protocols will not function.") def list_otp_devices(): + """List OTP devices.""" return _list_otp_devices() @@ -108,8 +111,8 @@ def list_otp_devices(): def scan_devices() -> Tuple[Mapping[PID, int], int]: """Scan USB for attached YubiKeys, without opening any connections. - Returns a dict mapping PID to device count, and a state object which can be used to - detect changes in attached devices. + :return: A dict mapping PID to device count, and a state object which can be used to + detect changes in attached devices. """ fingerprints = set() merged: Dict[PID, int] = {} @@ -270,9 +273,10 @@ def open_connection(self, connection_type): def list_all_devices( connection_types: Iterable[Type[Connection]] = _CONNECTION_LIST_MAPPING.keys(), ) -> List[Tuple[YkmanDevice, DeviceInfo]]: - """Connects to all attached YubiKeys and reads device info from them. + """Connect to all attached YubiKeys and read device info from them. - Returns a list of (device, info) tuples for each connected device. + :param connection_types: An iterable of YubiKey connection types. + :return: A list of (device, info) tuples for each connected device. """ groups: Dict[PID, _PidGroup] = {} diff --git a/ykman/fido.py b/ykman/fido.py index 31df4959..17ae6f86 100644 --- a/ykman/fido.py +++ b/ykman/fido.py @@ -44,7 +44,10 @@ def is_in_fips_mode(fido_connection: FidoConnection) -> bool: - """Check if a YubiKey FIPS is in FIPS approved mode.""" + """Check if a YubiKey FIPS is in FIPS approved mode. + + :param fido_connection: A FIDO connection. + """ try: ctap = Ctap1(fido_connection) ctap.send_apdu(ins=INS_FIPS_VERIFY_FIPS_MODE) @@ -62,6 +65,10 @@ def fips_change_pin( """Change the PIN on a YubiKey FIPS. If no PIN is set, pass None or an empty string as old_pin. + + :param fido_connection: A FIDO connection. + :param old_pin: The old PIN. + :param new_pin: The new PIN. """ ctap = Ctap1(fido_connection) @@ -75,7 +82,11 @@ def fips_change_pin( def fips_verify_pin(fido_connection: FidoConnection, pin: str): - """Unlock the YubiKey FIPS U2F module for credential creation.""" + """Unlock the YubiKey FIPS U2F module for credential creation. + + :param fido_connection: A FIDO connection. + :param pin: The FIDO PIN. + """ ctap = Ctap1(fido_connection) ctap.send_apdu(ins=INS_FIPS_VERIFY_PIN, data=pin.encode()) @@ -86,6 +97,8 @@ def fips_reset(fido_connection: FidoConnection): Note: This action is only permitted immediately after YubiKey FIPS power-up. It also requires the user to touch the flashing button on the YubiKey, and will halt until that happens, or the command times out. + + :param fido_connection: A FIDO connection. """ ctap = Ctap1(fido_connection) while True: diff --git a/ykman/hsmauth.py b/ykman/hsmauth.py index 69a49f2b..3a8390e7 100644 --- a/ykman/hsmauth.py +++ b/ykman/hsmauth.py @@ -31,6 +31,7 @@ def get_hsmauth_info(session: HsmAuthSession): + """Get information about the YubiHSM Auth application.""" retries = session.get_management_key_retries() info = { "YubiHSM Auth version": session.version, diff --git a/ykman/oath.py b/ykman/oath.py index e357fe2a..503e9bf0 100644 --- a/ykman/oath.py +++ b/ykman/oath.py @@ -34,14 +34,17 @@ def is_hidden(credential): + """Check if OATH credential is hidden.""" return credential.issuer == "_hidden" def is_steam(credential): + """Check if OATH credential is steam.""" return credential.oath_type == OATH_TYPE.TOTP and credential.issuer == "Steam" def calculate_steam(app, credential, timestamp=None): + """Calculate steam codes.""" timestamp = int(timestamp or time()) resp = app.calculate(credential.id, struct.pack(">q", timestamp // 30)) offset = resp[-1] & 0x0F @@ -54,4 +57,5 @@ def calculate_steam(app, credential, timestamp=None): def is_in_fips_mode(app): + """Check if OATH application is in FIPS mode.""" return app.locked diff --git a/ykman/openpgp.py b/ykman/openpgp.py index 68005dfb..9f414ed0 100644 --- a/ykman/openpgp.py +++ b/ykman/openpgp.py @@ -29,7 +29,10 @@ def get_openpgp_info(session: OpenPgpSession): - """Get human readable information about the OpenPGP configuration.""" + """Get human readable information about the OpenPGP configuration. + + :param session: The OpenPGP session. + """ data = session.get_application_related_data() discretionary = data.discretionary retries = discretionary.pw_status diff --git a/ykman/otp.py b/ykman/otp.py index c6e2154f..caf6a4de 100644 --- a/ykman/otp.py +++ b/ykman/otp.py @@ -144,7 +144,10 @@ def _prepare_upload_key( def is_in_fips_mode(session: YubiOtpSession) -> bool: - """Check if the OTP application of a FIPS YubiKey is in FIPS approved mode.""" + """Check if the OTP application of a FIPS YubiKey is in FIPS approved mode. + + :param session: The YubiOTP session. + """ return session.backend.send_and_receive(0x14, b"", 1) == b"\1" # type: ignore @@ -156,14 +159,22 @@ def generate_static_pw( keyboard_layout: KEYBOARD_LAYOUT = KEYBOARD_LAYOUT.MODHEX, blocklist: Iterable[str] = DEFAULT_PW_CHAR_BLOCKLIST, ) -> str: - """Generate a random password.""" + """Generate a random password. + + :param length: The length of the password. + :param keyboard_layout: The keyboard layout. + :param blocklist: The list of characters to block. + """ chars = [k for k in keyboard_layout.value.keys() if k not in blocklist] sr = random.SystemRandom() return "".join([sr.choice(chars) for _ in range(length)]) def parse_oath_key(val: str) -> bytes: - """Parse a secret key encoded as either Hex or Base32.""" + """Parse a secret key encoded as either Hex or Base32. + + :param val: The secret key. + """ try: return bytes.fromhex(val) except ValueError: @@ -171,14 +182,22 @@ def parse_oath_key(val: str) -> bytes: def format_oath_code(response: bytes, digits: int = 6) -> str: - """Formats an OATH code from a hash response.""" + """Format an OATH code from a hash response. + + :param response: The response. + :param digits: The number of digits in the OATH code. + """ offs = response[-1] & 0xF code = struct.unpack_from(">I", response[offs:])[0] & 0x7FFFFFFF return ("%%0%dd" % digits) % (code % 10**digits) def time_challenge(timestamp: int, period: int = 30) -> bytes: - """Formats a HMAC-SHA1 challenge based on an OATH timestamp and period.""" + """Format a HMAC-SHA1 challenge based on an OATH timestamp and period. + + :param timestamp: The timestamp. + :param period: The period. + """ return struct.pack(">q", int(timestamp // period)) @@ -190,7 +209,14 @@ def format_csv( access_code: Optional[bytes] = None, timestamp: Optional[datetime] = None, ) -> str: - """Produces a CSV line in the "Yubico" format.""" + """Produce a CSV line in the "Yubico" format. + + :param serial: The serial number. + :param public_id: The public ID. + :param private_id: The private ID. + :param key: The secret key. + :param access_code: The access code. + """ ts = timestamp or datetime.now() return ",".join( [ diff --git a/ykman/piv.py b/ykman/piv.py index e553c139..dae75011 100644 --- a/ykman/piv.py +++ b/ykman/piv.py @@ -118,9 +118,11 @@ def _parse(value: str) -> List[List[str]]: def parse_rfc4514_string(value: str) -> x509.Name: - """Parses an RFC 4514 string into a x509.Name. + """Parse an RFC 4514 string into a x509.Name. See: https://tools.ietf.org/html/rfc4514.html + + :param value: An RFC 4514 string. """ name = _parse(value) attributes: List[x509.RelativeDistinguishedName] = [] @@ -159,13 +161,19 @@ def derive_management_key(pin: str, salt: bytes) -> bytes: NOTE: This method of derivation is deprecated! Protect the management key using PivmanProtectedData instead. + + :param pin: The PIN. + :param salt: The salt. """ kdf = PBKDF2HMAC(hashes.SHA1(), 24, salt, 10000, default_backend()) # nosec return kdf.derive(pin.encode("utf-8")) def generate_random_management_key(algorithm: MANAGEMENT_KEY_TYPE) -> bytes: - """Generates a new random management key.""" + """Generate a new random management key. + + :param algorithm: The algorithm for the management key. + """ return os.urandom(algorithm.key_len) @@ -237,7 +245,10 @@ def get_bytes(self) -> bytes: def get_pivman_data(session: PivSession) -> PivmanData: - """Reads out the Pivman data from a YubiKey.""" + """Read out the Pivman data from a YubiKey. + + :param session: The PIV session. + """ logger.debug("Reading pivman data") try: return PivmanData(session.get_object(OBJECT_ID_PIVMAN_DATA)) @@ -250,9 +261,11 @@ def get_pivman_data(session: PivSession) -> PivmanData: def get_pivman_protected_data(session: PivSession) -> PivmanProtectedData: - """Reads out the Pivman protected data from a YubiKey. + """Read out the Pivman protected data from a YubiKey. This function requires PIN verification prior to being called. + + :param session: The PIV session. """ logger.debug("Reading protected pivman data") try: @@ -272,7 +285,14 @@ def pivman_set_mgm_key( touch: bool = False, store_on_device: bool = False, ) -> None: - """Set a new management key, while keeping PivmanData in sync.""" + """Set a new management key, while keeping PivmanData in sync. + + :param session: The PIV session. + :param new_key: The new management key. + :param algorithm: The algorithm for the management key. + :param touch: If set, touch is required. + :param store_on_device: If set, the management key is stored on device. + """ pivman = get_pivman_data(session) pivman_prot = None @@ -320,7 +340,12 @@ def pivman_set_mgm_key( def pivman_change_pin(session: PivSession, old_pin: str, new_pin: str) -> None: - """Change the PIN, while keeping PivmanData in sync.""" + """Change the PIN, while keeping PivmanData in sync. + + :param session: The PIV session. + :param old_pin: The old PIN. + :param new_pin: The new PIN. + """ session.change_pin(old_pin, new_pin) pivman = get_pivman_data(session) @@ -339,9 +364,11 @@ def pivman_change_pin(session: PivSession, old_pin: str, new_pin: str) -> None: def list_certificates(session: PivSession) -> Mapping[SLOT, Optional[x509.Certificate]]: - """Reads out and parses stored certificates. + """Read out and parse stored certificates. Only certificates which are successfully parsed are returned. + + :param session: The PIV session. """ certs = OrderedDict() for slot in set(SLOT) - {SLOT.ATTESTATION}: @@ -364,6 +391,10 @@ def check_key( This will create a signature using the private key, so the PIN must be verified prior to calling this function if the PIN policy requires it. + + :param session: The PIV session. + :param slot: The slot. + :param public_key: The public key. """ try: test_data = b"test" @@ -404,7 +435,7 @@ def check_key( def generate_chuid() -> bytes: - """Generates a CHUID (Cardholder Unique Identifier).""" + """Generate a CHUID (Cardholder Unique Identifier).""" # Non-Federal Issuer FASC-N # [9999-9999-999999-0-1-0000000000300001] FASC_N = ( @@ -424,7 +455,7 @@ def generate_chuid() -> bytes: def generate_ccc() -> bytes: - """Generates a CCC (Card Capability Container).""" + """Generate a CCC (Card Capability Container).""" return ( Tlv(0xF0, b"\xa0\x00\x00\x01\x16\xff\x02" + os.urandom(14)) + Tlv(0xF1, b"\x21") @@ -443,7 +474,10 @@ def generate_ccc() -> bytes: def get_piv_info(session: PivSession): - """Get human readable information about the PIV configuration.""" + """Get human readable information about the PIV configuration. + + :param session: The PIV session. + """ pivman = get_pivman_data(session) info: Dict[str, Any] = { "PIV version": session.version, @@ -570,7 +604,14 @@ def sign_certificate_builder( builder: x509.CertificateBuilder, hash_algorithm: Type[_AllowedHashTypes] = hashes.SHA256, ) -> x509.Certificate: - """Sign a Certificate.""" + """Sign a Certificate. + + :param session: The PIV session. + :param slot: The slot. + :param key_type: The key type. + :param builder: The x509 certificate builder object. + :param hash_algorithm: The hash algorithm. + """ logger.debug("Signing a certificate") dummy_key = _dummy_key(key_type) cert = builder.sign(dummy_key, hash_algorithm(), default_backend()) @@ -599,7 +640,15 @@ def sign_csr_builder( builder: x509.CertificateSigningRequestBuilder, hash_algorithm: Type[_AllowedHashTypes] = hashes.SHA256, ) -> x509.CertificateSigningRequest: - """Sign a CSR.""" + """Sign a CSR. + + :param session: The PIV session. + :param slot: The slot. + :param public_key: The public key. + :param builder: The x509 certificate signing request builder + object. + :param hash_algorithm: The hash algorithm. + """ logger.debug("Signing a CSR") key_type = KEY_TYPE.from_public_key(public_key) dummy_key = _dummy_key(key_type) @@ -641,7 +690,16 @@ def generate_self_signed_certificate( valid_to: datetime, hash_algorithm: Type[_AllowedHashTypes] = hashes.SHA256, ) -> x509.Certificate: - """Generate a self-signed certificate using a private key in a slot.""" + """Generate a self-signed certificate using a private key in a slot. + + :param session: The PIV session. + :param slot: The slot. + :param public_key: The public key. + :param subject_str: The subject RFC 4514 string. + :param valid_from: The date from when the certificate is valid. + :param valid_to: The date when the certificate expires. + :param hash_algorithm: The hash algorithm. + """ logger.debug("Generating a self-signed certificate") key_type = KEY_TYPE.from_public_key(public_key) @@ -666,7 +724,14 @@ def generate_csr( subject_str: str, hash_algorithm: Type[_AllowedHashTypes] = hashes.SHA256, ) -> x509.CertificateSigningRequest: - """Generate a CSR using a private key in a slot.""" + """Generate a CSR using a private key in a slot. + + :param session: The PIV session. + :param slot: The slot. + :param public_key: The public key. + :param subject_str: The subject RFC 4514 string. + :param hash_algorithm: The hash algorithm. + """ logger.debug("Generating a CSR") builder = x509.CertificateSigningRequestBuilder().subject_name( parse_rfc4514_string(subject_str) diff --git a/ykman/scripting.py b/ykman/scripting.py index 2195818f..0a6fa703 100644 --- a/ykman/scripting.py +++ b/ykman/scripting.py @@ -90,12 +90,15 @@ def name(self) -> str: return self._name def otp(self) -> OtpConnection: + """Establish a OTP connection.""" return self.open_connection(OtpConnection) def smart_card(self) -> SmartCardConnection: + """Establish a Smart Card connection.""" return self.open_connection(SmartCardConnection) def fido(self) -> FidoConnection: + """Establish a FIDO connection.""" return self.open_connection(FidoConnection) @@ -103,6 +106,11 @@ def fido(self) -> FidoConnection: def single(*, prompt=True) -> ScriptingDevice: + """Connect to a YubiKey. + + :param prompt: When set, you will be prompted to + insert a YubiKey. + """ pids, state = scan_devices() n_devs = sum(pids.values()) if prompt and n_devs == 0: @@ -120,6 +128,15 @@ def single(*, prompt=True) -> ScriptingDevice: def multi( *, ignore_duplicates: bool = True, allow_initial: bool = False, prompt: bool = True ) -> Generator[ScriptingDevice, None, None]: + """Connect to multiple YubiKeys. + + + :param ignore_duplicates: When set, duplicates are ignored. + :param allow_initial: When set, YubiKeys can be connected + at the start of the function call. + :param prompt: When set, you will be prompted to + insert a YubiKey. + """ state = None handled_serials: Set[Optional[int]] = set() pids, _ = scan_devices() @@ -162,6 +179,12 @@ def _get_reader(reader) -> YkmanDevice: def single_nfc(reader="", *, prompt=True) -> ScriptingDevice: + """Connect to a YubiKey over NFC. + + :param reader: The name of the NFC reader. + :param prompt: When set, you will prompted to place + a YubiKey on NFC reader. + """ device = _get_reader(reader) while True: try: @@ -178,6 +201,15 @@ def single_nfc(reader="", *, prompt=True) -> ScriptingDevice: def multi_nfc( reader="", *, ignore_duplicates=True, allow_initial=False, prompt=True ) -> Generator[ScriptingDevice, None, None]: + """Connect to multiple YubiKeys over NFC. + + :param reader: The name of the NFC reader. + :param ignore_duplicates: When set, duplicates are ignored. + :param allow_initial: When set, YubiKeys can be connected + at the start of the function call. + :param prompt: When set, you will be prompted to place + YubiKeys on the NFC reader. + """ device = _get_reader(reader) prompted = False diff --git a/ykman/util.py b/ykman/util.py index 63ba8e09..ac73bdcd 100644 --- a/ykman/util.py +++ b/ykman/util.py @@ -59,8 +59,11 @@ def _parse_pkcs12(data, password): def parse_private_key(data, password): - """ - Identifies, decrypts and returns a cryptography private key object. + """Identify, decrypt and return a cryptography private key object. + + :param data: The private key in bytes. + :param password: The password to decrypt the private key + (if it is encrypted). """ # PEM if is_pem(data): @@ -96,8 +99,10 @@ def parse_private_key(data, password): def parse_certificates(data, password): - """ - Identifies, decrypts and returns list of cryptography x509 certificates. + """Identify, decrypt and return a list of cryptography x509 certificates. + + :param data: The certificate(s) in bytes. + :param password: The password to decrypt the certificate(s). """ logger.debug("Attempting to parse certificate using PEM, PKCS12 and DER") @@ -133,10 +138,12 @@ def parse_certificates(data, password): def get_leaf_certificates(certs): - """ - Extracts the leaf certificates from a list of certificates. Leaf - certificates are ones whose subject does not appear as issuer among the - others. + """Extract the leaf certificates from a list of certificates. + + Leaf certificates are ones whose subject does not appear as + issuer among theothers. + + :param certs: The list of cryptography x509 certificate objects. """ issuers = [ cert.issuer.get_attributes_for_oid(x509.NameOID.COMMON_NAME) for cert in certs diff --git a/yubikit/__init__.py b/yubikit/__init__.py index 5ca333ac..7447b001 100644 --- a/yubikit/__init__.py +++ b/yubikit/__init__.py @@ -24,3 +24,8 @@ # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. + +""" +Contains the modules corresponding to the different applications supported +by a YubiKey. +""" diff --git a/yubikit/core/otp.py b/yubikit/core/otp.py index d4beafaf..c0058015 100644 --- a/yubikit/core/otp.py +++ b/yubikit/core/otp.py @@ -123,6 +123,8 @@ def _format_frame(slot, payload): class OtpProtocol: + """An implementation of the OTP protocol.""" + def __init__(self, otp_connection: OtpConnection): self.connection = otp_connection report = self._receive() @@ -149,12 +151,12 @@ def send_and_receive( If the command results in a configuration update, the programming sequence number is verified and the updated status bytes are returned. - @param slot the slot to send to - @param data the data payload to send - @param state optional CommandState for listening for user presence requirement + :param slot: The slot to send to. + :param data: The data payload to send. + :param state: Optional CommandState for listening for user presence requirement and for cancelling a command. - @return response data (including CRC) in the case of data, or an updated status - struct + :return: Response data (including CRC) in the case of data, or an updated status + struct. """ payload = (data or b"").ljust(SLOT_DATA_SIZE, b"\0") if len(payload) > SLOT_DATA_SIZE: @@ -180,10 +182,10 @@ def _receive(self): return report def read_status(self) -> bytes: - """Receive status bytes from YubiKey + """Receive status bytes from YubiKey. - @return status bytes (first 3 bytes are the firmware version) - @throws IOException in case of communication error + :return: Status bytes (first 3 bytes are the firmware version). + :raises IOException: in case of communication error. """ return self._receive()[1:-1] diff --git a/yubikit/core/smartcard.py b/yubikit/core/smartcard.py index da3e8410..38ef774b 100644 --- a/yubikit/core/smartcard.py +++ b/yubikit/core/smartcard.py @@ -135,6 +135,8 @@ def _encode_extended_apdu(cla, ins, p1, p2, data, le=0): class SmartCardProtocol: + """An implementation of the Smart Card protocol.""" + def __init__( self, smartcard_connection: SmartCardConnection, @@ -156,6 +158,10 @@ def enable_touch_workaround(self, version: Version) -> None: logger.debug(f"Touch workaround enabled={self._touch_workaround}") def select(self, aid: bytes) -> bytes: + """Perform a SELECT instruction. + + :param aid: The YubiKey application AID value. + """ try: return self.send_apdu(0, INS_SELECT, P1_SELECT, P2_SELECT, aid) except ApduError as e: @@ -171,6 +177,16 @@ def select(self, aid: bytes) -> bytes: def send_apdu( self, cla: int, ins: int, p1: int, p2: int, data: bytes = b"", le: int = 0 ) -> bytes: + """Send APDU message. + + :param cla: The instruction class. + :param ins: The instruction code. + :param p1: The instruction parameter. + :param p2: The instruction parameter. + :param data: The command data in bytes. + :param le: The maximum number of bytes in the data + field of the response. + """ if ( self._touch_workaround and self._last_long_resp > 0 diff --git a/yubikit/hsmauth.py b/yubikit/hsmauth.py index 9b567093..c9bcee25 100644 --- a/yubikit/hsmauth.py +++ b/yubikit/hsmauth.py @@ -94,6 +94,8 @@ @unique class ALGORITHM(IntEnum): + """Algorithms for YubiHSM Auth credentials.""" + AES128_YUBICO_AUTHENTICATION = 38 EC_P256_YUBICO_AUTHENTICATION = 39 @@ -169,6 +171,8 @@ def _retries_from_sw(sw): @total_ordering @dataclass(order=False, frozen=True) class Credential: + """A YubiHSM Auth credential object.""" + label: str algorithm: ALGORITHM counter: int @@ -187,6 +191,8 @@ def __hash__(self) -> int: class SessionKeys(NamedTuple): + """YubiHSM Session Keys.""" + key_senc: bytes key_smac: bytes key_srmac: bytes @@ -205,15 +211,19 @@ def parse(cls, response: bytes) -> "SessionKeys": class HsmAuthSession: + """A session with the YubiHSM Auth application.""" + def __init__(self, connection: SmartCardConnection) -> None: self.protocol = SmartCardProtocol(connection) self._version = _parse_select(self.protocol.select(AID.HSMAUTH)) @property def version(self) -> Version: + """The YubiHSM Auth application version.""" return self._version def reset(self) -> None: + """Perform a factory reset on the YubiHSM Auth application.""" self.protocol.send_apdu(0, INS_RESET, 0xDE, 0xAD) logger.info("YubiHSM Auth application data reset performed") @@ -294,7 +304,16 @@ def put_credential_symmetric( credential_password: Union[bytes, str], touch_required: bool = False, ) -> Credential: - """Import a symmetric YubiHSM Auth credential""" + """Import a symmetric YubiHSM Auth credential. + + :param management_key: The management key. + :param label: The label of the credential. + :param key_enc: The static K-ENC. + :param key_mac: The static K-MAC. + :param credential_password: The password used to protect + access to the credential. + :param touch_required: The touch requirement policy. + """ aes128_key_len = ALGORITHM.AES128_YUBICO_AUTHENTICATION.key_len if len(key_enc) != aes128_key_len or len(key_mac) != aes128_key_len: @@ -315,11 +334,19 @@ def put_credential_derived( self, management_key: bytes, label: str, - credential_password: Union[bytes, str], derivation_password: str, + credential_password: Union[bytes, str], touch_required: bool = False, ) -> Credential: - """Import a symmetric YubiHSM Auth credential derived from password""" + """Import a symmetric YubiHSM Auth credential derived from password. + + :param management_key: The management key. + :param label: The label of the credential. + :param derivation_password: The password used to derive the keys from. + :param credential_password: The password used to protect + access to the credential. + :param touch_required: The touch requirement policy. + """ key_enc, key_mac = _password_to_key(derivation_password) @@ -335,7 +362,16 @@ def put_credential_asymmetric( credential_password: Union[bytes, str], touch_required: bool = False, ) -> Credential: - """Import an asymmetric YubiHSM Auth credential""" + """Import an asymmetric YubiHSM Auth credential. + + :param management_key: The management key. + :param label: The label of the credential. + :param private_key: Private key corresponding to the public + authentication key object on the YubiHSM. + :param credential_password: The password used to protect + access to the credential. + :param touch_required: The touch requirement policy. + """ require_version(self.version, (5, 6, 0)) if not isinstance(private_key.curve, ec.SECP256R1): @@ -360,11 +396,18 @@ def generate_credential_asymmetric( credential_password: Union[bytes, str], touch_required: bool = False, ) -> Credential: - """Generate an asymmetric YubiHSM Auth credential + """Generate an asymmetric YubiHSM Auth credential. Generates a private key on the YubiKey, whose corresponding - public key can be retrieved using `get_public_key` + public key can be retrieved using `get_public_key`. + + :param management_key: The management key. + :param label: The label of the credential. + :param credential_password: The password used to protect + access to the credential. + :param touch_required: The touch requirement policy. """ + require_version(self.version, (5, 6, 0)) return self._put_credential( management_key, @@ -380,6 +423,8 @@ def get_public_key(self, label: str) -> ec.EllipticCurvePublicKey: This will return the long-term public key "PK-OCE" for an asymmetric credential. + + :param label: The label of the credential. """ require_version(self.version, (5, 6, 0)) data = Tlv(TAG_LABEL, _parse_label(label)) @@ -388,7 +433,11 @@ def get_public_key(self, label: str) -> ec.EllipticCurvePublicKey: return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), res) def delete_credential(self, management_key: bytes, label: str) -> None: - """Delete a YubiHSM Auth credential""" + """Delete a YubiHSM Auth credential. + + :param management_key: The management key. + :param label: The label of the credential. + """ if len(management_key) != MANAGEMENT_KEY_LEN: raise ValueError( @@ -416,7 +465,11 @@ def put_management_key( management_key: bytes, new_management_key: bytes, ) -> None: - """Change YubiHSM Auth management key""" + """Change YubiHSM Auth management key + + :param management_key: The current management key. + :param new_management_key: The new management key. + """ if ( len(management_key) != MANAGEMENT_KEY_LEN @@ -456,7 +509,6 @@ def _calculate_session_keys( card_crypto: Optional[bytes] = None, public_key: Optional[bytes] = None, ) -> bytes: - """Calculate session keys from YubiHSM Auth credential""" data = Tlv(TAG_LABEL, _parse_label(label)) + Tlv(TAG_CONTEXT, context) @@ -491,7 +543,14 @@ def calculate_session_keys_symmetric( credential_password: Union[bytes, str], card_crypto: Optional[bytes] = None, ) -> SessionKeys: - """Calculate session keys from symmetric YubiHSM Auth credential""" + """Calculate session keys from a symmetric YubiHSM Auth credential. + + :param label: The label of the credential. + :param context: The context (host challenge + hsm challenge). + :param credential_password: The password used to protect + access to the credential. + :param card_crypto: The card cryptogram. + """ return SessionKeys.parse( self._calculate_session_keys( @@ -510,7 +569,15 @@ def calculate_session_keys_asymmetric( credential_password: Union[bytes, str], card_crypto: bytes, ) -> SessionKeys: - """Calculate session keys from asymmetric YubiHSM Auth credential""" + """Calculate session keys from an asymmetric YubiHSM Auth credential. + + :param label: The label of the credential. + :param context: The context (EPK.OCE + EPK.SD). + :param public_key: The YubiHSM device's public key. + :param credential_password: The password used to protect + access to the credential. + :param card_crypto: The card cryptogram. + """ require_version(self.version, (5, 6, 0)) if not isinstance(public_key.curve, ec.SECP256R1): @@ -539,6 +606,8 @@ def get_challenge(self, label: str) -> bytes: For symmetric credentials this is Host Challenge, a random 8 byte value. For asymmetric credentials this is EPK-OCE. + + :param label: The label of the credential. """ require_version(self.version, (5, 6, 0)) data = Tlv(TAG_LABEL, _parse_label(label)) diff --git a/yubikit/management.py b/yubikit/management.py index 034be32e..5ef9f040 100644 --- a/yubikit/management.py +++ b/yubikit/management.py @@ -453,6 +453,7 @@ def version(self) -> Version: return self.backend.version def read_device_info(self) -> DeviceInfo: + """Get detailed information about the YubiKey.""" require_version(self.version, (4, 1, 0)) return DeviceInfo.parse(self.backend.read_config(), self.version) @@ -463,6 +464,13 @@ def write_device_config( cur_lock_code: Optional[bytes] = None, new_lock_code: Optional[bytes] = None, ) -> None: + """Write configuration settings for YubiKey. + + :pararm config: The device configuration. + :param reboot: If True the YubiKey will reboot. + :param cur_lock_code: Current lock code. + :param new_lock_code: New lock code. + """ require_version(self.version, (5, 0, 0)) if cur_lock_code is not None and len(cur_lock_code) != 16: raise ValueError("Lock code must be 16 bytes") @@ -485,6 +493,14 @@ def set_mode( chalresp_timeout: int = 0, auto_eject_timeout: Optional[int] = None, ) -> None: + """Write connection modes (USB interfaces) for YubiKey. + + :param mode: The connection modes (USB interfaces). + :param chalresp_timeout: The timeout when waiting for touch + for challenge response. + :param auto_eject_timeout: When set, the smartcard will + automatically eject after the given time. + """ logger.debug( f"Set mode: {mode}, chalresp_timeout: {chalresp_timeout}, " f"auto_eject_timeout: {auto_eject_timeout}" diff --git a/yubikit/oath.py b/yubikit/oath.py index cd03825d..28de0af3 100644 --- a/yubikit/oath.py +++ b/yubikit/oath.py @@ -80,6 +80,10 @@ class OATH_TYPE(IntEnum): def parse_b32_key(key: str): + """Parse Base32 encoded key. + + :param key: The Base32 encoded key. + """ key = key.upper().replace(" ", "") key += "=" * (-len(key) % 8) # Support unpadded return b32decode(key) @@ -96,6 +100,8 @@ def _parse_select(response): @dataclass class CredentialData: + """An object holding OATH credential data.""" + name: str oath_type: OATH_TYPE hash_algorithm: HASH_ALGORITHM @@ -107,6 +113,10 @@ class CredentialData: @classmethod def parse_uri(cls, uri: str) -> "CredentialData": + """Parse OATH credential data from URI. + + :param uri: The URI to parse from. + """ parsed = urlparse(uri.strip()) if parsed.scheme != "otpauth": raise ValueError("Invalid URI scheme") @@ -138,6 +148,8 @@ def get_id(self) -> bytes: @dataclass class Code: + """An OATH code object.""" + value: str valid_from: int valid_to: int @@ -146,6 +158,8 @@ class Code: @total_ordering @dataclass(order=False, frozen=True) class Credential: + """An OATH credential object.""" + device_id: str id: bytes issuer: Optional[str] @@ -246,6 +260,8 @@ def _format_code(credential, timestamp, truncated): class OathSession: + """A session with the OATH application.""" + def __init__(self, connection: SmartCardConnection): self.protocol = SmartCardProtocol(connection, INS_SEND_REMAINING) self._version, self._salt, self._challenge = _parse_select( @@ -262,21 +278,26 @@ def __init__(self, connection: SmartCardConnection): @property def version(self) -> Version: + """The OATH application version.""" return self._version @property def device_id(self) -> str: + """The device ID.""" return self._device_id @property def has_key(self) -> bool: + """If True, the YubiKey has an access key.""" return self._has_key @property def locked(self) -> bool: + """If True, the OATH application is password protected.""" return self._challenge is not None def reset(self) -> None: + """Perform a factory reset on the OATH application.""" self.protocol.send_apdu(0, INS_RESET, 0xDE, 0xAD) _, self._salt, self._challenge = _parse_select(self.protocol.select(AID.OATH)) logger.info("OATH application data reset performed") @@ -284,9 +305,17 @@ def reset(self) -> None: self._device_id = _get_device_id(self._salt) def derive_key(self, password: str) -> bytes: + """Derive a key from password. + + :param password: The derivation password. + """ return _derive_key(self._salt, password) def validate(self, key: bytes) -> None: + """Validate authentication with access key. + + :param key: The access key. + """ logger.debug("Unlocking session") response = _hmac_sha1(key, self._challenge) challenge = os.urandom(8) @@ -301,6 +330,10 @@ def validate(self, key: bytes) -> None: self._neo_unlock_workaround = False def set_key(self, key: bytes) -> None: + """Set access key for authentication. + + :param key: The access key. + """ challenge = os.urandom(8) response = _hmac_sha1(key, challenge) self.protocol.send_apdu( @@ -322,6 +355,10 @@ def set_key(self, key: bytes) -> None: self.validate(key) def unset_key(self) -> None: + """Remove access code. + + WARNING: This removes authentication. + """ self.protocol.send_apdu(0, INS_SET_CODE, 0, 0, Tlv(TAG_KEY)) logger.info("Access code removed") self._has_key = False @@ -329,6 +366,11 @@ def unset_key(self) -> None: def put_credential( self, credential_data: CredentialData, touch_required: bool = False ) -> Credential: + """Add a OATH credential. + + :param credential_data: The credential data. + :param touch_required: The touch policy. + """ d = credential_data cred_id = d.get_id() secret = _hmac_shorten_key(d.secret, d.hash_algorithm) @@ -365,6 +407,12 @@ def put_credential( def rename_credential( self, credential_id: bytes, name: str, issuer: Optional[str] = None ) -> bytes: + """Rename a OATH credential. + + :param credential_id: The id of the credential. + :param name: The new name of the credential. + :param issuer: The credential issuer. + """ require_version(self.version, (5, 3, 1)) _, _, period = _parse_cred_id(credential_id, OATH_TYPE.TOTP) new_id = _format_cred_id(issuer, name, OATH_TYPE.TOTP, period) @@ -375,6 +423,7 @@ def rename_credential( return new_id def list_credentials(self) -> List[Credential]: + """List OATH credentials.""" creds = [] for tlv in Tlv.parse_list(self.protocol.send_apdu(0, INS_LIST, 0, 0)): data = Tlv.unpack(TAG_NAME_LIST, tlv) @@ -389,6 +438,11 @@ def list_credentials(self) -> List[Credential]: return creds def calculate(self, credential_id: bytes, challenge: bytes) -> bytes: + """Perform a calculate for an OATH credential. + + :param credential_id: The id of the credential. + :param challenge: The challenge. + """ resp = Tlv.unpack( TAG_RESPONSE, self.protocol.send_apdu( @@ -402,12 +456,20 @@ def calculate(self, credential_id: bytes, challenge: bytes) -> bytes: return resp[1:] def delete_credential(self, credential_id: bytes) -> None: + """Delete an OATH credential. + + :param credential_id: The id of the credential. + """ self.protocol.send_apdu(0, INS_DELETE, 0, 0, Tlv(TAG_NAME, credential_id)) logger.info("Credential deleted") def calculate_all( self, timestamp: Optional[int] = None ) -> Mapping[Credential, Optional[Code]]: + """Calculate codes for all OATH credentials on the YubiKey. + + :param timestamp: A timestamp. + """ timestamp = int(timestamp or time()) challenge = _get_challenge(timestamp, DEFAULT_PERIOD) logger.debug(f"Calculating all codes for time={timestamp}") @@ -445,6 +507,11 @@ def calculate_all( def calculate_code( self, credential: Credential, timestamp: Optional[int] = None ) -> Code: + """Calculate code for an OATH credential. + + :param credential: The credential object. + :param timestamp: The timestamp. + """ if credential.device_id != self.device_id: raise ValueError("Credential does not belong to this YubiKey") diff --git a/yubikit/openpgp.py b/yubikit/openpgp.py index 833747c7..3e3cac7d 100644 --- a/yubikit/openpgp.py +++ b/yubikit/openpgp.py @@ -1,3 +1,30 @@ +# Copyright (c) 2023 Yubico AB +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + from .core import ( Tlv, Version, @@ -332,6 +359,8 @@ def parse(cls, encoded: bytes) -> "PwStatus": @unique class CRT(bytes, Enum): + """Control Reference Template values.""" + SIG = Tlv(0xB6) DEC = Tlv(0xB8) AUT = Tlv(0xA4) @@ -396,6 +425,8 @@ class KEY_STATUS(IntEnum): # mypy doesn't handle abstract dataclasses well @dataclass # type: ignore[misc] class AlgorithmAttributes(abc.ABC): + """OpenPGP key algorithm attributes.""" + _supported_ids: ClassVar[Sequence[int]] algorithm_id: int @@ -617,6 +648,8 @@ def get_algorithm_attributes(self, key_ref: KEY_REF) -> AlgorithmAttributes: @dataclass class ApplicationRelatedData: + """OpenPGP related data.""" + aid: OpenPgpAid historical: bytes extended_length_info: Optional[ExtendedLengthInfo] @@ -664,7 +697,7 @@ class Kdf(abc.ABC): @abc.abstractmethod def process(self, pin: str, pw: PW) -> bytes: - """Runs the KDF on the input PIN.""" + """Run the KDF on the input PIN.""" @classmethod @abc.abstractmethod @@ -955,6 +988,8 @@ def _pad_message(attributes, message, hash_algorithm): class OpenPgpSession: + """A session with the OpenPGP appliation.""" + def __init__(self, connection: SmartCardConnection): self.protocol = SmartCardProtocol(connection) try: @@ -1002,7 +1037,10 @@ def extended_capabilities(self) -> ExtendedCapabilities: return self._app_data.discretionary.extended_capabilities def get_challenge(self, length: int) -> bytes: - """Get random data from the YubiKey.""" + """Get random data from the YubiKey. + + :param length: Length of the returned data. + """ e = self.extended_capabilities if EXTENDED_CAPABILITY_FLAGS.GET_CHALLENGE not in e.flags: raise NotSupportedError("GET_CHALLENGE is not supported") @@ -1013,12 +1051,19 @@ def get_challenge(self, length: int) -> bytes: return self.protocol.send_apdu(0, INS.GET_CHALLENGE, 0, 0, le=length) def get_data(self, do: DO) -> bytes: - """Get a Data Object from the YubiKey.""" + """Get a Data Object from the YubiKey. + + :param do: The Data Object to get. + """ logger.debug(f"Reading Data Object {do.name} ({do:X})") return self.protocol.send_apdu(0, INS.GET_DATA, do >> 8, do & 0xFF) def put_data(self, do: DO, data: Union[bytes, SupportsBytes]) -> None: - """Write a Data Object to the YubiKey.""" + """Write a Data Object to the YubiKey. + + :param do: The Data Object to write to. + :param data: The data to write. + """ self.protocol.send_apdu(0, INS.PUT_DATA, do >> 8, do & 0xFF, bytes(data)) logger.info(f"Wrote Data Object {do.name} ({do:X})") @@ -1036,14 +1081,19 @@ def get_application_related_data(self) -> ApplicationRelatedData: return ApplicationRelatedData.parse(self.get_data(DO.APPLICATION_RELATED_DATA)) def set_signature_pin_policy(self, pin_policy: PIN_POLICY) -> None: - """Requires Admin PIN verification.""" + """Set signature PIN policy. + + Requires Admin PIN verification. + + :param pin_policy: The PIN policy. + """ logger.debug(f"Setting Signature PIN policy to {pin_policy}") data = struct.pack(">B", pin_policy) self.put_data(DO.PW_STATUS_BYTES, data) logger.info("Signature PIN policy set") def reset(self) -> None: - """Performs a factory reset on the OpenPGP application. + """Perform a factory reset on the OpenPGP application. WARNING: This will delete all stored keys, certificates and other data. """ @@ -1075,6 +1125,10 @@ def set_pin_attempts( WARNING: On YubiKey NEO this will reset the PINs to their default values. Requires Admin PIN verification. + + :param user_attempts: The User PIN attempts. + :param reset_attempts: The Reset Code attempts. + :param admin_attempts: The Admin PIN attempts. """ if self.version[0] == 1: # YubiKey NEO @@ -1108,6 +1162,8 @@ def set_kdf(self, kdf: Kdf) -> None: If a Reset Code is present, it will be invalidated. This command requires Admin PIN verification. + + :param kdf: The key derivation function. """ e = self._app_data.discretionary.extended_capabilities if EXTENDED_CAPABILITY_FLAGS.KDF not in e.flags: @@ -1132,7 +1188,11 @@ def verify_pin(self, pin, extended: bool = False): This will unlock functionality that requires User PIN verification. Note that with `extended=False` (default) only sign operations are allowed. - Inversely, with `extended=False` sign operations are NOT allowed. + Inversely, with `extended=True` sign operations are NOT allowed. + + :param pin: The User PIN. + :param extended: If `False` only sign operations are allowed, + otherwise sign operations are NOT allowed. """ logger.debug(f"Verifying User PIN in mode {'82' if extended else '81'}") self._verify(PW.USER, pin, 1 if extended else 0) @@ -1141,11 +1201,17 @@ def verify_admin(self, admin_pin): """Verify the Admin PIN. This will unlock functionality that requires Admin PIN verification. + + :param admin_pin: The Admin PIN. """ logger.debug("Verifying Admin PIN") self._verify(PW.ADMIN, admin_pin) def unverify_pin(self, pw: PW) -> None: + """Reset verification for PIN. + + :param pw: The User, Admin or Reset PIN + """ require_version(self.version, (5, 6, 0)) logger.debug(f"Resetting verification for {pw.name} PIN") self.protocol.send_apdu(0, INS.VERIFY, 0xFF, pw) @@ -1170,11 +1236,19 @@ def _change(self, pw: PW, pin: str, new_pin: str) -> None: logger.info(f"New {pw.name} PIN set") def change_pin(self, pin: str, new_pin: str) -> None: - """Change the User PIN.""" + """Change the User PIN. + + :param pin: The current User PIN. + :param new_pin: The new User PIN. + """ self._change(PW.USER, pin, new_pin) def change_admin(self, admin_pin: str, new_admin_pin: str) -> None: - """Change the Admin PIN.""" + """Change the Admin PIN. + + :param admin_pin: The current Admin PIN. + :param new_admin_pin: The new Admin PIN. + """ self._change(PW.ADMIN, admin_pin, new_admin_pin) def set_reset_code(self, reset_code: str) -> None: @@ -1184,6 +1258,8 @@ def set_reset_code(self, reset_code: str) -> None: blocked, using the reset_pin method. This command requires Admin PIN verification. + + :param reset_code: The Reset Code for User PIN. """ logger.debug("Setting a new PIN Reset Code") data = self.get_kdf().process(PW.RESET, reset_code) @@ -1191,9 +1267,12 @@ def set_reset_code(self, reset_code: str) -> None: logger.info("New Reset Code has been set") def reset_pin(self, new_pin: str, reset_code: Optional[str] = None) -> None: - """Resets the User PIN to a new value. + """Reset the User PIN to a new value. This command requires Admin PIN verification, or the Reset Code. + + :param new_pin: The new user PIN. + :param reset_code: The Reset Code. """ logger.debug("Resetting User PIN") p1 = 2 @@ -1216,7 +1295,10 @@ def reset_pin(self, new_pin: str, reset_code: Optional[str] = None) -> None: logger.info("New User PIN has been set") def get_algorithm_attributes(self, key_ref: KEY_REF) -> AlgorithmAttributes: - """Get the algorithm attributes for one of the key slots.""" + """Get the algorithm attributes for one of the key slots. + + :param key_ref: The key slot. + """ logger.debug(f"Getting Algorithm Attributes for {key_ref.name}") data = self.get_application_related_data() return data.discretionary.get_algorithm_attributes(key_ref) @@ -1283,12 +1365,15 @@ def get_algorithm_information( def set_algorithm_attributes( self, key_ref: KEY_REF, attributes: AlgorithmAttributes ) -> None: - """Sets the algorithm attributes for a key slot. + """Set the algorithm attributes for a key slot. WARNING: This will delete any key already stored in the slot if the attributes are changed! This command requires Admin PIN verification. + + :param key_ref: The key slot. + :param attributes: The algorithm attributes to set. """ logger.debug("Setting Algorithm Attributes for {key_ref.name}") supported = self.get_algorithm_information() @@ -1301,7 +1386,10 @@ def set_algorithm_attributes( logger.info("Algorithm Attributes have been changed") def get_uif(self, key_ref: KEY_REF) -> UIF: - """Get the User Interaction Flag (touch requirement) for a key.""" + """Get the User Interaction Flag (touch requirement) for a key. + + :param key_ref: The key slot. + """ try: return UIF.parse(self.get_data(key_ref.uif_do)) except ApduError as e: @@ -1314,6 +1402,9 @@ def set_uif(self, key_ref: KEY_REF, uif: UIF) -> None: """Set the User Interaction Flag (touch requirement) for a key. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param uif: The User Interaction Flag. """ require_version(self.version, (4, 2, 0)) if key_ref == KEY_REF.ATT: @@ -1350,6 +1441,9 @@ def set_generation_time(self, key_ref: KEY_REF, timestamp: int) -> None: """Set the generation timestamp for a key. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param timestamp: The timestamp. """ logger.debug(f"Setting key generation timestamp for {key_ref.name}") self.put_data(key_ref.generation_time_do, struct.pack(">I", timestamp)) @@ -1364,13 +1458,19 @@ def set_fingerprint(self, key_ref: KEY_REF, fingerprint: bytes) -> None: """Set the fingerprint for a key. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param fingerprint: The fingerprint. """ logger.debug(f"Setting key fingerprint for {key_ref.name}") self.put_data(key_ref.fingerprint_do, fingerprint) logger.info("Key fingerprint set for {key_ref.name}") def get_public_key(self, key_ref: KEY_REF) -> PublicKey: - """Get the public key from a slot.""" + """Get the public key from a slot. + + :param key_ref: The key slot. + """ logger.debug(f"Getting public key for {key_ref.name}") resp = self.protocol.send_apdu(0, INS.GENERATE_ASYM, 0x81, 0x00, key_ref.crt) data = Tlv.parse_dict(Tlv.unpack(TAG_PUBLIC_KEY, resp)) @@ -1386,6 +1486,9 @@ def generate_rsa_key( """Generate an RSA key in the given slot. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param key_size: The size of the RSA key. """ if (4, 2, 0) <= self.version < (4, 3, 5): raise NotSupportedError("RSA key generation not supported on this YubiKey") @@ -1409,6 +1512,9 @@ def generate_ec_key(self, key_ref: KEY_REF, curve_oid: CurveOid) -> EcPublicKey: """Generate an EC key in the given slot. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param curve_oid: The curve OID. """ require_version(self.version, (5, 2, 0)) @@ -1426,9 +1532,12 @@ def generate_ec_key(self, key_ref: KEY_REF, curve_oid: CurveOid) -> EcPublicKey: return _parse_ec_key(curve_oid, data) def put_key(self, key_ref: KEY_REF, private_key: PrivateKey) -> None: - """Import a private key into the give slot. + """Import a private key into the given slot. Requires Admin PIN verification. + + :param key_ref: The key slot. + :param private_key: The private key to import. """ logger.debug(f"Importing a private key for {key_ref.name}") @@ -1450,9 +1559,11 @@ def put_key(self, key_ref: KEY_REF, private_key: PrivateKey) -> None: logger.info(f"Private key imported for {key_ref.name}") def delete_key(self, key_ref: KEY_REF) -> None: - """Deletes the contents of a key slot. + """Delete the contents of a key slot. Requires Admin PIN verification. + + :param key_ref: The key slot. """ if self.version < (4, 0, 0): # Import over the key @@ -1489,7 +1600,10 @@ def _select_certificate(self, key_ref: KEY_REF) -> None: raise def get_certificate(self, key_ref: KEY_REF) -> x509.Certificate: - """Get a certificate from a slot.""" + """Get a certificate from a slot. + + :param key_ref: The slot. + """ logger.debug(f"Getting certificate for key {key_ref.name}") if key_ref == KEY_REF.ATT: require_version(self.version, (5, 2, 0)) @@ -1502,9 +1616,12 @@ def get_certificate(self, key_ref: KEY_REF) -> x509.Certificate: return x509.load_der_x509_certificate(data, default_backend()) def put_certificate(self, key_ref: KEY_REF, certificate: x509.Certificate) -> None: - """Imports a certificate into a slot. + """Import a certificate into a slot. Requires Admin PIN verification. + + :param key_ref: The slot. + :param certificate: The X.509 certificate to import. """ cert_data = certificate.public_bytes(Encoding.DER) logger.debug(f"Importing certificate for key {key_ref.name}") @@ -1517,9 +1634,11 @@ def put_certificate(self, key_ref: KEY_REF, certificate: x509.Certificate) -> No logger.info(f"Certificate imported for key {key_ref.name}") def delete_certificate(self, key_ref: KEY_REF) -> None: - """Deletes a certificate in a slot. + """Delete a certificate in a slot. Requires Admin PIN verification. + + :param key_ref: The slot. """ logger.debug(f"Deleting certificate for key {key_ref.name}") if key_ref == KEY_REF.ATT: @@ -1531,12 +1650,14 @@ def delete_certificate(self, key_ref: KEY_REF) -> None: logger.info(f"Certificate deleted for key {key_ref.name}") def attest_key(self, key_ref: KEY_REF) -> x509.Certificate: - """Creates an attestation certificate for a key. + """Create an attestation certificate for a key. The certificte is written to the certificate slot for the key, and its content is returned. Requires User PIN verification. + + :param key_ref: The key slot. """ require_version(self.version, (5, 2, 0)) logger.debug(f"Attesting key {key_ref.name}") @@ -1545,9 +1666,12 @@ def attest_key(self, key_ref: KEY_REF) -> x509.Certificate: return self.get_certificate(key_ref) def sign(self, message: bytes, hash_algorithm: hashes.HashAlgorithm) -> bytes: - """Signs a message using the SIG key. + """Sign a message using the SIG key. Requires User PIN verification. + + :param message: The message to sign. + :param hash_algorithm: The pre-signature hash algorithm. """ attributes = self.get_algorithm_attributes(KEY_REF.SIG) padded = _pad_message(attributes, message, hash_algorithm) @@ -1563,13 +1687,15 @@ def sign(self, message: bytes, hash_algorithm: hashes.HashAlgorithm) -> bytes: return response def decrypt(self, value: Union[bytes, EcPublicKey]) -> bytes: - """Decrypts a value using the DEC key. + """Decrypt a value using the DEC key. For RSA the `value` should be an encrypted block. For ECDH the `value` should be a peer public-key to perform the key exchange with, and the result will be the derived shared secret. Requires (extended) User PIN verification. + + :param value: The value to decrypt. """ attributes = self.get_algorithm_attributes(KEY_REF.DEC) logger.debug(f"Decrypting a value with {attributes}") @@ -1593,9 +1719,12 @@ def decrypt(self, value: Union[bytes, EcPublicKey]) -> bytes: def authenticate( self, message: bytes, hash_algorithm: hashes.HashAlgorithm ) -> bytes: - """Authenticates a message using the AUT key. + """Authenticate a message using the AUT key. Requires User PIN verification. + + :param message: The message to authenticate. + :param hash_algorithm: The pre-authentication hash algorithm. """ attributes = self.get_algorithm_attributes(KEY_REF.AUT) padded = _pad_message(attributes, message, hash_algorithm) diff --git a/yubikit/piv.py b/yubikit/piv.py index b5073c11..e197a800 100755 --- a/yubikit/piv.py +++ b/yubikit/piv.py @@ -432,6 +432,8 @@ def _parse_device_public_key(key_type, encoded): class PivSession: + """A session with the PIV application.""" + def __init__(self, connection: SmartCardConnection): self.protocol = SmartCardProtocol(connection) self.protocol.select(AID.PIV) @@ -483,6 +485,11 @@ def reset(self) -> None: def authenticate( self, key_type: MANAGEMENT_KEY_TYPE, management_key: bytes ) -> None: + """Authenticate to PIV with management key. + + :param key_type: The management key type. + :param management_key: The management key in raw bytes. + """ key_type = MANAGEMENT_KEY_TYPE(key_type) logger.debug(f"Authenticating with key type: {key_type}") response = self.protocol.send_apdu( @@ -523,6 +530,12 @@ def set_management_key( management_key: bytes, require_touch: bool = False, ) -> None: + """Set a new management key. + + :param key_type: The management key type. + :param management_key: The management key in raw bytes. + :param require_touch: The touch policy. + """ key_type = MANAGEMENT_KEY_TYPE(key_type) logger.debug(f"Setting management key of type: {key_type}") @@ -541,6 +554,10 @@ def set_management_key( logger.info("Management key set") def verify_pin(self, pin: str) -> None: + """Verify the PIN. + + :param pin: The PIN. + """ logger.debug("Verifying PIN") try: self.protocol.send_apdu(0, INS_VERIFY, 0, PIN_P2, _pin_bytes(pin)) @@ -553,6 +570,7 @@ def verify_pin(self, pin: str) -> None: raise InvalidPinError(retries) def get_pin_attempts(self) -> int: + """Get remaining PIN attempts.""" logger.debug("Getting PIN attempts") try: return self.get_pin_metadata().attempts_remaining @@ -571,21 +589,45 @@ def get_pin_attempts(self) -> int: return retries def change_pin(self, old_pin: str, new_pin: str) -> None: + """Change the PIN. + + :param old_pin: The current PIN. + :param new_pin: The new PIN. + """ logger.debug("Changing PIN") self._change_reference(INS_CHANGE_REFERENCE, PIN_P2, old_pin, new_pin) logger.info("New PIN set") def change_puk(self, old_puk: str, new_puk: str) -> None: + """Change the PUK. + + :param old_puk: The current PUK. + :param new_puk: The new PUK. + """ logger.debug("Changing PUK") self._change_reference(INS_CHANGE_REFERENCE, PUK_P2, old_puk, new_puk) logger.info("New PUK set") def unblock_pin(self, puk: str, new_pin: str) -> None: + """Reset PIN with PUK. + + :param puk: The PUK. + :param new_pin: The new PIN. + """ logger.debug("Using PUK to set new PIN") self._change_reference(INS_RESET_RETRY, PIN_P2, puk, new_pin) logger.info("New PIN set") def set_pin_attempts(self, pin_attempts: int, puk_attempts: int) -> None: + """Set PIN retries for PIN and PUK. + + Both PIN and PUK will be reset to default values when this is executed. + + Requires authentication with management key and PIN verification. + + :param pin_attempts: The PIN attempts. + :param puk_attempts: The PUK attempts. + """ logger.debug(f"Setting PIN/PUK attempts ({pin_attempts}, {puk_attempts})") self.protocol.send_apdu(0, INS_SET_PIN_RETRIES, pin_attempts, puk_attempts) self._max_pin_retries = pin_attempts @@ -593,14 +635,17 @@ def set_pin_attempts(self, pin_attempts: int, puk_attempts: int) -> None: logger.info("PIN/PUK attempts set") def get_pin_metadata(self) -> PinMetadata: + """Get PIN metadata.""" logger.debug("Getting PIN metadata") return self._get_pin_puk_metadata(PIN_P2) def get_puk_metadata(self) -> PinMetadata: + """Get PUK metadata.""" logger.debug("Getting PUK metadata") return self._get_pin_puk_metadata(PUK_P2) def get_management_key_metadata(self) -> ManagementKeyMetadata: + """Get management key metadata.""" logger.debug("Getting management key metadata") require_version(self.version, (5, 3, 0)) data = Tlv.parse_dict( @@ -614,6 +659,10 @@ def get_management_key_metadata(self) -> ManagementKeyMetadata: ) def get_slot_metadata(self, slot: SLOT) -> SlotMetadata: + """Get slot metadata. + + :param slot: The slot to get metadata from. + """ slot = SLOT(slot) logger.debug(f"Getting metadata for slot {slot}") require_version(self.version, (5, 3, 0)) @@ -635,6 +684,16 @@ def sign( hash_algorithm: hashes.HashAlgorithm, padding: Optional[AsymmetricPadding] = None, ) -> bytes: + """Sign message with key. + + Requires PIN verification. + + :param slot: The slot of the key to use. + :param key_type: The type of the key to sign with. + :param message: The message to sign. + :param hash_algorithm: The pre-signature hash algorithm to use. + :param padding: The pre-signature padding. + """ slot = SLOT(slot) key_type = KEY_TYPE(key_type) logger.debug( @@ -647,6 +706,14 @@ def sign( def decrypt( self, slot: SLOT, cipher_text: bytes, padding: AsymmetricPadding ) -> bytes: + """Decrypt cipher text. + + Requires PIN verification. + + :param slot: The slot. + :param cipher_text: The cipher text to decrypt. + :param padding: The padding of the plain text. + """ slot = SLOT(slot) if len(cipher_text) == 1024 // 8: key_type = KEY_TYPE.RSA1024 @@ -664,6 +731,13 @@ def decrypt( def calculate_secret( self, slot: SLOT, peer_public_key: ec.EllipticCurvePublicKey ) -> bytes: + """Calculate shared secret using ECDH. + + Requires PIN verification. + + :param slot: The slot. + :param peer_public_key: The peer's public key. + """ slot = SLOT(slot) key_type = KEY_TYPE.from_public_key(peer_public_key) if key_type.algorithm != ALGORITHM.EC: @@ -677,6 +751,12 @@ def calculate_secret( return self._use_private_key(slot, key_type, data, True) def get_object(self, object_id: int) -> bytes: + """Get object by ID. + + Requires PIN verification. + + :param object_id: The object identifier. + """ logger.debug(f"Reading data from object slot {hex(object_id)}") if object_id == OBJECT_ID.DISCOVERY: expected: int = OBJECT_ID.DISCOVERY @@ -698,6 +778,13 @@ def get_object(self, object_id: int) -> bytes: raise BadResponseError("Malformed object data", e) def put_object(self, object_id: int, data: Optional[bytes] = None) -> None: + """Write data to PIV object. + + Requires authentication with management key. + + :param object_id: The object identifier. + :param data: The object data. + """ self.protocol.send_apdu( 0, INS_PUT_DATA, @@ -708,6 +795,10 @@ def put_object(self, object_id: int, data: Optional[bytes] = None) -> None: logger.info(f"Data written to object slot {hex(object_id)}") def get_certificate(self, slot: SLOT) -> x509.Certificate: + """Get certificate from slot. + + :param slot: The slot to get the certificate from. + """ slot = SLOT(slot) logger.debug(f"Reading certificate in slot {slot}") try: @@ -732,6 +823,14 @@ def get_certificate(self, slot: SLOT) -> x509.Certificate: def put_certificate( self, slot: SLOT, certificate: x509.Certificate, compress: bool = False ) -> None: + """Import certificate to slot. + + Requires authentication with management key. + + :param slot: The slot to import the certificate to. + :param certificate: The certificate to import. + :param compress: If the certificate should be compressed or not. + """ slot = SLOT(slot) logger.debug(f"Storing certificate in slot {slot}") cert_data = certificate.public_bytes(Encoding.DER) @@ -751,6 +850,12 @@ def put_certificate( logger.info(f"Certificate written to slot {slot}, compression={compress}") def delete_certificate(self, slot: SLOT) -> None: + """Delete certificate. + + Requires authentication with management key. + + :param slot: The slot to delete the certificate from. + """ slot = SLOT(slot) logger.debug(f"Deleting certificate in slot {slot}") self.put_object(OBJECT_ID.from_slot(slot)) @@ -765,6 +870,15 @@ def put_key( pin_policy: PIN_POLICY = PIN_POLICY.DEFAULT, touch_policy: TOUCH_POLICY = TOUCH_POLICY.DEFAULT, ) -> None: + """Import a private key to slot. + + Requires authentication with management key. + + :param slot: The slot to import the key to. + :param private_key: The private key to import. + :param pin_policy: The PIN policy. + :param touch_policy: The touch policy. + """ slot = SLOT(slot) key_type = KEY_TYPE.from_public_key(private_key.public_key()) check_key_support(self.version, key_type, pin_policy, touch_policy, False) @@ -804,6 +918,15 @@ def generate_key( pin_policy: PIN_POLICY = PIN_POLICY.DEFAULT, touch_policy: TOUCH_POLICY = TOUCH_POLICY.DEFAULT, ) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: + """Generate private key in slot. + + Requires authentication with management key. + + :param slot: The slot to generate the private key in. + :param key_type: The key type. + :param pin_policy: The PIN policy. + :param touch_policy: The touch policy. + """ slot = SLOT(slot) key_type = KEY_TYPE(key_type) check_key_support(self.version, key_type, pin_policy, touch_policy, True) @@ -823,6 +946,11 @@ def generate_key( return _parse_device_public_key(key_type, Tlv.unpack(0x7F49, response)) def attest_key(self, slot: SLOT) -> x509.Certificate: + """Attest key in slot. + + :param slot: The slot where the key has been generated. + :return: A X.509 certificate. + """ require_version(self.version, (4, 3, 0)) slot = SLOT(slot) response = self.protocol.send_apdu(0, INS_ATTEST, slot, 0) diff --git a/yubikit/support.py b/yubikit/support.py index 33e291a9..b854b1fc 100644 --- a/yubikit/support.py +++ b/yubikit/support.py @@ -256,6 +256,9 @@ def read_info(conn: Connection, pid: Optional[PID] = None) -> DeviceInfo: required, for example to "fix" known bad values. The *pid* parameter must be provided whenever the YubiKey is connected via USB. + + :param conn: A connection to a YubiKey. + :param pid: The USB Product ID. """ logger.debug(f"Attempting to read device info, using {type(conn).__name__}") @@ -366,7 +369,11 @@ def _is_preview(version): def get_name(info: DeviceInfo, key_type: Optional[YUBIKEY]) -> str: - """Determine the product name of a YubiKey""" + """Determine the product name of a YubiKey + + :param info: The device info. + :param key_type: The YubiKey hardware platform. + """ usb_supported = info.supported_capabilities[TRANSPORT.USB] # Guess the key type (over NFC) diff --git a/yubikit/yubiotp.py b/yubikit/yubiotp.py index 4b9e0633..2d191819 100644 --- a/yubikit/yubiotp.py +++ b/yubikit/yubiotp.py @@ -705,6 +705,8 @@ def send_and_receive(self, slot, data, expected_len, event=None, on_keepalive=No class YubiOtpSession: + """A session with the YubiOTP application.""" + def __init__(self, connection: Union[OtpConnection, SmartCardConnection]): if isinstance(connection, OtpConnection): otp_protocol = OtpProtocol(connection) @@ -750,11 +752,13 @@ def version(self) -> Version: return self._version def get_serial(self) -> int: + """Get serial number.""" return bytes2int( self.backend.send_and_receive(CONFIG_SLOT.DEVICE_SERIAL, b"", 4) ) def get_config_state(self) -> ConfigState: + """Get configuration state of the YubiOTP application.""" return ConfigState(self.version, struct.unpack(" None: + """Write configuration to slot. + + :param slot: The slot to configure. + :param configuration: The slot configuration. + :param acc_code: The new access code. + :param cur_acc_code: The current access code. + """ if not configuration.is_supported_by(self.version): raise NotSupportedError( "This configuration is not supported on this YubiKey version" @@ -794,6 +805,13 @@ def update_configuration( acc_code: Optional[bytes] = None, cur_acc_code: Optional[bytes] = None, ) -> None: + """Update configuration in slot. + + :param slot: The slot to update the configuration in. + :param configuration: The slot configuration. + :param acc_code: The new access code. + :param cur_acc_code: The current access code. + """ if not configuration.is_supported_by(self.version): raise NotSupportedError( "This configuration is not supported on this YubiKey version" @@ -812,10 +830,16 @@ def update_configuration( ) def swap_slots(self) -> None: + """Swap the two slot configurations.""" logger.debug("Swapping touch slots") self._write_config(CONFIG_SLOT.SWAP, b"", None) def delete_slot(self, slot: SLOT, cur_acc_code: Optional[bytes] = None) -> None: + """Delete configuration stored in slot. + + :param slot: The slot to delete the configuration in. + :param cur_acc_code: The current access code. + """ slot = SLOT(slot) logger.debug(f"Deleting slot {slot}") self._write_config( @@ -827,6 +851,11 @@ def delete_slot(self, slot: SLOT, cur_acc_code: Optional[bytes] = None) -> None: def set_scan_map( self, scan_map: bytes, cur_acc_code: Optional[bytes] = None ) -> None: + """Update scan-codes on YubiKey. + + This updates the scan-codes (or keyboard presses) that the YubiKey + will use when typing out OTPs. + """ logger.debug("Writing scan map") self._write_config(CONFIG_SLOT.SCAN_MAP, scan_map, cur_acc_code) @@ -837,6 +866,13 @@ def set_ndef_configuration( cur_acc_code: Optional[bytes] = None, ndef_type: NDEF_TYPE = NDEF_TYPE.URI, ) -> None: + """Configure a slot to be used over NDEF (NFC). + + :param slot: The slot to configure. + :param uri: URI or static text. + :param cur_acc_code: The current access code. + :param ndef_type: The NDEF type (text or URI). + """ slot = SLOT(slot) logger.debug(f"Writing NDEF configuration for slot {slot} of type {ndef_type}") self._write_config( @@ -852,6 +888,12 @@ def calculate_hmac_sha1( event: Optional[Event] = None, on_keepalive: Optional[Callable[[int], None]] = None, ) -> bytes: + """Perform a challenge-response operation using HMAC-SHA1. + + :param slot: The slot to perform the operation against. + :param challenge: The challenge. + :param event: An event. + """ require_version(self.version, (2, 2, 0)) slot = SLOT(slot) logger.debug(f"Calculating response for slot {slot}")