diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 24a817cb6..18617f2d5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,11 @@ Changelog 1.14.0 (master) --------------- +* Added support for Ed25519, Ed448, X25519 and X448 keys (see `RFC 8037 `_). + These are also known as Bernstein curves. +* Added support for signing with Ed25519, Ed448, X25519 and X448 keys + (see `RFC 8032 `_). See JWA. +* Minimum requirement of ``cryptography`` is now 2.6+. * Dropped support for Python 3.6. * Added a new valid PGP key for signing our PyPI packages with the fingerprint diff --git a/src/josepy/json_util.py b/src/josepy/json_util.py index 1ae998599..92c848bf4 100644 --- a/src/josepy/json_util.py +++ b/src/josepy/json_util.py @@ -485,7 +485,7 @@ def register(cls, type_cls: Type[GenericTypedJSONObjectWithFields], def get_type_cls(cls, jobj: Mapping[str, Any]) -> Type["TypedJSONObjectWithFields"]: """Get the registered class for ``jobj``.""" if cls in cls.TYPES.values(): - if cls.type_field_name not in jobj: + if cls.type_field_name not in jobj: # noqa raise errors.DeserializationError( "Missing type field ({0})".format(cls.type_field_name)) # cls is already registered type_cls, force to use it diff --git a/src/josepy/jwk.py b/src/josepy/jwk.py index afc9bd504..309e457be 100644 --- a/src/josepy/jwk.py +++ b/src/josepy/jwk.py @@ -1,5 +1,6 @@ """JSON Web Key.""" import abc +import collections import json import logging import math @@ -18,7 +19,16 @@ import cryptography.exceptions from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import ec, rsa +# TODO import with try/except as some curves may not be available +# They do this in latchset/jwcrypto +from cryptography.hazmat.primitives.asymmetric import ( + ec, + ed448, + ed25519, + rsa, + x448, + x25519, +) import josepy.util from josepy import errors, json_util, util @@ -387,3 +397,112 @@ def public_key(self) -> 'JWKEC': else: key = self.key.public_numbers().public_key(default_backend()) return type(self)(key=key) + + +@JWK.register +class JWKOKP(JWK): + """ + Performs signing and verification operations using either + Ed25519, Ed448, X25519 or X448. See RFC 8037 and RFC 8032 for details about + the algorithms, and signing, respectively. + + :ivar: :key :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey` + or :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey` + wrapped in :class:`~josepy.util.ComparableOKPKey` + + This class requires ``cryptography>=2.6`` to be installed. + """ + typ = "OKP" + __slots__ = ("key",) + cryptography_key_types = ( + ed25519.Ed25519PrivateKey, ed25519.Ed25519PrivateKey, + ed448.Ed448PublicKey, ed448.Ed448PrivateKey, + x25519.X25519PrivateKey, x25519.X25519PublicKey, + x448.X448PrivateKey, x448.X448PublicKey, + ) + required = ("crv", JWK.type_field_name, "x") + okp_curve = collections.namedtuple("okp_curve", "pubkey privkey") + crv_to_pub_priv = { + "Ed25519": okp_curve(pubkey=ed25519.Ed25519PublicKey, privkey=ed25519.Ed25519PrivateKey), + "Ed448": okp_curve(pubkey=ed448.Ed448PublicKey, privkey=ed448.Ed448PrivateKey), + "X25519": okp_curve(pubkey=x25519.X25519PublicKey, privkey=x25519.X25519PrivateKey), + "X448": okp_curve(pubkey=x448.X448PublicKey, privkey=x448.X448PrivateKey), + } + + def __init__(self, *args: Any, **kwargs: Any) -> None: + if 'key' in kwargs and not isinstance(kwargs['key'], util.ComparableOKPKey): + kwargs['key'] = util.ComparableOKPKey(kwargs['key']) + super().__init__(*args, **kwargs) + + def public_key(self) -> "JWKOKP": + return self.key.__class__.public_key() + + def _key_to_crv(self) -> str: + if isinstance(self.key._wrapped, (ed25519.Ed25519PublicKey, ed25519.Ed25519PrivateKey)): + return "Ed25519" + elif isinstance(self.key._wrapped, (ed448.Ed448PublicKey, ed448.Ed448PrivateKey)): + return "Ed448" + elif isinstance(self.key._wrapped, (x25519.X25519PublicKey, x25519.X25519PrivateKey)): + return "X25519" + elif isinstance(self.key._wrapped, (x448.X448PublicKey, x448.X448PrivateKey)): + return "X448" + return NotImplemented + + def fields_to_partial_json(self) -> Dict[str, Any]: + params = { + "crv": self._key_to_crv(), + "kty": "OKP", + } + if hasattr(self.key._wrapped, "private_bytes"): + params['d'] = json_util.encode_b64jose(self.key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + )) + params['x'] = json_util.encode_b64jose(self.key.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + )) + else: + params['x'] = json_util.encode_b64jose(self.key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + )) + return params + + @classmethod + def fields_from_json(cls, jobj: Mapping[str, Any]) -> "JWKOKP": + curve = jobj["crv"] + if curve not in cls.crv_to_pub_priv: + raise errors.DeserializationError(f"Invalid curve: {curve}") + + if "x" not in jobj: + raise errors.DeserializationError('OKP should have "x" parameter') + x = json_util.decode_b64jose(jobj["x"]) + + try: + if "d" not in jobj: # public key + pub_class: Type[Union[ + ed25519.Ed25519PublicKey, + ed448.Ed448PublicKey, + x25519.X25519PublicKey, + x448.X448PublicKey, + ]] = cls.crv_to_pub_priv[curve].pubkey + return cls(key=pub_class.from_public_bytes(x)) + else: # private key + d = json_util.decode_b64jose(jobj["d"]) + priv_key_class: Type[Union[ + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + x25519.X25519PrivateKey, + x448.X448PrivateKey, + ]] = cls.crv_to_pub_priv[curve].privkey + return cls(key=priv_key_class.from_private_bytes(d)) + except ValueError as err: + raise errors.DeserializationError("Invalid key parameter") from err diff --git a/src/josepy/util.py b/src/josepy/util.py index bf41b9ef3..68dcc69a2 100644 --- a/src/josepy/util.py +++ b/src/josepy/util.py @@ -6,7 +6,8 @@ from types import ModuleType from typing import Any, Callable, Iterator, List, Tuple, TypeVar, Union, cast -from cryptography.hazmat.primitives.asymmetric import ec, rsa +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa from OpenSSL import crypto @@ -69,17 +70,22 @@ class ComparableKey: """ __hash__: Callable[[], int] = NotImplemented + def __getattr__(self, name: str) -> Any: + return getattr(self._wrapped, name) + def __init__(self, wrapped: Union[ rsa.RSAPrivateKeyWithSerialization, rsa.RSAPublicKeyWithSerialization, ec.EllipticCurvePrivateKeyWithSerialization, - ec.EllipticCurvePublicKeyWithSerialization]): + ec.EllipticCurvePublicKeyWithSerialization, + ed25519.Ed25519PrivateKey, + ed25519.Ed25519PublicKey, + ed448.Ed448PrivateKey, + ed448.Ed448PublicKey, + ]): self._wrapped = wrapped - def __getattr__(self, name: str) -> Any: - return getattr(self._wrapped, name) - def __eq__(self, other: Any) -> bool: if (not isinstance(other, self.__class__) or self._wrapped.__class__ is not other._wrapped.__class__): @@ -88,6 +94,26 @@ def __eq__(self, other: Any) -> bool: return self.private_numbers() == other.private_numbers() elif hasattr(self._wrapped, 'public_numbers'): return self.public_numbers() == other.public_numbers() + elif (isinstance(self._wrapped, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)) and + isinstance(other._wrapped, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey))): + return self._wrapped.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) == other._wrapped.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + elif (isinstance(self._wrapped, (ed25519.Ed25519PublicKey, ed448.Ed448PublicKey)) and + isinstance(other._wrapped, (ed25519.Ed25519PublicKey, ed448.Ed448PublicKey))): + return self._wrapped.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) == other._wrapped.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) else: return NotImplemented @@ -96,8 +122,12 @@ def __repr__(self) -> str: def public_key(self) -> 'ComparableKey': """Get wrapped public key.""" - if isinstance(self._wrapped, (rsa.RSAPublicKeyWithSerialization, - ec.EllipticCurvePublicKeyWithSerialization)): + if isinstance(self._wrapped, ( + rsa.RSAPublicKeyWithSerialization, + ec.EllipticCurvePublicKeyWithSerialization, + ed25519.Ed25519PublicKey, + ed448.Ed448PublicKey, + )): return self return self.__class__(self._wrapped.public_key()) @@ -129,7 +159,7 @@ def __hash__(self) -> int: class ComparableECKey(ComparableKey): - """Wrapper for ``cryptography`` RSA keys. + """Wrapper for ``cryptography`` EC keys. Wraps around: - :class:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey` - :class:`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` @@ -152,6 +182,36 @@ def __hash__(self) -> int: GenericImmutableMap = TypeVar('GenericImmutableMap', bound='ImmutableMap') +class ComparableOKPKey(ComparableKey): + """Wrapper for ``cryptography`` OKP keys. + + Wraps around any of these available with the compilation + - :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey` + + These are not yet supported + - :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey` + - :class:`~cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey` + """ + + def __hash__(self) -> int: + if isinstance(self._wrapped, (ed25519.Ed25519PublicKey, ed448.Ed448PublicKey)): + return hash(self._wrapped.public_bytes( + format=serialization.PublicFormat.Raw, + encoding=serialization.Encoding.Raw, + )[:32]) + elif isinstance(self._wrapped, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)): + return hash(self._wrapped.public_key().public_bytes( + format=serialization.PublicFormat.Raw, + encoding=serialization.Encoding.Raw, + )[:32]) + return 0 + + class ImmutableMap(Mapping, Hashable): """Immutable key to value mapping with attribute access.""" diff --git a/tests/jwk_test.py b/tests/jwk_test.py index 1b0d6bad4..dd7b54150 100644 --- a/tests/jwk_test.py +++ b/tests/jwk_test.py @@ -12,6 +12,11 @@ EC_P256_KEY = test_util.load_ec_private_key('ec_p256_key.pem') EC_P384_KEY = test_util.load_ec_private_key('ec_p384_key.pem') EC_P521_KEY = test_util.load_ec_private_key('ec_p521_key.pem') +Ed25519_KEY = test_util.load_okp_private_key('ed25519_key.pem') +Ed448_KEY = test_util.load_okp_private_key('ed448_key.pem') +# Not implemented on my machine locally, and just cause barf from OpenSSL +# X25519_KEY = test_util.load_okp_private_key('x25519_key.pem') +# X448_KEY = test_util.load_okp_private_key('x448_key.pem') class JWKTest(unittest.TestCase): @@ -327,5 +332,146 @@ def test_encode_y_leading_zero_p256(self): JWK.from_json(data) +class JWKOKPTestBase(unittest.TestCase): + pass + + +class JWKOKPTest(JWKOKPTestBase): + """Tests for josepy.jwk.JWKOKP.""" + + thumbprint = ( + b'kPrK_qmxVWaYVA9wwBF6Iuo3vVzz7TxHCTwXBygrS4k' + ) + + def setUp(self): + from josepy.jwk import JWKOKP + self.ed25519_key = JWKOKP(key=Ed25519_KEY.public_key()) + self.ed448_key = JWKOKP(key=Ed448_KEY.public_key()) + # self.x25519_key = JWKOKP(key=X25519_KEY.public_key()) + # self.x448_key = JWKOKP(key=X448_KEY.public_key()) + # self.private = self.x448_key + # self.jwk = self.private + # Test vectors taken from RFC 8037, A.2 + self.jwked25519json = { + 'kty': 'OKP', + 'crv': 'Ed25519', + 'x': '11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo', + } + self.jwked448json = { + 'kty': 'OKP', + 'crv': 'Ed448', + 'x': ( + "9b08f7cc31b7e3e67d22d5aea121074a273bd2b83de09c63faa73d2c" + "22c5d9bbc836647241d953d40c5b12da88120d53177f80e532c41fa0" + ) + } + # Test vectors taken from + # https://datatracker.ietf.org/doc/html/rfc7748#section-6.1 + self.jwkx25519json = { + 'kty': 'OKP', + 'crv': 'X25519', + 'x': '8520f0098930a754748b7ddcb43ef75a0dbf3a0d26381af4eba4a98eaa9b4e6a', + } + # not 56 bytes long + self.jwkx448json = { + "kty": "OKP", + "crv": "X448", + "x": "jjQtV-fA7J_tK8dPzYq7jRPNjF8r5p6LW2R25S2Gw5U", + } + self.jwk = self.jwked25519json + self.private = JWKOKP(key=Ed25519_KEY) + + def test_encode_ed448(self): + from josepy.jwk import JWKOKP + data = b"""-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOfqsAFWdop10FFPW7Ha2tx2AZh0Ii+jfL2wFXU/dY/fe +iU7/vrGmQ+ux26NkgzfploOHZjEmltLJ9w== +-----END PRIVATE KEY-----""" + key = JWKOKP.load(data) + partial = key.to_partial_json() + self.assertEqual(partial['crv'], 'Ed448') + + def test_encode_ed25519(self): + from josepy.jwk import JWKOKP + data = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIPIAha9VqyHHpY1GtEW8JXWqLU5mrPRhXPwJqCtL3bWZ +-----END PRIVATE KEY-----""" + key = JWKOKP.load(data) + data = key.to_partial_json() + self.assertEqual(data['x'], "9ujoz88QZL05w2lhaqUbBaBpwmM12Y7Y8Ybfwjibk-I") + self.assertEqual(data['d'], "8gCFr1WrIceljUa0RbwldaotTmas9GFc_AmoK0vdtZk") + + def test_from_json(self): + from josepy.jwk import JWK + key = JWK.from_json(self.jwked25519json) + with self.subTest(key=[ + self.jwked448json, + self.jwked25519json, + ]): + self.assertIsInstance(key.key, util.ComparableOKPKey) + + def test_load(self): + from josepy.jwk import JWKOKP + self.assertEqual( + self.private, + JWKOKP.load(test_util.load_vector('ed25519_key.pem')), + ) + + def test_fields_to_json(self): + from josepy.jwk import JWK + data = b"""-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIPIAha9VqyHHpY1GtEW8JXWqLU5mrPRhXPwJqCtL3bWZ +-----END PRIVATE KEY-----""" + key = JWK.load(data) + data = key.fields_to_partial_json() + self.assertEqual(data["crv"], "Ed25519") + self.assertEqual(data["d"], "8gCFr1WrIceljUa0RbwldaotTmas9GFc_AmoK0vdtZk") + + @unittest.skip + def test_init_auto_comparable(self): + self.assertIsInstance(self.x448_key.key, util.ComparableOKPKey) + + def test_unknown_crv_name(self): + from josepy.jwk import JWK + self.assertRaises( + errors.DeserializationError, JWK.from_json, + { + 'kty': 'OKP', + 'crv': 'Ed1000', + 'x': 'jjQtV-fA7J_tK8dPzYq7jRPNjF8r5p6LW2R25S2Gw5U', + } + ) + + def test_no_x_value(self): + from josepy.jwk import JWK + with self.assertRaises(errors.DeserializationError) as warn: + JWK.from_json( + { + "kty": "OKP", + "crv": "Ed448", + } + ) + self.assertEqual( + warn.exception.__str__(), + 'Deserialization error: OKP should have "x" parameter' + ) + + def test_from_json_hashable(self): + from josepy.jwk import JWK + h = hash(JWK.from_json(self.jwked25519json)) + self.assertIsInstance(h, int) + + def test_deserialize_public_key(self): + # should target jwk.py:474-484, but those lines are still marked as missing + # in the coverage report + from josepy.jwk import JWKOKP + JWKOKP.fields_from_json(self.jwked25519json) + + @unittest.skip + def test_x448(self): + from josepy.jwk import JWKOKP + _ = JWKOKP.fields_from_json(self.jwkx448json) + + if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/tests/test_util.py b/tests/test_util.py index 8195a5ae6..fad92ebe7 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -9,7 +9,7 @@ import josepy.util from josepy import ComparableRSAKey, ComparableX509 -from josepy.util import ComparableECKey +from josepy.util import ComparableECKey, ComparableOKPKey def vector_path(*names: str) -> str: @@ -74,6 +74,15 @@ def load_ec_private_key(*names: str) -> josepy.util.ComparableECKey: load_vector(*names), password=None, backend=default_backend())) +def load_okp_private_key(*names): + """Load OKP private key.""" + loader = _guess_loader( + names[-1], serialization.load_pem_private_key, + serialization.load_der_private_key, + ) + return ComparableOKPKey(loader(load_vector(*names), password=None, backend=default_backend())) + + def load_pyopenssl_private_key(*names: str) -> crypto.PKey: """Load pyOpenSSL private key.""" loader = _guess_loader( diff --git a/tests/testdata/README b/tests/testdata/README index deb1eb6f0..7f6c0a7aa 100644 --- a/tests/testdata/README +++ b/tests/testdata/README @@ -10,6 +10,13 @@ The following command has been used to generate test keys: openssl ecparam -name secp384r1 -genkey -out ec_p384_key.pem openssl ecparam -name secp521r1 -genkey -out ec_p521_key.pem +The following commands generate the Bernstein keys, Ed25519, Ed448, X448 and X25519 keys. + + for version in 25519 448; do + openssl genpkey -algorithm ed${version} -out ed${version}_key.pem + openssl genpkey -algorithm x${version} -out x${version}_key.pem + done + and for the CSR: openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -outform DER > csr.der diff --git a/tests/testdata/ed25519_key.pem b/tests/testdata/ed25519_key.pem new file mode 100644 index 000000000..a09f9bdd0 --- /dev/null +++ b/tests/testdata/ed25519_key.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIPIAha9VqyHHpY1GtEW8JXWqLU5mrPRhXPwJqCtL3bWZ +-----END PRIVATE KEY----- diff --git a/tests/testdata/ed448_key.pem b/tests/testdata/ed448_key.pem new file mode 100644 index 000000000..6e5e7e7e7 --- /dev/null +++ b/tests/testdata/ed448_key.pem @@ -0,0 +1,4 @@ +-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOfqsAFWdop10FFPW7Ha2tx2AZh0Ii+jfL2wFXU/dY/fe +iU7/vrGmQ+ux26NkgzfploOHZjEmltLJ9w== +-----END PRIVATE KEY----- diff --git a/tests/testdata/x25519_key.pem b/tests/testdata/x25519_key.pem new file mode 100644 index 000000000..cfdd3a573 --- /dev/null +++ b/tests/testdata/x25519_key.pem @@ -0,0 +1,3 @@ +----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VuBCIEIHCtaWroERB0RhzMDCOeinLOOuEhe19g+c6End8SEelh +-----END PRIVATE KEY------ diff --git a/tests/testdata/x448_key.pem b/tests/testdata/x448_key.pem new file mode 100644 index 000000000..54ec6a2b9 --- /dev/null +++ b/tests/testdata/x448_key.pem @@ -0,0 +1,4 @@ +-----BEGIN PRIVATE KEY----- +MEYCAQAwBQYDK2VvBDoEOCwvHLPxqFBYBtdODtQYBGo2fUfJpmwvcnJ6Vfrhhw0n +NrMORIJt/2cv50jMYyjPzpErbolrHTWT +-----END PRIVATE KEY------- diff --git a/tests/util_test.py b/tests/util_test.py index e804957c7..e9bf6aa48 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -133,6 +133,31 @@ def test_public_key(self): self.assertIsInstance(self.p256_key.public_key(), ComparableECKey) +class ComparableOKPKeyTests(unittest.TestCase): + def setUp(self): + # test_utl.load_ec_private_key return ComparableECKey + self.ed25519_key = test_util.load_okp_private_key('ed25519_key.pem') + self.ed25519_key_same = test_util.load_okp_private_key('ed25519_key.pem') + self.ed448_key = test_util.load_okp_private_key('ed448_key.pem') + # self.x25519_key = test_util.load_okp_private_key('x25519_key.pem') + # self.x448_key = test_util.load_okp_private_key('x448_key.pem') + + def test_repr(self): + self.assertIs(repr(self.ed25519_key).startswith( + '