diff --git a/airflow/configuration.py b/airflow/configuration.py index f5b2f8f7d5328..b8cab4f5700b5 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -35,10 +35,12 @@ from configparser import ConfigParser, NoOptionError, NoSectionError from contextlib import contextmanager from copy import deepcopy +from Crypto.Cipher import AES +from Crypto.Util.Padding import unpad from io import StringIO from json.decoder import JSONDecodeError from re import Pattern -from typing import IO, TYPE_CHECKING, Any, Union +from typing import IO, TYPE_CHECKING, Any, Optional, Union from urllib.parse import urlsplit import re2 @@ -73,6 +75,8 @@ ENV_VAR_PREFIX = "AIRFLOW__" +PARAMS_ENCRYPTION_AES256_KEY = "params_encryption_aes256_key" + def _parse_sqlite_version(s: str) -> tuple[int, ...]: match = _SQLITE3_VERSION_PATTERN.match(s) @@ -181,6 +185,20 @@ def retrieve_configuration_description( return base_configuration_description +def decrypt_aes256(ciphertext, key): + """ + Decrypt a ciphertext encrypted with AES-256 using a given key. + + :param ciphertext: Encrypted value to decrypt + :param key: Key used for encryption and decryption + :return: Decrypted plaintext + """ + iv = ciphertext[:AES.block_size] + cipher = AES.new(key, AES.MODE_CBC, iv) + plaintext = unpad(cipher.decrypt(ciphertext[AES.block_size:]), AES.block_size) + return plaintext.decode() + + class AirflowConfigParser(ConfigParser): """ Custom Airflow Configparser supporting defaults and deprecated options. @@ -216,6 +234,7 @@ def __init__( self.is_validated = False self._suppress_future_warnings = False self._providers_configuration_loaded = False + self._params_encryption_aes256_key = self.get("core", PARAMS_ENCRYPTION_AES256_KEY, fallback=None) def _update_logging_deprecated_template_to_one_from_defaults(self): default = self.get_default_value("logging", "log_filename_template") @@ -798,6 +817,17 @@ def _get_env_var_option(self, section: str, key: str): env_var = self._env_var_name(section, key) if env_var in os.environ: return expand_env_var(os.environ[env_var]) + # alternatively AIRFLOW__{SECTION}__{KEY}_ENCRYPTED (for an encrypted value) + env_var_encrypted = env_var + "_ENCRYPTED" + if env_var_encrypted in os.environ: + # if this is a valid encrypted key and not the key used for encryption itself + if ( + (section, key) in self.sensitive_config_values + and key != PARAMS_ENCRYPTION_AES256_KEY + and self._params_encryption_aes256_key is not None + ): + return decrypt_aes256(os.environ[env_var_encrypted], self._params_encryption_aes256_key) + # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) env_var_cmd = env_var + "_CMD" if env_var_cmd in os.environ: @@ -874,7 +904,7 @@ def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str return value @overload # type: ignore[override] - def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... + def get(self, section: str, key: str, fallback: Optional[str] = ..., **kwargs) -> str: ... @overload # type: ignore[override] def get(self, section: str, key: str, **kwargs) -> str | None: ... @@ -1050,16 +1080,47 @@ def _get_option_from_config_file( issue_warning: bool = True, extra_stacklevel: int = 0, ) -> str | None: - if super().has_option(section, key): + value = None + if ( + super().has_option(section, key + "_encrypted") + and key != PARAMS_ENCRYPTION_AES256_KEY + and self._params_encryption_aes256_key is not None + ): + value = decrypt_aes256( + super().get(section, key + "_encrypted", **kwargs), + self._params_encryption_aes256_key, + ) + elif super().has_option(section, key): # Use the parent's methods to get the actual config here to be able to # separate the config from default config. - return expand_env_var(super().get(section, key, **kwargs)) + value = super().get(section, key, **kwargs) + + if value is not None: + return expand_env_var(value) + if deprecated_section and deprecated_key: - if super().has_option(deprecated_section, deprecated_key): + deprecated_value = None + if ( + super().has_option(deprecated_section, deprecated_key) + and deprecated_key != PARAMS_ENCRYPTION_AES256_KEY + and self._params_encryption_aes256_key is not None + ): + if issue_warning: + self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) + with self.suppress_future_warnings(): + deprecated_value = decrypt_aes256( + super().get(deprecated_section, deprecated_key + "_encrypted", **kwargs), + self._params_encryption_aes256_key, + ) + elif super().has_option(deprecated_section, deprecated_key): if issue_warning: self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) with self.suppress_future_warnings(): - return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) + deprecated_value = super().get(deprecated_section, deprecated_key, **kwargs) + + if deprecated_value is not None: + return expand_env_var(deprecated_value) + return None def _get_environment_variables(