Skip to content

Commit

Permalink
Migrate some crypt tests to pytest
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Algarvio <[email protected]>
  • Loading branch information
s0undt3ch authored and vzhestkov committed Jan 23, 2025
1 parent efb014e commit 6914177
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 658 deletions.
306 changes: 0 additions & 306 deletions tests/pytests/unit/crypt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,196 +1,3 @@
import getpass
import logging
import os

import salt.utils.files
import salt.utils.stringutils
from salt.exceptions import InvalidKeyError

try:
from M2Crypto import BIO, EVP, RSA

HAS_M2 = True
except ImportError:
HAS_M2 = False

if not HAS_M2:
try:
from Cryptodome import Random
from Cryptodome.Cipher import AES, PKCS1_OAEP
from Cryptodome.Cipher import PKCS1_v1_5 as PKCS1_v1_5_CIPHER
from Cryptodome.Hash import SHA
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5

HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False

if not HAS_M2 and not HAS_CRYPTO:
try:
# let this be imported, if possible
from Crypto import Random # nosec
from Crypto.Cipher import AES, PKCS1_OAEP # nosec
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_v1_5_CIPHER # nosec
from Crypto.Hash import SHA # nosec
from Crypto.PublicKey import RSA # nosec
from Crypto.Signature import PKCS1_v1_5 # nosec

HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False

log = logging.getLogger(__name__)


def legacy_gen_keys(keydir, keyname, keysize, user=None, passphrase=None):
"""
Generate a RSA public keypair for use with salt
:param str keydir: The directory to write the keypair to
:param str keyname: The type of salt server for whom this key should be written. (i.e. 'master' or 'minion')
:param int keysize: The number of bits in the key
:param str user: The user on the system who should own this keypair
:param str passphrase: The passphrase which should be used to encrypt the private key
:rtype: str
:return: Path on the filesystem to the RSA private key
"""
base = os.path.join(keydir, keyname)
priv = f"{base}.pem"
pub = f"{base}.pub"

# gen = rsa.generate_private_key(e, keysize)
if HAS_M2:
gen = RSA.gen_key(keysize, 65537, lambda: None)
else:
gen = RSA.generate(bits=keysize, e=65537)

if os.path.isfile(priv):
# Between first checking and the generation another process has made
# a key! Use the winner's key
return priv

# Do not try writing anything, if directory has no permissions.
if not os.access(keydir, os.W_OK):
raise OSError(
'Write access denied to "{}" for user "{}".'.format(
os.path.abspath(keydir), getpass.getuser()
)
)

with salt.utils.files.set_umask(0o277):
if HAS_M2:
# if passphrase is empty or None use no cipher
if not passphrase:
gen.save_pem(priv, cipher=None)
else:
gen.save_pem(
priv,
cipher="des_ede3_cbc",
callback=lambda x: salt.utils.stringutils.to_bytes(passphrase),
)
else:
with salt.utils.files.fopen(priv, "wb+") as f:
f.write(gen.exportKey("PEM", passphrase))
if HAS_M2:
gen.save_pub_key(pub)
else:
with salt.utils.files.fopen(pub, "wb+") as f:
f.write(gen.publickey().exportKey("PEM"))
os.chmod(priv, 0o400)
if user:
try:
import pwd

uid = pwd.getpwnam(user).pw_uid
os.chown(priv, uid, -1)
os.chown(pub, uid, -1)
except (KeyError, ImportError, OSError):
# The specified user was not found, allow the backup systems to
# report the error
pass
return priv


class LegacyPrivateKey:

def __init__(self, path, passphrase=None):
if HAS_M2:
self.key = RSA.load_key(path, lambda x: bytes(passphrase))
else:
with salt.utils.files.fopen(path) as f:
self.key = RSA.importKey(f.read(), passphrase)

def encrypt(self, data):
if HAS_M2:
return self.key.private_encrypt(data, salt.utils.rsax931.RSA_X931_PADDING)
else:
return salt.utils.rsax931.RSAX931Signer(self.key.exportKey("PEM")).sign(
data
)

def sign(self, data):
if HAS_M2:
md = EVP.MessageDigest("sha1")
md.update(salt.utils.stringutils.to_bytes(data))
digest = md.final()
return self.key.sign(digest)
else:
signer = PKCS1_v1_5.new(self.key)
return signer.sign(SHA.new(salt.utils.stringutils.to_bytes(data)))


class LegacyPublicKey:
def __init__(self, path, _HAS_M2=HAS_M2):
self._HAS_M2 = _HAS_M2
if self._HAS_M2:
with salt.utils.files.fopen(path, "rb") as f:
data = f.read().replace(b"RSA ", b"")
bio = BIO.MemoryBuffer(data)
try:
self.key = RSA.load_pub_key_bio(bio)
except RSA.RSAError:
raise InvalidKeyError("Encountered bad RSA public key")
else:
with salt.utils.files.fopen(path) as f:
try:
self.key = RSA.importKey(f.read())
except (ValueError, IndexError, TypeError):
raise InvalidKeyError("Encountered bad RSA public key")

