From c2cf61d6e7247dba03d2b64b98aadc7b61757940 Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 11:17:00 -0600 Subject: [PATCH 1/6] refactor certificate manager more refactoring --- auto_identity/certificate_manager.py | 206 +++++++++++------- auto_identity/utils.py | 17 ++ examples/register_auto_id.py | 5 +- .../test_certificate_manager.py | 4 +- 4 files changed, 144 insertions(+), 88 deletions(-) diff --git a/auto_identity/certificate_manager.py b/auto_identity/certificate_manager.py index 9262bb6..d211431 100644 --- a/auto_identity/certificate_manager.py +++ b/auto_identity/certificate_manager.py @@ -13,7 +13,7 @@ class CertificateManager: def __init__(self, certificate=None, private_key=None): """ - Initializes a certificate issuer. + Initializes a certificate manager. Args: certificate(Certificate): Certificate. @@ -22,6 +22,21 @@ def __init__(self, certificate=None, private_key=None): self.private_key = private_key self.certificate = certificate + def _prepare_signing_params(self): + """ + Prepares the signing parameters based on the key type. + + Returns: + dict: Signing parameters. + """ + private_key = self.private_key + if isinstance(private_key, ed25519.Ed25519PrivateKey): + return {"private_key": private_key, "algorithm": None} + if isinstance(private_key, rsa.RSAPrivateKey): + return {"private_key": private_key, "algorithm": hashes.SHA256()} + + raise ValueError("Unsupported key type for signing.") + @staticmethod def _to_common_name(subject_name): """ @@ -35,88 +50,103 @@ def _to_common_name(subject_name): """ return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, subject_name)]) - def _prepare_signing_params(self): + @staticmethod + def certificate_to_pem(certificate: x509.Certificate): """ - Prepares the signing parameters based on the key type. + Converts an x509 certificate to PEM format. Returns: - dict: Signing parameters. + bytes: PEM encoded certificate. """ - private_key = self.private_key - if isinstance(private_key, ed25519.Ed25519PrivateKey): - return {"private_key": private_key, "algorithm": None} - if isinstance(private_key, rsa.RSAPrivateKey): - return {"private_key": private_key, "algorithm": hashes.SHA256()} + return certificate.public_bytes(serialization.Encoding.PEM) - raise ValueError("Unsupported key type for signing.") + @staticmethod + def pem_to_certificate(pem_bytes: bytes) -> x509.Certificate: + """ + Converts PEM bytes to an x509 Certificate object. + + Args: + pem_bytes (bytes): The PEM-encoded certificate as bytes. + + Returns: + An x509.Certificate object. + """ + certificate = x509.load_pem_x509_certificate(pem_bytes) + return certificate + + @staticmethod + def get_subject_common_name(certificate: x509.Certificate): + """ + Retrieves the common name from the subject of the certificate. + + Args: + certificate(x509.Certificate): Certificate to retrieve the common name from . + + Returns: + str: Common name of the certificate. + """ + return certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value - def create_csr(self, subject_name): + @staticmethod + def create_csr(subject_name, uri: str = None): """ - Creates a Certificate Signing Request(CSR). + Creates an unsigned Certificate Signing Request(CSR). Args: subject_name(str): Subject name for the CSR(common name). Returns: - CertificateSigningRequest: Created X.509 CertificateSigningRequest. + CertificateSigningRequestBuilder: Created X.509 CertificateSigningRequestBuilder. """ - signing_params = self._prepare_signing_params() + common_name = x509.Name( + [x509.NameAttribute(NameOID.COMMON_NAME, subject_name)]) + csr = x509.CertificateSigningRequestBuilder().subject_name( - x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, subject_name)]) - ).sign(**signing_params) + common_name) + if uri: + csr = csr.add_extension( + x509.SubjectAlternativeName([x509.UniformResourceIdentifier(uri)]), critical=False) return csr - def issue_certificate(self, csr, validity_period_days=365): + def sign_csr(self, csr): """ - Issues a certificate for Certificate Signing Request(CSR). + Signs a Certificate Signing Request(CSR). Args: csr(CertificateSigningRequest): Certificate Signing Request. - issuer_certificate(Certificate): Issuer certificate. - issuer_private_key(PrivateKey): Private key to sign the certificate with . - validity_period_days(int, optional): Number of days the certificate is valid. Defaults to 365. Returns: - Certificate: Created X.509 certificate. + CertificateSigningRequest: Created X.509 CertificateSigningRequest. """ - if self.certificate is None: - raise ValueError("Certificate is not set.") if self.private_key is None: raise ValueError("Private key is not set.") - issuer_certificate = self.certificate + signing_params = self._prepare_signing_params() - # Verify that the public key derived from the private key matches the one in the issuer's certificate - if not do_public_keys_match(issuer_certificate.public_key(), self.private_key.public_key()): - raise ValueError( - "Issuer certificate public key does not match the private key used for signing.") + return csr.sign(**signing_params) - signing_params = self._prepare_signing_params() + def create_and_sign_csr(self, subject_name, uri: str = None): + """ + Creates and signs a Certificate Signing Request(CSR). - certificate = x509.CertificateBuilder().subject_name( - csr.subject, - ).issuer_name( - issuer_certificate.subject, - ).public_key( - csr.public_key(), - ).serial_number( - x509.random_serial_number(), - ).not_valid_before( - datetime.now(timezone.utc), - ).not_valid_after( - datetime.now(timezone.utc) + timedelta(days=validity_period_days), - ).sign(**signing_params) + Args: + subject_name(str): Subject name for the CSR(common name). - return certificate + Returns: + CertificateSigningRequest: Created X.509 CertificateSigningRequest. + """ + csr = self.create_csr(subject_name, uri) + return self.sign_csr(csr) - def self_issue_certificate(self, subject_name: str, validity_period_days=365): + def issue_certificate(self, csr: x509.CertificateSigningRequest, validity_period_days=365): """ - Issues a self-signed certificate for the identity. + Issues a certificate for Certificate Signing Request(CSR). Args: - subject_name(str): Subject name for the certificate(common name). - private_key(PrivateKey): Private key to sign the certificate with . + csr(CertificateSigningRequest): Certificate Signing Request. + issuer_certificate(Certificate): Issuer certificate. + issuer_private_key(PrivateKey): Private key to sign the certificate with . validity_period_days(int, optional): Number of days the certificate is valid. Defaults to 365. Returns: @@ -125,59 +155,67 @@ def self_issue_certificate(self, subject_name: str, validity_period_days=365): if self.private_key is None: raise ValueError("Private key is not set.") - signing_params = self._prepare_signing_params() - common_name = self._to_common_name(subject_name) - - certificate = x509.CertificateBuilder().subject_name( - common_name, + if self.certificate is None: + issuer_name = csr.subject + else: + if not do_public_keys_match(self.certificate.public_key(), self.private_key.public_key()): + raise ValueError( + "Issuer certificate public key does not match the private key used for signing.") + issuer_name = self.certificate.subject + + # Prepare the certificate builder with information from the CSR + certificate_builder = x509.CertificateBuilder().subject_name( + csr.subject ).issuer_name( - common_name, + issuer_name ).public_key( - self.private_key.public_key(), + csr.public_key() ).serial_number( - x509.random_serial_number(), + x509.random_serial_number() ).not_valid_before( - datetime.now(timezone.utc), + datetime.utcnow() ).not_valid_after( - datetime.now(timezone.utc) + timedelta(days=validity_period_days), - ).sign(**signing_params) + # Set the certificate's validity period + datetime.utcnow() + timedelta(days=validity_period_days) + ) - self.certificate = certificate - return certificate + # Copy all extensions from the CSR to the certificate + for extension in csr.extensions: + certificate_builder = certificate_builder.add_extension( + extension.value, extension.critical + ) - @staticmethod - def certificate_to_pem(certificate: x509.Certificate): - """ - Converts an x509 certificate to PEM format. - - Returns: - bytes: PEM encoded certificate. - """ - return certificate.public_bytes(serialization.Encoding.PEM) + return certificate_builder.sign(**self._prepare_signing_params()) - @staticmethod - def pem_to_certificate(pem_bytes: bytes) -> x509.Certificate: + def self_issue_certificate(self, subject_name: str, validity_period_days=365): """ - Converts PEM bytes to an x509 Certificate object. + Issues a self-signed certificate for the identity. Args: - pem_bytes (bytes): The PEM-encoded certificate as bytes. + subject_name(str): Subject name for the certificate(common name). + private_key(PrivateKey): Private key to sign the certificate with . + validity_period_days(int, optional): Number of days the certificate is valid. Defaults to 365. Returns: - An x509.Certificate object. + Certificate: Created X.509 certificate. """ - certificate = x509.load_pem_x509_certificate(pem_bytes) + if self.private_key is None: + raise ValueError("Private key is not set.") + + csr = self.sign_csr(self.create_csr(subject_name)) + certificate = self.issue_certificate(csr, validity_period_days) + + self.certificate = certificate return certificate - @staticmethod - def get_subject_common_name(certificate: x509.Certificate): + def save_certificate(self, file_path: str): """ - Retrieves the common name from the subject of the certificate. + Saves the certificate to a file. Args: - certificate(x509.Certificate): Certificate to retrieve the common name from . - - Returns: - str: Common name of the certificate. + file_path (str): Path to the file where the certificate should be saved. """ - return certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + certificate_data = self.certificate.public_bytes( + serialization.Encoding.PEM) + with open(file_path, "wb") as cert_file: + cert_file.write(certificate_data) diff --git a/auto_identity/utils.py b/auto_identity/utils.py index 74b00aa..fa5be95 100644 --- a/auto_identity/utils.py +++ b/auto_identity/utils.py @@ -1,4 +1,5 @@ from cryptography.x509.oid import ObjectIdentifier +from cryptography.hazmat.primitives import hashes from pyasn1.type import namedtype, univ from pyasn1.codec.der.encoder import encode @@ -35,3 +36,19 @@ def der_encode_signature_algorithm_oid(oid: ObjectIdentifier): der_encoded_oid = encode(algorithm_identifier) return der_encoded_oid + + +def blake2b_256(data: bytes) -> bytes: + """ + Hashes the given data using BLAKE2b-256. + + Args: + data (bytes): The data to be hashed. + + Returns: + bytes: The BLAKE2b-256 hash of the data. + + """ + digest = hashes.Hash(hashes.BLAKE2b(32)) + digest.update(data) + return digest.finalize() diff --git a/examples/register_auto_id.py b/examples/register_auto_id.py index 3fc2787..01efb2f 100644 --- a/examples/register_auto_id.py +++ b/examples/register_auto_id.py @@ -1,6 +1,6 @@ import os from dotenv import load_dotenv -from auto_identity import generate_ed25519_key_pair, self_issue_certificate, Registry, Keypair +from auto_identity import generate_ed25519_key_pair, CertificateManager, Registry, Keypair # Configuration @@ -18,7 +18,8 @@ def main(): registry = Registry(rpc_url=RPC_URL, signer=keypair) keys = generate_ed25519_key_pair() - certificate = self_issue_certificate("test", private_key=keys[0]) + cm = CertificateManager(private_key=keys[0]) + certificate = cm.self_issue_certificate("test") # Attempt to register the certificate receipt = registry.register_auto_id(certificate) diff --git a/tests/auto_identity_tests/test_certificate_manager.py b/tests/auto_identity_tests/test_certificate_manager.py index 3bdff73..c8ba5e4 100644 --- a/tests/auto_identity_tests/test_certificate_manager.py +++ b/tests/auto_identity_tests/test_certificate_manager.py @@ -13,7 +13,7 @@ def test_create_csr(): csr_creator = CertificateManager(private_key=private_key) # Call the create_csr function - csr = csr_creator.create_csr(subject_name) + csr = csr_creator.create_and_sign_csr(subject_name) # Assert that the CSR is not None assert csr is not None @@ -43,7 +43,7 @@ def test_issue_certificate(): csr_creator = CertificateManager(private_key=subject_private_key) # Call the create_csr function to generate a CSR - csr = csr_creator.create_csr(subject_name) + csr = csr_creator.create_and_sign_csr(subject_name) # Issue a certificate using the CSR certificate = issuer.issue_certificate(csr) From a8c7ec953098b8446e0b5a49eea5b5f0e33cacea Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 12:27:43 -0600 Subject: [PATCH 2/6] update register_auto_id example to include issuing and registering leaf auto_id --- examples/register_auto_id.py | 39 +++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/examples/register_auto_id.py b/examples/register_auto_id.py index 01efb2f..24be988 100644 --- a/examples/register_auto_id.py +++ b/examples/register_auto_id.py @@ -9,24 +9,39 @@ KEYPAIR_URI = os.getenv("KEYPAIR_URI") +def register(certificate, registry, issuer_id=None): + # Attempt to register the certificate + receipt = registry.register_auto_id(certificate, issuer_id) + if receipt.is_success: + for event in receipt.triggered_events: + event_data = event['event'].serialize() + print(event_data) + if event_data.get('event_id') == 'NewAutoIdRegistered': + auto_id_identifier = event_data['attributes'] + print( + f"New Auto Id registered with identifier: {auto_id_identifier}") + return auto_id_identifier + else: + print("Registration failed.") + + def main(): # Initialize the signer keypair - keypair = Keypair.create_from_uri( + substrate_keypair = Keypair.create_from_uri( KEYPAIR_URI, ss58_format=42) - # Initialize the Registry instance - registry = Registry(rpc_url=RPC_URL, signer=keypair) + registry = Registry(rpc_url=RPC_URL, signer=substrate_keypair) keys = generate_ed25519_key_pair() - cm = CertificateManager(private_key=keys[0]) - certificate = cm.self_issue_certificate("test") - - # Attempt to register the certificate - receipt = registry.register_auto_id(certificate) - if receipt: - print("Registration successful. Receipt:", receipt) - else: - print("Registration failed.") + self_issued_cm = CertificateManager(private_key=keys[0]) + self_issued_cm.self_issue_certificate("test") + issuer_id = register(self_issued_cm.certificate, registry) + + user_keys = generate_ed25519_key_pair() + user_cm = CertificateManager(private_key=user_keys[0]) + user_csr = user_cm.create_and_sign_csr("user") + user_cert = self_issued_cm.issue_certificate(user_csr) + register_user = register(user_cert, registry, issuer_id) if __name__ == "__main__": From 1196479420e365f25dc17309f18a51112cd185f8 Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 13:17:10 -0600 Subject: [PATCH 3/6] update register_auto_id to return identifier --- auto_identity/registry.py | 59 ++++++++++-------------------------- examples/register_auto_id.py | 12 ++------ 2 files changed, 19 insertions(+), 52 deletions(-) diff --git a/auto_identity/registry.py b/auto_identity/registry.py index 73d5fab..b59935e 100644 --- a/auto_identity/registry.py +++ b/auto_identity/registry.py @@ -1,10 +1,15 @@ -from typing import Optional +from typing import Optional, NamedTuple from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import ed25519 from substrateinterface import ExtrinsicReceipt, Keypair, SubstrateInterface, exceptions from .utils import der_encode_signature_algorithm_oid +class RegistrationResult(NamedTuple): + receipt: Optional[ExtrinsicReceipt] + identifier: Optional[int] + + class Registry(): """ Registry class for managing identities on Autonomys identity domains. @@ -51,7 +56,7 @@ def _compose_call(self, call_function: str, call_params: dict) -> Optional[Extri print("Failed to send: {}".format(e)) return None - def register_auto_id(self, certificate: x509.Certificate, issuer_id=None): + def register_auto_id(self, certificate: x509.Certificate, issuer_id=None) -> RegistrationResult: """ Register a certificate in the registry. @@ -60,7 +65,8 @@ def register_auto_id(self, certificate: x509.Certificate, issuer_id=None): issuer_id: The issuer ID. If None, the ID will be self-issued. Returns: - Optional[ExtrinsicReceipt]: The receipt of the extrinsic if successful, None otherwise. + RegistrationResult: A named tuple containing the receipt of the extrinsic and the identifier if successful, + or None for both fields if unsuccessful. """ base_certificate = { @@ -82,45 +88,12 @@ def register_auto_id(self, certificate: x509.Certificate, issuer_id=None): receipt = self._compose_call( call_function="register_auto_id", call_params=req) - return receipt - - def get_auto_id(self, identifier): - """ - Get an auto identity from the registry. - - Args: - subject_name (str): The subject name of the certificate. - - Returns: - The auto entity with the provided subject name. - """ - - result = self.registry.query('auto-id', 'AutoIds', [identifier]) - - if result is None: - return None - - # TODO map result to AutoEntity type - auto_entity = result - - return auto_entity - - def verify(self, subject_name: str, public_key: ed25519.Ed25519PublicKey): - """ - Verify a certificate from the registry. - - Args: - subject_name (str): The subject name of the certificate. - public_key (ed25519.Ed25519PublicKey): The public key to verify. - - Returns: - bool: True if the certificate is valid, False otherwise. - """ - entity = self.get_auto_id(subject_name) - - if entity is None: - return False - # TODO check public key and subject name match certificate, and that it is within the validity period + if receipt.is_success: + for event in receipt.triggered_events: + event_data = event['event'].serialize() + if event_data.get('event_id') == 'NewAutoIdRegistered': + identifier = event_data['attributes'] + return RegistrationResult(receipt=receipt, identifier=identifier) - return True + return RegistrationResult(receipt=receipt, identifier=None) diff --git a/examples/register_auto_id.py b/examples/register_auto_id.py index 24be988..3d3d21e 100644 --- a/examples/register_auto_id.py +++ b/examples/register_auto_id.py @@ -11,16 +11,10 @@ def register(certificate, registry, issuer_id=None): # Attempt to register the certificate - receipt = registry.register_auto_id(certificate, issuer_id) + receipt, identifer = registry.register_auto_id(certificate, issuer_id) if receipt.is_success: - for event in receipt.triggered_events: - event_data = event['event'].serialize() - print(event_data) - if event_data.get('event_id') == 'NewAutoIdRegistered': - auto_id_identifier = event_data['attributes'] - print( - f"New Auto Id registered with identifier: {auto_id_identifier}") - return auto_id_identifier + print(f"Registration successful. {identifer}") + return identifer else: print("Registration failed.") From 7d1b5cdf00a7b5ca32553f0a59d83bedb01352ac Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 14:10:16 -0600 Subject: [PATCH 4/6] add auto_id identifier logic to certificate issuance --- auto_identity/certificate_manager.py | 83 ++++++++++++++++++++++++---- auto_identity/utils.py | 8 +-- examples/register_auto_id.py | 4 ++ 3 files changed, 81 insertions(+), 14 deletions(-) diff --git a/auto_identity/certificate_manager.py b/auto_identity/certificate_manager.py index d211431..a920e1c 100644 --- a/auto_identity/certificate_manager.py +++ b/auto_identity/certificate_manager.py @@ -4,6 +4,7 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ed25519, rsa from .key_management import do_public_keys_match +from .utils import blake2b_256 class CertificateManager: @@ -50,6 +51,34 @@ def _to_common_name(subject_name): """ return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, subject_name)]) + @staticmethod + def pretty_print_certificate(cert): + """ + Prints the details of an X.509 Certificate in a readable format. + + Args: + cert (x509.Certificate): The certificate to print. + """ + print("Certificate:") + print("============") + print("Subject:", cert.subject.rfc4514_string()) + print("Issuer:", cert.issuer.rfc4514_string()) + print("Serial Number:", cert.serial_number) + print("Not Valid Before:", cert.not_valid_before_utc) + print("Not Valid After:", cert.not_valid_after_utc) + + print("\nExtensions:") + for ext in cert.extensions: + print( + f" - {ext.oid._name if ext.oid._name else ext.oid.dotted_string}: {ext.value}") + + # Optionally, print the public key details + print("\nPublic Key:") + print(cert.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ).decode('utf-8')) + @staticmethod def certificate_to_pem(certificate: x509.Certificate): """ @@ -75,7 +104,7 @@ def pem_to_certificate(pem_bytes: bytes) -> x509.Certificate: return certificate @staticmethod - def get_subject_common_name(certificate: x509.Certificate): + def get_subject_common_name(subject: x509.Name): """ Retrieves the common name from the subject of the certificate. @@ -85,10 +114,27 @@ def get_subject_common_name(certificate: x509.Certificate): Returns: str: Common name of the certificate. """ - return certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + return subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + + @staticmethod + def get_certificate_auto_id(certificate: x509.Certificate): + """ + Retrieves the autoid from the certificate. + + Args: + certificate(x509.Certificate): Certificate to retrieve the autoid from. + + Returns: + str: Autoid of the certificate. + """ + san = certificate.extensions.get_extension_for_oid( + x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value + for name in san: + if isinstance(name, x509.UniformResourceIdentifier) and name.value.startswith("autoid:auto:"): + return name.value.split(":")[-1] @staticmethod - def create_csr(subject_name, uri: str = None): + def create_csr(subject_name): """ Creates an unsigned Certificate Signing Request(CSR). @@ -103,9 +149,6 @@ def create_csr(subject_name, uri: str = None): csr = x509.CertificateSigningRequestBuilder().subject_name( common_name) - if uri: - csr = csr.add_extension( - x509.SubjectAlternativeName([x509.UniformResourceIdentifier(uri)]), critical=False) return csr @@ -126,7 +169,7 @@ def sign_csr(self, csr): return csr.sign(**signing_params) - def create_and_sign_csr(self, subject_name, uri: str = None): + def create_and_sign_csr(self, subject_name): """ Creates and signs a Certificate Signing Request(CSR). @@ -136,7 +179,7 @@ def create_and_sign_csr(self, subject_name, uri: str = None): Returns: CertificateSigningRequest: Created X.509 CertificateSigningRequest. """ - csr = self.create_csr(subject_name, uri) + csr = self.create_csr(subject_name) return self.sign_csr(csr) def issue_certificate(self, csr: x509.CertificateSigningRequest, validity_period_days=365): @@ -157,11 +200,15 @@ def issue_certificate(self, csr: x509.CertificateSigningRequest, validity_period if self.certificate is None: issuer_name = csr.subject + auto_id = blake2b_256( + self.get_subject_common_name(issuer_name).encode()) else: if not do_public_keys_match(self.certificate.public_key(), self.private_key.public_key()): raise ValueError( "Issuer certificate public key does not match the private key used for signing.") issuer_name = self.certificate.subject + auto_id = blake2b_256((self.get_subject_common_name( + issuer_name)+self.get_subject_common_name(csr.subject)).encode()) # Prepare the certificate builder with information from the CSR certificate_builder = x509.CertificateBuilder().subject_name( @@ -175,17 +222,33 @@ def issue_certificate(self, csr: x509.CertificateSigningRequest, validity_period ).not_valid_before( datetime.utcnow() ).not_valid_after( - # Set the certificate's validity period datetime.utcnow() + timedelta(days=validity_period_days) ) + auto_id_san = x509.UniformResourceIdentifier( + f"autoid:auto:{auto_id.hex()}") + san_extensions = [ext for ext in csr.extensions if isinstance( + ext.value, x509.SubjectAlternativeName)] + if san_extensions: + existing_san = san_extensions[0].value + new_san = existing_san.add(auto_id_san) + certificate_builder = certificate_builder.add_extension( + new_san, critical=False) + else: + certificate_builder = certificate_builder.add_extension( + x509.SubjectAlternativeName([auto_id_san]), + critical=False + ) # Copy all extensions from the CSR to the certificate for extension in csr.extensions: certificate_builder = certificate_builder.add_extension( extension.value, extension.critical ) - return certificate_builder.sign(**self._prepare_signing_params()) + certificate = certificate_builder.sign( + **self._prepare_signing_params()) + + return certificate def self_issue_certificate(self, subject_name: str, validity_period_days=365): """ diff --git a/auto_identity/utils.py b/auto_identity/utils.py index fa5be95..6b5e074 100644 --- a/auto_identity/utils.py +++ b/auto_identity/utils.py @@ -1,5 +1,5 @@ +from hashlib import blake2b from cryptography.x509.oid import ObjectIdentifier -from cryptography.hazmat.primitives import hashes from pyasn1.type import namedtype, univ from pyasn1.codec.der.encoder import encode @@ -49,6 +49,6 @@ def blake2b_256(data: bytes) -> bytes: bytes: The BLAKE2b-256 hash of the data. """ - digest = hashes.Hash(hashes.BLAKE2b(32)) - digest.update(data) - return digest.finalize() + hasher = blake2b(data, digest_size=32) + + return hasher.digest() diff --git a/examples/register_auto_id.py b/examples/register_auto_id.py index 3d3d21e..ae18f83 100644 --- a/examples/register_auto_id.py +++ b/examples/register_auto_id.py @@ -35,8 +35,12 @@ def main(): user_cm = CertificateManager(private_key=user_keys[0]) user_csr = user_cm.create_and_sign_csr("user") user_cert = self_issued_cm.issue_certificate(user_csr) + CertificateManager.pretty_print_certificate(user_cert) register_user = register(user_cert, registry, issuer_id) + print( + f"auto id from cert: {CertificateManager.get_certificate_auto_id(user_cert)}") + if __name__ == "__main__": main() From 9ced1728704c92b5b0cd4e80caf0d55f570311b7 Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 14:18:27 -0600 Subject: [PATCH 5/6] add blake2b test and update cert manager test --- auto_identity/__init__.py | 5 +++-- setup.py | 2 +- tests/auto_identity_tests/test_certificate_manager.py | 2 +- tests/auto_identity_tests/test_utils.py | 9 ++++++++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/auto_identity/__init__.py b/auto_identity/__init__.py index 9136d98..77dad1a 100644 --- a/auto_identity/__init__.py +++ b/auto_identity/__init__.py @@ -18,9 +18,9 @@ save_key) from .certificate_manager import CertificateManager from .registry import Registry -from .utils import der_encode_signature_algorithm_oid +from .utils import der_encode_signature_algorithm_oid, blake2b_256 -__version__ = '0.1.4' +__version__ = '0.1.5' __all__ = [ "generate_rsa_key_pair", @@ -34,6 +34,7 @@ "key_to_pem", "CertificateManager", "der_encode_signature_algorithm_oid", + "blake2b_256", "Registry", "Keypair", ] diff --git a/setup.py b/setup.py index 529e082..ab6c507 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='auto-sdk', - version='0.1.4', + version='0.1.5', author='Autonomys', author_email='jeremy@subspace.network', url='https://github.com/subspace/auto-kit', diff --git a/tests/auto_identity_tests/test_certificate_manager.py b/tests/auto_identity_tests/test_certificate_manager.py index c8ba5e4..613b516 100644 --- a/tests/auto_identity_tests/test_certificate_manager.py +++ b/tests/auto_identity_tests/test_certificate_manager.py @@ -102,7 +102,7 @@ def test_get_subject_common_name(): certificate = self_issuer.self_issue_certificate("Test") # Retrieve the common name from the certificate - common_name = self_issuer.get_subject_common_name(certificate) + common_name = self_issuer.get_subject_common_name(certificate.subject) # Assert that the common name matches the provided subject name assert common_name == subject_name diff --git a/tests/auto_identity_tests/test_utils.py b/tests/auto_identity_tests/test_utils.py index 756b3af..f3fbc98 100644 --- a/tests/auto_identity_tests/test_utils.py +++ b/tests/auto_identity_tests/test_utils.py @@ -1,7 +1,7 @@ import pathlib from cryptography.hazmat.backends import default_backend from cryptography import x509 -from auto_identity import der_encode_signature_algorithm_oid +from auto_identity import der_encode_signature_algorithm_oid, blake2b_256 def test_der_encode_algorithm2(): @@ -22,3 +22,10 @@ def test_der_encode_algorithm2(): # Compare the DER encoded OID with the result from tests in https://github.com/subspace/subspace/blob/d875a5aac35c1732eec61ce4359782eff58ff6fc/domains/pallets/auto-id/src/tests.rs#L127 from_rust_implementation = "300d06092a864886f70d01010b0500" assert (der_encoded_oid.hex() == from_rust_implementation) + + +def test_blake2b_256(): + data = b"Hello, world!" + hash = blake2b_256(data) + expected_hash = "b5da441cfe72ae042ef4d2b17742907f675de4da57462d4c3609c2e2ed755970" + assert hash.hex() == expected_hash From f8edeb928e1b8c684434393d76175b8500c8f759 Mon Sep 17 00:00:00 2001 From: Jeremy Frank Date: Thu, 11 Apr 2024 14:37:26 -0600 Subject: [PATCH 6/6] fix auto_id creation issue --- auto_identity/__init__.py | 2 +- auto_identity/certificate_manager.py | 4 ++-- setup.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/auto_identity/__init__.py b/auto_identity/__init__.py index 77dad1a..7ff4c69 100644 --- a/auto_identity/__init__.py +++ b/auto_identity/__init__.py @@ -20,7 +20,7 @@ from .registry import Registry from .utils import der_encode_signature_algorithm_oid, blake2b_256 -__version__ = '0.1.5' +__version__ = '0.1.6' __all__ = [ "generate_rsa_key_pair", diff --git a/auto_identity/certificate_manager.py b/auto_identity/certificate_manager.py index a920e1c..9c10cdb 100644 --- a/auto_identity/certificate_manager.py +++ b/auto_identity/certificate_manager.py @@ -207,8 +207,8 @@ def issue_certificate(self, csr: x509.CertificateSigningRequest, validity_period raise ValueError( "Issuer certificate public key does not match the private key used for signing.") issuer_name = self.certificate.subject - auto_id = blake2b_256((self.get_subject_common_name( - issuer_name)+self.get_subject_common_name(csr.subject)).encode()) + auto_id = blake2b_256((self.get_certificate_auto_id( + self.certificate).encode() + self.get_subject_common_name(csr.subject).encode())) # Prepare the certificate builder with information from the CSR certificate_builder = x509.CertificateBuilder().subject_name( diff --git a/setup.py b/setup.py index ab6c507..6428000 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='auto-sdk', - version='0.1.5', + version='0.1.6', author='Autonomys', author_email='jeremy@subspace.network', url='https://github.com/subspace/auto-kit',