Skip to content

Commit

Permalink
Support AES256 encryption of sensitive params
Browse files Browse the repository at this point in the history
Implements support for `_ENCRYPTED` versions of env variables and `_encrypted` versions of config params.
Those would be encrypted using a parameter `params_encryption_aes256_key` (which can be retrieved from a secret for example).
The params encryption key itself doesn't support encrypted suffix, or it'd create an infinite recursion.

closes: apache#45194

Signed-off-by: Andrii Korotkov <[email protected]>
  • Loading branch information
andrii-korotkov-verkada committed Dec 24, 2024
1 parent a540eeb commit c2ca77e
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
from configparser import ConfigParser, NoOptionError, NoSectionError
from contextlib import contextmanager
from copy import deepcopy
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from io import StringIO
from json.decoder import JSONDecodeError
from re import Pattern
from typing import IO, TYPE_CHECKING, Any, Union
from typing import IO, TYPE_CHECKING, Any, Optional, Union
from urllib.parse import urlsplit

import re2
Expand Down Expand Up @@ -73,6 +75,8 @@

ENV_VAR_PREFIX = "AIRFLOW__"

PARAMS_ENCRYPTION_AES256_KEY = "params_encryption_aes256_key"


def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
Expand Down Expand Up @@ -181,6 +185,20 @@ def retrieve_configuration_description(
return base_configuration_description


def decrypt_aes256(ciphertext, key):
"""
Decrypt a ciphertext encrypted with AES-256 using a given key.
:param ciphertext: Encrypted value to decrypt
:param key: Key used for encryption and decryption
:return: Decrypted plaintext
"""
iv = ciphertext[:AES.block_size]
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext[AES.block_size:]), AES.block_size)
return plaintext.decode()


class AirflowConfigParser(ConfigParser):
"""
Custom Airflow Configparser supporting defaults and deprecated options.
Expand Down Expand Up @@ -216,6 +234,7 @@ def __init__(
self.is_validated = False
self._suppress_future_warnings = False
self._providers_configuration_loaded = False
self._params_encryption_aes256_key = self.get("core", PARAMS_ENCRYPTION_AES256_KEY, fallback=None)

def _update_logging_deprecated_template_to_one_from_defaults(self):
default = self.get_default_value("logging", "log_filename_template")
Expand Down Expand Up @@ -798,6 +817,17 @@ def _get_env_var_option(self, section: str, key: str):
env_var = self._env_var_name(section, key)
if env_var in os.environ:
return expand_env_var(os.environ[env_var])
# alternatively AIRFLOW__{SECTION}__{KEY}_ENCRYPTED (for an encrypted value)
env_var_encrypted = env_var + "_ENCRYPTED"
if env_var_encrypted in os.environ:
# if this is a valid encrypted key and not the key used for encryption itself
if (
(section, key) in self.sensitive_config_values
and key != PARAMS_ENCRYPTION_AES256_KEY
and self._params_encryption_aes256_key is not None
):
return decrypt_aes256(os.environ[env_var_encrypted], self._params_encryption_aes256_key)

# alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
env_var_cmd = env_var + "_CMD"
if env_var_cmd in os.environ:
Expand Down Expand Up @@ -874,7 +904,7 @@ def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str
return value

@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ...
def get(self, section: str, key: str, fallback: Optional[str] = ..., **kwargs) -> str: ...

@overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None: ...
Expand Down Expand Up @@ -1050,16 +1080,47 @@ def _get_option_from_config_file(
issue_warning: bool = True,
extra_stacklevel: int = 0,
) -> str | None:
if super().has_option(section, key):
value = None
if (
super().has_option(section, key + "_encrypted")
and key != PARAMS_ENCRYPTION_AES256_KEY
and self._params_encryption_aes256_key is not None
):
value = decrypt_aes256(
super().get(section, key + "_encrypted", **kwargs),
self._params_encryption_aes256_key,
)
elif super().has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(super().get(section, key, **kwargs))
value = super().get(section, key, **kwargs)

if value is not None:
return expand_env_var(value)

if deprecated_section and deprecated_key:
if super().has_option(deprecated_section, deprecated_key):
deprecated_value = None
if (
super().has_option(deprecated_section, deprecated_key)
and deprecated_key != PARAMS_ENCRYPTION_AES256_KEY
and self._params_encryption_aes256_key is not None
):
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key + "_encrypted", extra_stacklevel)
with self.suppress_future_warnings():
deprecated_value = decrypt_aes256(
super().get(deprecated_section, deprecated_key + "_encrypted", **kwargs),
self._params_encryption_aes256_key,
)
elif super().has_option(deprecated_section, deprecated_key):
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
with self.suppress_future_warnings():
return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
deprecated_value = super().get(deprecated_section, deprecated_key, **kwargs)

if deprecated_value is not None:
return expand_env_var(deprecated_value)

return None

def _get_environment_variables(
Expand Down

0 comments on commit c2ca77e

Please sign in to comment.