Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit add3e52c0e6ba22048a0d76fd7d0e431890b20d3
Author: Nicolas Chabbey <[email protected]>
Date:   Mon Feb 27 15:40:51 2023 +0100

    Implemented AES decrypt function
    Updated cli.py for shared cmd-line options
    Add new cli test for aes.decrypt()
    Moved test constants out of constants.py

commit 6ffec270da0e8e26f5adf5db0d584861d420acb0
Author: Nicolas Chabbey <[email protected]>
Date:   Mon Feb 27 10:40:57 2023 +0100

    Moved PBKDF2 function in its own module
    Renamed directory /ciphers to /crypto
  • Loading branch information
e3prom committed Feb 27, 2023
1 parent d855dcd commit 518add5
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 80 deletions.
130 changes: 94 additions & 36 deletions kryptoxin/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,62 @@
import sys
from . import log
from .constants import *
from ..ciphers import aes
from ..crypto import aes


@click.group()
@click.version_option(prog_name=PROGRAM_NAME)
def cli():
# local array of click.options for cryptographic parameters options
_cmd_opts_crypto = [
click.option('-i', '--in', 'input_file', type=click.File("rb"),
default=sys.stdin.buffer, nargs=1,
help="Input file (e,g. Script, .dll file)",),
click.option('-o', '--out', 'output_file', type=click.File("wb"),
help="Output file"),
click.option('-a', '--alg',
default=CIPHER_DEFAULT_ALGORITHM, show_default=True,
type=click.Choice(['AES'], case_sensitive=False),
help="Encryption algorithm"),
click.option('-k', '--key', required=True, help="Encryption key string"),
click.option('-s', '--key_size',
default=CIPHER_DEFAULT_KEYSIZE, show_default=True,
type=click.IntRange(0, 4096), help="Encryption key size"),
click.option('-m', '--mode', 'opmode',
default=CIPHER_DEFAULT_BLOCKMODE, show_default=True,
type=click.Choice(['CBC', 'CFB'], case_sensitive=False),
help="Block Cipher operation mode"),
click.option('--iv', default=CIPHER_DEFAULT_IV,
help="Block Cipher initialization vector (iv)"),
click.option('--salt', default=CIPHER_DEFAULT_SALT,
help="Password hashing algorithm's salt"),
click.option('-h', '--hmac',
default=CIPHER_DEFAULT_HMHASHALG, show_default=True,
type=click.Choice(['SHA1', 'SHA256', 'SHA512'],
case_sensitive=False),
help="PBKDF2 HMAC algorithm"),
click.option('--iter', 'pbkdf2_iter',
default=CIPHER_DEFAULT_PBKDF2_ITER, show_default=True,
type=click.IntRange(0, 1000000),
help="PBKDF2 Iteration Count")
]


def _add_cmd_options(options):
""" This local function returns the built function's options.
"""
def _add_options(func):
for option in reversed(options):
func = option(func)
return func
return _add_options


@ click.group()
@ click.version_option(prog_name=PROGRAM_NAME)
def cli(**kwargs):
pass