def encrypt(self, data):
bdata = salt.utils.stringutils.to_bytes(data)
if self._HAS_M2:
return self.key.public_encrypt(bdata, RSA.pkcs1_oaep_padding)
else:
return PKCS1_OAEP.new(self.key).encrypt(bdata)

def verify(self, data, signature):
if self._HAS_M2:
md = EVP.MessageDigest("sha1")
md.update(salt.utils.stringutils.to_bytes(data))
digest = md.final()
try:
return self.key.verify(digest, signature)
except RSA.RSAError as exc:
log.debug("Signature verification failed: %s", exc.args[0])
return False
else:
verifier = PKCS1_v1_5.new(self.key)
return verifier.verify(
SHA.new(salt.utils.stringutils.to_bytes(data)), signature
)

def decrypt(self, data):
data = salt.utils.stringutils.to_bytes(data)
if HAS_M2:
return self.key.public_decrypt(data, salt.utils.rsax931.RSA_X931_PADDING)
else:
verifier = salt.utils.rsax931.RSAX931Verifier(self.key.exportKey("PEM"))
return verifier.verify(data)


PRIVKEY_DATA = (
"-----BEGIN RSA PRIVATE KEY-----\n"
"MIIEpAIBAAKCAQEA75GR6ZTv5JOv90Vq8tKhKC7YQnhDIo2hM0HVziTEk5R4UQBW\n"
Expand Down Expand Up @@ -249,116 +56,3 @@ def decrypt(self, data):
b"\x98\x8a\x8a&#\xb9(#?\x80\x15\x9eW\xb5\x12\xd1\x95S\xf2<G\xeb\xf1"
b"\x14H\xb2\xc4>\xc3A\xed\x86x~\xcfU\xd5Q\xfe~\x10\xd2\x9b"
)

SIGNATURE = (
b"w\xac\xfe18o\xeb\xfb\x14+\x9e\xd1\xb7\x7fe}\xec\xd6\xe1P\x9e\xab"
b"\xb5\x07\xe0\xc1\xfd\xda#\x04Z\x8d\x7f\x0b\x1f}:~\xb2s\x860u\x02N"
b'\xd4q"\xb7\x86*\x8f\x1f\xd0\x9d\x11\x92\xc5~\xa68\xac>\x12H\xc2%y,'
b"\xe6\xceU\x1e\xa3?\x0c,\xf0u\xbb\xd0[g_\xdd\x8b\xb0\x95:Y\x18\xa5*"
b"\x99\xfd\xf3K\x92\x92 ({\xd1\xff\xd9F\xc8\xd6K\x86e\xf9\xa8\xad\xb0z"
b"\xe3\x9dD\xf5k\x8b_<\xe7\xe7\xec\xf3\"'\xd5\xd2M\xb4\xce\x1a\xe3$"
b"\x9c\x81\xad\xf9\x11\xf6\xf5>)\xc7\xdd\x03&\xf7\x86@ks\xa6\x05\xc2"
b"\xd0\xbd\x1a7\xfc\xde\xe6\xb0\xad!\x12#\xc86Y\xea\xc5\xe3\xe2\xb3"
b"\xc9\xaf\xfa\x0c\xf2?\xbf\x93w\x18\x9e\x0b\xa2a\x10:M\x05\x89\xe2W.Q"
b"\xe8;yGT\xb1\xf2\xc6A\xd2\xc4\xbeN\xb3\xcfS\xaf\x03f\xe2\xb4)\xe7\xf6"
b'\xdbs\xd0Z}8\xa4\xd2\x1fW*\xe6\x1c"\x8b\xd0\x18w\xb9\x7f\x9e\x96\xa3'
b"\xd9v\xf7\x833\x8e\x01"
)

TEST_KEY = (
"-----BEGIN RSA PUBLIC KEY-----\n"
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzLtFhsvfbFDFaUgulSEX\n"
"Gl12XriL1DT78Ef2/u8HHaSMmPie37BLWas/zaHwI6066bIyYQJ/nUCahTaoHM7L\n"
"GlWc0wOU6zyfpihCRQHil05Y6F+olFBoZuYbFPvtp7/hJx/D7I/0n2o/c7M5i3Y2\n"
"3sBxAYNooIQHXHUmPQW6C9iu95ylZDW8JQzYy/EI4vCC8yQMdTK8jK1FQV0Sbwny\n"
"qcMxSyAWDoFbnhh2P2TnO8HOWuUOaXR8ZHOJzVcDl+a6ew+medW090x3K5O1f80D\n"
"+WjgnG6b2HG7VQpOCfM2GALD/FrxicPilvZ38X1aLhJuwjmVE4LAAv8DVNJXohaO\n"
"WQIDAQAB\n"
"-----END RSA PUBLIC KEY-----\n"
)

