1
- from base64 import b64encode , b64decode
1
+ from base64 import b64decode , b64encode
2
+ from collections import namedtuple
2
3
from enum import Enum
3
4
4
- from lxml import etree
5
- from lxml .etree import Element , SubElement
6
-
5
+ from cryptography .hazmat .backends import default_backend
7
6
from cryptography .hazmat .primitives .asymmetric import dsa , ec , rsa , utils
8
7
from cryptography .hazmat .primitives .asymmetric .padding import PKCS1v15
9
- from cryptography .hazmat .primitives .hashes import Hash , SHA1 , SHA224 , SHA256 , SHA384 , SHA512
8
+ from cryptography .hazmat .primitives .hashes import SHA1 , SHA224 , SHA256 , SHA384 , SHA512 , Hash
10
9
from cryptography .hazmat .primitives .serialization import load_der_public_key
11
- from cryptography .hazmat .backends import default_backend
10
+ from lxml import etree
11
+ from lxml .etree import Element , SubElement
12
12
13
- from .exceptions import InvalidSignature , InvalidDigest , InvalidInput , InvalidCertificate # noqa
14
- from .util import (bytes_to_long , long_to_bytes , strip_pem_header , add_pem_header , ensure_bytes , ensure_str , Namespace ,
15
- XMLProcessor , iterate_pem , verify_x509_cert_chain , bits_to_bytes_unit )
16
- from collections import namedtuple
13
+ from .exceptions import InvalidCertificate , InvalidDigest , InvalidInput , InvalidSignature # noqa
14
+ from .util import (
15
+ Namespace ,
16
+ XMLProcessor ,
17
+ _remove_sig ,
18
+ add_pem_header ,
19
+ bits_to_bytes_unit ,
20
+ bytes_to_long ,
21
+ ensure_bytes ,
22
+ ensure_str ,
23
+ iterate_pem ,
24
+ long_to_bytes ,
25
+ strip_pem_header ,
26
+ verify_x509_cert_chain ,
27
+ )
17
28
18
29
methods = Enum ("Methods" , "enveloped enveloping detached" )
19
30
27
38
xenc11 = "http://www.w3.org/2009/xmlenc11#"
28
39
)
29
40
41
+
30
42
def ds_tag (tag ):
31
43
return "{" + namespaces .ds + "}" + tag
32
44
45
+
33
46
def dsig11_tag (tag ):
34
47
return "{" + namespaces .dsig11 + "}" + tag
35
48
49
+
36
50
def ec_tag (tag ):
37
51
return "{" + namespaces .ec + "}" + tag
38
52
39
- def _remove_sig (signature , idempotent = False ):
40
- """
41
- Remove the signature node from its parent, keeping any tail element.
42
- This is needed for eneveloped signatures.
43
-
44
- :param signature: Signature to remove from payload
45
- :type signature: XML ElementTree Element
46
- :param idempotent:
47
- If True, don't raise an error if signature is already detached from parent.
48
- :type idempotent: boolean
49
- """
50
- try :
51
- signaturep = next (signature .iterancestors ())
52
- except StopIteration :
53
- if idempotent :
54
- return
55
- raise ValueError ("Can't remove the root signature node" )
56
- if signature .tail is not None :
57
- try :
58
- signatures = next (signature .itersiblings (preceding = True ))
59
- except StopIteration :
60
- if signaturep .text is not None :
61
- signaturep .text = signaturep .text + signature .tail
62
- else :
63
- signaturep .text = signature .tail
64
- else :
65
- if signatures .tail is not None :
66
- signatures .tail = signatures .tail + signature .tail
67
- else :
68
- signatures .tail = signature .tail
69
- signaturep .remove (signature )
70
53
71
54
class VerifyResult (namedtuple ("VerifyResult" , "signed_data signed_xml signature_xml" )):
72
55
"""
@@ -84,6 +67,7 @@ class VerifyResult(namedtuple("VerifyResult", "signed_data signed_xml signature_
84
67
verified_data = signxml.XMLVerifier().verify(input_data).signed_xml
85
68
"""
86
69
70
+
87
71
class XMLSignatureProcessor (XMLProcessor ):
88
72
schema_file = "xmldsig1-schema.xsd"
89
73
@@ -245,6 +229,7 @@ def _resolve_reference(self, doc_root, reference, uri_resolver=None):
245
229
raise InvalidInput ("Unable to resolve reference URI: {}" .format (uri ))
246
230
return result
247
231
232
+
248
233
class XMLSigner (XMLSignatureProcessor ):
249
234
"""
250
235
Create a new XML Signature Signer object, which can be used to hold configuration information and sign multiple
@@ -427,7 +412,7 @@ def sign(self, data, key=None, passphrase=None, cert=None, reference_uri=None, k
427
412
if isinstance (cert , (str , bytes )):
428
413
x509_certificate .text = strip_pem_header (cert )
429
414
else :
430
- from OpenSSL .crypto import dump_certificate , FILETYPE_PEM
415
+ from OpenSSL .crypto import FILETYPE_PEM , dump_certificate
431
416
x509_certificate .text = strip_pem_header (dump_certificate (FILETYPE_PEM , cert ))
432
417
else :
433
418
sig_root .append (key_info )
@@ -568,13 +553,14 @@ def _serialize_key_value(self, key, key_info_element):
568
553
e .text = ensure_str (b64encode (long_to_bytes (getattr (key_params , field ))))
569
554
elif self .sign_alg .startswith ("ecdsa-" ):
570
555
ec_key_value = SubElement (key_value , dsig11_tag ("ECKeyValue" ), nsmap = dict (dsig11 = namespaces .dsig11 ))
571
- named_curve = SubElement (ec_key_value , dsig11_tag ("NamedCurve" ),
556
+ named_curve = SubElement (ec_key_value , dsig11_tag ("NamedCurve" ), # noqa:F841
572
557
URI = self .known_ecdsa_curve_oids [key .curve .name ])
573
558
public_key = SubElement (ec_key_value , dsig11_tag ("PublicKey" ))
574
559
x = key .public_key ().public_numbers ().x
575
560
y = key .public_key ().public_numbers ().y
576
561
public_key .text = ensure_str (b64encode (long_to_bytes (4 ) + long_to_bytes (x ) + long_to_bytes (y )))
577
562
563
+
578
564
class XMLVerifier (XMLSignatureProcessor ):
579
565
"""
580
566
Create a new XML Signature Verifier object, which can be used to hold configuration information and verify multiple
@@ -596,8 +582,8 @@ def _verify_signature_with_pubkey(self, signed_info_c14n, raw_signature, key_val
596
582
named_curve = self ._find (ec_key_value , "NamedCurve" , namespace = "dsig11" )
597
583
public_key = self ._find (ec_key_value , "PublicKey" , namespace = "dsig11" )
598
584
key_data = b64decode (public_key .text )[1 :]
599
- x = bytes_to_long (key_data [:len (key_data )// 2 ])
600
- y = bytes_to_long (key_data [len (key_data )// 2 :])
585
+ x = bytes_to_long (key_data [:len (key_data ) // 2 ])
586
+ y = bytes_to_long (key_data [len (key_data ) // 2 :])
601
587
curve_class = self .known_ecdsa_curves [named_curve .get ("URI" )]
602
588
key = ec .EllipticCurvePublicNumbers (x = x , y = y , curve = curve_class ()).public_key (backend = default_backend ())
603
589
elif not isinstance (key , ec .EllipticCurvePublicKey ):
@@ -818,7 +804,9 @@ def verify(self, data, require_x509=True, x509_cert=None, cert_subject_name=None
818
804
inclusive_ns_prefixes = inclusive_ns_prefixes )
819
805
820
806
if x509_data is not None or self .require_x509 :
821
- from OpenSSL .crypto import load_certificate , X509 , FILETYPE_PEM , verify , Error as OpenSSLCryptoError
807
+ from OpenSSL .crypto import FILETYPE_PEM , X509
808
+ from OpenSSL .crypto import Error as OpenSSLCryptoError
809
+ from OpenSSL .crypto import load_certificate , verify
822
810
823
811
if self .x509_cert is None :
824
812
if x509_data is None :
@@ -939,8 +927,8 @@ def check_key_value_matches_cert_public_key(self, key_value, public_key, signatu
939
927
named_curve = self ._find (ec_key_value , "NamedCurve" , namespace = "dsig11" )
940
928
public_key = self ._find (ec_key_value , "PublicKey" , namespace = "dsig11" )
941
929
key_data = b64decode (public_key .text )[1 :]
942
- x = bytes_to_long (key_data [:len (key_data )// 2 ])
943
- y = bytes_to_long (key_data [len (key_data )// 2 :])
930
+ x = bytes_to_long (key_data [:len (key_data ) // 2 ])
931
+ y = bytes_to_long (key_data [len (key_data ) // 2 :])
944
932
curve_class = self .known_ecdsa_curves [named_curve .get ("URI" )]
945
933
946
934
pubk_curve = public_key .to_cryptography_key ().public_numbers ().curve
0 commit comments