Skip to content

Commit

Permalink
Fix Jose JWE with certificates (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
OmegaPointZero authored Sep 20, 2022
1 parent efb476f commit 1034191
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ def RSAKey(key, algorithm):
key = codecs.encode(key, encoding='utf-8')

if types.is_bytelike(key):
if key.startswith(b'-----BEGIN CERTIFICATE-----'):
safe(self._process_cert)(key).unwrap()
return self

self.prepared_key = RSA.importKey(key)
return self

Expand Down
51 changes: 51 additions & 0 deletions larky/src/test/resources/quick_tests/test_customer_usecase.star
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
load("@stdlib//larky", "larky")
load("@stdlib//unittest","unittest")
load("@stdlib//json","json")
load("@stdlib//types", "types")
load("@stdlib//builtins", builtins="builtins")

load("@vendor//asserts","asserts")

load("@vgs//http/request", "VGSHttpRequest")

def process(input_http, ctx):
# Extract card BIN and add it to the body
body = json.loads(input_http.body.decode("utf-8"))
BIN = body['cardNumber'][:6]
body['BIN'] = BIN

# Add the BIN to the headers and change the X-Custom-Header
headers = input_http.headers
headers['X-BIN-Header'] = BIN
headers['X-Custom-Header'] = "I am a changed header"

# Set the body as builtins.bytes of the updated body and update the headers
input_http.body = builtins.bytes(json.dumps(body))
input_http.headers = headers

return input_http

def test_customer_case():
req_body = b'{"cardNumber": "4111111111111111"}'
headers = {"X-Custom-Header":"Header Value"}
request = VGSHttpRequest("http://example.com", data=req_body, headers=headers, method='POST')
context_variables = {}
larky_output = process(request, context_variables)

body = json.loads(larky_output._data.decode("utf-8"))
headers = larky_output.headers

# Test that the code has executed properly on your request
asserts.assert_that(body['BIN']).is_equal_to("411111")
asserts.assert_that(headers['X-Custom-Header']).is_equal_to("I am a changed header")
asserts.assert_that(headers['X-BIN-Header']).is_equal_to("411111")


def _testsuite():
_suite = unittest.TestSuite()
_suite.addTest(unittest.FunctionTestCase(test_customer_case))
return _suite


_runner = unittest.TextTestRunner()
_runner.run(_testsuite())
74 changes: 73 additions & 1 deletion larky/src/test/resources/vendor_tests/jose/test_jose.star
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ load("@vendor//jose/backends", AESKey="AESKey")
load("@vendor//jose/constants", ALGORITHMS="ALGORITHMS")
load("@vendor//jose/jwe", jwe="jwe")
load("@vendor//jose/jwk", jwk="jwk")
load("@vendor//jose/utils", base64url_encode="base64url_encode")
load("@vendor//jose/utils", base64url_encode="base64url_encode",
base64url_decode="base64url_decode")


def test_encrypt_and_decrypt_jwe_with_defaults():
Expand Down Expand Up @@ -196,6 +197,76 @@ def test_sign_with_ecc():

encoded_string = b".".join([headers, encoded_payload, encoded_signature])

def test_encrypt_and_decrypt_with_certificate():
certificate = """-----BEGIN CERTIFICATE-----
MIIDazCCAlOgAwIBAgIUHX5scwWw/5q3CzXVV2fjNun9/L0wDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMjA5MTkxNTU1MzJaFw0yNTA2
MTUxNTU1MzJaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQCsRBjGoF0D9XemfmiC+VNGRRcveDKCiQu4VEYa7J+q
fUSevUQfgqTXdp0VezPtfHnU/Y7iZmrHqspvrElyq4jqCyx9nRTVGuq2/Byi9w76
L1A8X5vh198sXk1cmsbQJhuct4B7vaglkPrXnE9z0yIuSP3rpVwDcTMdmrXO685O
jS2BQyM9svQMsk8xgEZ+AKZ9ck3kQGL3O+M7DU5abUqIJ2VVL0MaHI16ovsWnU86
r9DM/k+PCB9V6Q0rz64Ch+C0Xk25RCAJ+vTSHtoosSnKc9VpZ6A9qYZARhDeihqw
kHS3nAQjiFZwWahaPSZ342EYlmLYOTOyWp78QswxCI8BAgMBAAGjUzBRMB0GA1Ud
DgQWBBQccX0IKYxq5B1Z+iw5h7RZMUlg5jAfBgNVHSMEGDAWgBQccX0IKYxq5B1Z
+iw5h7RZMUlg5jAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAD
ayMFSwHIEKtfgR9e7ZTc/ZleLjSKblNTFHOM5rGOXemsL8ObqH/ndKbdZ6lERxtF
Lj+GkWoHNHRKSGhHYDHl7YNj5fA68k3bNwbrnf22c3kjISpgyjbGHHrhiHzLQpfS
A3fVFXcNdGmjZTQbJNd1dQOvpCR6bIALOshvjZ8v239pWU1SugvciwU7NVb/0U2o
FCGMKgwbq+sP25drieztv6Gr56+fjHXG0lhVtYjI9/Ig9xG33+FZMXsaG4uag0QT
KeQ4gDJgCc/gGBA0OumvV5efjiAVYl4uLmSUP/2YiOUgO0eAnX3Xz8CFeVOCBq4B
y3W7mjvEN6bJaR544JNF
-----END CERTIFICATE-----"""

header = {
"alg":"RSA-OAEP-256",
"enc":"A256GCM"
}
payload = b"Test JWE Payload"
json_headers = bytes(json.dumps(header), "utf-8")
headers = base64url_encode(json_headers)

encrypted = jwe.encrypt(payload, certificate, encryption="A256GCM", algorithm="RSA-OAEP-256")
jwe_header = encrypted.split(b".")[0]
enc_header = json.loads(base64url_decode(jwe_header).decode("utf-8"))

asserts.assert_that(enc_header['alg']).is_equal_to("RSA-OAEP-256")
asserts.assert_that(enc_header['enc']).is_equal_to("A256GCM")

rsa_private_key = """-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCsRBjGoF0D9Xem
fmiC+VNGRRcveDKCiQu4VEYa7J+qfUSevUQfgqTXdp0VezPtfHnU/Y7iZmrHqspv
rElyq4jqCyx9nRTVGuq2/Byi9w76L1A8X5vh198sXk1cmsbQJhuct4B7vaglkPrX
nE9z0yIuSP3rpVwDcTMdmrXO685OjS2BQyM9svQMsk8xgEZ+AKZ9ck3kQGL3O+M7
DU5abUqIJ2VVL0MaHI16ovsWnU86r9DM/k+PCB9V6Q0rz64Ch+C0Xk25RCAJ+vTS
HtoosSnKc9VpZ6A9qYZARhDeihqwkHS3nAQjiFZwWahaPSZ342EYlmLYOTOyWp78
QswxCI8BAgMBAAECggEAB8cfU0CEUpxvpY3JjDhToTWXYWZM6YXkiJMNg0OxxdHY
Gk6zV7TfWncZipHAe3WGTq6QF/rF0XQNpdMikdHa4a5VeOpxuVl4xYBGjrkW7Qbb
2Y37jMvhYLB1T7wRQ+6kioPigjPC9sc//CIrmDAtN+fFxzD1IZan1ytYEBqnevZj
42lvmlARnBpwmjNqtRuDnDkJ4GqTI+RRLqAQHDU+Sk2P7IWl727R5jJZIUqDNsAE
I3VM5qFE5xybObqjBK8tNENEqNn7uTAmik0JzCpbTHvBj+cY23YVE5g1bpk/h7WZ
QuRf09D5ShXQdigQCftUjlmyF2doI+vfNHb9pFB+0QKBgQDO2FzUMNfKsqpiwjzQ
03HA55w3Hzwa150SwGi1xqXnXPsLntvXxOxVZ1Ux8CMP4oSWgEmxa/hh0OzDbdyA
0B3WzuEZ6YJqcUTCFGZLW2tBEXfgERABZOdaVOuEx/TABRbRYgSQvpyqrMYcD2YW
hGss/I29EPvXKOtqMX7N31nhpQKBgQDVNBNLsqYMLlYSnn/bBG3JJPayXzw1kZvH
qNI2ceuuTbSt+KJIM0udpLciNMiWRR2RYUIgWWDutsZF6AIqZpD6+kxqC+p+uxHB
Xc+8Sp7655yHEStYX+a4nzH4E5VBq6MU4xOJ3y87B2VNI24aEc37CWk/VcZEzZuT
7iIzJ11BLQKBgQC0+v6N8oZ9JkKK0qTfmoJHZN98I2o1mj4m8A8uLTdv7h0CF+cH
LZgTSaxzW0dyWKHmBS11faEABQuEGxX55x6UmsK+J2AiviSJI8w1VzHK5vvaI1O7
xIvgr7i6nzH46PsEDR0tgHoXo8BbQOX0Aby8yeVCbh/MLFN+wPvQKgK8uQKBgGlQ
qPtqivVXajMWUkfo/yYt+SKRQpefjpjovrYgPfBC+C47tEX/+KktdT0TX8ZC6+El
btm17NjeNkDP40n4kkM3osl7i2EAnTusUHJNVgzQnhRmGcg0zy6BjNhjLAZdd1hY
9wzSz2zUMWkSSE/eXaZUtsWPZDoWanR/XCtylXEdAoGAZVQNPZ2mfiw6yQsRCaz5
ooM2PY1wa4FW47EYCVDEMrdr4Ri7twSoZj304BaQcFtFpk0OlHPWYzuiLBvUYjXy
z1+6y3A1leW114pIsrFPlDaBrCC6ZaBk89fATsuWexS9AAecqy/cCCsv/FviFvV+
6mSOrqb9AIY0fottKPSxjW4=
-----END PRIVATE KEY-----"""

decrypted = jwe.decrypt(encrypted, rsa_private_key)

asserts.assert_that(decrypted).is_equal_to(payload)

def _testsuite():
_suite = unittest.TestSuite()
Expand Down Expand Up @@ -225,6 +296,7 @@ def _testsuite():
_suite.addTest(unittest.FunctionTestCase(test_pbkdf2_hmac_aes_key_wrapped))
_suite.addTest(unittest.FunctionTestCase(test_sign_with_ecc))
_suite.addTest(unittest.FunctionTestCase(test_sign_with_rsa))
_suite.addTest(unittest.FunctionTestCase(test_encrypt_and_decrypt_with_certificate))

return _suite

Expand Down

0 comments on commit 1034191

Please sign in to comment.