PRIV_KEY = """
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAoAsMPt+4kuIG6vKyw9r3+OuZrVBee/2vDdVetW+Js5dTlgrJ
aghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLnyHNJ/HpVhMG0M07MF6FMfILtDrrt8
ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+fu6HYwu96HggmG2pqkOrn3iGfqBvV
YVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpef8vRUrNicRLc7dAcvfhtgt2DXEZ2
d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvTIIPQIjR8htFxGTz02STVXfnhnJ0Z
k8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cYOwIDAQABAoIBABZUJEO7Y91+UnfC
H6XKrZEZkcnH7j6/UIaOD9YhdyVKxhsnax1zh1S9vceNIgv5NltzIsfV6vrb6v2K
Dx/F7Z0O0zR5o+MlO8ZncjoNKskex10gBEWG00Uqz/WPlddiQ/TSMJTv3uCBAzp+
S2Zjdb4wYPUlgzSgb2ygxrhsRahMcSMG9PoX6klxMXFKMD1JxiY8QfAHahPzQXy9
F7COZ0fCVo6BE+MqNuQ8tZeIxu8mOULQCCkLFwXmkz1FpfK/kNRmhIyhxwvCS+z4
JuErW3uXfE64RLERiLp1bSxlDdpvRO2R41HAoNELTsKXJOEt4JANRHm/CeyA5wsh
NpscufUCgYEAxhgPfcMDy2v3nL6KtkgYjdcOyRvsAF50QRbEa8ldO+87IoMDD/Oe
osFERJ5hhyyEO78QnaLVegnykiw5DWEF02RKMhD/4XU+1UYVhY0wJjKQIBadsufB
2dnaKjvwzUhPh5BrBqNHl/FXwNCRDiYqXa79eWCPC9OFbZcUWWq70s8CgYEAztOI
61zRfmXJ7f70GgYbHg+GA7IrsAcsGRITsFR82Ho0lqdFFCxz7oK8QfL6bwMCGKyk
nzk+twh6hhj5UNp18KN8wktlo02zTgzgemHwaLa2cd6xKgmAyuPiTgcgnzt5LVNG
FOjIWkLwSlpkDTl7ZzY2QSy7t+mq5d750fpIrtUCgYBWXZUbcpPL88WgDB7z/Bjg
dlvW6JqLSqMK4b8/cyp4AARbNp12LfQC55o5BIhm48y/M70tzRmfvIiKnEc/gwaE
NJx4mZrGFFURrR2i/Xx5mt/lbZbRsmN89JM+iKWjCpzJ8PgIi9Wh9DIbOZOUhKVB
9RJEAgo70LvCnPTdS0CaVwKBgDJW3BllAvw/rBFIH4OB/vGnF5gosmdqp3oGo1Ik
jipmPAx6895AH4tquIVYrUl9svHsezjhxvjnkGK5C115foEuWXw0u60uiTiy+6Pt
2IS0C93VNMulenpnUrppE7CN2iWFAiaura0CY9fE/lsVpYpucHAWgi32Kok+ZxGL
WEttAoGAN9Ehsz4LeQxEj3x8wVeEMHF6OsznpwYsI2oVh6VxpS4AjgKYqeLVcnNi
TlZFsuQcqgod8OgzA91tdB+Rp86NygmWD5WzeKXpCOg9uA+y/YL+0sgZZHsuvbK6
PllUgXdYxqClk/hdBFB7v9AQoaj7K9Ga22v32msftYDQRJ94xOI=
-----END RSA PRIVATE KEY-----
"""


PUB_KEY = """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoAsMPt+4kuIG6vKyw9r3
+OuZrVBee/2vDdVetW+Js5dTlgrJaghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLny
HNJ/HpVhMG0M07MF6FMfILtDrrt8ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+f
u6HYwu96HggmG2pqkOrn3iGfqBvVYVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpe
f8vRUrNicRLc7dAcvfhtgt2DXEZ2d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvT
IIPQIjR8htFxGTz02STVXfnhnJ0Zk8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cY
OwIDAQAB
-----END PUBLIC KEY-----
"""

