diff --git a/noxfile.py b/noxfile.py index f176452..3804f34 100644 --- a/noxfile.py +++ b/noxfile.py @@ -39,8 +39,6 @@ def should_skip(python: str, django: str) -> bool: # Django main requires Python 3.10+ return True - - if django == DJ50 and version(python) < version(PY310): # Django 5.0 requires Python 3.10+ return True diff --git a/pyproject.toml b/pyproject.toml index dca3036..12991be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,9 +3,7 @@ build-backend = "hatchling.build" requires = ["hatchling"] [project] -authors = [ - {name = "Josh Thomas", email = "josh@joshthomas.dev"} -] +authors = [{ name = "Josh Thomas", email = "josh@joshthomas.dev" }] classifiers = [ "Development Status :: 3 - Alpha", "Framework :: Django", @@ -21,15 +19,13 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: Implementation :: CPython" -] -dependencies = [ - "django>=4.2" + "Programming Language :: Python :: Implementation :: CPython", ] +dependencies = ["django>=4.2"] description = "A custom Django field for storing and securely accessing a 1Password vault item." dynamic = ["version"] keywords = [] -license = {file = "LICENSE"} +license = { file = "LICENSE" } name = "django-opfield" readme = "README.md" requires-python = ">=3.8" @@ -52,7 +48,7 @@ dev = [ "pytest-django", "pytest-randomly", "pytest-reverse", - "pytest-xdist" + "pytest-xdist", ] docs = [ "cogapp", @@ -61,7 +57,7 @@ docs = [ "sphinx", "sphinx-autobuild", "sphinx-copybutton", - "sphinx-inline-tabs" + "sphinx-inline-tabs", ] lint = ["pre-commit"] @@ -74,20 +70,14 @@ Source = "https://github.com/westerveltco/django-opfield" commit = true commit_message = ":bookmark: bump version {old_version} -> {new_version}" current_version = "0.1.0" -push = false # set to false for CI +push = false # set to false for CI tag = false version_pattern = "MAJOR.MINOR.PATCH[PYTAGNUM]" [tool.bumpver.file_patterns] -".copier/package.yml" = [ - 'current_version: {version}' -] -"src/django_opfield/__init__.py" = [ - '__version__ = "{version}"' -] -"tests/test_version.py" = [ - 'assert __version__ == "{version}"' -] +".copier/package.yml" = ['current_version: {version}'] +"src/django_opfield/__init__.py" = ['__version__ = "{version}"'] +"tests/test_version.py" = ['assert __version__ == "{version}"'] [tool.coverage.paths] source = ["src"] @@ -99,15 +89,12 @@ exclude_lines = [ "if not DEBUG:", "if settings.DEBUG:", "if TYPE_CHECKING:", - 'def __str__\(self\)\s?\-?\>?\s?\w*\:' + 'def __str__\(self\)\s?\-?\>?\s?\w*\:', ] fail_under = 75 [tool.coverage.run] -omit = [ - "src/django_opfield/migrations/*", - "tests/*" -] +omit = ["src/django_opfield/migrations/*", "tests/*"] source = ["django_opfield"] [tool.django-stubs] @@ -118,10 +105,7 @@ strict_settings = false indent = 2 [tool.hatch.build] -exclude = [ - ".*", - "Justfile" -] +exclude = [".*", "Justfile"] [tool.hatch.build.targets.wheel] packages = ["src/django_opfield"] @@ -134,9 +118,7 @@ check_untyped_defs = true exclude = "docs/.*\\.py$" mypy_path = "src/" no_implicit_optional = true -plugins = [ - "mypy_django_plugin.main" -] +plugins = ["mypy_django_plugin.main"] warn_redundant_casts = true warn_unused_configs = true warn_unused_ignores = true @@ -144,10 +126,7 @@ warn_unused_ignores = true [[tool.mypy.overrides]] ignore_errors = true ignore_missing_imports = true -module = [ - "django_opfield.*.migrations.*", - "tests.*" -] +module = ["django_opfield.*.migrations.*", "tests.*"] [tool.mypy_django_plugin] ignore_missing_model_attributes = true @@ -180,7 +159,7 @@ exclude = [ "dist", "migrations", "node_modules", - "venv" + "venv", ] extend-include = ["*.pyi?"] indent-width = 4 @@ -202,13 +181,13 @@ quote-style = "double" dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" # Allow autofix for all enabled rules (when `--fix`) is provided. fixable = ["A", "B", "C", "D", "E", "F", "I"] -ignore = ["E501", "E741"] # temporary +ignore = ["E501", "E741"] # temporary select = [ "B", # flake8-bugbear "E", # Pycodestyle "F", # Pyflakes "I", # isort - "UP" # pyupgrade + "UP", # pyupgrade ] unfixable = [] diff --git a/src/django_opfield/apps.py b/src/django_opfield/apps.py new file mode 100644 index 0000000..f4f50fb --- /dev/null +++ b/src/django_opfield/apps.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from django.apps import AppConfig + + +class DjangoOPFieldConfig(AppConfig): + name = "django_opfield" + label = "django_opfield" + verbose_name = "Django OPField" diff --git a/src/django_opfield/conf.py b/src/django_opfield/conf.py new file mode 100644 index 0000000..cb891d6 --- /dev/null +++ b/src/django_opfield/conf.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import os +import sys +from dataclasses import dataclass +from typing import Any + +from django.conf import settings + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import ( + override, # pyright: ignore[reportUnreachable] # pragma: no cover + ) + +OPFIELD_SETTINGS_NAME = "DJANGO_OPFIELD" + + +@dataclass(frozen=True) +class AppSettings: + @override + def __getattribute__(self, __name: str) -> Any: + user_settings = getattr(settings, OPFIELD_SETTINGS_NAME, {}) + return user_settings.get(__name, super().__getattribute__(__name)) + + @property + def OP_SERVICE_ACCOUNT_TOKEN(self) -> str: + return os.environ.get("OP_SERVICE_ACCOUNT_TOKEN", "") + + +app_settings = AppSettings() diff --git a/src/django_opfield/fields.py b/src/django_opfield/fields.py new file mode 100644 index 0000000..44aa09d --- /dev/null +++ b/src/django_opfield/fields.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import shutil +import subprocess +import sys +from typing import Any + +from django.db import models + +from django_opfield.conf import app_settings +from django_opfield.validators import OPURIValidator + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import ( + override, # pyright: ignore[reportUnreachable] # pragma: no cover + ) + + +class OPField(models.CharField): + description = "1Password secret" + + def __init__( + self, vaults: list[str] | None = None, *args: Any, **kwargs: Any + ) -> None: + self.vaults = vaults + kwargs.setdefault("max_length", 255) + super().__init__(*args, **kwargs) + self.validators.append(OPURIValidator(vaults=self.vaults)) + + @classmethod + def with_secret(cls, *args: Any, **kwargs: Any) -> tuple[OPField, property]: + op_uri = cls(*args, **kwargs) + + @property + def secret(self: models.Model) -> str | None: + if not app_settings.OP_SERVICE_ACCOUNT_TOKEN: + msg = "OP_SERVICE_ACCOUNT_TOKEN is not set" + raise ValueError(msg) + if shutil.which("op") is None: + msg = "The 'op' CLI command is not available" + raise OSError(msg) + field_value = getattr(self, op_uri.name) + result = subprocess.run(["op", "read", field_value], capture_output=True) + if result.returncode != 0: + err = result.stderr.decode("utf-8") + msg = f"Could not read secret from 1Password: {err}" + raise ValueError(msg) + secret = result.stdout.decode("utf-8").strip() + return secret + + @secret.setter + def secret(self: models.Model, value: str) -> None: + raise NotImplementedError("OPField does not support setting secret values") + + return op_uri, secret + + @override + def deconstruct(self): + name, path, args, kwargs = super().deconstruct() + if self.vaults is not None: + kwargs["vaults"] = self.vaults + return name, path, args, kwargs diff --git a/src/django_opfield/py.typed b/src/django_opfield/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/django_opfield/validators.py b/src/django_opfield/validators.py new file mode 100644 index 0000000..40e32cf --- /dev/null +++ b/src/django_opfield/validators.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import re +import sys +from typing import Any + +from django.core.exceptions import ValidationError +from django.core.validators import RegexValidator +from django.utils.deconstruct import deconstructible +from django.utils.regex_helper import ( + _lazy_re_compile, # pyright: ignore[reportPrivateUsage] +) + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import ( + override, # pyright: ignore[reportUnreachable] # pragma: no cover + ) + + +@deconstructible +class OPURIValidator(RegexValidator): + ul = "\u00a1-\uffff" # Unicode letters range (must not be a raw string). + + op_path_re = ( + r"(?P<vault>[^/]+)" # Vault: must be a non-empty string without slashes + r"/(?P<item>[^/]+)" # Item: must be a non-empty string without slashes + r"(?:/(?P<section>[^/]+))?" # Section: optional, non-empty string without slashes + r"/(?P<field>[^/]+)" # Field: must be a non-empty string without slashes + ) + + @property + def op_regex(self) -> re.Pattern[str]: + return _lazy_re_compile( + r"^op://" + self.op_path_re + r"$", + re.IGNORECASE, + ) + + message = "Enter a valid 1Password URI." + schemes = ["op"] + + def __init__(self, vaults: list[str] | None = None, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.vaults = vaults if vaults is not None else [] + + @override + def __call__(self, value: Any) -> None: + if not isinstance(value, str) or len(value) > 2048: + raise ValidationError(self.message, code="invalid", params={"value": value}) + + # Match the URL against the regex to validate structure + match = self.op_regex.match(value) + if not match: + raise ValidationError( + self.message, code="invalid_format", params={"value": value} + ) + + # Check if the vault is in the list of valid vaults + vault = match.group("vault") + if self.vaults and vault not in self.vaults: + raise ValidationError( + f"The vault '{vault}' is not a valid vault.", code="invalid_vault" + ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..246eea3 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import logging + +from django.conf import settings + +from .settings import DEFAULT_SETTINGS + +pytest_plugins = [] + + +def pytest_configure(config): + logging.disable(logging.CRITICAL) + + settings.configure(**DEFAULT_SETTINGS, **TEST_SETTINGS) + + +TEST_SETTINGS = { + "INSTALLED_APPS": [ + "tests", + ] +} diff --git a/tests/models.py b/tests/models.py new file mode 100644 index 0000000..db02524 --- /dev/null +++ b/tests/models.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from django.db import models + +from django_opfield.fields import OPField + + +class TestModel(models.Model): + op_uri, op_secret = OPField.with_secret() diff --git a/tests/test_fields.py b/tests/test_fields.py new file mode 100644 index 0000000..57aa3f3 --- /dev/null +++ b/tests/test_fields.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import os +from unittest.mock import patch + +import pytest +from django.core.exceptions import ValidationError +from django.db import models + +from django_opfield.fields import OPField +from django_opfield.validators import OPURIValidator + +from .models import TestModel + + +def test_init_with_defaults(): + field = OPField() + + assert field.max_length == 255 + assert field.vaults is None + assert isinstance(field, models.CharField) + + +def test_init_with_max_length(): + field = OPField(max_length=10) + + assert field.max_length == 10 + + +@pytest.mark.parametrize( + "vaults", + [ + None, + ["vault33"], + ["vault33", "vault31"], + ], +) +def test_init_with_vaults(vaults): + field = OPField(vaults=vaults) + + assert field.vaults == vaults + + +def test_init_validator(): + field = OPField() + + validators = [type(validator) for validator in field.validators] + + assert OPURIValidator in validators + + +def test_field_return(): + field, secret = OPField.with_secret() + + assert isinstance(field, OPField) + assert isinstance(secret, property) + + +def test_deconstruct_default(): + field = OPField() + + name, path, args, kwargs = field.deconstruct() + + assert name is None + assert path == "django_opfield.fields.OPField" + assert args == [] + assert kwargs.get("vaults") is None + assert kwargs.get("max_length") == 255 + + +def test_deconstruct_with_max_length(): + field = OPField(max_length=10) + + name, path, args, kwargs = field.deconstruct() + + assert kwargs.get("max_length") == 10 + + +def test_deconstruct_with_vaults(): + field = OPField(vaults=["vault33", "vault31"]) + name, path, args, kwargs = field.deconstruct() + + assert kwargs.get("vaults") == ["vault33", "vault31"] + + +@patch("subprocess.run") +@patch.dict(os.environ, {"OP_SERVICE_ACCOUNT_TOKEN": "token"}, clear=True) +def test_get_secret(mock_run): + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = b"secret value" + + model = TestModel(op_uri="op://vault/item/field") + + secret = model.op_secret + + mock_run.assert_called_once_with( + ["op", "read", "op://vault/item/field"], capture_output=True + ) + assert secret == "secret value" + + +@patch("subprocess.run") +def test_get_secret_no_token(mock_run): + mock_run.return_value.returncode = 1 + mock_run.return_value.stderr = b"error message" + + model = TestModel(op_uri="op://vault/item/field") + + with pytest.raises(ValueError) as exc_info: + _ = model.op_secret + + assert "OP_SERVICE_ACCOUNT_TOKEN is not set" in str(exc_info.value) + + +@patch("subprocess.run") +@patch.dict(os.environ, {"OP_SERVICE_ACCOUNT_TOKEN": "token"}, clear=True) +def test_get_secret_error(mock_run): + mock_run.return_value.returncode = 1 + mock_run.return_value.stderr = b"error message" + + model = TestModel(op_uri="op://vault/item/field") + + with pytest.raises(ValueError) as exc_info: + _ = model.op_secret + + assert "Could not read secret from 1Password" in str(exc_info.value) + + +@patch("shutil.which", return_value=None) +@patch.dict(os.environ, {"OP_SERVICE_ACCOUNT_TOKEN": "token"}, clear=True) +def test_get_secret_command_not_available(mock_which, db): + model = TestModel(op_uri="op://vault/item/field") + + with pytest.raises(OSError) as excinfo: + _ = model.op_secret + + assert "The 'op' CLI command is not available" in str(excinfo.value) + + +@patch("subprocess.run") +@patch.dict(os.environ, {"OP_SERVICE_ACCOUNT_TOKEN": "token"}, clear=True) +def test_set_secret_failure(mock_run): + model = TestModel(op_uri="op://vault/item/field") + + with pytest.raises(NotImplementedError) as exc_info: + model.op_secret = "new secret" + model.save() + + assert "OPField does not support setting secret values" in str(exc_info.value) + + +@pytest.mark.parametrize( + "valid_uri", + [ + "op://vault/item/field", + "op://vault/item/section/field", + ], +) +def test_model_with_valid_op_uri(valid_uri, db): + model = TestModel(op_uri=valid_uri) + model.full_clean() + model.save() + + assert model.op_uri == valid_uri + + +@pytest.mark.parametrize( + "invalid_uri", + [ + "invalid_uri", + "op://", + "op://vault", + "op://vault/item", + "https://example.com", + ], +) +def test_model_with_invalid_op_uri(invalid_uri, db): + model = TestModel(op_uri=invalid_uri) + + with pytest.raises(ValidationError) as excinfo: + model.full_clean() + model.save() + + assert "op_uri" in str(excinfo.value) diff --git a/tests/test_validators.py b/tests/test_validators.py new file mode 100644 index 0000000..2db011b --- /dev/null +++ b/tests/test_validators.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import pytest +from django.core.exceptions import ValidationError + +from django_opfield.validators import OPURIValidator + + +@pytest.mark.parametrize( + "valid_uri", + [ + "op://vault/item/field", + "op://vault/item/section/field", + ], +) +def test_valid_uri(valid_uri): + validator = OPURIValidator() + + assert validator(valid_uri) is None + + +@pytest.mark.parametrize( + "invalid_uri", + [ + "op://", + "op://vault", + "op://vault/item", + "https://example.com", + ], +) +def test_invalid_uri(invalid_uri): + validator = OPURIValidator() + + with pytest.raises(ValidationError): + assert validator(invalid_uri) + + +@pytest.mark.parametrize( + "valid_uri", + [ + "op://vault/item/field", + "op://vault/item/section/field", + ], +) +def test_valid_uri_with_vault(valid_uri): + validator = OPURIValidator(vaults=["vault"]) + + assert validator(valid_uri) is None + + +@pytest.mark.parametrize( + "valid_uri", + [ + "op://vault/item/field", + "op://vault/item/section/field", + ], +) +def test_valid_uri_with_mismatched_vault(valid_uri): + validator = OPURIValidator(vaults=["invalid"]) + + with pytest.raises(ValidationError): + assert validator(valid_uri) + + +@pytest.mark.parametrize( + "invalid_input", + [ + 1, + "a" * 2048, + ], +) +def test_invalid_input(invalid_input): + validator = OPURIValidator() + + with pytest.raises(ValidationError): + assert validator(invalid_input)