Skip to content

Commit

Permalink
Port to cryptography.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomprince committed Sep 9, 2020
1 parent f405ffa commit c16b85c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
six>=1.10.0
pycryptodomex>=3.7.0
cryptography>=3.1
requests>=2.9.1
vdf>=3.3
gevent>=1.3.0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

install_requires = [
'six>=1.10',
'pycryptodomex>=3.7.0',
'cryptography>=3.1',
'requests>=2.9.1',
'vdf>=3.3',
'cachetools>=3.0.0',
Expand Down
79 changes: 61 additions & 18 deletions steam/core/crypto.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
"""
All function in this module take and return :class:`bytes`
"""
import hashlib
import sys
from base64 import b64decode
from os import urandom as random_bytes
from struct import pack
from base64 import b64decode

from Cryptodome.Hash import SHA1, HMAC
from Cryptodome.PublicKey.RSA import import_key as rsa_import_key, construct as rsa_construct
from Cryptodome.Cipher import PKCS1_OAEP, PKCS1_v1_5
from Cryptodome.Cipher import AES as AES
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.serialization import load_der_public_key


class UniverseKey(object):
"""Public keys for Universes"""

Public = rsa_import_key(b64decode("""
Public = load_der_public_key(b64decode("""
MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQDf7BrWLBBmLBc1OhSwfFkRf53T
2Ct64+AVzRkeRuh7h3SiGEYxqQMUeYKO6UWiSRKpI2hzic9pobFhRr3Bvr/WARvY
gdTckPv+T1JzZsuVcNfFjrocejN1oWI0Rrtgt4Bo+hOneoo3S57G9F1fOpn5nsQ6
Expand All @@ -39,8 +42,14 @@ def generate_session_key(hmac_secret=b''):
:rtype: :class:`tuple`
"""
session_key = random_bytes(32)
encrypted_session_key = PKCS1_OAEP.new(UniverseKey.Public, SHA1)\
.encrypt(session_key + hmac_secret)
encrypted_session_key = UniverseKey.Public.encrypt(
session_key + hmac_secret,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA256(),
label=None
)
)

return (session_key, encrypted_session_key)

Expand All @@ -49,7 +58,13 @@ def symmetric_encrypt(message, key):
return symmetric_encrypt_with_iv(message, key, iv)

def symmetric_encrypt_ecb(message, key):
return AES.new(key, AES.MODE_ECB).encrypt(pad(message))
padder = PKCS7(algorithms.AES.block_size).padder()
plaintext = padder.update(message)
plaintext += padder.finalize()
encryptor = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
cyphertext = encryptor.update(plaintext)
cyphertext += encryptor.finalize()
return cyphertext

def symmetric_encrypt_HMAC(message, key, hmac_secret):
prefix = random_bytes(3)
Expand All @@ -58,19 +73,33 @@ def symmetric_encrypt_HMAC(message, key, hmac_secret):
return symmetric_encrypt_with_iv(message, key, iv)

def symmetric_encrypt_iv(iv, key):
return AES.new(key, AES.MODE_ECB).encrypt(iv)
encryptor = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
cyphertext = encryptor.update(iv)
cyphertext += encryptor.finalize()
return cyphertext

def symmetric_encrypt_with_iv(message, key, iv):
encrypted_iv = symmetric_encrypt_iv(iv, key)
cyphertext = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(message))
padder = PKCS7(algorithms.AES.block_size).padder()
plaintext = padder.update(message)
plaintext += padder.finalize()
encryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).encryptor()
cyphertext = encryptor.update(plaintext)
cyphertext += encryptor.finalize()
return encrypted_iv + cyphertext

def symmetric_decrypt(cyphertext, key):
iv = symmetric_decrypt_iv(cyphertext, key)
return symmetric_decrypt_with_iv(cyphertext, key, iv)

def symmetric_decrypt_ecb(cyphertext, key):
return unpad(AES.new(key, AES.MODE_ECB).decrypt(cyphertext))
decryptor = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
plaintext = decryptor.update(cyphertext)
plaintext += decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
message = unpadder.update(plaintext)
message += unpadder.finalize()
return message

def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
""":raises: :class:`RuntimeError` when HMAC verification fails"""
Expand All @@ -85,19 +114,33 @@ def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
return message

def symmetric_decrypt_iv(cyphertext, key):
return AES.new(key, AES.MODE_ECB).decrypt(cyphertext[:BS])
decryptor = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
iv = decryptor.update(cyphertext[:BS])
iv += decryptor.finalize()
return iv

def symmetric_decrypt_with_iv(cyphertext, key, iv):
return unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cyphertext[BS:]))
decryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).decryptor()
plaintext = decryptor.update(cyphertext[BS:])
plaintext += decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
message = unpadder.update(plaintext)
message += unpadder.finalize()
return message

def hmac_sha1(secret, data):
return HMAC.new(secret, data, SHA1).digest()
h = HMAC(secret, hashes.SHA1())
h.update(data)
return h.finalize()

def sha1_hash(data):
return SHA1.new(data).digest()
return hashlib.sha1(data).digest()

def rsa_publickey(mod, exp):
return rsa_construct((mod, exp))
return rsa.RSAPublicNumbers(e=exp, n=mod).public_key()

def pkcs1v15_encrypt(key, message):
return PKCS1_v1_5.new(key).encrypt(message)
key.encrypt(
message,
padding.PKCS1v15,
)

0 comments on commit c16b85c

Please sign in to comment.