PRIV_KEY2 = """
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAp+8cTxguO6Vg+YO92VfHgNld3Zy8aM3JbZvpJcjTnis+YFJ7
Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvTsMBZWvmUoEVUj1Xg8XXQkBvb9Ozy
Gqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc2cKeCVvWFqDi0GRFGzyaXLaX3PPm
M7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbuT1OqDfufXWQl/82JXeiwU2cOpqWq
7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww3oJSwvMbAmgzvOhqqhlqv+K7u0u7
FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQbQIDAQABAoIBAADrqWDQnd5DVZEA
lR+WINiWuHJAy/KaIC7K4kAMBgbxrz2ZbiY9Ok/zBk5fcnxIZDVtXd1sZicmPlro
GuWodIxdPZAnWpZ3UtOXUayZK/vCP1YsH1agmEqXuKsCu6Fc+K8VzReOHxLUkmXn
FYM+tixGahXcjEOi/aNNTWitEB6OemRM1UeLJFzRcfyXiqzHpHCIZwBpTUAsmzcG
QiVDkMTKubwo/m+PVXburX2CGibUydctgbrYIc7EJvyx/cpRiPZXo1PhHQWdu4Y1
SOaC66WLsP/wqvtHo58JQ6EN/gjSsbAgGGVkZ1xMo66nR+pLpR27coS7o03xCks6
DY/0mukCgYEAuLIGgBnqoh7YsOBLd/Bc1UTfDMxJhNseo+hZemtkSXz2Jn51322F
Zw/FVN4ArXgluH+XsOhvG/MFFpojwZSrb0Qq5b1MRdo9qycq8lGqNtlN1WHqosDQ
zW29kpL0tlRrSDpww3wRESsN9rH5XIrJ1b3ZXuO7asR+KBVQMy/+NcUCgYEA6MSC
c+fywltKPgmPl5j0DPoDe5SXE/6JQy7w/vVGrGfWGf/zEJmhzS2R+CcfTTEqaT0T
Yw8+XbFgKAqsxwtE9MUXLTVLI3sSUyE4g7blCYscOqhZ8ItCUKDXWkSpt++rG0Um
1+cEJP/0oCazG6MWqvBC4NpQ1nzh46QpjWqMwokCgYAKDLXJ1p8rvx3vUeUJW6zR
dfPlEGCXuAyMwqHLxXgpf4EtSwhC5gSyPOtx2LqUtcrnpRmt6JfTH4ARYMW9TMef
QEhNQ+WYj213mKP/l235mg1gJPnNbUxvQR9lkFV8bk+AGJ32JRQQqRUTbU+yN2MQ
HEptnVqfTp3GtJIultfwOQKBgG+RyYmu8wBP650izg33BXu21raEeYne5oIqXN+I
R5DZ0JjzwtkBGroTDrVoYyuH1nFNEh7YLqeQHqvyufBKKYo9cid8NQDTu+vWr5UK
tGvHnwdKrJmM1oN5JOAiq0r7+QMAOWchVy449VNSWWV03aeftB685iR5BXkstbIQ
EVopAoGAfcGBTAhmceK/4Q83H/FXBWy0PAa1kZGg/q8+Z0KY76AqyxOVl0/CU/rB
3tO3sKhaMTHPME/MiQjQQGoaK1JgPY6JHYvly2KomrJ8QTugqNGyMzdVJkXAK2AM
GAwC8ivAkHf8CHrHa1W7l8t2IqBjW1aRt7mOW92nfG88Hck0Mbo=
-----END RSA PRIVATE KEY-----
"""


PUB_KEY2 = """
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp+8cTxguO6Vg+YO92VfH
gNld3Zy8aM3JbZvpJcjTnis+YFJ7Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvT
sMBZWvmUoEVUj1Xg8XXQkBvb9OzyGqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc
2cKeCVvWFqDi0GRFGzyaXLaX3PPmM7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbu
T1OqDfufXWQl/82JXeiwU2cOpqWq7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww
3oJSwvMbAmgzvOhqqhlqv+K7u0u7FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQ
bQIDAQAB
-----END PUBLIC KEY-----
"""
19 changes: 19 additions & 0 deletions tests/pytests/unit/crypt/test_crypt_cryptodome.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,22 @@ def test_gen_keys_with_passphrase():
salt.crypt.gen_keys(key_path, "keyname", 2048)
assert open_priv_wb in m_open.calls
assert open_pub_wb in m_open.calls


def test_sign_message():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message("/keydir/keyname.pem", MSG)


def test_sign_message_with_passphrase():
key = RSA.importKey(PRIVKEY_DATA)
with patch("salt.crypt.get_rsa_key", return_value=key):
assert SIG == salt.crypt.sign_message(
"/keydir/keyname.pem", MSG, passphrase="password"
)


def test_verify_signature():
with patch("salt.utils.files.fopen", mock_open(read_data=PUBKEY_DATA)):
assert salt.crypt.verify_signature("/keydir/keyname.pub", MSG, SIG)
Loading

0 comments on commit 6914177

Please sign in to comment.