diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index ab40d21..3bfff59 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,6 +1,5 @@ -*Issue #, if available:* - -*Description of changes:* +_Issue #, if available:_ +_Description of changes:_ By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 91dde32..05f931d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -4,43 +4,51 @@ name: Python package on: - push: - branches: [ master ] pull_request: - branches: [ master ] + types: [opened, synchronize, ready_for_review] jobs: build: - runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] - + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt -r dev-requirements.txt - pip install -e . - - name: Lint with flake8 - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Lint with PyLint - run: pylint --rcfile=.pylintrc src/aws_secretsmanager_caching - - name: Check formatting with Ruff - uses: astral-sh/ruff-action@v3 - - name: Test with pytest - run: | - pytest test/unit/ - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v5 + - name: Checkout + uses: actions/checkout@v4 + + - name: Check formatting with Ruff + uses: astral-sh/ruff-action@v3 + + - name: Prettier + uses: rutajdash/prettier-cli-action@v1.0.2 + with: + file_pattern: "**/*.{md,yml}" + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt -r dev-requirements.txt + pip install -e . + + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + + - name: Lint with PyLint + run: pylint --rcfile=.pylintrc src/aws_secretsmanager_caching + + - name: Test with pytest + run: pytest test/unit/ + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v5 diff --git a/.pylintrc b/.pylintrc index 6285b78..61f3409 100644 --- a/.pylintrc +++ b/.pylintrc @@ -11,7 +11,6 @@ disable = I0011, # locally-disabled # (visual studio) and html output-format=colorized - [FORMAT] # Maximum number of characters on a single line. max-line-length=120 diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 3b64466..ec98f2b 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -1,4 +1,5 @@ ## Code of Conduct -This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). -For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact + +This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). +For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact opensource-codeofconduct@amazon.com with any additional questions or comments. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 874c4f3..a975c68 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,29 +1,28 @@ # Contributing Guidelines -Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional +Thank you for your interest in contributing to our project. Whether it's a bug report, new feature, correction, or additional documentation, we greatly value feedback and contributions from our community. -Please read through this document before submitting any issues or pull requests to ensure we have all the necessary +Please read through this document before submitting any issues or pull requests to ensure we have all the necessary information to effectively respond to your bug report or contribution. - ## Reporting Bugs/Feature Requests We welcome you to use the GitHub issue tracker to report bugs or suggest features. -When filing an issue, please check [existing open](https://github.com/aws/aws-secretsmanager-caching-python/issues), or [recently closed](https://github.com/aws/aws-secretsmanager-caching-python/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already +When filing an issue, please check [existing open](https://github.com/aws/aws-secretsmanager-caching-python/issues), or [recently closed](https://github.com/aws/aws-secretsmanager-caching-python/issues?utf8=%E2%9C%93&q=is%3Aissue%20is%3Aclosed%20), issues to make sure somebody else hasn't already reported the issue. Please try to include as much information as you can. Details like these are incredibly useful: -* A reproducible test case or series of steps -* The version of our code being used -* Any modifications you've made relevant to the bug -* Anything unusual about your environment or deployment - +- A reproducible test case or series of steps +- The version of our code being used +- Any modifications you've made relevant to the bug +- Anything unusual about your environment or deployment ## Contributing via Pull Requests + Contributions via pull requests are much appreciated. Before sending us a pull request, please ensure that: -1. You are working against the latest source on the *master* branch. +1. You are working against the latest source on the _master_ branch. 2. You check existing open, and recently merged, pull requests to make sure someone else hasn't addressed the problem already. 3. You open an issue to discuss any significant work - we would hate for your time to be wasted. @@ -31,28 +30,34 @@ To send us a pull request, please: 1. Fork the repository. 2. Modify the source; please focus on the specific change you are contributing. If you also reformat all the code, it will be hard for us to focus on your change. -3. Ensure local tests pass. -4. Commit to your fork using clear commit messages. -5. Send us a pull request, answering any default questions in the pull request interface. -6. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation. +3. Use Ruff to ensure consistent formatting for `.py` files, and Prettier for `.md` and `.yml` files (see [Formatting instructions](#formatting-instructions)). +4. Ensure local tests pass. +5. Commit to your fork using clear commit messages. +6. Send us a pull request, answering any default questions in the pull request interface. +7. Pay attention to any automated CI failures reported in the pull request, and stay involved in the conversation. -GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and +GitHub provides additional document on [forking a repository](https://help.github.com/articles/fork-a-repo/) and [creating a pull request](https://help.github.com/articles/creating-a-pull-request/). +## Formatting instructions + +- Install [Ruff](https://docs.astral.sh/ruff/installation/) and [Prettier](https://prettier.io/docs/install). + - If using VS Code, install the VS Code extensions for the above. +- Run `ruff format` and `prettier --write **/*.{md,yml}`, or configure VS Code to format on save using Ruff and Prettier. ## Finding contributions to work on -Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels (enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/aws/aws-secretsmanager-caching-python/labels/help%20wanted) issues is a great place to start. +Looking at the existing issues is a great way to find something to contribute on. As our projects, by default, use the default GitHub issue labels (enhancement/bug/duplicate/help wanted/invalid/question/wontfix), looking at any ['help wanted'](https://github.com/aws/aws-secretsmanager-caching-python/labels/help%20wanted) issues is a great place to start. ## Code of Conduct -This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). -For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact -opensource-codeofconduct@amazon.com with any additional questions or comments. +This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). +For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact +opensource-codeofconduct@amazon.com with any additional questions or comments. ## Security issue notifications -If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue. +If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue. ## Licensing diff --git a/README.md b/README.md index 92b2984..b7428a5 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ The AWS Secrets Manager Python caching client enables in-process caching of secr To use this client you must have: -- Python 3.8 or newer. Use of Python versions 3.7 or older are not supported. +- Python 3.9 or newer. Use of Python versions 3.8 or older are not supported. - An Amazon Web Services (AWS) account to access secrets stored in AWS Secrets Manager. - **To create an AWS account**, go to [Sign In or Create an AWS Account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) and then choose **I am a new user.** Follow the instructions to create an AWS account. diff --git a/doc/conf.py b/doc/conf.py index 14a6a3f..42bc8c9 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -3,49 +3,49 @@ import os import shutil -project = u'AWS Secrets Manager Python Caching Client' +project = "AWS Secrets Manager Python Caching Client" # If you use autosummary, this ensures that any stale autogenerated files are # cleaned up first. -if os.path.exists('_autosummary'): +if os.path.exists("_autosummary"): print("cleaning up stale autogenerated files...") - shutil.rmtree('_autosummary') + shutil.rmtree("_autosummary") # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.autosummary', - 'sphinx.ext.coverage', - 'sphinx.ext.doctest', - 'sphinx.ext.napoleon', - 'sphinx.ext.todo', + "sphinx.ext.autodoc", + "sphinx.ext.autosummary", + "sphinx.ext.coverage", + "sphinx.ext.doctest", + "sphinx.ext.napoleon", + "sphinx.ext.todo", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] -source_suffix = '.rst' # The suffix of source filenames. -master_doc = 'index' # The master toctree document. +source_suffix = ".rst" # The suffix of source filenames. +master_doc = "index" # The master toctree document. -copyright = u'%s, Amazon.com' % datetime.now().year +copyright = "%s, Amazon.com" % datetime.now().year # The full version, including alpha/beta/rc tags. -release = version('aws_secretsmanager_caching') +release = version("aws_secretsmanager_caching") # List of directories, relative to source directory, that shouldn't be searched # for source files. -exclude_trees = ['_build', '_templates'] +exclude_trees = ["_build", "_templates"] -pygments_style = 'sphinx' +pygments_style = "sphinx" autoclass_content = "both" -autodoc_default_flags = ['show-inheritance', 'members', 'undoc-members'] -autodoc_member_order = 'bysource' +autodoc_default_flags = ["show-inheritance", "members", "undoc-members"] +autodoc_member_order = "bysource" -html_theme = 'haiku' -html_static_path = ['_static'] -htmlhelp_basename = '%sdoc' % project +html_theme = "haiku" +html_static_path = ["_static"] +htmlhelp_basename = "%sdoc" % project # autosummary autosummary_generate = True diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..f11cf63 --- /dev/null +++ b/ruff.toml @@ -0,0 +1 @@ +line-length = 120 diff --git a/setup.py b/setup.py index 1066b31..369c04d 100644 --- a/setup.py +++ b/setup.py @@ -14,17 +14,16 @@ packages=find_packages(where="src", exclude=("test",)), package_dir={"": "src"}, classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7' + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", ], - keywords='secretsmanager secrets manager development cache caching client', + keywords="secretsmanager secrets manager development cache caching client", use_scm_version=True, - python_requires='>=3.8', - install_requires=['botocore'], - setup_requires=['pytest-runner', 'setuptools-scm'], - tests_require=['pytest', 'pytest-cov', 'pytest-sugar', 'codecov'] - + python_requires=">=3.8", + install_requires=["botocore"], + setup_requires=["pytest-runner", "setuptools-scm"], + tests_require=["pytest", "pytest-cov", "pytest-sugar", "codecov"], ) diff --git a/src/aws_secretsmanager_caching/__init__.py b/src/aws_secretsmanager_caching/__init__.py index 67d114f..e7f55fe 100644 --- a/src/aws_secretsmanager_caching/__init__.py +++ b/src/aws_secretsmanager_caching/__init__.py @@ -11,8 +11,17 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """High level AWS Secrets Manager caching client.""" + from aws_secretsmanager_caching.config import SecretCacheConfig -from aws_secretsmanager_caching.decorators import InjectKeywordedSecretString, InjectSecretString +from aws_secretsmanager_caching.decorators import ( + InjectKeywordedSecretString, + InjectSecretString, +) from aws_secretsmanager_caching.secret_cache import SecretCache -__all__ = ["SecretCache", "SecretCacheConfig", "InjectSecretString", "InjectKeywordedSecretString"] +__all__ = [ + "SecretCache", + "SecretCacheConfig", + "InjectSecretString", + "InjectKeywordedSecretString", +] diff --git a/src/aws_secretsmanager_caching/cache/__init__.py b/src/aws_secretsmanager_caching/cache/__init__.py index 8fce9dd..cb68af3 100644 --- a/src/aws_secretsmanager_caching/cache/__init__.py +++ b/src/aws_secretsmanager_caching/cache/__init__.py @@ -15,7 +15,12 @@ No guarantee is provided on the modules and APIs within this namespace staying consistent. Directly reference at your own risk. """ -from aws_secretsmanager_caching.cache.items import SecretCacheItem, SecretCacheObject, SecretCacheVersion + +from aws_secretsmanager_caching.cache.items import ( + SecretCacheItem, + SecretCacheObject, + SecretCacheVersion, +) from aws_secretsmanager_caching.cache.lru import LRUCache __all__ = ["SecretCacheObject", "SecretCacheItem", "SecretCacheVersion", "LRUCache"] diff --git a/src/aws_secretsmanager_caching/cache/items.py b/src/aws_secretsmanager_caching/cache/items.py index fb4aec9..d478332 100644 --- a/src/aws_secretsmanager_caching/cache/items.py +++ b/src/aws_secretsmanager_caching/cache/items.py @@ -25,6 +25,7 @@ class SecretCacheObject: # pylint: disable=too-many-instance-attributes """Secret cache object that handles the common refresh logic.""" + # Jitter max for refresh now FORCE_REFRESH_JITTER_SLEEP = 5000 __metaclass__ = ABCMeta @@ -100,7 +101,7 @@ def __refresh(self): except Exception as e: # pylint: disable=broad-except self._exception = e delay = self._config.exception_retry_delay_base * ( - self._config.exception_retry_growth_factor ** self._exception_count + self._config.exception_retry_growth_factor**self._exception_count ) self._exception_count += 1 delay = min(delay, self._config.exception_retry_delay_max) @@ -132,7 +133,10 @@ def refresh_secret_now(self): self._refresh_needed = True # Generate a random number to have a sleep jitter to not get stuck in a retry loop - sleep = randint(int(self.FORCE_REFRESH_JITTER_SLEEP / 2), self.FORCE_REFRESH_JITTER_SLEEP + 1) + sleep = randint( + int(self.FORCE_REFRESH_JITTER_SLEEP / 2), + self.FORCE_REFRESH_JITTER_SLEEP + 1, + ) if self._exception is not None: current_time_millis = int(datetime.now(timezone.utc).timestamp() * 1000) @@ -240,8 +244,10 @@ def _get_version(self, version_stage): version = self._versions.get(version_id) if version: return version.get_secret_value() - self._versions.put_if_absent(version_id, SecretCacheVersion(self._config, self._client, self._secret_id, - version_id)) + self._versions.put_if_absent( + version_id, + SecretCacheVersion(self._config, self._client, self._secret_id, version_id), + ) return self._versions.get(version_id).get_secret_value() diff --git a/src/aws_secretsmanager_caching/config.py b/src/aws_secretsmanager_caching/config.py index 026703a..3564f1f 100644 --- a/src/aws_secretsmanager_caching/config.py +++ b/src/aws_secretsmanager_caching/config.py @@ -16,7 +16,6 @@ class SecretCacheConfig: - """Advanced configuration for SecretCache clients. :type max_cache_size: int @@ -54,7 +53,7 @@ class SecretCacheConfig: "exception_retry_delay_max": 3600, "default_version_stage": "AWSCURRENT", "secret_refresh_interval": 3600, - "secret_cache_hook": None + "secret_cache_hook": None, } def __init__(self, **kwargs): diff --git a/src/aws_secretsmanager_caching/decorators.py b/src/aws_secretsmanager_caching/decorators.py index f9ed873..9196487 100644 --- a/src/aws_secretsmanager_caching/decorators.py +++ b/src/aws_secretsmanager_caching/decorators.py @@ -98,9 +98,7 @@ def _wrapped_func(*args, **kwargs): Internal function to execute wrapped function """ try: - secret = json.loads( - self.cache.get_secret_string(secret_id=self.secret_id) - ) + secret = json.loads(self.cache.get_secret_string(secret_id=self.secret_id)) except json.decoder.JSONDecodeError: raise RuntimeError("Cached secret is not valid JSON") from None @@ -109,9 +107,7 @@ def _wrapped_func(*args, **kwargs): try: resolved_kwargs[orig_kwarg] = secret[secret_key] except KeyError: - raise RuntimeError( - f"Cached secret does not contain key {secret_key}" - ) from None + raise RuntimeError(f"Cached secret does not contain key {secret_key}") from None return func(*args, **resolved_kwargs, **kwargs) diff --git a/src/aws_secretsmanager_caching/secret_cache.py b/src/aws_secretsmanager_caching/secret_cache.py index 99c11c1..308ae72 100644 --- a/src/aws_secretsmanager_caching/secret_cache.py +++ b/src/aws_secretsmanager_caching/secret_cache.py @@ -11,6 +11,7 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """High level AWS Secrets Manager caching client.""" + from copy import deepcopy from importlib.metadata import version, PackageNotFoundError @@ -25,9 +26,9 @@ class SecretCache: """Secret Cache client for AWS Secrets Manager secrets""" try: - __version__ = version('aws_secretsmanager_caching') + __version__ = version("aws_secretsmanager_caching") except PackageNotFoundError: - __version__ = '0.0.0' + __version__ = "0.0.0" def __init__(self, config=SecretCacheConfig(), client=None): """Construct a secret cache using the given configuration and @@ -43,9 +44,11 @@ def __init__(self, config=SecretCacheConfig(), client=None): self._client = client self._config = deepcopy(config) self._cache = LRUCache(max_size=self._config.max_cache_size) - boto_config = botocore.config.Config(**{ - "user_agent_extra": f"AwsSecretCache/{SecretCache.__version__}", - }) + boto_config = botocore.config.Config( + **{ + "user_agent_extra": f"AwsSecretCache/{SecretCache.__version__}", + } + ) if self._client is None: self._client = botocore.session.get_session().create_client("secretsmanager", config=boto_config) @@ -62,7 +65,8 @@ def _get_cached_secret(self, secret_id): if secret is not None: return secret self._cache.put_if_absent( - secret_id, SecretCacheItem(config=self._config, client=self._client, secret_id=secret_id) + secret_id, + SecretCacheItem(config=self._config, client=self._client, secret_id=secret_id), ) return self._cache.get(secret_id) diff --git a/test/integ/test_aws_secretsmanager_caching.py b/test/integ/test_aws_secretsmanager_caching.py index 9aed27e..90de8c3 100644 --- a/test/integ/test_aws_secretsmanager_caching.py +++ b/test/integ/test_aws_secretsmanager_caching.py @@ -23,42 +23,45 @@ from aws_secretsmanager_caching.secret_cache import SecretCache from botocore.exceptions import ClientError, HTTPClientError, NoCredentialsError -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) class TestAwsSecretsManagerCachingInteg: - fixture_prefix = 'python_caching_integ_test_' + fixture_prefix = "python_caching_integ_test_" uuid_suffix = uuid4().hex - @pytest.fixture(scope='module') + @pytest.fixture(scope="module") def client(self): - yield botocore.session.get_session().create_client('secretsmanager') + yield botocore.session.get_session().create_client("secretsmanager") - @pytest.fixture(scope='module', autouse=True) + @pytest.fixture(scope="module", autouse=True) def pre_test_cleanup(self, client): - logger.info('Starting cleanup operation of previous test secrets...') + logger.info("Starting cleanup operation of previous test secrets...") old_secrets = [] two_days_ago = datetime.now() - timedelta(days=2) - paginator = client.get_paginator('list_secrets') - paginator_config = {'PageSize': 10, 'StartingToken': None} + paginator = client.get_paginator("list_secrets") + paginator_config = {"PageSize": 10, "StartingToken": None} iterator = paginator.paginate(PaginationConfig=paginator_config) try: for page in iterator: - logger.info('Fetching results from ListSecretValue...') - for secret in page['SecretList']: - if secret['Name'].startswith(TestAwsSecretsManagerCachingInteg.fixture_prefix) and \ - (secret['LastChangedDate'] > two_days_ago) and (secret['LastAccessedDate'] > two_days_ago): + logger.info("Fetching results from ListSecretValue...") + for secret in page["SecretList"]: + if ( + secret["Name"].startswith(TestAwsSecretsManagerCachingInteg.fixture_prefix) + and (secret["LastChangedDate"] > two_days_ago) + and (secret["LastAccessedDate"] > two_days_ago) + ): old_secrets.append(secret) try: - paginator_config['StartingToken'] = page['NextToken'] + paginator_config["StartingToken"] = page["NextToken"] except KeyError: - logger.info('reached end of list') + logger.info("reached end of list") break time.sleep(0.5) except ClientError as e: - logger.error("Got ClientError {0} while calling ListSecrets".format(e.response['Error']['Code'])) + logger.error("Got ClientError {0} while calling ListSecrets".format(e.response["Error"]["Code"])) except HTTPClientError: logger.error("Got HTTPClientError while calling ListSecrets") except NoCredentialsError: @@ -69,105 +72,168 @@ def pre_test_cleanup(self, client): logger.info("No previously configured test secrets found") for secret in old_secrets: - logger.info("Scheduling deletion of secret {}".format(secret['Name'])) + logger.info("Scheduling deletion of secret {}".format(secret["Name"])) try: - client.delete_secret(SecretId=secret['Name']) + client.delete_secret(SecretId=secret["Name"]) except ClientError as e: - logger.error("Got ClientError {0} while calling " - "DeleteSecret for secret {1}".format(e.response['Error']['Code'], secret['Name'])) + logger.error( + "Got ClientError {0} while calling DeleteSecret for secret {1}".format( + e.response["Error"]["Code"], secret["Name"] + ) + ) except HTTPClientError: - logger.error("Got HTTPClientError while calling DeleteSecret for secret {0}".format(secret['Name'])) + logger.error("Got HTTPClientError while calling DeleteSecret for secret {0}".format(secret["Name"])) time.sleep(0.5) yield None @pytest.fixture def secret_string(self, request, client): - name = "{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, request.function.__name__, - TestAwsSecretsManagerCachingInteg.uuid_suffix) + name = "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + request.function.__name__, + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) - secret = client.create_secret(Name=name, SecretString='test') + secret = client.create_secret(Name=name, SecretString="test") yield secret - client.delete_secret(SecretId=secret['ARN'], ForceDeleteWithoutRecovery=True) + client.delete_secret(SecretId=secret["ARN"], ForceDeleteWithoutRecovery=True) def test_get_secret_string(self, client, secret_string): cache = SecretCache(client=client) - secret = client.get_secret_value(SecretId=secret_string['ARN'])['SecretString'] + secret = client.get_secret_value(SecretId=secret_string["ARN"])["SecretString"] for _ in range(10): - assert cache.get_secret_string("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) == secret + assert ( + cache.get_secret_string( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + == secret + ) def test_get_secret_string_refresh(self, client, secret_string): - cache = SecretCache(config=SecretCacheConfig(secret_refresh_interval=1), - client=client) - secret = client.get_secret_value(SecretId=secret_string['ARN'])['SecretString'] + cache = SecretCache(config=SecretCacheConfig(secret_refresh_interval=1), client=client) + secret = client.get_secret_value(SecretId=secret_string["ARN"])["SecretString"] for _ in range(10): - assert cache.get_secret_string("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) == secret - - client.put_secret_value(SecretId=secret_string['ARN'], - SecretString='test2', VersionStages=['AWSCURRENT']) + assert ( + cache.get_secret_string( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + == secret + ) + + client.put_secret_value( + SecretId=secret_string["ARN"], + SecretString="test2", + VersionStages=["AWSCURRENT"], + ) time.sleep(2) - secret = client.get_secret_value(SecretId=secret_string['ARN'])['SecretString'] + secret = client.get_secret_value(SecretId=secret_string["ARN"])["SecretString"] for _ in range(10): - assert cache.get_secret_string("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) == secret + assert ( + cache.get_secret_string( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + == secret + ) def test_get_secret_binary_empty(self, client, secret_string): cache = SecretCache(client=client) - assert cache.get_secret_binary("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) is None + assert ( + cache.get_secret_binary( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + is None + ) @pytest.fixture def secret_string_stage(self, request, client): - name = "{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, request.function.__name__, - TestAwsSecretsManagerCachingInteg.uuid_suffix) + name = "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + request.function.__name__, + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) - secret = client.create_secret(Name=name, SecretString='test') - client.put_secret_value(SecretId=secret['ARN'], SecretString='test2', - VersionStages=['AWSCURRENT']) + secret = client.create_secret(Name=name, SecretString="test") + client.put_secret_value(SecretId=secret["ARN"], SecretString="test2", VersionStages=["AWSCURRENT"]) - yield client.describe_secret(SecretId=secret['ARN']) - client.delete_secret(SecretId=secret['ARN'], ForceDeleteWithoutRecovery=True) + yield client.describe_secret(SecretId=secret["ARN"]) + client.delete_secret(SecretId=secret["ARN"], ForceDeleteWithoutRecovery=True) def test_get_secret_string_stage(self, client, secret_string_stage): cache = SecretCache(client=client) - secret = client.get_secret_value(SecretId=secret_string_stage['ARN'], - VersionStage='AWSPREVIOUS')['SecretString'] + secret = client.get_secret_value(SecretId=secret_string_stage["ARN"], VersionStage="AWSPREVIOUS")[ + "SecretString" + ] for _ in range(10): - assert cache.get_secret_string("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix), - 'AWSPREVIOUS') == secret + assert ( + cache.get_secret_string( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ), + "AWSPREVIOUS", + ) + == secret + ) @pytest.fixture def secret_binary(self, request, client): - name = "{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, request.function.__name__, - TestAwsSecretsManagerCachingInteg.uuid_suffix) + name = "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + request.function.__name__, + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) - secret = client.create_secret(Name=name, SecretBinary=b'01010101') + secret = client.create_secret(Name=name, SecretBinary=b"01010101") yield secret - client.delete_secret(SecretId=secret['ARN'], ForceDeleteWithoutRecovery=True) + client.delete_secret(SecretId=secret["ARN"], ForceDeleteWithoutRecovery=True) def test_get_secret_binary(self, client, secret_binary): cache = SecretCache(client=client) - secret = client.get_secret_value(SecretId=secret_binary['ARN'])['SecretBinary'] + secret = client.get_secret_value(SecretId=secret_binary["ARN"])["SecretBinary"] for _ in range(10): - assert cache.get_secret_binary("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) == secret + assert ( + cache.get_secret_binary( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + == secret + ) def test_get_secret_string_empty(self, client, secret_binary): cache = SecretCache(client=client) - assert cache.get_secret_string("{0}{1}{2}".format(TestAwsSecretsManagerCachingInteg.fixture_prefix, - inspect.stack()[0][3], - TestAwsSecretsManagerCachingInteg.uuid_suffix)) is None + assert ( + cache.get_secret_string( + "{0}{1}{2}".format( + TestAwsSecretsManagerCachingInteg.fixture_prefix, + inspect.stack()[0][3], + TestAwsSecretsManagerCachingInteg.uuid_suffix, + ) + ) + is None + ) diff --git a/test/unit/test_aws_secretsmanager_caching.py b/test/unit/test_aws_secretsmanager_caching.py index f00c560..b1fe036 100644 --- a/test/unit/test_aws_secretsmanager_caching.py +++ b/test/unit/test_aws_secretsmanager_caching.py @@ -13,6 +13,7 @@ """ Unit test suite for high-level functions in aws_secretsmanager_caching """ + import unittest import botocore @@ -27,21 +28,19 @@ class TestAwsSecretsManagerCaching(unittest.TestCase): - def setUp(self): pass def get_client(self, response={}, versions=None, version_response=None): - client = botocore.session.get_session().create_client( - 'secretsmanager', region_name='us-west-2') + client = botocore.session.get_session().create_client("secretsmanager", region_name="us-west-2") stubber = Stubber(client) - expected_params = {'SecretId': 'test'} + expected_params = {"SecretId": "test"} if versions: - response['VersionIdsToStages'] = versions - stubber.add_response('describe_secret', response, expected_params) + response["VersionIdsToStages"] = versions + stubber.add_response("describe_secret", response, expected_params) if version_response is not None: - stubber.add_response('get_secret_value', version_response) + stubber.add_response("get_secret_value", version_response) stubber.activate() return client @@ -54,9 +53,11 @@ def test_default_session(self): user_agent_extra = f"AwsSecretCache/{cache.__version__}" user_agent = cache._client.meta.config.user_agent - self.assertTrue(user_agent.find(user_agent_extra) > 0, - f"User agent: {user_agent} ; \ - does not include: {user_agent_extra}") + self.assertTrue( + user_agent.find(user_agent_extra) > 0, + f"User agent: {user_agent} ; \ + does not include: {user_agent_extra}", + ) except NoRegionError: pass @@ -65,142 +66,109 @@ def test_client_stub(self): def test_get_secret_string_none(self): cache = SecretCache(client=self.get_client()) - self.assertIsNone(cache.get_secret_string('test')) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_string_missing(self): response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'Name': 'test'} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) - self.assertIsNone(cache.get_secret_string('test')) + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"Name": "test"} + cache = SecretCache(client=self.get_client(response, versions, version_response)) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_string_no_current(self): response = {} - versions = { - '01234567890123456789012345678901': ['NOTCURRENT'] - } - version_response = {'Name': 'test'} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) - self.assertIsNone(cache.get_secret_string('test')) + versions = {"01234567890123456789012345678901": ["NOTCURRENT"]} + version_response = {"Name": "test"} + cache = SecretCache(client=self.get_client(response, versions, version_response)) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_string_no_versions(self): - response = {'Name': 'test'} + response = {"Name": "test"} cache = SecretCache(client=self.get_client(response)) - self.assertIsNone(cache.get_secret_string('test')) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_string_empty(self): response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} version_response = {} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) - self.assertIsNone(cache.get_secret_string('test')) + cache = SecretCache(client=self.get_client(response, versions, version_response)) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_string(self): - secret = 'mysecret' + secret = "mysecret" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} + cache = SecretCache(client=self.get_client(response, versions, version_response)) for _ in range(10): - self.assertEqual(secret, cache.get_secret_string('test')) + self.assertEqual(secret, cache.get_secret_string("test")) def test_get_secret_string_refresh(self): - secret = 'mysecret' + secret = "mysecret" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} cache = SecretCache( config=SecretCacheConfig(secret_refresh_interval=1), - client=self.get_client(response, - versions, - version_response)) + client=self.get_client(response, versions, version_response), + ) for _ in range(10): - self.assertEqual(secret, cache.get_secret_string('test')) + self.assertEqual(secret, cache.get_secret_string("test")) def test_get_secret_string_stage(self): - secret = 'mysecret' + secret = "mysecret" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} + cache = SecretCache(client=self.get_client(response, versions, version_response)) for _ in range(10): - self.assertEqual(secret, cache.get_secret_string('test', - 'AWSCURRENT')) + self.assertEqual(secret, cache.get_secret_string("test", "AWSCURRENT")) def test_get_secret_string_multiple(self): cache = SecretCache(client=self.get_client()) for _ in range(100): - self.assertIsNone(cache.get_secret_string('test')) + self.assertIsNone(cache.get_secret_string("test")) def test_get_secret_binary(self): - secret = b'01010101' + secret = b"01010101" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretBinary': secret} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretBinary": secret} + cache = SecretCache(client=self.get_client(response, versions, version_response)) for _ in range(10): - self.assertEqual(secret, cache.get_secret_binary('test')) + self.assertEqual(secret, cache.get_secret_binary("test")) def test_get_secret_binary_no_versions(self): cache = SecretCache(client=self.get_client()) - self.assertIsNone(cache.get_secret_binary('test')) + self.assertIsNone(cache.get_secret_binary("test")) def test_refresh_secret_now(self): - secret = 'mysecret' + secret = "mysecret" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} - cache = SecretCache(client=self.get_client(response, - versions, - version_response)) - secret = cache._get_cached_secret('test') + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} + cache = SecretCache(client=self.get_client(response, versions, version_response)) + secret = cache._get_cached_secret("test") self.assertIsNotNone(secret) old_refresh_time = secret._next_refresh_time - secret = cache._get_cached_secret('test') + secret = cache._get_cached_secret("test") self.assertTrue(old_refresh_time == secret._next_refresh_time) - cache.refresh_secret_now('test') + cache.refresh_secret_now("test") - secret = cache._get_cached_secret('test') + secret = cache._get_cached_secret("test") new_refresh_time = secret._next_refresh_time self.assertTrue(new_refresh_time > old_refresh_time) def test_get_secret_string_exception(self): - client = botocore.session.get_session().create_client( - 'secretsmanager', region_name='us-west-2') + client = botocore.session.get_session().create_client("secretsmanager", region_name="us-west-2") stubber = Stubber(client) cache = SecretCache(client=client) for _ in range(3): - stubber.add_client_error('describe_secret') + stubber.add_client_error("describe_secret") stubber.activate() - self.assertRaises(ClientError, cache.get_secret_binary, 'test') + self.assertRaises(ClientError, cache.get_secret_binary, "test") diff --git a/test/unit/test_cache_hook.py b/test/unit/test_cache_hook.py index d46eae5..d0948c8 100644 --- a/test/unit/test_cache_hook.py +++ b/test/unit/test_cache_hook.py @@ -13,6 +13,7 @@ """ Unit test suite for items module """ + import unittest import botocore @@ -28,11 +29,11 @@ class DummySecretCacheHook(SecretCacheHook): dict = {} def put(self, obj): - if 'SecretString' in obj: - obj['SecretString'] = obj['SecretString'] + "+hook_put" + if "SecretString" in obj: + obj["SecretString"] = obj["SecretString"] + "+hook_put" - if 'SecretBinary' in obj: - obj['SecretBinary'] = obj['SecretBinary'] + b'11111111' + if "SecretBinary" in obj: + obj["SecretBinary"] = obj["SecretBinary"] + b"11111111" key = len(self.dict) self.dict[key] = obj @@ -41,31 +42,29 @@ def put(self, obj): def get(self, cached_obj): obj = self.dict[cached_obj] - if 'SecretString' in obj: - obj['SecretString'] = obj['SecretString'] + "+hook_get" + if "SecretString" in obj: + obj["SecretString"] = obj["SecretString"] + "+hook_get" - if 'SecretBinary' in obj: - obj['SecretBinary'] = obj['SecretBinary'] + b'00000000' + if "SecretBinary" in obj: + obj["SecretBinary"] = obj["SecretBinary"] + b"00000000" return obj class TestSecretCacheHook(unittest.TestCase): - def setUp(self): pass def get_client(self, response={}, versions=None, version_response=None): - client = botocore.session.get_session().create_client( - 'secretsmanager', region_name='us-west-2') + client = botocore.session.get_session().create_client("secretsmanager", region_name="us-west-2") stubber = Stubber(client) - expected_params = {'SecretId': 'test'} + expected_params = {"SecretId": "test"} if versions: - response['VersionIdsToStages'] = versions - stubber.add_response('describe_secret', response, expected_params) + response["VersionIdsToStages"] = versions + stubber.add_response("describe_secret", response, expected_params) if version_response is not None: - stubber.add_response('get_secret_value', version_response) + stubber.add_response("get_secret_value", version_response) stubber.activate() return client @@ -73,40 +72,32 @@ def tearDown(self): pass def test_calls_hook_string(self): - secret = 'mysecret' + secret = "mysecret" hooked_secret = secret + "+hook_put+hook_get" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} hook = DummySecretCacheHook() config = SecretCacheConfig(secret_cache_hook=hook) - cache = SecretCache(config=config, client=self.get_client(response, - versions, - version_response)) + cache = SecretCache(config=config, client=self.get_client(response, versions, version_response)) for _ in range(10): - fetched_secret = cache.get_secret_string('test') + fetched_secret = cache.get_secret_string("test") self.assertTrue(fetched_secret.startswith(hooked_secret)) def test_calls_hook_binary(self): - secret = b'01010101' - hooked_secret = secret + b'1111111100000000' + secret = b"01010101" + hooked_secret = secret + b"1111111100000000" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretBinary': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretBinary": secret} hook = DummySecretCacheHook() config = SecretCacheConfig(secret_cache_hook=hook) - cache = SecretCache(config=config, client=self.get_client(response, - versions, - version_response)) + cache = SecretCache(config=config, client=self.get_client(response, versions, version_response)) for _ in range(10): - self.assertEqual(hooked_secret, cache.get_secret_binary('test')[0:24]) + self.assertEqual(hooked_secret, cache.get_secret_binary("test")[0:24]) diff --git a/test/unit/test_config.py b/test/unit/test_config.py index 118a5cc..7bf03c9 100644 --- a/test/unit/test_config.py +++ b/test/unit/test_config.py @@ -13,13 +13,13 @@ """ Unit test suite for items module """ + import unittest from aws_secretsmanager_caching.config import SecretCacheConfig class TestSecretCacheConfig(unittest.TestCase): - def setUp(self): pass @@ -27,10 +27,10 @@ def tearDown(self): pass def test_simple_config(self): - self.assertRaises(TypeError, SecretCacheConfig, no='one') + self.assertRaises(TypeError, SecretCacheConfig, no="one") def test_config_default_version_stage(self): - stage = 'nothing' + stage = "nothing" config = SecretCacheConfig(default_version_stage=stage) self.assertEqual(config.default_version_stage, stage) diff --git a/test/unit/test_decorators.py b/test/unit/test_decorators.py index f869e70..a7284b1 100644 --- a/test/unit/test_decorators.py +++ b/test/unit/test_decorators.py @@ -13,200 +13,192 @@ """ Unit test suite for decorators module """ + import json import unittest import botocore -from aws_secretsmanager_caching.decorators import InjectKeywordedSecretString, InjectSecretString +from aws_secretsmanager_caching.decorators import ( + InjectKeywordedSecretString, + InjectSecretString, +) from aws_secretsmanager_caching.secret_cache import SecretCache from botocore.stub import Stubber class TestAwsSecretsManagerCachingInjectKeywordedSecretStringDecorator(unittest.TestCase): - def get_client(self, response={}, versions=None, version_response=None): - client = botocore.session.get_session().create_client('secretsmanager', region_name='us-west-2') + client = botocore.session.get_session().create_client("secretsmanager", region_name="us-west-2") stubber = Stubber(client) - expected_params = {'SecretId': 'test'} + expected_params = {"SecretId": "test"} if versions: - response['VersionIdsToStages'] = versions - stubber.add_response('describe_secret', response, expected_params) + response["VersionIdsToStages"] = versions + stubber.add_response("describe_secret", response, expected_params) if version_response is not None: - stubber.add_response('get_secret_value', version_response) + stubber.add_response("get_secret_value", version_response) stubber.activate() return client def test_valid_json(self): - secret = { - 'username': 'secret_username', - 'password': 'secret_password' - } + secret = {"username": "secret_username", "password": "secret_password"} secret_string = json.dumps(secret) response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret_string} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret_string} cache = SecretCache(client=self.get_client(response, versions, version_response)) - @InjectKeywordedSecretString(secret_id='test', cache=cache, func_username='username', func_password='password') - def function_to_be_decorated(func_username, func_password, keyworded_argument='foo'): - self.assertEqual(secret['username'], func_username) - self.assertEqual(secret['password'], func_password) - self.assertEqual(keyworded_argument, 'foo') - return 'OK' + @InjectKeywordedSecretString( + secret_id="test", + cache=cache, + func_username="username", + func_password="password", + ) + def function_to_be_decorated(func_username, func_password, keyworded_argument="foo"): + self.assertEqual(secret["username"], func_username) + self.assertEqual(secret["password"], func_password) + self.assertEqual(keyworded_argument, "foo") + return "OK" - self.assertEqual(function_to_be_decorated(), 'OK') + self.assertEqual(function_to_be_decorated(), "OK") def test_valid_json_with_mixed_args(self): - secret = { - 'username': 'secret_username', - 'password': 'secret_password' - } + secret = {"username": "secret_username", "password": "secret_password"} secret_string = json.dumps(secret) response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret_string} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret_string} cache = SecretCache(client=self.get_client(response, versions, version_response)) - @InjectKeywordedSecretString(secret_id='test', cache=cache, arg2='username', arg3='password') - def function_to_be_decorated(arg1, arg2, arg3, arg4='bar'): - self.assertEqual(arg1, 'foo') - self.assertEqual(secret['username'], arg2) - self.assertEqual(secret['password'], arg3) - self.assertEqual(arg4, 'bar') + @InjectKeywordedSecretString(secret_id="test", cache=cache, arg2="username", arg3="password") + def function_to_be_decorated(arg1, arg2, arg3, arg4="bar"): + self.assertEqual(arg1, "foo") + self.assertEqual(secret["username"], arg2) + self.assertEqual(secret["password"], arg3) + self.assertEqual(arg4, "bar") - function_to_be_decorated('foo') + function_to_be_decorated("foo") def test_valid_json_with_no_secret_kwarg(self): - secret = { - 'username': 'secret_username', - 'password': 'secret_password' - } + secret = {"username": "secret_username", "password": "secret_password"} secret_string = json.dumps(secret) response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret_string} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret_string} cache = SecretCache(client=self.get_client(response, versions, version_response)) - @InjectKeywordedSecretString('test', cache=cache, func_username='username', func_password='password') - def function_to_be_decorated(func_username, func_password, keyworded_argument='foo'): - self.assertEqual(secret['username'], func_username) - self.assertEqual(secret['password'], func_password) - self.assertEqual(keyworded_argument, 'foo') + @InjectKeywordedSecretString("test", cache=cache, func_username="username", func_password="password") + def function_to_be_decorated(func_username, func_password, keyworded_argument="foo"): + self.assertEqual(secret["username"], func_username) + self.assertEqual(secret["password"], func_password) + self.assertEqual(keyworded_argument, "foo") function_to_be_decorated() def test_invalid_json(self): - secret = 'not json' + secret = "not json" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} cache = SecretCache(client=self.get_client(response, versions, version_response)) with self.assertRaises((RuntimeError, json.decoder.JSONDecodeError)): - @InjectKeywordedSecretString(secret_id='test', cache=cache, func_username='username', - func_passsword='password') - def function_to_be_decorated(func_username, func_password, keyworded_argument='foo'): + + @InjectKeywordedSecretString( + secret_id="test", + cache=cache, + func_username="username", + func_passsword="password", + ) + def function_to_be_decorated(func_username, func_password, keyworded_argument="foo"): return function_to_be_decorated() def test_missing_key(self): - secret = {'username': 'secret_username'} + secret = {"username": "secret_username"} secret_string = json.dumps(secret) response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret_string} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret_string} cache = SecretCache(client=self.get_client(response, versions, version_response)) with self.assertRaises((RuntimeError, ValueError)): - @InjectKeywordedSecretString(secret_id='test', cache=cache, func_username='username', - func_passsword='password') - def function_to_be_decorated(func_username, func_password, keyworded_argument='foo'): + + @InjectKeywordedSecretString( + secret_id="test", + cache=cache, + func_username="username", + func_passsword="password", + ) + def function_to_be_decorated(func_username, func_password, keyworded_argument="foo"): return function_to_be_decorated() class TestAwsSecretsManagerCachingInjectSecretStringDecorator(unittest.TestCase): - def get_client(self, response={}, versions=None, version_response=None): - client = botocore.session.get_session().create_client('secretsmanager', region_name='us-west-2') + client = botocore.session.get_session().create_client("secretsmanager", region_name="us-west-2") stubber = Stubber(client) - expected_params = {'SecretId': 'test'} + expected_params = {"SecretId": "test"} if versions: - response['VersionIdsToStages'] = versions - stubber.add_response('describe_secret', response, expected_params) + response["VersionIdsToStages"] = versions + stubber.add_response("describe_secret", response, expected_params) if version_response is not None: - stubber.add_response('get_secret_value', version_response) + stubber.add_response("get_secret_value", version_response) stubber.activate() return client def test_string(self): - secret = 'not json' + secret = "not json" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} cache = SecretCache(client=self.get_client(response, versions, version_response)) - @InjectSecretString('test', cache) + @InjectSecretString("test", cache) def function_to_be_decorated(arg1, arg2, arg3): self.assertEqual(arg1, secret) - self.assertEqual(arg2, 'foo') - self.assertEqual(arg3, 'bar') - return 'OK' + self.assertEqual(arg2, "foo") + self.assertEqual(arg3, "bar") + return "OK" - self.assertEqual(function_to_be_decorated('foo', 'bar'), 'OK') + self.assertEqual(function_to_be_decorated("foo", "bar"), "OK") def test_string_with_additional_kwargs(self): - secret = 'not json' + secret = "not json" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} cache = SecretCache(client=self.get_client(response, versions, version_response)) - @InjectSecretString('test', cache) + @InjectSecretString("test", cache) def function_to_be_decorated(arg1, arg2, arg3): self.assertEqual(arg1, secret) - self.assertEqual(arg2, 'foo') - self.assertEqual(arg3, 'bar') + self.assertEqual(arg2, "foo") + self.assertEqual(arg3, "bar") - function_to_be_decorated(arg2='foo', arg3='bar') + function_to_be_decorated(arg2="foo", arg3="bar") def test_string_with_class_method(self): - secret = 'not json' + secret = "not json" response = {} - versions = { - '01234567890123456789012345678901': ['AWSCURRENT'] - } - version_response = {'SecretString': secret} + versions = {"01234567890123456789012345678901": ["AWSCURRENT"]} + version_response = {"SecretString": secret} cache = SecretCache(client=self.get_client(response, versions, version_response)) class TestClass(unittest.TestCase): - @InjectSecretString('test', cache) + @InjectSecretString("test", cache) def class_method(self, arg1, arg2, arg3): self.assertEqual(arg1, secret) - self.assertEqual(arg2, 'foo') - self.assertEqual(arg3, 'bar') + self.assertEqual(arg2, "foo") + self.assertEqual(arg3, "bar") t = TestClass() t.class_method(arg2="foo", arg3="bar") diff --git a/test/unit/test_items.py b/test/unit/test_items.py index da93341..e777924 100644 --- a/test/unit/test_items.py +++ b/test/unit/test_items.py @@ -13,6 +13,7 @@ """ Unit test suite for items module """ + import unittest from datetime import timezone, datetime, timedelta from unittest.mock import Mock @@ -22,7 +23,6 @@ class TestSecretCacheObject(unittest.TestCase): - def setUp(self): pass @@ -30,7 +30,6 @@ def tearDown(self): pass class TestObject(SecretCacheObject): - def __init__(self, config, client, secret_id): super(TestSecretCacheObject.TestObject, self).__init__(config, client, secret_id) @@ -71,7 +70,9 @@ def test_refresh_now(self): # New refresh time will use the ttl and will be less than the old refresh time that was artificially set a month ahead # The new refresh time will be between now + ttl and now + (ttl / 2) if the secret was immediately refreshed - self.assertTrue(new_refresh_time < old_refresh_time and new_refresh_time < datetime.now(timezone.utc) + timedelta(ttl)) + self.assertTrue( + new_refresh_time < old_refresh_time and new_refresh_time < datetime.now(timezone.utc) + timedelta(ttl) + ) def test_datetime_fix_is_refresh_needed(self): secret_cached_object = TestSecretCacheObject.TestObject(SecretCacheConfig(), None, None) @@ -88,7 +89,8 @@ def test_datetime_fix_refresh(self): secret_cached_object = SecretCacheObject( SecretCacheConfig(exception_retry_delay_base=1, exception_retry_growth_factor=2), - None, None + None, + None, ) secret_cached_object._set_result = Mock(side_effect=Exception("exception used for test")) secret_cached_object._refresh_needed = True @@ -99,16 +101,14 @@ def test_datetime_fix_refresh(self): t_after = datetime.now(tz=timezone.utc) t_before_delay = t_before + timedelta( - milliseconds=secret_cached_object._config.exception_retry_delay_base * ( - secret_cached_object._config.exception_retry_growth_factor ** exp_factor - ) + milliseconds=secret_cached_object._config.exception_retry_delay_base + * (secret_cached_object._config.exception_retry_growth_factor**exp_factor) ) self.assertLessEqual(t_before_delay, secret_cached_object._next_retry_time) t_after_delay = t_after + timedelta( - milliseconds=secret_cached_object._config.exception_retry_delay_base * ( - secret_cached_object._config.exception_retry_growth_factor ** exp_factor - ) + milliseconds=secret_cached_object._config.exception_retry_delay_base + * (secret_cached_object._config.exception_retry_growth_factor**exp_factor) ) self.assertGreaterEqual(t_after_delay, secret_cached_object._next_retry_time) diff --git a/test/unit/test_lru.py b/test/unit/test_lru.py index 03c8547..5df3686 100644 --- a/test/unit/test_lru.py +++ b/test/unit/test_lru.py @@ -13,6 +13,7 @@ """ Unit test suite for high-level functions in aws_secretsmanager_caching """ + import unittest import pytest @@ -22,7 +23,6 @@ class TestLRUCache(unittest.TestCase): - def test_lru_cache_max(self): cache = LRUCache(max_size=10) for n in range(100): diff --git a/tox.ini b/tox.ini index 29142a9..960b45f 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,6 @@ # test suite on all supported python versions. To use it, "pip install tox" # and then run "tox" from this directory. - [tox] envlist = py38, py39, py310, py311, py312, flake8, pylint skip_missing_interpreters = true @@ -44,4 +43,3 @@ changedir = {toxinidir} description = invoke sphinx-build to build the HTML docs commands = sphinx-build -d "{toxworkdir}/docs_doctree" doc "{toxworkdir}/docs_out" --color -bhtml {posargs} python -c 'import pathlib; print("documentation available under file://\{0\}".format(pathlib.Path(r"{toxworkdir}") / "docs_out" / "index.html"))' -