diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index 12cfbd3..8724152 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -26,4 +26,7 @@ jobs: run: uv run ruff format --diff . - name: ruff check - run: uv run ruff check --diff . \ No newline at end of file + run: uv run ruff check --diff . + + - name: mypy + run: uv run mypy . \ No newline at end of file diff --git a/pkcs11/__init__.py b/pkcs11/__init__.py index 77ef7c0..c67e5bb 100644 --- a/pkcs11/__init__.py +++ b/pkcs11/__init__.py @@ -2,17 +2,38 @@ :mod:`pkcs11` defines a high-level, "Pythonic" interface to PKCS#11. """ -from .constants import * # noqa: F403 -from .exceptions import * # noqa: F403 -from .mechanisms import * # noqa: F403 -from .types import * # noqa: F403 -from .util import dh, dsa, ec, rsa, x509 # noqa: F401 +import typing + +from pkcs11.constants import ( + Attribute, + CertificateType, + MechanismFlag, + ObjectClass, + SlotFlag, + TokenFlag, + UserType, +) +from pkcs11.exceptions import * # noqa: F403 +from pkcs11.mechanisms import KDF, MGF, KeyType, Mechanism +from pkcs11.types import ( + Certificate, + DomainParameters, + Library, + MechanismInfo, + PrivateKey, + PublicKey, + SecretKey, + Session, + Slot, + Token, +) +from pkcs11.util import dh, dsa, ec, rsa, x509 _so = None _lib = None -def lib(so): +def lib(so: str) -> Library: """ Wrap the main library call coming from Cython with a preemptive dynamic loading. @@ -22,15 +43,44 @@ def lib(so): if _lib: if _so != so: - raise AlreadyInitialized( # noqa: F405 - "Already initialized with %s" % so - ) + raise AlreadyInitialized("Already initialized with %s" % so) # noqa: F405 else: return _lib - from . import _pkcs11 + from . import _pkcs11 # type: ignore[attr-defined] - _lib = _pkcs11.lib(so) + _lib = typing.cast(Library, _pkcs11.lib(so)) _so = so return _lib + + +__all__ = [ + "KDF", + "MGF", + "Attribute", + "Certificate", + "CertificateType", + "DomainParameters", + "KeyType", + "Library", + "Mechanism", + "MechanismFlag", + "MechanismInfo", + "ObjectClass", + "PrivateKey", + "PublicKey", + "SecretKey", + "Session", + "Slot", + "SlotFlag", + "Token", + "TokenFlag", + "UserType", + "dh", + "dsa", + "ec", + "lib", + "rsa", + "x509", +] diff --git a/pkcs11/_errors.pyx b/pkcs11/_errors.pyx index ce49998..e6cf908 100644 --- a/pkcs11/_errors.pyx +++ b/pkcs11/_errors.pyx @@ -2,7 +2,7 @@ Map from CKR return codes to Python exceptions. """ -from .exceptions import * +from pkcs11.exceptions import * cdef ERROR_MAP = { CKR_ATTRIBUTE_TYPE_INVALID: AttributeTypeInvalid, diff --git a/pkcs11/_pkcs11.pyx b/pkcs11/_pkcs11.pyx index 343293f..a05abc4 100644 --- a/pkcs11/_pkcs11.pyx +++ b/pkcs11/_pkcs11.pyx @@ -8,9 +8,6 @@ for Sphinx/Jedi/etc, as this module is not importable without having the library loaded. """ -from __future__ import (absolute_import, unicode_literals, - print_function, division) - from cython.view cimport array from cpython.mem cimport PyMem_Malloc, PyMem_Free @@ -19,16 +16,25 @@ IF UNAME_SYSNAME == "Windows": ELSE: from posix cimport dlfcn -from ._pkcs11_defn cimport * +from pkcs11._pkcs11_defn cimport * include '_errors.pyx' include '_utils.pyx' -from . import types -from .defaults import * -from .exceptions import * -from .constants import * -from .mechanisms import * -from .types import ( +from pkcs11 import types +from pkcs11.defaults import ( + DEFAULT_DERIVE_MECHANISMS, + DEFAULT_ENCRYPT_MECHANISMS, + DEFAULT_GENERATE_MECHANISMS, + DEFAULT_KEY_CAPABILITIES, + DEFAULT_MECHANISM_PARAMS, + DEFAULT_PARAM_GENERATE_MECHANISMS, + DEFAULT_SIGN_MECHANISMS, + DEFAULT_WRAP_MECHANISMS, +) +from pkcs11.exceptions import ArgumentsBad +from pkcs11.constants import DEFAULT, Attribute, MechanismFlag, ObjectClass, UserType, TokenFlag +from pkcs11.mechanisms import KeyType, Mechanism +from pkcs11.types import ( _CK_UTF8CHAR_to_str, _CK_VERSION_to_tuple, _CK_MECHANISM_TYPE_to_enum, diff --git a/pkcs11/_utils.pyx b/pkcs11/_utils.pyx index 57c8f72..edb8710 100644 --- a/pkcs11/_utils.pyx +++ b/pkcs11/_utils.pyx @@ -2,8 +2,7 @@ Type wrangling utility functions. """ -from .constants import * -from .mechanisms import * +from pkcs11.defaults import ATTRIBUTE_TYPES cdef CK_BYTE_buffer(length): diff --git a/pkcs11/constants.py b/pkcs11/constants.py index 6ead8a8..9737b99 100644 --- a/pkcs11/constants.py +++ b/pkcs11/constants.py @@ -5,11 +5,7 @@ use these classes. """ -try: - from enum import IntEnum, IntFlag, unique -except ImportError: - from aenum import IntEnum, IntFlag, unique - +from enum import IntEnum, IntFlag, unique DEFAULT = object() """Sentinel value used in templates. @@ -58,7 +54,7 @@ class ObjectClass(IntEnum): _VENDOR_DEFINED = 0x80000000 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name @@ -344,7 +340,7 @@ class Attribute(IntEnum): _VENDOR_DEFINED = 0x80000000 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name diff --git a/pkcs11/defaults.py b/pkcs11/defaults.py index 66f821f..c22dc95 100644 --- a/pkcs11/defaults.py +++ b/pkcs11/defaults.py @@ -7,14 +7,15 @@ from datetime import datetime from struct import Struct +from typing import Any, Callable -from .constants import ( +from pkcs11.constants import ( Attribute, CertificateType, MechanismFlag, ObjectClass, ) -from .mechanisms import MGF, KeyType, Mechanism +from pkcs11.mechanisms import MGF, KeyType, Mechanism DEFAULT_GENERATE_MECHANISMS = { KeyType.AES: Mechanism.AES_KEY_GEN, @@ -35,7 +36,7 @@ _SIGNING = MechanismFlag.SIGN | MechanismFlag.VERIFY _WRAPPING = MechanismFlag.WRAP | MechanismFlag.UNWRAP -DEFAULT_KEY_CAPABILITIES = { +DEFAULT_KEY_CAPABILITIES: dict[KeyType, MechanismFlag] = { KeyType.AES: _ENCRYPTION | _SIGNING | _WRAPPING, KeyType.DES2: _ENCRYPTION | _SIGNING | _WRAPPING, KeyType.DES3: _ENCRYPTION | _SIGNING | _WRAPPING, @@ -43,9 +44,10 @@ KeyType.DSA: _SIGNING, KeyType.EC: _SIGNING | MechanismFlag.DERIVE, KeyType.RSA: _ENCRYPTION | _SIGNING | _WRAPPING, - KeyType.GENERIC_SECRET: 0, + KeyType.GENERIC_SECRET: 0, # type: ignore[dict-item] KeyType.EC_EDWARDS: _SIGNING, } + """ Default capabilities for generating keys. """ @@ -125,11 +127,11 @@ _biginteger = _bytes -def _enum(type_): +def _enum(type_: type[Any]) -> tuple[Callable[[Any], bytes], Callable[[Any], Any]]: """Factory to pack/unpack intos into IntEnums.""" pack, unpack = _ulong - return (lambda v: pack(int(v)), lambda v: type_(unpack(v))) + return (lambda v: pack(int(v)), lambda v: type_(unpack(v))) # type: ignore[no-untyped-call] ATTRIBUTE_TYPES = { diff --git a/pkcs11/mechanisms.py b/pkcs11/mechanisms.py index 1ab2fb7..ad0f9c4 100644 --- a/pkcs11/mechanisms.py +++ b/pkcs11/mechanisms.py @@ -101,7 +101,7 @@ class KeyType(IntEnum): _VENDOR_DEFINED = 0x80000000 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name @@ -709,7 +709,7 @@ class Mechanism(IntEnum): _VENDOR_DEFINED = 0x80000000 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name @@ -729,7 +729,7 @@ class KDF(IntEnum): SHA512 = 0x00000008 CPDIVERSIFY = 0x00000009 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name @@ -744,5 +744,5 @@ class MGF(IntEnum): SHA512 = 0x00000004 SHA224 = 0x00000005 - def __repr__(self): + def __repr__(self) -> str: return "" % self.name diff --git a/pkcs11/py.typed b/pkcs11/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pkcs11/types.py b/pkcs11/types.py index 529ecf3..5033daa 100644 --- a/pkcs11/types.py +++ b/pkcs11/types.py @@ -4,23 +4,16 @@ This module provides stubs that are overrideen in pkcs11._pkcs11. """ +import types +import typing from binascii import hexlify +from collections.abc import Iterable, Iterator +from functools import cached_property from threading import RLock +from typing import Any, Literal, Optional, Protocol, TypedDict, TypeVar, Union, overload -try: - from functools import cached_property -except ImportError: - from cached_property import cached_property - -from .constants import ( - Attribute, - MechanismFlag, - ObjectClass, - SlotFlag, - TokenFlag, - UserType, -) -from .exceptions import ( +from pkcs11.constants import Attribute, MechanismFlag, ObjectClass, SlotFlag, TokenFlag, UserType +from pkcs11.exceptions import ( ArgumentsBad, AttributeTypeInvalid, MultipleObjectsReturned, @@ -28,23 +21,32 @@ SignatureInvalid, SignatureLenRange, ) -from .mechanisms import KeyType, Mechanism +from pkcs11.mechanisms import KeyType, Mechanism PROTECTED_AUTH = object() """Indicate the pin should be supplied via an external mechanism (e.g. pin pad)""" +# Using typing.Self with Python 3.11 +Self = TypeVar("Self", bound="Object") +AttributeDict = dict[Attribute, Union[bytes, bool, str, list[int], ObjectClass, KeyType]] + + +class VersionDict(TypedDict): + major: str + minor: str + -def _CK_UTF8CHAR_to_str(data): +def _CK_UTF8CHAR_to_str(data: bytes) -> str: """Convert CK_UTF8CHAR to string.""" return data.rstrip(b"\0").decode("utf-8").rstrip() -def _CK_VERSION_to_tuple(data): +def _CK_VERSION_to_tuple(data: VersionDict) -> tuple[str, str]: """Convert CK_VERSION to tuple.""" return (data["major"], data["minor"]) -def _CK_MECHANISM_TYPE_to_enum(mechanism): +def _CK_MECHANISM_TYPE_to_enum(mechanism: int) -> Union[Mechanism, int]: """Convert CK_MECHANISM_TYPE to enum or be okay.""" try: return Mechanism(mechanism) @@ -52,6 +54,42 @@ def _CK_MECHANISM_TYPE_to_enum(mechanism): return mechanism +class Library(Protocol): + so: str + manufacturer_id: str + library_description: str + cryptoki_version: str + library_version: str + + def __init__(self, so: str) -> None: ... + + def __str__(self) -> str: ... + + def __repr__(self) -> str: ... + + def get_slots(self, token_present: bool = False) -> list["Slot"]: ... + + def get_tokens( + self, + token_label: Optional[str] = None, + token_serial: Optional[bytes] = None, + token_flags: Optional[TokenFlag] = None, + slot_flags: Optional[SlotFlag] = None, + mechanisms: Optional[set[Mechanism]] = None, + ) -> Iterator["Token"]: ... + + def get_token( + self, + token_label: Optional[str] = None, + token_serial: Optional[bytes] = None, + token_flags: Optional[TokenFlag] = None, + slot_flags: Optional[SlotFlag] = None, + mechanisms: Optional[set[Mechanism]] = None, + ) -> "Token": ... + + def reinitialize(self) -> None: ... + + class MechanismInfo: """ Information about a mechanism. @@ -59,7 +97,14 @@ class MechanismInfo: See :meth:`pkcs11.Slot.get_mechanism_info`. """ - def __init__(self, slot, mechanism, ulMinKeySize=None, ulMaxKeySize=None, flags=None, **kwargs): + def __init__( + self, + slot: "Slot", + mechanism: Mechanism, + ulMinKeySize: Optional[int] = None, + ulMaxKeySize: Optional[int] = None, + flags: Optional[int] = None, + ): self.slot = slot """:class:`pkcs11.Slot` this information is for.""" self.mechanism = mechanism @@ -68,10 +113,10 @@ def __init__(self, slot, mechanism, ulMinKeySize=None, ulMaxKeySize=None, flags= """Minimum key length in bits (:class:`int`).""" self.max_key_length = ulMaxKeySize """Maximum key length in bits (:class:`int`).""" - self.flags = MechanismFlag(flags) + self.flags = MechanismFlag(flags) # type: ignore[arg-type] """Mechanism capabilities (:class:`pkcs11.constants.MechanismFlag`).""" - def __str__(self): + def __str__(self) -> str: return "\n".join( ( "Supported key lengths: [%s, %s]" % (self.min_key_length, self.max_key_length), @@ -79,7 +124,7 @@ def __str__(self): ) ) - def __repr__(self): + def __repr__(self) -> str: return "<{klass} (mechanism={mechanism}, flags={flags})>".format( klass=type(self).__name__, mechanism=str(self.mechanism), flags=str(self.flags) ) @@ -96,15 +141,14 @@ class Slot: def __init__( self, - lib, - slot_id, - slotDescription=None, - manufacturerID=None, - hardwareVersion=None, - firmwareVersion=None, - flags=None, - **kwargs, - ): + lib: Library, + slot_id: int, + slotDescription: bytes, + manufacturerID: bytes, + hardwareVersion: VersionDict, + firmwareVersion: VersionDict, + flags: int, + ) -> None: self._lib = lib # Hold a reference to the lib to prevent gc self.slot_id = slot_id @@ -120,7 +164,7 @@ def __init__( self.flags = SlotFlag(flags) """Capabilities of this slot (:class:`SlotFlag`).""" - def get_token(self): + def get_token(self) -> "Token": """ Returns the token loaded into this slot. @@ -128,7 +172,7 @@ def get_token(self): """ raise NotImplementedError() - def get_mechanisms(self): + def get_mechanisms(self) -> set[Mechanism]: """ Returns the mechanisms supported by this device. @@ -136,7 +180,7 @@ def get_mechanisms(self): """ raise NotImplementedError() - def get_mechanism_info(self, mechanism): + def get_mechanism_info(self, mechanism: Mechanism) -> MechanismInfo: """ Returns information about the mechanism. @@ -145,10 +189,10 @@ def get_mechanism_info(self, mechanism): """ raise NotImplementedError() - def __eq__(self, other): - return self.slot_id == other.slot_id + def __eq__(self, other: Any) -> bool: + return self.slot_id == other.slot_id # type: ignore[no-any-return] - def __str__(self): + def __str__(self) -> str: return "\n".join( ( "Slot Description: %s" % self.slot_description, @@ -159,7 +203,7 @@ def __str__(self): ) ) - def __repr__(self): + def __repr__(self) -> str: return "<{klass} (slotID={slot_id} flags={flags})>".format( klass=type(self).__name__, slot_id=self.slot_id, flags=str(self.flags) ) @@ -175,16 +219,15 @@ class Token: def __init__( self, - slot, - label=None, - serialNumber=None, - model=None, - manufacturerID=None, - hardwareVersion=None, - firmwareVersion=None, - flags=None, - **kwargs, - ): + slot: Slot, + label: bytes, + serialNumber: bytes, + model: bytes, + manufacturerID: bytes, + hardwareVersion: VersionDict, + firmwareVersion: VersionDict, + flags: int, + ) -> None: self.slot = slot """The :class:`Slot` this token is installed in.""" self.label = _CK_UTF8CHAR_to_str(label) @@ -202,10 +245,16 @@ def __init__( self.flags = TokenFlag(flags) """Capabilities of this token (:class:`pkcs11.flags.TokenFlag`).""" - def __eq__(self, other): - return self.slot == other.slot + def __eq__(self, other: Any) -> bool: + return self.slot == other.slot # type: ignore[no-any-return] - def open(self, rw=False, user_pin=None, so_pin=None, user_type=None): + def open( + self, + rw: bool = False, + user_pin: Optional[str] = None, + so_pin: Optional[str] = None, + user_type: Optional[UserType] = None, + ) -> "Session": """ Open a session on the token and optionally log in as a user or security officer (pass one of `user_pin` or `so_pin`). Pass PROTECTED_AUTH to @@ -231,11 +280,11 @@ def open(self, rw=False, user_pin=None, so_pin=None, user_type=None): """ raise NotImplementedError() - def __str__(self): + def __str__(self) -> str: return self.label - def __repr__(self): - return "<{klass} (label='{label}' serial={serial} flags={flags})>".format( + def __repr__(self) -> str: + return "<{klass} (label='{label!r}' serial={serial!r} flags={flags})>".format( klass=type(self).__name__, label=self.label, serial=self.serial, flags=str(self.flags) ) @@ -251,7 +300,9 @@ class Session: context manager or closed with :meth:`close`. """ - def __init__(self, token, handle, rw=False, user_type=UserType.NOBODY): + def __init__( + self, token: Token, handle: int, rw: bool = False, user_type: UserType = UserType.NOBODY + ) -> None: self.token = token """:class:`Token` this session is on.""" @@ -266,23 +317,71 @@ def __init__(self, token, handle, rw=False, user_type=UserType.NOBODY): self.user_type = user_type """User type for this session (:class:`pkcs11.constants.UserType`).""" - def __eq__(self, other): - return self.token == other.token and self._handle == other._handle + def __eq__(self, other: Any) -> bool: + return ( + isinstance(self, type(other)) + and self.token == other.token + and self._handle == other._handle + ) - def __hash__(self): + def __hash__(self) -> int: return hash(self._handle) - def __enter__(self): + def __enter__(self) -> "Session": return self - def __exit__(self, type_, value, traceback): + def __exit__( + self, type_: type[Exception], value: Exception, traceback: types.TracebackType + ) -> None: self.close() - def close(self): + def close(self) -> None: """Close the session.""" raise NotImplementedError() - def get_key(self, object_class=None, key_type=None, label=None, id=None): + @typing.overload + def get_key( + self, + object_class: Literal[ObjectClass.PRIVATE_KEY], + key_type: Optional[KeyType] = None, + label: Optional[str] = None, + id: Optional[bytes] = None, + ) -> "PrivateKey": ... + + @typing.overload + def get_key( + self, + object_class: Literal[ObjectClass.PUBLIC_KEY], + key_type: Optional[KeyType] = None, + label: Optional[str] = None, + id: Optional[bytes] = None, + ) -> "PublicKey": ... + + @typing.overload + def get_key( + self, + object_class: Literal[ObjectClass.SECRET_KEY], + key_type: Optional[KeyType] = None, + label: Optional[str] = None, + id: Optional[bytes] = None, + ) -> "SecretKey": ... + + @typing.overload + def get_key( + self, + object_class: Optional[ObjectClass] = None, + key_type: Optional[KeyType] = None, + label: Optional[str] = None, + id: Optional[bytes] = None, + ) -> "Key": ... + + def get_key( + self, + object_class: Optional[ObjectClass] = None, + key_type: Optional[KeyType] = None, + label: Optional[str] = None, + id: Optional[bytes] = None, + ) -> "Key": """ Search for a key with any of `key_type`, `label` and/or `id`. @@ -303,7 +402,7 @@ def get_key(self, object_class=None, key_type=None, label=None, id=None): if object_class is None and key_type is None and label is None and id is None: raise ArgumentsBad("Must specify at least one search parameter.") - attrs = {} + attrs: AttributeDict = {} if object_class is not None: attrs[Attribute.CLASS] = object_class @@ -317,7 +416,7 @@ def get_key(self, object_class=None, key_type=None, label=None, id=None): if id is not None: attrs[Attribute.ID] = id - iterator = self.get_objects(attrs) + iterator: Iterator[Key] = typing.cast(Iterator[Key], self.get_objects(attrs)) try: try: @@ -333,9 +432,9 @@ def get_key(self, object_class=None, key_type=None, label=None, id=None): finally: # Force finalizing SearchIter rather than waiting for garbage # collection, so that we release the operation lock. - iterator._finalize() + iterator._finalize() # type: ignore[attr-defined] - def get_objects(self, attrs=None): + def get_objects(self, attrs: Optional[AttributeDict] = None) -> Iterator["Object"]: """ Search for objects matching `attrs`. Returns a generator. @@ -356,7 +455,7 @@ def get_objects(self, attrs=None): """ raise NotImplementedError() - def create_object(self, attrs): + def create_object(self, attrs: AttributeDict) -> "Object": """ Create a new object on the :class:`Token`. This is a low-level interface to create any type of object and can be used for importing @@ -383,7 +482,13 @@ def create_object(self, attrs): """ raise NotImplementedError() - def create_domain_parameters(self, key_type, attrs, local=False, store=False): + def create_domain_parameters( + self, + key_type: KeyType, + attrs: AttributeDict, + local: bool = False, + store: bool = False, + ) -> "DomainParameters": """ Create a domain parameters object from known parameters. @@ -415,13 +520,13 @@ def create_domain_parameters(self, key_type, attrs, local=False, store=False): def generate_domain_parameters( self, - key_type, - param_length, - store=False, - mechanism=None, - mechanism_param=None, - template=None, - ): + key_type: KeyType, + param_length: int, + store: bool = False, + mechanism: Optional[Mechanism] = None, + mechanism_param: Optional[tuple[Any, ...]] = None, + template: Optional[AttributeDict] = None, + ) -> "DomainParameters": """ Generate domain parameters. @@ -449,16 +554,16 @@ def generate_domain_parameters( def generate_key( self, - key_type, - key_length=None, - id=None, - label=None, - store=False, - capabilities=None, - mechanism=None, - mechanism_param=None, - template=None, - ): + key_type: KeyType, + key_length: Optional[int] = None, + id: Optional[bytes] = None, + label: Optional[str] = None, + store: bool = False, + capabilities: Optional[MechanismFlag] = None, + mechanism: Optional[Mechanism] = None, + mechanism_param: Optional[tuple[Any, ...]] = None, + template: Optional[AttributeDict] = None, + ) -> "SecretKey": """ Generate a single key (e.g. AES, DES). @@ -492,7 +597,14 @@ def generate_key( """ raise NotImplementedError() - def generate_keypair(self, key_type, key_length=None, **kwargs): + def _generate_keypair( + self, key_type: KeyType, key_length: Optional[int] = None, **kwargs: Any + ) -> tuple["PublicKey", "PrivateKey"]: + raise NotImplementedError() + + def generate_keypair( + self, key_type: KeyType, key_length: Optional[int] = None, **kwargs: Any + ) -> tuple["PublicKey", "PrivateKey"]: """ Generate a asymmetric keypair (e.g. RSA). @@ -521,7 +633,7 @@ def generate_keypair(self, key_type, key_length=None, **kwargs): else: return self._generate_keypair(key_type, key_length=key_length, **kwargs) - def seed_random(self, seed): + def seed_random(self, seed: bytes) -> None: """ Mix additional seed material into the RNG (if supported). @@ -529,7 +641,7 @@ def seed_random(self, seed): """ raise NotImplementedError() - def generate_random(self, nbits): + def generate_random(self, nbits: int) -> bytes: """ Generate `length` bits of random or pseudo-random data (if supported). @@ -538,7 +650,23 @@ def generate_random(self, nbits): """ raise NotImplementedError() - def digest(self, data, **kwargs): + def _digest(self, data: bytes, **kwargs: Any) -> bytes: + raise NotImplementedError() + + def _digest_generator( + self, + data: Union[tuple[Union[bytes, "Key"], ...], Iterator[Union[bytes, "Key"]]], + **kwargs: Any, + ) -> bytes: + raise NotImplementedError() + + def digest( + self, + data: Union[ + str, bytes, "Key", tuple[Union[bytes, "Key"], ...], Iterator[Union[bytes, "Key"]] + ], + **kwargs: Any, + ) -> bytes: """ Digest `data` using `mechanism`. @@ -580,21 +708,27 @@ class Object: :exc:`pkcs11.exceptions.AttributeTypeInvalid`. """ - object_class = None + object_class: ObjectClass """:class:`pkcs11.constants.ObjectClass` of this Object.""" - def __init__(self, session, handle): + def __init__(self, session: Session, handle: Optional[int]) -> None: self.session = session """:class:`Session` this object is valid for.""" self._handle = handle - def __eq__(self, other): - return self.session == other.session and self._handle == other._handle + def __eq__(self, other: Any) -> bool: + return self.session == other.session and self._handle == other._handle # type: ignore[no-any-return] - def __hash__(self): + def __hash__(self) -> int: return hash((self.session, self._handle)) - def copy(self, attrs): + if typing.TYPE_CHECKING: + + def __getitem__(self, item: Attribute) -> Any: ... + + def __setitem__(self, key: Attribute, value: Any) -> None: ... + + def copy(self: Self, attrs: AttributeDict) -> Self: """ Make a copy of the object with new attributes `attrs`. @@ -615,7 +749,7 @@ def copy(self, attrs): """ raise NotImplementedError() - def destroy(self): + def destroy(self) -> None: """ Destroy the object. @@ -629,191 +763,30 @@ def destroy(self): raise NotImplementedError() -class DomainParameters(Object): - """ - PKCS#11 Domain Parameters. - - Used to store domain parameters as part of the key generation step, e.g. - in DSA and Diffie-Hellman. - """ - - def __init__(self, session, handle, params=None): - super().__init__(session, handle) - self.params = params - - def __getitem__(self, key): - if self._handle is None: - try: - return self.params[key] - except KeyError as ex: - raise AttributeTypeInvalid from ex - else: - return super().__getitem__(key) - - def __setitem__(self, key, value): - if self._handle is None: - self.params[key] = value - else: - super().__setitem__(key, value) - - @cached_property - def key_type(self): - """ - Key type (:class:`pkcs11.mechanisms.KeyType`) these parameters - can be used to generate. - """ - return self[Attribute.KEY_TYPE] - - def generate_keypair( - self, - id=None, - label=None, - store=False, - capabilities=None, - mechanism=None, - mechanism_param=None, - public_template=None, - private_template=None, - ): - """ - Generate a key pair from these domain parameters (e.g. for - Diffie-Hellman. - - See :meth:`Session.generate_key` for more information. - - :param bytes id: Key identifier. - :param str label: Key label. - :param store: Store key on token (requires R/W session). - :param MechanismFlag capabilities: Key capabilities (or default). - :param Mechanism mechanism: Generation mechanism (or default). - :param bytes mechanism_param: Optional vector to the mechanism. - :param dict(Attribute,*) template: Additional attributes. - - :rtype: (PublicKey, PrivateKey) - """ - raise NotImplementedError() - - -class Key(Object): - """Base class for all key :class:`Object` types.""" - - @cached_property - def id(self): - """Key id (:class:`bytes`).""" - return self[Attribute.ID] - - @cached_property - def label(self): - """Key label (:class:`str`).""" - return self[Attribute.LABEL] - - @cached_property - def key_type(self): - """Key type (:class:`pkcs11.mechanisms.KeyType`).""" - return self[Attribute.KEY_TYPE] - - @cached_property - def _key_description(self): - """A description of the key.""" - try: - return "%s-bit %s" % (self.key_length, self.key_type.name) - except AttributeTypeInvalid: - return self.key_type.name - - def __repr__(self): - return "<%s label='%s' id='%s' %s>" % ( - type(self).__name__, - self.label, - hexlify(self.id).decode("ascii"), - self._key_description, - ) - - -class SecretKey(Key): - """ - A PKCS#11 :attr:`pkcs11.constants.ObjectClass.SECRET_KEY` object - (symmetric encryption key). - """ - - object_class = ObjectClass.SECRET_KEY - - @cached_property - def key_length(self): - """Key length in bits.""" - return self[Attribute.VALUE_LEN] * 8 - - -class PublicKey(Key): - """ - A PKCS#11 :attr:`pkcs11.constants.ObjectClass.PUBLIC_KEY` object - (asymmetric public key). - - RSA private keys can be imported and exported from PKCS#1 DER-encoding - using :func:`pkcs11.util.rsa.decode_rsa_public_key` and - :func:`pkcs11.util.rsa.encode_rsa_public_key` respectively. - """ - - object_class = ObjectClass.PUBLIC_KEY - - @cached_property - def key_length(self): - """Key length in bits.""" - return self[Attribute.MODULUS_BITS] - - -class PrivateKey(Key): - """ - A PKCS#11 :attr:`pkcs11.constants.ObjectClass.PRIVATE_KEY` object - (asymmetric private key). - - RSA private keys can be imported from PKCS#1 DER-encoding using - :func:`pkcs11.util.rsa.decode_rsa_private_key`. - - .. warning:: - - Private keys imported directly, rather than unwrapped from a trusted - private key should be considered insecure. - """ - - object_class = ObjectClass.PRIVATE_KEY - - @cached_property - def key_length(self): - """Key length in bits.""" - return len(self[Attribute.MODULUS]) * 8 - - -class Certificate(Object): +class EncryptMixin(Object): """ - A PKCS#11 :attr:`pkcs11.constants.ObjectClass.CERTIFICATE` object. - - PKCS#11 is limited in its handling of certificates, and does not - provide features like parsing of X.509 etc. These should be handled in - an external library. PKCS#11 will not set attributes on the certificate - based on the `VALUE`. - - :func:`pkcs11.util.x509.decode_x509_certificate` will extract attributes - from a certificate to create the object. + This :class:`Object` supports the encrypt capability. """ - object_class = ObjectClass.CERTIFICATE + def _encrypt(self, data: bytes, buffer_size: int = 8192, **kwargs: Any) -> bytes: + raise NotImplementedError - @cached_property - def certificate_type(self): - """ - The type of certificate. + def _encrypt_generator( + self, data: Iterable[bytes], buffer_size: int = 8192, **kwargs: Any + ) -> Iterator[bytes]: + raise NotImplementedError - :rtype: CertificateType - """ - return self[Attribute.CERTIFICATE_TYPE] + @overload + def encrypt(self, data: Union[str, bytes], buffer_size: int = 8192, **kwargs: Any) -> bytes: ... + @overload + def encrypt( + self, data: Union[Iterable[bytes]], buffer_size: int = 8192, **kwargs: Any + ) -> Iterator[bytes]: ... -class EncryptMixin(Object): - """ - This :class:`Object` supports the encrypt capability. - """ - - def encrypt(self, data, buffer_size=8192, **kwargs): + def encrypt( + self, data: Union[str, bytes, Iterable[bytes]], buffer_size: int = 8192, **kwargs: Any + ) -> Union[bytes, Iterator[bytes]]: """ Encrypt some `data`. @@ -893,12 +866,152 @@ def encrypt_file(file_in, file_out, buffer_size=8192): return self._encrypt_generator(data, buffer_size=buffer_size, **kwargs) +class VerifyMixin(Object): + """ + This :class:`Object` supports the verify capability. + """ + + def _verify(self, data: bytes, signature: bytes, **kwargs: Any) -> bool: + raise NotImplementedError + + def _verify_generator(self, data: Iterable[bytes], signature: bytes, **kwargs: Any) -> bool: + raise NotImplementedError + + def verify( + self, data: Union[str, bytes, Iterable[bytes]], signature: bytes, **kwargs: Any + ) -> bool: + """ + Verify some `data`. + + See :meth:`EncryptMixin.encrypt` for more information. + + Returns True if `signature` is valid for `data`. + + For DSA and ECDSA keys, PKCS #11 expects the two parameters (r & s) + as two concatenated `biginteger` of the same length. To convert these + from other formats, such as the format used by OpenSSL, use + :func:`pkcs11.util.dsa.decode_dsa_signature` or + :func:`pkcs11.util.ec.decode_ecdsa_signature`. + + :param data: data to sign + :type data: str, bytes or iter(bytes) + :param bytes signature: signature + :param Mechanism mechanism: optional signing mechanism + :param bytes mechanism_param: optional mechanism parameter + + :rtype: bool + """ + + # If data is a string, encode it now as UTF-8. + if isinstance(data, str): + data = data.encode("utf-8") + + try: + if isinstance(data, bytes): + self._verify(data, signature, **kwargs) + else: + self._verify_generator(data, signature, **kwargs) + + return True + + except (SignatureInvalid, SignatureLenRange): + return False + + +class DomainParameters(Object): + """ + PKCS#11 Domain Parameters. + + Used to store domain parameters as part of the key generation step, e.g. + in DSA and Diffie-Hellman. + """ + + def __init__( + self, session: Session, handle: Optional[int], params: Optional[AttributeDict] = None + ): + super().__init__(session, handle) + self.params = params + + def __getitem__(self, key: Attribute) -> Any: + if self._handle is None: + try: + return self.params[key] # type: ignore[index] # seems to be handled somehow? + except KeyError as ex: + raise AttributeTypeInvalid from ex + else: + return super().__getitem__(key) + + def __setitem__(self, key: Attribute, value: Any) -> None: + if self._handle is None: + self.params[key] = value # type: ignore[index] # seems to be handled somehow? + else: + super().__setitem__(key, value) + + @cached_property + def key_type(self) -> KeyType: + """ + Key type (:class:`pkcs11.mechanisms.KeyType`) these parameters + can be used to generate. + """ + return self[Attribute.KEY_TYPE] # type: ignore[no-any-return] + + def generate_keypair( + self, + id: Optional[bytes] = None, + label: Optional[str] = None, + store: bool = False, + capabilities: Optional[MechanismFlag] = None, + mechanism: Optional[Mechanism] = None, + mechanism_param: Optional[tuple[Any, ...]] = None, + public_template: Optional[AttributeDict] = None, + private_template: Optional[AttributeDict] = None, + ) -> tuple["PublicKey", "PrivateKey"]: + """ + Generate a key pair from these domain parameters (e.g. for + Diffie-Hellman. + + See :meth:`Session.generate_key` for more information. + + :param bytes id: Key identifier. + :param str label: Key label. + :param store: Store key on token (requires R/W session). + :param MechanismFlag capabilities: Key capabilities (or default). + :param Mechanism mechanism: Generation mechanism (or default). + :param bytes mechanism_param: Optional vector to the mechanism. + :param dict(Attribute,*) template: Additional attributes. + + :rtype: (PublicKey, PrivateKey) + """ + raise NotImplementedError() + + class DecryptMixin(Object): """ This :class:`Object` supports the decrypt capability. """ - def decrypt(self, data, buffer_size=8192, **kwargs): + def _decrypt(self, data: bytes, buffer_size: int = 8192, **kwargs: Any) -> bytes: + raise NotImplementedError + + def _decrypt_generator( + self, data: Union[Iterable[int], Iterable[bytes]], buffer_size: int = 8192, **kwargs: Any + ) -> Iterator[bytes]: + raise NotImplementedError + + @typing.overload + def decrypt(self, data: bytes, buffer_size: int = 8192, **kwargs: Any) -> bytes: ... + + @typing.overload + def decrypt( + self, data: Union[Iterable[int], Iterable[bytes]], buffer_size: int = 8192, **kwargs: Any + ) -> Iterator[bytes]: ... + + def decrypt( + self, + data: Union[bytes, Iterable[bytes], Iterable[int]], + buffer_size: int = 8192, + **kwargs: Any, + ) -> Union[bytes, Iterator[bytes]]: """ Decrypt some `data`. @@ -930,7 +1043,14 @@ class SignMixin(Object): This :class:`Object` supports the sign capability. """ - def sign(self, data, **kwargs): + def _sign(self, data: bytes, **kwargs: Any) -> bytes: + raise NotImplementedError + + # NOTE: Unlike other mixins, sign_generator() also returns bytes and not an iterator + def _sign_generator(self, data: Iterable[bytes], **kwargs: Any) -> bytes: + raise NotImplementedError + + def sign(self, data: Union[str, bytes, Iterable[bytes]], **kwargs: Any) -> bytes: """ Sign some `data`. @@ -962,56 +1082,17 @@ def sign(self, data, **kwargs): return self._sign_generator(data, **kwargs) -class VerifyMixin(Object): - """ - This :class:`Object` supports the verify capability. - """ - - def verify(self, data, signature, **kwargs): - """ - Verify some `data`. - - See :meth:`EncryptMixin.encrypt` for more information. - - Returns True if `signature` is valid for `data`. - - For DSA and ECDSA keys, PKCS #11 expects the two parameters (r & s) - as two concatenated `biginteger` of the same length. To convert these - from other formats, such as the format used by OpenSSL, use - :func:`pkcs11.util.dsa.decode_dsa_signature` or - :func:`pkcs11.util.ec.decode_ecdsa_signature`. - - :param data: data to sign - :type data: str, bytes or iter(bytes) - :param bytes signature: signature - :param Mechanism mechanism: optional signing mechanism - :param bytes mechanism_param: optional mechanism parameter - - :rtype: bool - """ - - # If data is a string, encode it now as UTF-8. - if isinstance(data, str): - data = data.encode("utf-8") - - try: - if isinstance(data, bytes): - self._verify(data, signature, **kwargs) - else: - self._verify_generator(data, signature, **kwargs) - - return True - - except (SignatureInvalid, SignatureLenRange): - return False - - class WrapMixin(Object): """ This :class:`Object` supports the wrap capability. """ - def wrap_key(self, key, mechanism=None, mechanism_param=None): + def wrap_key( + self, + key: "Key", + mechanism: Optional[Mechanism] = None, + mechanism_param: Optional[tuple[Any, ...]] = None, + ) -> bytes: """ Use this key to wrap (i.e. encrypt) `key` for export. Returns an encrypted version of `key`. @@ -1034,17 +1115,17 @@ class UnwrapMixin(Object): def unwrap_key( self, - object_class, - key_type, - key_data, - id=None, - label=None, - mechanism=None, - mechanism_param=None, - store=False, - capabilities=None, - template=None, - ): + object_class: ObjectClass, + key_type: KeyType, + key_data: bytes, + id: Optional[bytes] = None, + label: Optional[bytes] = None, + mechanism: Optional["Key"] = None, + mechanism_param: Optional[tuple[Any, ...]] = None, + store: bool = False, + capabilities: Optional[MechanismFlag] = None, + template: Optional[AttributeDict] = None, + ) -> "Key": """ Use this key to unwrap (i.e. decrypt) and import `key_data`. @@ -1073,16 +1154,16 @@ class DeriveMixin(Object): def derive_key( self, - key_type, - key_length, - id=None, - label=None, - store=False, - capabilities=None, - mechanism=None, - mechanism_param=None, - template=None, - ): + key_type: KeyType, + key_length: int, + id: Optional[bytes] = None, + label: Optional[bytes] = None, + store: bool = False, + capabilities: Optional[MechanismFlag] = None, + mechanism: Optional[Mechanism] = None, + mechanism_param: Optional[Union[bytes, tuple[Any, ...]]] = None, + template: Optional[AttributeDict] = None, + ) -> "SecretKey": """ Derive a new key from this key. Used to create session keys from a PKCS key exchange. @@ -1167,3 +1248,123 @@ def derive_key( :rtype: SecretKey """ raise NotImplementedError() + + +class Key(Object): + """Base class for all key :class:`Object` types.""" + + @cached_property + def id(self) -> bytes: + """Key id (:class:`bytes`).""" + return self[Attribute.ID] # type: ignore[no-any-return] + + @cached_property + def label(self) -> str: + """Key label (:class:`str`).""" + return self[Attribute.LABEL] # type: ignore[no-any-return] + + @property + def key_length(self) -> int: + raise NotImplementedError() + + @cached_property + def key_type(self) -> KeyType: + """Key type (:class:`pkcs11.mechanisms.KeyType`).""" + return self[Attribute.KEY_TYPE] # type: ignore[no-any-return] + + @cached_property + def _key_description(self) -> str: + """A description of the key.""" + try: + return "%s-bit %s" % (self.key_length, self.key_type.name) + except AttributeTypeInvalid: + return self.key_type.name + + def __repr__(self) -> str: + return "<%s label='%s' id='%s' %s>" % ( + type(self).__name__, + self.label, + hexlify(self.id).decode("ascii"), + self._key_description, + ) + + +class SecretKey( + DecryptMixin, DeriveMixin, EncryptMixin, SignMixin, UnwrapMixin, VerifyMixin, WrapMixin, Key +): + """ + A PKCS#11 :attr:`pkcs11.constants.ObjectClass.SECRET_KEY` object + (symmetric encryption key). + """ + + object_class = ObjectClass.SECRET_KEY + + @cached_property + def key_length(self) -> int: + """Key length in bits.""" + return self[Attribute.VALUE_LEN] * 8 # type: ignore[no-any-return] + + +class PublicKey(EncryptMixin, VerifyMixin, WrapMixin, Key): + """ + A PKCS#11 :attr:`pkcs11.constants.ObjectClass.PUBLIC_KEY` object + (asymmetric public key). + + RSA private keys can be imported and exported from PKCS#1 DER-encoding + using :func:`pkcs11.util.rsa.decode_rsa_public_key` and + :func:`pkcs11.util.rsa.encode_rsa_public_key` respectively. + """ + + object_class = ObjectClass.PUBLIC_KEY + + @cached_property + def key_length(self) -> int: + """Key length in bits.""" + return self[Attribute.MODULUS_BITS] # type: ignore[no-any-return] + + +class PrivateKey(DecryptMixin, DeriveMixin, SignMixin, UnwrapMixin, Key): + """ + A PKCS#11 :attr:`pkcs11.constants.ObjectClass.PRIVATE_KEY` object + (asymmetric private key). + + RSA private keys can be imported from PKCS#1 DER-encoding using + :func:`pkcs11.util.rsa.decode_rsa_private_key`. + + .. warning:: + + Private keys imported directly, rather than unwrapped from a trusted + private key should be considered insecure. + """ + + object_class = ObjectClass.PRIVATE_KEY + + @cached_property + def key_length(self) -> int: + """Key length in bits.""" + return len(self[Attribute.MODULUS]) * 8 + + +class Certificate(Object): + """ + A PKCS#11 :attr:`pkcs11.constants.ObjectClass.CERTIFICATE` object. + + PKCS#11 is limited in its handling of certificates, and does not + provide features like parsing of X.509 etc. These should be handled in + an external library. PKCS#11 will not set attributes on the certificate + based on the `VALUE`. + + :func:`pkcs11.util.x509.decode_x509_certificate` will extract attributes + from a certificate to create the object. + """ + + object_class = ObjectClass.CERTIFICATE + + @cached_property + def certificate_type(self) -> int: # TODO: Unclear if this is a KeyType or some othe renum + """ + The type of certificate. + + :rtype: CertificateType + """ + return self[Attribute.CERTIFICATE_TYPE] # type: ignore[no-any-return] diff --git a/pkcs11/util/__init__.py b/pkcs11/util/__init__.py index 8acc6bd..69b1c4c 100644 --- a/pkcs11/util/__init__.py +++ b/pkcs11/util/__init__.py @@ -1,4 +1,4 @@ -def biginteger(value): +def biginteger(value: int) -> bytes: """ Returns a PKCS#11 biginteger bytestream from a Python integer or similar type (e.g. :class:`asn1crypto.core.Integer`). diff --git a/pkcs11/util/dh.py b/pkcs11/util/dh.py index 2936cd3..378768b 100644 --- a/pkcs11/util/dh.py +++ b/pkcs11/util/dh.py @@ -5,11 +5,13 @@ from asn1crypto.algos import DHParameters from asn1crypto.core import Integer -from ..constants import Attribute -from . import biginteger +from pkcs11 import DomainParameters, PublicKey +from pkcs11.constants import Attribute +from pkcs11.types import AttributeDict +from pkcs11.util import biginteger -def decode_dh_domain_parameters(der): +def decode_dh_domain_parameters(der: bytes) -> AttributeDict: """ Decode DER-encoded Diffie-Hellman domain parameters. @@ -25,7 +27,7 @@ def decode_dh_domain_parameters(der): } -def encode_dh_domain_parameters(obj): +def encode_dh_domain_parameters(obj: DomainParameters) -> bytes: """ Encode DH domain parameters into DER-encoded format. @@ -42,23 +44,25 @@ def encode_dh_domain_parameters(obj): } ) - return asn1.dump() + encoded = asn1.dump() + assert isinstance(encoded, bytes) + return encoded -def encode_dh_public_key(key): +def encode_dh_public_key(key: PublicKey) -> bytes: """ Encode DH public key into RFC 3279 DER-encoded format. :param PublicKey key: public key :rtype: bytes """ - asn1 = Integer(int.from_bytes(key[Attribute.VALUE], byteorder="big")) - - return asn1.dump() + encoded = asn1.dump() + assert isinstance(encoded, bytes) + return encoded -def decode_dh_public_key(der): +def decode_dh_public_key(der: bytes) -> bytes: """ Decode a DH public key from RFC 3279 DER-encoded format. diff --git a/pkcs11/util/dsa.py b/pkcs11/util/dsa.py index 6867b0e..b3f3b46 100644 --- a/pkcs11/util/dsa.py +++ b/pkcs11/util/dsa.py @@ -2,15 +2,18 @@ Key handling utilities for DSA keys, domain parameters and signatures.. """ +from typing import Any + from asn1crypto.algos import DSASignature from asn1crypto.core import Integer from asn1crypto.keys import DSAParams -from ..constants import Attribute -from . import biginteger +from pkcs11.constants import Attribute +from pkcs11.types import DomainParameters, Key +from pkcs11.util import biginteger -def decode_dsa_domain_parameters(der): +def decode_dsa_domain_parameters(der: bytes) -> dict[Attribute, Any]: """ Decode RFC 3279 DER-encoded Dss-Params. @@ -27,7 +30,7 @@ def decode_dsa_domain_parameters(der): } -def encode_dsa_domain_parameters(obj): +def encode_dsa_domain_parameters(obj: DomainParameters) -> bytes: """ Encode RFC 3279 DER-encoded Dss-Params. @@ -42,10 +45,12 @@ def encode_dsa_domain_parameters(obj): } ) - return asn1.dump() + dumped = asn1.dump() + assert isinstance(dumped, bytes) + return dumped -def encode_dsa_public_key(key): +def encode_dsa_public_key(key: Key) -> bytes: """ Encode DSA public key into RFC 3279 DER-encoded format. @@ -54,11 +59,12 @@ def encode_dsa_public_key(key): """ asn1 = Integer(int.from_bytes(key[Attribute.VALUE], byteorder="big")) - - return asn1.dump() + dumped_data = asn1.dump() + assert isinstance(dumped_data, bytes) + return dumped_data -def decode_dsa_public_key(der): +def decode_dsa_public_key(der: bytes) -> bytes: """ Decode a DSA public key from RFC 3279 DER-encoded format. @@ -73,7 +79,7 @@ def decode_dsa_public_key(der): return biginteger(asn1) -def encode_dsa_signature(signature): +def encode_dsa_signature(signature: bytes) -> bytes: """ Encode a signature (generated by :meth:`pkcs11.SignMixin.sign`) into DER-encoded ASN.1 (Dss_Sig_Value) format. @@ -83,11 +89,12 @@ def encode_dsa_signature(signature): """ asn1 = DSASignature.from_p1363(signature) + dumped = asn1.dump() + assert isinstance(dumped, bytes) + return dumped - return asn1.dump() - -def decode_dsa_signature(der): +def decode_dsa_signature(der: bytes) -> bytes: """ Decode a DER-encoded ASN.1 (Dss_Sig_Value) signature (as generated by OpenSSL/X.509) into PKCS #11 format. @@ -97,5 +104,6 @@ def decode_dsa_signature(der): """ asn1 = DSASignature.load(der) - - return asn1.to_p1363() + decoded = asn1.to_p1363() + assert isinstance(decoded, bytes) + return decoded diff --git a/pkcs11/util/ec.py b/pkcs11/util/ec.py index 905ba02..a1c6abe 100644 --- a/pkcs11/util/ec.py +++ b/pkcs11/util/ec.py @@ -3,20 +3,18 @@ signatures. """ +from typing import Any + from asn1crypto.algos import DSASignature from asn1crypto.core import OctetString -from asn1crypto.keys import ( - ECDomainParameters, - ECPrivateKey, - NamedCurve, - PublicKeyInfo, -) +from asn1crypto.keys import ECDomainParameters, ECPrivateKey, NamedCurve, PublicKeyInfo -from ..constants import Attribute, ObjectClass -from ..mechanisms import KeyType +from pkcs11.constants import Attribute, ObjectClass +from pkcs11.mechanisms import KeyType +from pkcs11.types import PublicKey -def encode_named_curve_parameters(oid): +def encode_named_curve_parameters(oid: str) -> bytes: """ Return DER-encoded ANSI X.62 EC parameters for a named curve. @@ -27,13 +25,15 @@ def encode_named_curve_parameters(oid): :param str oid: OID or named curve :rtype: bytes """ - return ECDomainParameters( + encoded_parameters = ECDomainParameters( name="named", value=NamedCurve.unmap(oid), ).dump() + assert isinstance(encoded_parameters, bytes) + return encoded_parameters -def decode_ec_public_key(der, encode_ec_point=True): +def decode_ec_public_key(der: bytes, encode_ec_point: bool = True) -> dict[Attribute, Any]: """ Decode a DER-encoded EC public key as stored by OpenSSL into a dictionary of attributes able to be passed to :meth:`pkcs11.Session.create_object`. @@ -67,7 +67,7 @@ def decode_ec_public_key(der, encode_ec_point=True): } -def decode_ec_private_key(der): +def decode_ec_private_key(der: bytes) -> dict[Attribute, Any]: """ Decode a DER-encoded EC private key as stored by OpenSSL into a dictionary of attributes able to be passed to :meth:`pkcs11.Session.create_object`. @@ -86,7 +86,7 @@ def decode_ec_private_key(der): } -def encode_ec_public_key(key): +def encode_ec_public_key(key: PublicKey) -> bytes: """ Encode a DER-encoded EC public key as stored by OpenSSL. @@ -95,20 +95,23 @@ def encode_ec_public_key(key): """ ecparams = ECDomainParameters.load(key[Attribute.EC_PARAMS]) - ecpoint = bytes(OctetString.load(key[Attribute.EC_POINT])) + ecpoint = OctetString.load(key[Attribute.EC_POINT]) - return PublicKeyInfo( + public_key_info = PublicKeyInfo( { "algorithm": { "algorithm": "ec", "parameters": ecparams, }, - "public_key": ecpoint, + "public_key": bytes(ecpoint), } - ).dump() + ) + public_key_info_dumped = public_key_info.dump() + assert isinstance(public_key_info_dumped, bytes) + return public_key_info_dumped -def encode_ecdsa_signature(signature): +def encode_ecdsa_signature(signature: bytes) -> bytes: """ Encode a signature (generated by :meth:`pkcs11.SignMixin.sign`) into DER-encoded ASN.1 (ECDSA_Sig_Value) format. @@ -117,10 +120,16 @@ def encode_ecdsa_signature(signature): :rtype: bytes """ - return DSASignature.from_p1363(signature).dump() + loaded_signature = DSASignature.from_p1363(signature) + assert isinstance(loaded_signature, DSASignature) + + encoded = loaded_signature.dump() + assert isinstance(encoded, bytes) + + return encoded -def decode_ecdsa_signature(der): +def decode_ecdsa_signature(der: bytes) -> bytes: """ Decode a DER-encoded ASN.1 (ECDSA_Sig_Value) signature (as generated by OpenSSL/X.509) into PKCS #11 format. @@ -129,5 +138,6 @@ def decode_ecdsa_signature(der): :rtype bytes: """ - asn1 = DSASignature.load(der) - return asn1.to_p1363() + data = DSASignature.load(der).to_p1363() + assert isinstance(data, bytes) + return data diff --git a/pkcs11/util/rsa.py b/pkcs11/util/rsa.py index b59ae3f..61f1dd5 100644 --- a/pkcs11/util/rsa.py +++ b/pkcs11/util/rsa.py @@ -2,15 +2,20 @@ Key handling utilities for RSA keys (PKCS#1). """ +from typing import Optional + from asn1crypto.keys import RSAPrivateKey, RSAPublicKey -from ..constants import Attribute, MechanismFlag, ObjectClass -from ..defaults import DEFAULT_KEY_CAPABILITIES -from ..mechanisms import KeyType -from . import biginteger +from pkcs11.constants import Attribute, MechanismFlag, ObjectClass +from pkcs11.defaults import DEFAULT_KEY_CAPABILITIES +from pkcs11.mechanisms import KeyType +from pkcs11.types import AttributeDict, PublicKey +from pkcs11.util import biginteger -def decode_rsa_private_key(der, capabilities=None): +def decode_rsa_private_key( + der: bytes, capabilities: Optional[MechanismFlag] = None +) -> AttributeDict: """ Decode a RFC2437 (PKCS#1) DER-encoded RSA private key into a dictionary of attributes able to be passed to :meth:`pkcs11.Session.create_object`. @@ -41,7 +46,9 @@ def decode_rsa_private_key(der, capabilities=None): } -def decode_rsa_public_key(der, capabilities=None): +def decode_rsa_public_key( + der: bytes, capabilities: Optional[MechanismFlag] = None +) -> AttributeDict: """ Decode a RFC2437 (PKCS#1) DER-encoded RSA public key into a dictionary of attributes able to be passed to :meth:`pkcs11.Session.create_object`. @@ -54,7 +61,7 @@ def decode_rsa_public_key(der, capabilities=None): if capabilities is None: capabilities = DEFAULT_KEY_CAPABILITIES[KeyType.RSA] - key = RSAPublicKey.load(der) + key: RSAPublicKey = RSAPublicKey.load(der) return { Attribute.CLASS: ObjectClass.PUBLIC_KEY, Attribute.KEY_TYPE: KeyType.RSA, @@ -66,16 +73,19 @@ def decode_rsa_public_key(der, capabilities=None): } -def encode_rsa_public_key(key): +def encode_rsa_public_key(key: PublicKey) -> bytes: """ Encode an RSA public key into PKCS#1 DER-encoded format. :param PublicKey key: RSA public key :rtype: bytes """ - return RSAPublicKey( + public_key = RSAPublicKey( { "modulus": int.from_bytes(key[Attribute.MODULUS], byteorder="big"), "public_exponent": int.from_bytes(key[Attribute.PUBLIC_EXPONENT], byteorder="big"), } - ).dump() + ) + dumped = public_key.dump() + assert isinstance(dumped, bytes) + return dumped diff --git a/pkcs11/util/x509.py b/pkcs11/util/x509.py index dd72a7a..27efddd 100644 --- a/pkcs11/util/x509.py +++ b/pkcs11/util/x509.py @@ -2,13 +2,16 @@ Certificate handling utilities for X.509 (SSL) certificates. """ +from typing import Any + from asn1crypto.x509 import Certificate -from ..constants import Attribute, CertificateType, ObjectClass -from ..mechanisms import KeyType +from pkcs11.constants import Attribute, CertificateType, ObjectClass +from pkcs11.mechanisms import KeyType +from pkcs11.types import AttributeDict -def decode_x509_public_key(der): +def decode_x509_public_key(der: bytes) -> AttributeDict: """ Decode a DER-encoded X.509 certificate's public key into a set of attributes able to be passed to :meth:`pkcs11.Session.create_object`. @@ -22,7 +25,7 @@ def decode_x509_public_key(der): :param bytes der: DER-encoded certificate :rtype: dict(Attribute,*) """ - x509 = Certificate.load(der) + x509: Certificate = Certificate.load(der) key_info = x509.public_key key = bytes(key_info["public_key"]) @@ -32,17 +35,17 @@ def decode_x509_public_key(der): "ec": KeyType.EC, }[key_info.algorithm] - attrs = { + attrs: dict[Attribute, Any] = { Attribute.CLASS: ObjectClass.PUBLIC_KEY, Attribute.KEY_TYPE: key_type, } if key_type is KeyType.RSA: - from .rsa import decode_rsa_public_key + from pkcs11.util.rsa import decode_rsa_public_key attrs.update(decode_rsa_public_key(key)) elif key_type is KeyType.DSA: - from .dsa import decode_dsa_domain_parameters, decode_dsa_public_key + from pkcs11.util.dsa import decode_dsa_domain_parameters, decode_dsa_public_key params = key_info["algorithm"]["parameters"].dump() @@ -67,7 +70,7 @@ def decode_x509_public_key(der): return attrs -def decode_x509_certificate(der, extended_set=False): +def decode_x509_certificate(der: bytes, extended_set: bool = False) -> AttributeDict: """ Decode a DER-encoded X.509 certificate into a dictionary of attributes able to be passed to :meth:`pkcs11.Session.create_object`. diff --git a/pyproject.toml b/pyproject.toml index 241b146..d75d3a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools>=74.1", "wheel", "cython"] +requires = ["setuptools>=74.1", "wheel", "cython>=3"] build-backend = "setuptools.build_meta" [project] @@ -37,6 +37,22 @@ Documentation = "http://python-pkcs11.readthedocs.io/en/latest/" Issues = "https://github.com/pyauth/python-pkcs11/issues" Repository = "https://github.com/pyauth/python-pkcs11" +[tool.mypy] +strict = true +show_error_codes = true +exclude = [ + "dist/", # mypy complains about an extracted wheel + "build/", + "docs/", +] + +[[tool.mypy.overrides]] +module = [ + "asn1crypto.*", + "oscrypto.*", +] +ignore_missing_imports = true + [tool.pytest.ini_options] markers = [ "requires: marks tests require support for a certain PKCS11 mechanism.", @@ -73,6 +89,7 @@ include = ["pkcs11*"] [dependency-groups] dev = [ "cryptography>=44.0.0", + "mypy>=1.13.0", "oscrypto>=1.3.0", "pytest>=8.3.4", "ruff>=0.8.3", diff --git a/tests/conftest.py b/tests/conftest.py index 4b70370..ea401a1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ import string import subprocess from pathlib import Path -from typing import Iterator +from typing import Any, Iterator from unittest import mock from warnings import warn @@ -12,6 +12,7 @@ from _pytest.fixtures import SubRequest import pkcs11 +from pkcs11 import Library ALLOWED_RANDOM_CHARS = string.ascii_letters + string.digits LIB_PATH = os.environ.get("PKCS11_MODULE", "/usr/lib/softhsm/libsofthsm2.so") @@ -29,7 +30,7 @@ warn("Path to OpenSSL not found. Please adjust `PATH' or define `OPENSSL_PATH'", stacklevel=2) -def pytest_collection_modifyitems(items) -> None: +def pytest_collection_modifyitems(items: list[Any]) -> None: for item in items: markers = [marker.name for marker in item.iter_markers()] if "xfail_nfast" in markers and IS_NFAST: @@ -50,12 +51,12 @@ def pytest_collection_modifyitems(items) -> None: ) -def get_random_string(length): +def get_random_string(length: int) -> str: return "".join(secrets.choice(ALLOWED_RANDOM_CHARS) for i in range(length)) @pytest.fixture(scope="session") -def lib(): +def lib() -> Library: return pkcs11.lib(LIB_PATH) @@ -101,7 +102,7 @@ def pin() -> str: @pytest.fixture -def softhsm_token(request: "SubRequest", lib, so_pin: str, pin: str) -> pkcs11.Token: +def softhsm_token(request: "SubRequest", lib: Library, so_pin: str, pin: str) -> pkcs11.Token: """Get a unique token for the current test.""" request.getfixturevalue("softhsm_setup") token = get_random_string(8) diff --git a/tests/test_aes.py b/tests/test_aes.py index f9ca94c..88c39b9 100644 --- a/tests/test_aes.py +++ b/tests/test_aes.py @@ -26,14 +26,14 @@ def test_encrypt(key: pkcs11.SecretKey) -> None: # We should be aligned to the block size assert len(crypttext) == 16 # Ensure we didn't just get 16 nulls - assert all(c == "\0" for c in crypttext) is False + assert all(c == 0 for c in crypttext) is False text = key.decrypt(crypttext, mechanism_param=iv) assert data == text @pytest.mark.requires(Mechanism.AES_CBC_PAD) -def test_encrypt_stream(key: pkcs11.SecretKey): +def test_encrypt_stream(key: pkcs11.SecretKey) -> None: data = ( b"I" * 16, b"N" * 16, @@ -53,14 +53,14 @@ def test_encrypt_stream(key: pkcs11.SecretKey): # We should be aligned to the block size assert len(crypttext) % 16 == 0 # Ensure we didn't just get 16 nulls - assert all(c == "\0" for c in crypttext) is False + assert all(c == 0 for c in crypttext) is False text = b"".join(key.decrypt(cryptblocks, mechanism_param=iv)) assert b"".join(data) == text @pytest.mark.requires(Mechanism.AES_CBC_PAD) -def test_encrypt_whacky_sizes(key: pkcs11.SecretKey): +def test_encrypt_whacky_sizes(key: pkcs11.SecretKey) -> None: data = [(char * ord(char)).encode("utf-8") for char in "HELLO WORLD"] iv = b"0" * 16 @@ -71,7 +71,7 @@ def test_encrypt_whacky_sizes(key: pkcs11.SecretKey): @pytest.mark.requires(Mechanism.AES_CBC_PAD) -def test_encrypt_big_string(session: pkcs11.Session, key: pkcs11.SecretKey): +def test_encrypt_big_string(session: pkcs11.Session, key: pkcs11.SecretKey) -> None: data = b"HELLO WORLD" * 1024 iv = session.generate_random(128) @@ -82,7 +82,7 @@ def test_encrypt_big_string(session: pkcs11.Session, key: pkcs11.SecretKey): @pytest.mark.requires(Mechanism.AES_MAC) -def test_sign(key: pkcs11.SecretKey): +def test_sign(key: pkcs11.SecretKey) -> None: data = b"HELLO WORLD" signature = key.sign(data) @@ -92,7 +92,7 @@ def test_sign(key: pkcs11.SecretKey): @pytest.mark.requires(Mechanism.AES_MAC) -def test_sign_stream(key: pkcs11.SecretKey): +def test_sign_stream(key: pkcs11.SecretKey) -> None: data = ( b"I" * 16, b"N" * 16, @@ -108,7 +108,7 @@ def test_sign_stream(key: pkcs11.SecretKey): @pytest.mark.requires(Mechanism.AES_KEY_WRAP) @pytest.mark.xfail_opencryptoki # can't set key attributes -def test_wrap(session: pkcs11.Session, key: pkcs11.SecretKey): +def test_wrap(session: pkcs11.Session, key: pkcs11.SecretKey) -> None: key = session.generate_key( pkcs11.KeyType.AES, 128, @@ -151,7 +151,7 @@ def test_derive_using_ecb_encrypt( test_key_length: int, iv_length: int, is_none: bool, -): +) -> None: """Function to test AES Key Derivation using the ECB_ENCRYPT Mechanism. Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 @@ -278,7 +278,7 @@ def test_derive_using_cbc_encrypt( iv_length: int, data_length: int, is_none: bool, -): +) -> None: """Function to test AES Key Derivation using the CBC_ENCRYPT Mechanism. Refer to Section 2.15 of http://docs.oasis-open.org/pkcs11/pkcs11-curr/v2.40/errata01/os/pkcs11-curr-v2.40-errata01-os-complete.html#_Toc441850521 @@ -339,7 +339,7 @@ def test_encrypt_with_key_derived_using_cbc_encrypt( test_key_length: int, iv_length: int, data_length: int, -): +) -> None: """Function to test Data Encryption/Decryption using a Derived AES Key. Function to test Data Encryption/Decryption using an AES Key diff --git a/tests/test_des.py b/tests/test_des.py index 114c9ae..af50ce7 100644 --- a/tests/test_des.py +++ b/tests/test_des.py @@ -9,20 +9,20 @@ @pytest.mark.requires(Mechanism.DES2_KEY_GEN) -def test_generate_des2_key(session: pkcs11.Session): +def test_generate_des2_key(session: pkcs11.Session) -> None: key = session.generate_key(KeyType.DES2) assert isinstance(key, pkcs11.SecretKey) @pytest.mark.requires(Mechanism.DES3_KEY_GEN) -def test_generate_des3_key(session: pkcs11.Session): +def test_generate_des3_key(session: pkcs11.Session) -> None: key = session.generate_key(KeyType.DES3) assert isinstance(key, pkcs11.SecretKey) @pytest.mark.requires(Mechanism.DES2_KEY_GEN) @pytest.mark.requires(Mechanism.DES3_CBC_PAD) -def test_encrypt_des2(session: pkcs11.Session): +def test_encrypt_des2(session: pkcs11.Session) -> None: key = session.generate_key(KeyType.DES2) iv = session.generate_random(64) @@ -34,7 +34,7 @@ def test_encrypt_des2(session: pkcs11.Session): @pytest.mark.requires(Mechanism.DES3_KEY_GEN) @pytest.mark.requires(Mechanism.DES3_CBC_PAD) -def test_encrypt_des3(session: pkcs11.Session): +def test_encrypt_des3(session: pkcs11.Session) -> None: key = session.generate_key(KeyType.DES3) iv = session.generate_random(64) diff --git a/tests/test_dsa.py b/tests/test_dsa.py index 61f3a72..bee66a2 100644 --- a/tests/test_dsa.py +++ b/tests/test_dsa.py @@ -32,7 +32,7 @@ def test_generate_params(session: pkcs11.Session) -> None: @pytest.mark.requires(Mechanism.DSA_KEY_PAIR_GEN) @pytest.mark.requires(Mechanism.DSA_SHA1) -def test_generate_keypair_and_sign(session: pkcs11.Session): +def test_generate_keypair_and_sign(session: pkcs11.Session) -> None: dhparams = session.create_domain_parameters( KeyType.DSA, decode_dsa_domain_parameters(DHPARAMS), local=True ) @@ -50,6 +50,6 @@ def test_generate_keypair_and_sign(session: pkcs11.Session): @pytest.mark.xfail_nfast @pytest.mark.requires(Mechanism.DSA_PARAMETER_GEN) @pytest.mark.requires(Mechanism.DSA_KEY_PAIR_GEN) -def test_generate_keypair_directly(session: pkcs11.Session): +def test_generate_keypair_directly(session: pkcs11.Session) -> None: public, private = session.generate_keypair(KeyType.DSA, 1024) assert len(public[Attribute.VALUE]) == 1024 // 8 diff --git a/tests/test_ecc.py b/tests/test_ecc.py index 852c4a6..60647f5 100644 --- a/tests/test_ecc.py +++ b/tests/test_ecc.py @@ -3,11 +3,12 @@ """ import base64 +import typing import pytest import pkcs11 -from pkcs11 import KDF, Attribute, KeyType, Mechanism +from pkcs11 import KDF, Attribute, KeyType, Mechanism, PrivateKey, PublicKey from pkcs11.util.ec import ( decode_ec_private_key, decode_ec_public_key, @@ -130,7 +131,7 @@ def test_import_key_pair(session: pkcs11.Session) -> None: MQ2PssJ5huE/vhFWYSR0z3iDp1UXB114r5EXvmDEAWx/32cqnwnuNbyJd/W8IapY vN/QAI/1qMV2bopaSmlwabxm8dt/NFCIa3nNYxYyLTjoP16fXTnnI0GSu2dMFatV """) - priv = session.create_object(decode_ec_private_key(priv)) + priv_obj = typing.cast(PrivateKey, session.create_object(decode_ec_private_key(priv))) pub = base64.b64decode(""" MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// @@ -147,10 +148,10 @@ def test_import_key_pair(session: pkcs11.Session) -> None: eIOnVRcHXXivkRe+YMQBbH/fZyqfCe41vIl39bwhqli839AAj/WoxXZuilpKaXBp vGbx2380UIhrec1jFjItOOg/Xp9dOecjQZK7Z0wVq1U= """) - pub = session.create_object(decode_ec_public_key(pub)) + pub_obj = typing.cast(PublicKey, session.create_object(decode_ec_public_key(pub))) - signature = priv.sign(b"Example", mechanism=Mechanism.ECDSA) - assert pub.verify(b"Example", signature, mechanism=Mechanism.ECDSA) + signature = priv_obj.sign(b"Example", mechanism=Mechanism.ECDSA) + assert pub_obj.verify(b"Example", signature, mechanism=Mechanism.ECDSA) @pytest.mark.requires(Mechanism.EC_EDWARDS_KEY_PAIR_GEN) diff --git a/tests/test_iterators.py b/tests/test_iterators.py index 1b3cd8c..6f59333 100644 --- a/tests/test_iterators.py +++ b/tests/test_iterators.py @@ -5,6 +5,7 @@ import pytest import pkcs11 +from pkcs11 import ObjectClass @pytest.mark.requires(pkcs11.Mechanism.AES_KEY_GEN) @@ -12,7 +13,7 @@ def test_partial_decrypt(session: pkcs11.Session) -> None: session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") - key = session.get_key(label="LOOK ME UP") + key = session.get_key(object_class=ObjectClass.SECRET_KEY, label="LOOK ME UP") data = (b"1234", b"1234") iv = session.generate_random(128) @@ -34,11 +35,8 @@ def test_partial_decrypt(session: pkcs11.Session) -> None: def test_close_iterators(session: pkcs11.Session) -> None: session.generate_key(pkcs11.KeyType.AES, 128, label="LOOK ME UP") - key = session.get_key(label="LOOK ME UP") - data = ( - b"1234", - b"1234", - ) + key = session.get_key(object_class=ObjectClass.SECRET_KEY, label="LOOK ME UP") + data = (b"1234", b"1234") iv = session.generate_random(128) encrypted_data = list(key.encrypt(data, mechanism_param=iv)) diff --git a/tests/test_public_key_external.py b/tests/test_public_key_external.py index c575a68..85d1fae 100644 --- a/tests/test_public_key_external.py +++ b/tests/test_public_key_external.py @@ -1,4 +1,7 @@ +import typing + import pytest +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey import pkcs11 from pkcs11 import KDF, Attribute, KeyType, Mechanism, ObjectClass @@ -19,11 +22,11 @@ def test_rsa(session: pkcs11.Session) -> None: pub = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PUBLIC_KEY) - pub = encode_rsa_public_key(pub) + pub_bytes = encode_rsa_public_key(pub) from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt - pub = load_public_key(pub) + pub = load_public_key(pub_bytes) crypttext = rsa_pkcs1v15_encrypt(pub, b"Data to encrypt") priv = session.get_key(key_type=KeyType.RSA, object_class=ObjectClass.PRIVATE_KEY) @@ -72,7 +75,7 @@ def test_ecdh(session: pkcs11.Session) -> None: # Retrieve our keypair, with our public key encoded for interchange alice_priv = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PRIVATE_KEY) alice_pub = session.get_key(key_type=KeyType.EC, object_class=ObjectClass.PUBLIC_KEY) - alice_pub = encode_ec_public_key(alice_pub) + alice_pub_bytes = encode_ec_public_key(alice_pub) from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec @@ -92,10 +95,10 @@ def test_ecdh(session: pkcs11.Session) -> None: # Bob converts Alice's key to internal format and generates their # shared key - bob_shared_key = bob_priv.exchange( - ec.ECDH(), - load_der_public_key(alice_pub, default_backend()), + alice_loaded_public_key = typing.cast( + EllipticCurvePublicKey, load_der_public_key(alice_pub_bytes) ) + bob_shared_key = bob_priv.exchange(ec.ECDH(), alice_loaded_public_key) key = alice_priv.derive_key( KeyType.GENERIC_SECRET, @@ -124,10 +127,7 @@ def test_terrible_hybrid_file_encryption_app(session: pkcs11.Session) -> None: import io from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt - from oscrypto.symmetric import ( - aes_cbc_pkcs7_decrypt, - aes_cbc_pkcs7_encrypt, - ) + from oscrypto.symmetric import aes_cbc_pkcs7_decrypt, aes_cbc_pkcs7_encrypt # A key we generated earlier session.generate_keypair(KeyType.RSA, 1024) diff --git a/tests/test_rsa.py b/tests/test_rsa.py index 05579cd..4fd8cad 100644 --- a/tests/test_rsa.py +++ b/tests/test_rsa.py @@ -129,6 +129,12 @@ def test_encrypt_too_much_data(keypair: tuple[pkcs11.PublicKey, pkcs11.PrivateKe public_key, private_key = keypair data = b"1234" * 128 + assert public_key.key_type == KeyType.RSA + assert private_key.key_type == KeyType.RSA + assert public_key.label == "" + assert private_key.label == "" + assert private_key.id == b"" + # You can't encrypt lots of data with RSA # This should ideally throw DataLen but you can't trust it with pytest.raises(pkcs11.PKCS11Error): diff --git a/tests/test_sessions.py b/tests/test_sessions.py index f704735..21f125c 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -31,9 +31,9 @@ def test_open_session_and_login_so(token: pkcs11.Token, so_pin: str) -> None: def test_generate_key(token: pkcs11.Token, pin: str) -> None: with token.open(user_pin=pin) as session: key = session.generate_key(pkcs11.KeyType.AES, 128) - assert isinstance(key, pkcs11.Object) + assert isinstance(key, pkcs11.types.Object) assert isinstance(key, pkcs11.SecretKey) - assert isinstance(key, pkcs11.EncryptMixin) + assert isinstance(key, pkcs11.types.EncryptMixin) assert key.object_class is pkcs11.ObjectClass.SECRET_KEY @@ -51,11 +51,14 @@ def test_generate_key(token: pkcs11.Token, pin: str) -> None: # Create another key with no capabilities key = session.generate_key( - pkcs11.KeyType.AES, 128, label="MY KEY", id=b"\1\2\3\4", capabilities=0 + pkcs11.KeyType.AES, + 128, + label="MY KEY", + id=b"\1\2\3\4", + capabilities=0, # type: ignore[arg-type] # seems to be what we're testing? ) - assert isinstance(key, pkcs11.Object) + assert isinstance(key, pkcs11.types.Object) assert isinstance(key, pkcs11.SecretKey) - assert not isinstance(key, pkcs11.EncryptMixin) assert key.label == "MY KEY" @@ -162,4 +165,4 @@ def test_generate_random(token: pkcs11.Token, pin: str) -> None: random = session.generate_random(16 * 8) assert len(random) == 16 # Ensure we didn't get 16 bytes of zeros - assert all(c != "\x00" for c in random) + assert all(c != int.from_bytes(b"\x00") for c in random) diff --git a/tests/test_slots_and_tokens.py b/tests/test_slots_and_tokens.py index aba49e0..21856e5 100644 --- a/tests/test_slots_and_tokens.py +++ b/tests/test_slots_and_tokens.py @@ -59,7 +59,7 @@ def test_get_mechanism_info_ec() -> None: def test_get_tokens(softhsm_token: pkcs11.Token) -> None: lib = pkcs11.lib(LIB_PATH) - tokens = list(lib.get_tokens(token_flags=pkcs11.TokenFlag.RNG)) + tokens = lib.get_tokens(token_flags=pkcs11.TokenFlag.RNG) assert len(list(tokens)) == 2 tokens = lib.get_tokens(token_label=softhsm_token.label) diff --git a/tests/test_threading.py b/tests/test_threading.py index 88a1d34..2fb232d 100644 --- a/tests/test_threading.py +++ b/tests/test_threading.py @@ -6,10 +6,12 @@ """ import threading +import typing import pytest import pkcs11 +from pkcs11 import SecretKey from .conftest import IS_NFAST @@ -26,11 +28,11 @@ def test_concurrency(session: pkcs11.Session) -> None: test_passed = [True] - def thread_work(): + def thread_work() -> None: try: data = b"1234" * 1024 * 1024 # Multichunk files iv = session.generate_random(128) - key = session.get_key(label="LOOK ME UP") + key = typing.cast(SecretKey, session.get_key(label="LOOK ME UP")) assert key.encrypt(data, mechanism_param=iv) is not None except pkcs11.PKCS11Error: test_passed[0] = False diff --git a/tests/test_x509.py b/tests/test_x509.py index cbf3188..d4fab2c 100644 --- a/tests/test_x509.py +++ b/tests/test_x509.py @@ -49,6 +49,7 @@ def test_import_ca_certificate_easy(session: pkcs11.Session) -> None: cert = session.create_object(decode_x509_certificate(CERT)) assert isinstance(cert, pkcs11.Certificate) + assert cert.certificate_type == KeyType.RSA @pytest.mark.skipif(IS_NFAST or IS_OPENCRYPTOKI, reason="Unknown reason.") @@ -163,6 +164,7 @@ def test_verify_certificate_ecdsa(session: pkcs11.Session) -> None: @pytest.mark.requires(Mechanism.RSA_PKCS_KEY_PAIR_GEN) @pytest.mark.requires(Mechanism.SHA1_RSA_PKCS) def test_self_sign_certificate(tmpdir: Path, session: pkcs11.Session) -> None: + assert OPENSSL is not None # Warning: proof of concept code only! pub, priv = session.generate_keypair(KeyType.RSA, 1024) @@ -235,8 +237,8 @@ def test_self_sign_certificate(tmpdir: Path, session: pkcs11.Session) -> None: stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, ) as proc: - proc.stdin.write(pem_cert) - proc.stdin.close() + proc.stdin.write(pem_cert) # type: ignore[union-attr] + proc.stdin.close() # type: ignore[union-attr] assert proc.wait() == 0 @@ -245,6 +247,7 @@ def test_self_sign_certificate(tmpdir: Path, session: pkcs11.Session) -> None: @pytest.mark.requires(Mechanism.SHA1_RSA_PKCS) def test_sign_csr(session: pkcs11.Session) -> None: # Warning: proof of concept code only! + assert OPENSSL is not None pub, priv = session.generate_keypair(KeyType.RSA, 1024) info = CertificationRequestInfo( @@ -285,7 +288,7 @@ def test_sign_csr(session: pkcs11.Session) -> None: stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, ) as proc: - proc.stdin.write(csr.dump()) - proc.stdin.close() + proc.stdin.write(csr.dump()) # type: ignore[union-attr] + proc.stdin.close() # type: ignore[union-attr] assert proc.wait() == 0 diff --git a/uv.lock b/uv.lock index 2da0ef3..a170827 100644 --- a/uv.lock +++ b/uv.lock @@ -373,6 +373,54 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/73/085399401383ce949f727afec55ec3abd76648d04b9f22e1c0e99cb4bec3/MarkupSafe-3.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:6e296a513ca3d94054c2c881cc913116e90fd030ad1c656b3869762b754f5f8a", size = 15506 }, ] +[[package]] +name = "mypy" +version = "1.13.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mypy-extensions" }, + { name = "tomli", marker = "python_full_version < '3.11'" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e8/21/7e9e523537991d145ab8a0a2fd98548d67646dc2aaaf6091c31ad883e7c1/mypy-1.13.0.tar.gz", hash = "sha256:0291a61b6fbf3e6673e3405cfcc0e7650bebc7939659fdca2702958038bd835e", size = 3152532 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/8c/206de95a27722b5b5a8c85ba3100467bd86299d92a4f71c6b9aa448bfa2f/mypy-1.13.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:6607e0f1dd1fb7f0aca14d936d13fd19eba5e17e1cd2a14f808fa5f8f6d8f60a", size = 11020731 }, + { url = "https://files.pythonhosted.org/packages/ab/bb/b31695a29eea76b1569fd28b4ab141a1adc9842edde080d1e8e1776862c7/mypy-1.13.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8a21be69bd26fa81b1f80a61ee7ab05b076c674d9b18fb56239d72e21d9f4c80", size = 10184276 }, + { url = "https://files.pythonhosted.org/packages/a5/2d/4a23849729bb27934a0e079c9c1aad912167d875c7b070382a408d459651/mypy-1.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7b2353a44d2179846a096e25691d54d59904559f4232519d420d64da6828a3a7", size = 12587706 }, + { url = "https://files.pythonhosted.org/packages/5c/c3/d318e38ada50255e22e23353a469c791379825240e71b0ad03e76ca07ae6/mypy-1.13.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0730d1c6a2739d4511dc4253f8274cdd140c55c32dfb0a4cf8b7a43f40abfa6f", size = 13105586 }, + { url = "https://files.pythonhosted.org/packages/4a/25/3918bc64952370c3dbdbd8c82c363804678127815febd2925b7273d9482c/mypy-1.13.0-cp310-cp310-win_amd64.whl", hash = "sha256:c5fc54dbb712ff5e5a0fca797e6e0aa25726c7e72c6a5850cfd2adbc1eb0a372", size = 9632318 }, + { url = "https://files.pythonhosted.org/packages/d0/19/de0822609e5b93d02579075248c7aa6ceaddcea92f00bf4ea8e4c22e3598/mypy-1.13.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:581665e6f3a8a9078f28d5502f4c334c0c8d802ef55ea0e7276a6e409bc0d82d", size = 10939027 }, + { url = "https://files.pythonhosted.org/packages/c8/71/6950fcc6ca84179137e4cbf7cf41e6b68b4a339a1f5d3e954f8c34e02d66/mypy-1.13.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3ddb5b9bf82e05cc9a627e84707b528e5c7caaa1c55c69e175abb15a761cec2d", size = 10108699 }, + { url = "https://files.pythonhosted.org/packages/26/50/29d3e7dd166e74dc13d46050b23f7d6d7533acf48f5217663a3719db024e/mypy-1.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:20c7ee0bc0d5a9595c46f38beb04201f2620065a93755704e141fcac9f59db2b", size = 12506263 }, + { url = "https://files.pythonhosted.org/packages/3f/1d/676e76f07f7d5ddcd4227af3938a9c9640f293b7d8a44dd4ff41d4db25c1/mypy-1.13.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:3790ded76f0b34bc9c8ba4def8f919dd6a46db0f5a6610fb994fe8efdd447f73", size = 12984688 }, + { url = "https://files.pythonhosted.org/packages/9c/03/5a85a30ae5407b1d28fab51bd3e2103e52ad0918d1e68f02a7778669a307/mypy-1.13.0-cp311-cp311-win_amd64.whl", hash = "sha256:51f869f4b6b538229c1d1bcc1dd7d119817206e2bc54e8e374b3dfa202defcca", size = 9626811 }, + { url = "https://files.pythonhosted.org/packages/fb/31/c526a7bd2e5c710ae47717c7a5f53f616db6d9097caf48ad650581e81748/mypy-1.13.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:5c7051a3461ae84dfb5dd15eff5094640c61c5f22257c8b766794e6dd85e72d5", size = 11077900 }, + { url = "https://files.pythonhosted.org/packages/83/67/b7419c6b503679d10bd26fc67529bc6a1f7a5f220bbb9f292dc10d33352f/mypy-1.13.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:39bb21c69a5d6342f4ce526e4584bc5c197fd20a60d14a8624d8743fffb9472e", size = 10074818 }, + { url = "https://files.pythonhosted.org/packages/ba/07/37d67048786ae84e6612575e173d713c9a05d0ae495dde1e68d972207d98/mypy-1.13.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:164f28cb9d6367439031f4c81e84d3ccaa1e19232d9d05d37cb0bd880d3f93c2", size = 12589275 }, + { url = "https://files.pythonhosted.org/packages/1f/17/b1018c6bb3e9f1ce3956722b3bf91bff86c1cefccca71cec05eae49d6d41/mypy-1.13.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a4c1bfcdbce96ff5d96fc9b08e3831acb30dc44ab02671eca5953eadad07d6d0", size = 13037783 }, + { url = "https://files.pythonhosted.org/packages/cb/32/cd540755579e54a88099aee0287086d996f5a24281a673f78a0e14dba150/mypy-1.13.0-cp312-cp312-win_amd64.whl", hash = "sha256:a0affb3a79a256b4183ba09811e3577c5163ed06685e4d4b46429a271ba174d2", size = 9726197 }, + { url = "https://files.pythonhosted.org/packages/11/bb/ab4cfdc562cad80418f077d8be9b4491ee4fb257440da951b85cbb0a639e/mypy-1.13.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a7b44178c9760ce1a43f544e595d35ed61ac2c3de306599fa59b38a6048e1aa7", size = 11069721 }, + { url = "https://files.pythonhosted.org/packages/59/3b/a393b1607cb749ea2c621def5ba8c58308ff05e30d9dbdc7c15028bca111/mypy-1.13.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5d5092efb8516d08440e36626f0153b5006d4088c1d663d88bf79625af3d1d62", size = 10063996 }, + { url = "https://files.pythonhosted.org/packages/d1/1f/6b76be289a5a521bb1caedc1f08e76ff17ab59061007f201a8a18cc514d1/mypy-1.13.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:de2904956dac40ced10931ac967ae63c5089bd498542194b436eb097a9f77bc8", size = 12584043 }, + { url = "https://files.pythonhosted.org/packages/a6/83/5a85c9a5976c6f96e3a5a7591aa28b4a6ca3a07e9e5ba0cec090c8b596d6/mypy-1.13.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:7bfd8836970d33c2105562650656b6846149374dc8ed77d98424b40b09340ba7", size = 13036996 }, + { url = "https://files.pythonhosted.org/packages/b4/59/c39a6f752f1f893fccbcf1bdd2aca67c79c842402b5283563d006a67cf76/mypy-1.13.0-cp313-cp313-win_amd64.whl", hash = "sha256:9f73dba9ec77acb86457a8fc04b5239822df0c14a082564737833d2963677dbc", size = 9737709 }, + { url = "https://files.pythonhosted.org/packages/5f/d4/b33ddd40dad230efb317898a2d1c267c04edba73bc5086bf77edeb410fb2/mypy-1.13.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0246bcb1b5de7f08f2826451abd947bf656945209b140d16ed317f65a17dc7dc", size = 11013906 }, + { url = "https://files.pythonhosted.org/packages/f4/e6/f414bca465b44d01cd5f4a82761e15044bedd1bf8025c5af3cc64518fac5/mypy-1.13.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:7f5b7deae912cf8b77e990b9280f170381fdfbddf61b4ef80927edd813163732", size = 10180657 }, + { url = "https://files.pythonhosted.org/packages/38/e9/fc3865e417722f98d58409770be01afb961e2c1f99930659ff4ae7ca8b7e/mypy-1.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7029881ec6ffb8bc233a4fa364736789582c738217b133f1b55967115288a2bc", size = 12586394 }, + { url = "https://files.pythonhosted.org/packages/2e/35/f4d8b6d2cb0b3dad63e96caf159419dda023f45a358c6c9ac582ccaee354/mypy-1.13.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:3e38b980e5681f28f033f3be86b099a247b13c491f14bb8b1e1e134d23bb599d", size = 13103591 }, + { url = "https://files.pythonhosted.org/packages/22/1d/80594aef135f921dd52e142fa0acd19df197690bd0cde42cea7b88cf5aa2/mypy-1.13.0-cp39-cp39-win_amd64.whl", hash = "sha256:a6789be98a2017c912ae6ccb77ea553bbaf13d27605d2ca20a76dfbced631b24", size = 9634690 }, + { url = "https://files.pythonhosted.org/packages/3b/86/72ce7f57431d87a7ff17d442f521146a6585019eb8f4f31b7c02801f78ad/mypy-1.13.0-py3-none-any.whl", hash = "sha256:9c250883f9fd81d212e0952c92dbfcc96fc237f4b7c92f56ac81fd48460b3e5a", size = 2647043 }, +] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/98/a4/1ab47638b92648243faf97a5aeb6ea83059cc3624972ab6b8d2316078d3f/mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782", size = 4433 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/e2/5d3f6ada4297caebe1a2add3b126fe800c96f56dbe5d1988a2cbe0b267aa/mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d", size = 4695 }, +] + [[package]] name = "oscrypto" version = "1.3.0" @@ -449,6 +497,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "cryptography" }, + { name = "mypy" }, { name = "oscrypto" }, { name = "pytest" }, { name = "ruff" }, @@ -464,6 +513,7 @@ requires-dist = [{ name = "asn1crypto", specifier = ">=1.4.0" }] [package.metadata.requires-dev] dev = [ { name = "cryptography", specifier = ">=44.0.0" }, + { name = "mypy", specifier = ">=1.13.0" }, { name = "oscrypto", specifier = ">=1.3.0" }, { name = "pytest", specifier = ">=8.3.4" }, { name = "ruff", specifier = ">=0.8.3" },