@cli.command()
@click.option('-i', '--in', 'input_file', type=click.File("rb"),
default=sys.stdin.buffer, nargs=1,
help="Input file (e,g. Script, .dll file)")
@click.option('-o', '--out', 'output_file', type=click.File("wb"),
help="Output file")
@click.option('-a', '--alg',
default=CIPHER_DEFAULT_ALGORITHM, show_default=True,
type=click.Choice(['AES'], case_sensitive=False),
help="Encryption algorithm")
@click.option('-k', '--key', required=True, help="Encryption key string")
@click.option('-s', '--key_size',
default=CIPHER_DEFAULT_KEYSIZE, show_default=True,
type=click.IntRange(0, 4096), help="Encryption key size")
@click.option('-m', '--mode', 'opmode',
default=CIPHER_DEFAULT_BLOCKMODE, show_default=True,
type=click.Choice(['CBC', 'CFB'], case_sensitive=False),
help="Block Cipher operation mode")
@click.option('--iv', default=CIPHER_DEFAULT_IV,
help="Block Cipher initialization vector (iv)")
@click.option('--salt', default=CIPHER_DEFAULT_SALT,
help="Password hashing algorithm's salt")
@click.option('-h', '--hmac',
default=CIPHER_DEFAULT_HMHASHALG, show_default=True,
type=click.Choice(['SHA1', 'SHA256', 'SHA512'],
case_sensitive=False),
help="PBKDF2 HMAC algorithm")
@click.option('--iter', 'pbkdf2_iter',
default=CIPHER_DEFAULT_PBKDF2_ITER, show_default=True,
type=click.IntRange(0, 1000000), help="PBKDF2 Iteration Count")
@ cli.command()
@ _add_cmd_options(_cmd_opts_crypto)
def encrypt(
alg, key, key_size, opmode, iv, salt, hmac,
input_file, output_file, pbkdf2_iter
Expand Down Expand Up @@ -72,7 +89,7 @@ def encrypt(
raise SystemExit

# Call encryption function.
encoded_ciphertext = aes.encrypt_aes(
encoded_ciphertext = aes.encrypt(
plaintext, key, key_size, salt, opmode,
iv, halg=hmac, pbkdf2_iter=pbkdf2_iter)

Expand All @@ -84,5 +101,46 @@ def encrypt(
print(f"{str(encoded_ciphertext, 'UTF-8')}")


# Add commands to the application cli.
@ cli.command()
@ _add_cmd_options(_cmd_opts_crypto)
def decrypt(
alg, key, key_size, opmode, iv, salt, hmac,
input_file, output_file, pbkdf2_iter
):
""" This command perform decryption on the supplied input.
It reads on stdin or the file supplied by the '--in' option.
See Options below for more information.
"""
# Detect the arguments types and re-encode if necessary.
# If given as 'str' type perform re-encode using 'UTF-8'.
if type(key) is str:
key = bytes(key, 'UTF-8')
if type(iv) is str:
iv = bytes(iv, 'UTF-8')
if type(salt) is str:
salt = bytes(salt, 'UTF-8')

# Read provided file and catch errors if any.
try:
ciphertext = input_file.read()
except UnicodeDecodeError:
log.error(
f"Cannot read input file: {input_file.name}. \
Make sure it's UTF-8 encoded.")
raise SystemExit

# Call decryption function.
plaintext = aes.decrypt(ciphertext, key, key_size, salt, opmode,
iv, halg=hmac, pbkdf2_iter=pbkdf2_iter)

# If output file given, write content (and create file if necessary).
if output_file:
output_file.write(plaintext)
output_file.flush()
else:
print(f"{str(plaintext, 'UTF-8')}")


# Add commands to the application CLI.
cli.add_command(encrypt)
cli.add_command(decrypt)
2 changes: 0 additions & 2 deletions kryptoxin/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,3 @@
CIPHER_BLOCK_OPERMODE_CFB = "cfb"
CIPHER_BLOCK_OPERMODE_OFB = "ofb"
CIPHER_BLOCK_OPERMODE_EAX = "eax"
# Test Constants
TEST_TXTFILE_PATH = 'tests/samples/input_file.txt'
File renamed without changes.
115 changes: 83 additions & 32 deletions kryptoxin/ciphers/aes.py → kryptoxin/crypto/aes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,32 @@
"""
from ..core import log
from ..core.constants import *
from . import base64
import hashlib
from . import base64, pbkdf2
import Crypto.Cipher.AES


def encrypt_aes(
def _cipher_opmode(mode):
""" This local function returns the appropriate block-cipher
operation mode.
"""
# Cipher block operation modes mapping
# use if-then until python 3.10 becomes more readily available.
if mode.casefold() == CIPHER_BLOCK_OPERMODE_CBC:
op_mode = Crypto.Cipher.AES.MODE_CBC
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_CFB:
op_mode = Crypto.Cipher.AES.MODE_CFB
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_OFB:
op_mode = Crypto.Cipher.AES.MODE_OFB
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_EAX:
op_mode = Crypto.Cipher.AES.MODE_EAX
else:
log.error(f"Unknown cipher block operation mode '{mode}'.")
raise SystemExit

return op_mode


def encrypt(
plaintext, key, key_size, salt, mode,
iv, halg=CIPHER_DEFAULT_HMHASHALG,
pbkdf2_iter=CIPHER_DEFAULT_PBKDF2_ITER,
Expand All @@ -30,42 +50,17 @@ def encrypt_aes(
pbkdf2_iter: Number of iterations for the key-derivation function
iv_prepend: Prepend Initialization Vector (IV) to the plain-text
"""
# Cipher block operation modes mapping
# use if-then until python 3.10 becomes more readily available.
if mode.casefold() == CIPHER_BLOCK_OPERMODE_CBC:
op_mode = Crypto.Cipher.AES.MODE_CBC
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_CFB:
op_mode = Crypto.Cipher.AES.MODE_CFB
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_OFB:
op_mode = Crypto.Cipher.AES.MODE_OFB
elif mode.casefold() == CIPHER_BLOCK_OPERMODE_EAX:
op_mode = Crypto.Cipher.AES.MODE_EAX
else:
log.error(f"Unknown cipher block operation mode '{mode}'.")
raise SystemExit
# Determine the block-cipher operation mode.
op_mode = _cipher_opmode(mode)

# PKCS#7 Padding
# pad the plaintext to the nearest multiple of block's length.
padding = CIPHER_BLOCK_BLKSZ_AES - \
(len(plaintext) % CIPHER_BLOCK_BLKSZ_AES)
padded_plaintext = plaintext + bytes([padding] * padding)

# Derived key sizes
# must be 16, 24 or 32 bytes for AES128,192,256 respectively.
dklen = CIPHER_PBKDF2_AES256_KS
if key_size == 128:
dklen = CIPHER_PBKDF2_AES128_KS
elif key_size == 192:
dklen = CIPHER_PBKDF2_AES192_KS
elif key_size == 256:
dklen = CIPHER_PBKDF2_AES256_KS
else:
log.error(f"Invalid AES key size '{key_size}")
raise SystemExit

# Derive the encryption key from the password.
derived_key = hashlib.pbkdf2_hmac(halg, memoryview(
key), memoryview(salt), pbkdf2_iter, dklen)
# call the key-derivation function
derived_key = pbkdf2.derive_key(key, key_size, halg, pbkdf2_iter, salt)

# Create new AES cipher
cipher = Crypto.Cipher.AES.new(derived_key, op_mode, iv)
Expand All @@ -80,3 +75,59 @@ def encrypt_aes(

# Encode and return the ciphertext in base64.
return base64.encode_base64(ciphertext)


def decrypt(
ciphertext, key, key_size, salt, mode,
iv, halg=CIPHER_DEFAULT_HMHASHALG,
pbkdf2_iter=CIPHER_DEFAULT_PBKDF2_ITER,
iv_prepend=CIPHER_DEFAULT_IV_PREPEND
):
""" This function perform AES block cipher decryption.
Arguments:
ciphertext: cipher-text data
key: Cipher's key (bytes[])
key_size: Algorithm key size
salt: Salt used to enhance hash security (bytes[])
mode: Block cipher operation mode
iv: Block cipher initialization vector (bytes[])
halg: PBKDF2 HMAC algorithm
pbkdf2_iter: Number of iterations for the key-derivation function
iv_prepend: Prepend Initialization Vector (IV) to the plain-text
"""
# Decode the ciphertext using base64
ciphertext = base64.decode_base64(ciphertext)

# Determine the block-cipher operation mode.
op_mode = _cipher_opmode(mode)

# Call the key-derivation function
derived_key = pbkdf2.derive_key(key, key_size, halg, pbkdf2_iter, salt)

# Create new AES cipher
cipher = Crypto.Cipher.AES.new(derived_key, op_mode, iv)

# Perform decryption
plaintext = cipher.decrypt(ciphertext)

# If iv-prepending is True truncate the inizialization vector
# from the returned plaintext.
if iv_prepend:
plaintext = plaintext[CIPHER_BLOCK_BLKSZ_AES:]

# Removes the PKCS#7 padding bytes
# Read the padding length from the plaintext and
# truncate the plaintext accordingly.
pad_len = plaintext[-1]
plaintext = plaintext[:-pad_len]

# Check plaintext bytes object size.
if not len(plaintext):
log.error(
"Error while performing AES decryption. "
"Verify input parameters such as the key.")
raise SystemExit

# return the plaintext
return plaintext
File renamed without changes.
39 changes: 39 additions & 0 deletions kryptoxin/crypto/pbkdf2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
kryptoxin PBKDF2 module.
This is the key-derivation function PBKDF2 module of the kryptoxin project.
"""
from ..core import log
from ..core.constants import *
import hashlib


def derive_key(key, key_size, halg, iter, salt):
""" This function perform the key derivation function
using PBKDF2.
Arguments:
key: Cipher's key (bytes[])
key_size: Algorithm key size
halg: PBKDF2 HMAC algorithm
iter: Number of iterations for the key-derivation function
salt: Salt used to enhance hash security (bytes[])
"""
# Derived key sizes per block-cipher algorithms.
# must be 16, 24 or 32 bytes for AES128,192,256 respectively.
dklen = CIPHER_PBKDF2_AES256_KS
if key_size == 128:
dklen = CIPHER_PBKDF2_AES128_KS
elif key_size == 192:
dklen = CIPHER_PBKDF2_AES192_KS
elif key_size == 256:
dklen = CIPHER_PBKDF2_AES256_KS
else:
log.error(f"Invalid AES key size '{key_size}")
raise SystemExit

# Derive the encryption key from the password.
derived_key = hashlib.pbkdf2_hmac(halg, memoryview(
key), memoryview(salt), iter, dklen)

# return the derived_key
return derived_key
Loading

0 comments on commit 518add5

Please sign in to comment.