1
+ # Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2
+ # SPDX-License-Identifier: Apache-2.0
3
+
4
+ from aws_encryption_sdk .key_providers .raw import RawMasterKeyProvider
5
+ from aws_encryption_sdk .identifiers import EncryptionKeyType , WrappingAlgorithm
6
+ from aws_encryption_sdk .internal .crypto .wrapping_keys import WrappingKey
7
+ from aws_encryption_sdk .internal .str_ops import to_str
8
+ from aws_cryptographic_material_providers .internaldafny .generated .RawAESKeyring import RawAESKeyring
9
+ from aws_cryptographic_material_providers .internaldafny .generated .AwsKmsKeyring import AwsKmsKeyring
10
+ from aws_cryptographic_material_providers .internaldafny .generated .AwsKmsMrkKeyring import AwsKmsMrkKeyring
11
+ from aws_cryptographic_material_providers .internaldafny .generated .AwsKmsDiscoveryKeyring import AwsKmsDiscoveryKeyring
12
+ from aws_cryptographic_material_providers .internaldafny .generated .AwsKmsMrkDiscoveryKeyring import AwsKmsMrkDiscoveryKeyring
13
+ from aws_encryption_sdk .key_providers .kms import (
14
+ MRKAwareDiscoveryAwsKmsMasterKeyProvider ,
15
+ MRKAwareStrictAwsKmsMasterKeyProvider ,
16
+ )
17
+ import aws_cryptographic_material_providers .smithygenerated .aws_cryptography_materialproviders .dafny_to_smithy
18
+ import aws_encryption_sdk
19
+ class StaticMasterKeyProvider (RawMasterKeyProvider ):
20
+ """Generates 256-bit keys for each unique key ID."""
21
+
22
+ def __init__ (self , ** kwargs ): # pylint: disable=unused-argument
23
+ """Initialize empty map of keys."""
24
+ self ._static_keys = {}
25
+ self .provider_id = ""
26
+
27
+ @property
28
+ def static_keys (self ):
29
+ return self ._static_keys
30
+
31
+ # The key namespace in the Raw keyrings is equivalent to Provider ID (or Provider) field
32
+ # in the Raw Master Key Providers
33
+ @property
34
+ def provider_id (self ):
35
+ return self ._provider_id
36
+
37
+ @property
38
+ def wrapping_key_type (self ):
39
+ return self ._wrapping_key_type
40
+
41
+ @property
42
+ def wrapping_algorithm (self ):
43
+ return self ._wrapping_algorithm
44
+
45
+ @provider_id .setter
46
+ def provider_id (self , value ):
47
+ self ._provider_id = value
48
+
49
+ @static_keys .setter
50
+ def static_keys (self , static_key_dict ):
51
+ self ._static_keys = static_key_dict
52
+
53
+ @wrapping_key_type .setter
54
+ def wrapping_key_type (self , wrapping_key_type ):
55
+ self ._wrapping_key_type = wrapping_key_type
56
+
57
+ @wrapping_key_type .setter
58
+ def wrapping_algorithm (self , wrapping_algorithm ):
59
+ self ._wrapping_algorithm = wrapping_algorithm
60
+
61
+ def _get_raw_key (self , key_id ):
62
+ """Returns a static, symmetric key for the specified key ID.
63
+
64
+ :param str key_id: Key ID
65
+ :returns: Wrapping key that contains the specified static key
66
+ :rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
67
+ """
68
+ static_key = self ._static_keys [to_str (key_id )] # add_master_key changes it to bytes and we have to use internal to_str
69
+ return WrappingKey (
70
+ wrapping_algorithm = self ._wrapping_algorithm ,
71
+ wrapping_key = static_key ,
72
+ wrapping_key_type = self ._wrapping_key_type ,
73
+ )
74
+
75
+ def create_raw_aes_key_provider (key_name , key_namespace , key ):
76
+ # Create a Raw AES master key provider.
77
+
78
+ # The key name in the Raw keyrings is equivalent to the Key ID field
79
+ # in the Raw Master Key Providers
80
+ key_id = key_name
81
+ static_key_map = {key_id : key }
82
+ key_provider = StaticMasterKeyProvider ()
83
+ key_provider .static_keys = static_key_map
84
+ key_provider .provider_id = key_namespace
85
+ key_provider .wrapping_key_type = EncryptionKeyType .SYMMETRIC
86
+ key_provider .wrapping_algorithm = WrappingAlgorithm .AES_256_GCM_IV12_TAG16_NO_PADDING
87
+ key_provider .add_master_key (key_name )
88
+
89
+ return key_provider
90
+
91
+ def keyring_to_mkp (keyring ):
92
+ if (isinstance (keyring , RawAESKeyring )):
93
+ mkp_key_id = bytes (keyring .wrappingKey )
94
+ # The key name in the Raw keyrings is equivalent to the Key ID field
95
+ # in the Raw Master Key Providers
96
+ mkp_key = b"" .join (
97
+ ord (chr (c )).to_bytes (2 , "big" ) for c in keyring .keyName
98
+ ).decode ("utf-16-be" )
99
+ mkpProvider_id = b"" .join (
100
+ ord (chr (c )).to_bytes (2 , "big" ) for c in keyring .keyNamespace
101
+ ).decode ("utf-16-be" )
102
+ return create_raw_aes_key_provider (mkp_key , mkpProvider_id , mkp_key_id )
103
+
104
+ if (isinstance (keyring , AwsKmsKeyring )):
105
+ aws_kms_arn = string_to_native (keyring .awsKmsKey )
106
+ return aws_encryption_sdk .StrictAwsKmsMasterKeyProvider (key_ids = [
107
+ aws_kms_arn ,
108
+ ])
109
+
110
+ if (isinstance (keyring , AwsKmsMrkKeyring )):
111
+ aws_kms_arn = string_to_native (keyring .awsKmsKey )
112
+ kwargs = dict (key_ids = [aws_kms_arn ])
113
+ return MRKAwareStrictAwsKmsMasterKeyProvider (** kwargs )
114
+
115
+ if (isinstance (keyring , AwsKmsDiscoveryKeyring )):
116
+ if keyring .discoveryFilter .is_Some :
117
+ discovery_filter = aws_cryptographic_material_providers .smithygenerated .aws_cryptography_materialproviders .dafny_to_smithy .aws_cryptography_materialproviders_DiscoveryFilter (
118
+ keyring .discoveryFilter .UnwrapOr (None )
119
+ )
120
+ kwargs = dict (discovery_filter = discovery_filter )
121
+ aws_encryption_sdk .DiscoveryAwsKmsMasterKeyProvider (** kwargs )
122
+ return aws_encryption_sdk .DiscoveryAwsKmsMasterKeyProvider ()
123
+
124
+ if (isinstance (keyring , AwsKmsMrkDiscoveryKeyring )):
125
+ region = string_to_native (keyring .region )
126
+ if keyring .discoveryFilter .is_Some :
127
+ discovery_filter = aws_cryptographic_material_providers .smithygenerated .aws_cryptography_materialproviders .dafny_to_smithy .aws_cryptography_materialproviders_DiscoveryFilter (
128
+ keyring .discoveryFilter .UnwrapOr (None )
129
+ )
130
+ kwargs = dict (discovery_filter = discovery_filter , discovery_region = region )
131
+ aws_encryption_sdk .MRKAwareDiscoveryAwsKmsMasterKeyProvider (** kwargs )
132
+ kwargs = dict (discovery_region = region )
133
+ return MRKAwareDiscoveryAwsKmsMasterKeyProvider (** kwargs )
134
+ raise ValueError ("No keyring matched to convert to MKP. Input keyring type: " + str (type (keyring )))
135
+
136
+
137
+ def string_to_native (arn ):
138
+ return b"" .join (
139
+ ord (c ).to_bytes (2 , "big" ) for c in arn
140
+ ).decode ("utf-16-be" )
0 commit comments