Skip to content

Commit

Permalink
Merge pull request #70 from koladev32/68-optional-ip-whitelisting-and…
Browse files Browse the repository at this point in the history
…-blacklisting-using-database-storage

68 optional ip whitelisting and blacklisting using database storage
  • Loading branch information
koladev32 authored Oct 27, 2024
2 parents 9c669a4 + 37ca7d2 commit 2bb00f2
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changelog
[Unreleased]
------------

- Adding IP whitelisting and blacklisting (#68)

[v2.1.0] - 2024-05-23
------------------

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ format:

migrations:
@echo "Running migrations..."
${bin}python -m scripts.makemigrations
${bin}python -m scripts.makemigrations ${app}


migrations-check:
@echo "Running migrations checks..."
${bin}python -m scripts.makemigrations --check
${bin}python -m scripts.makemigrations --check ${app}

test:
${bin}pytest && TEST_WITH_ROTATION=1 ${bin}pytest
4 changes: 4 additions & 0 deletions drf_simple_apikey/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class ApiKeyAdmin(admin.ModelAdmin):
"revoked",
"expiry_date",
"created",
"whitelisted_ips",
"blacklisted_ips",
)

list_filter = (
Expand All @@ -34,6 +36,8 @@ def get_readonly_fields(
"name",
"revoked",
"expiry_date",
"whitelisted_ips",
"blacklisted_ips",
)

return fields
Expand Down
11 changes: 11 additions & 0 deletions drf_simple_apikey/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from drf_simple_apikey.crypto import get_crypto
from drf_simple_apikey.models import APIKey
from drf_simple_apikey.parser import APIKeyParser
from drf_simple_apikey.settings import package_settings


class APIKeyAuthentication(BaseBackend):
Expand Down Expand Up @@ -68,6 +69,16 @@ def _authenticate_credentials(self, request, key):
if api_key.revoked:
raise exceptions.AuthenticationFailed("This API Key has been revoked.")

# IP address validation
client_ip = request.META.get(package_settings.IP_ADDRESS_HEADER)
if api_key.blacklisted_ips and client_ip in api_key.blacklisted_ips:
raise exceptions.AuthenticationFailed("Access denied from blacklisted IP.")

if api_key.whitelisted_ips and client_ip not in api_key.whitelisted_ips:
raise exceptions.AuthenticationFailed(
"Access restricted to specific IP addresses."
)

return api_key.entity, key

def authenticate_header(self, request):
Expand Down
1 change: 1 addition & 0 deletions drf_simple_apikey/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This modules provides the `ApiKeyCrypto` classes that contain
methods needed to generate, encrypt, and decrypt an API Key.
"""

import json
from copy import copy
from datetime import timedelta
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 5.0.3 on 2024-10-27 15:13

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("drf_simple_apikey", "0002_alter_apikey_options"),
]

operations = [
migrations.AddField(
model_name="apikey",
name="blacklisted_ips",
field=models.JSONField(
blank=True,
help_text="List of denied IP addresses for this API key.",
null=True,
),
),
migrations.AddField(
model_name="apikey",
name="whitelisted_ips",
field=models.JSONField(
blank=True,
help_text="List of allowed IP addresses for this API key.",
null=True,
),
),
]
11 changes: 11 additions & 0 deletions drf_simple_apikey/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ class AbstractAPIKey(models.Model):
)
created = models.DateTimeField(auto_now=True)

whitelisted_ips = models.JSONField(
blank=True,
null=True,
help_text="List of allowed IP addresses for this API key.",
)
blacklisted_ips = models.JSONField(
blank=True,
null=True,
help_text="List of denied IP addresses for this API key.",
)

objects = APIKeyManager()

def _has_expired(self) -> bool:
Expand Down
8 changes: 5 additions & 3 deletions drf_simple_apikey/rotation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def get_rotation_status():
cache.set(
"rotation_status",
rotation_status,
package_settings.ROTATION_PERIOD.total_seconds()
if rotation_status
else None,
(
package_settings.ROTATION_PERIOD.total_seconds()
if rotation_status
else None
),
) # Cache for the rotation period if true

return rotation_status
1 change: 1 addition & 0 deletions drf_simple_apikey/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"AUTHENTICATION_KEYWORD_HEADER": "Api-Key",
"ROTATION_PERIOD": timedelta(days=7),
"API_KEY_CLASS": "drf_simple_apikey.Apikey",
"IP_ADDRESS_HEADER": "REMOTE_ADDR",
}

REMOVED_SETTINGS = ()
Expand Down
1 change: 1 addition & 0 deletions example/ApiKey/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""

from django.contrib import admin
from django.urls import path, include

Expand Down
2 changes: 2 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def test_admin_readonly_fields(rf: RequestFactory, user) -> None:
"name",
"revoked",
"expiry_date",
"whitelisted_ips",
"blacklisted_ips",
)


Expand Down
110 changes: 110 additions & 0 deletions tests/test_ip_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest
from django.contrib.auth.models import User
from rest_framework import exceptions
from rest_framework.test import APIRequestFactory

from drf_simple_apikey.backends import APIKeyAuthentication
from drf_simple_apikey.settings import package_settings
from .fixtures.api_key import active_api_key
from .fixtures.user import user

pytestmark = pytest.mark.django_db


@pytest.fixture
def valid_request_with_whitelisted_ip(user, active_api_key):
"""Creates a valid request from a whitelisted IP address."""
factory = APIRequestFactory()
api_key, key = active_api_key
api_key.whitelisted_ips = ["127.0.0.1"]
api_key.save()

return factory.get(
"/test-request/",
REMOTE_ADDR="127.0.0.1",
HTTP_AUTHORIZATION=f"{package_settings.AUTHENTICATION_KEYWORD_HEADER} {key}",
)


@pytest.fixture
def valid_request_with_blacklisted_ip(user, active_api_key):
"""Creates a request from a blacklisted IP address."""
factory = APIRequestFactory()
api_key, key = active_api_key
api_key.blacklisted_ips = ["127.0.0.1"]
api_key.save()

return factory.get(
"/test-request/",
REMOTE_ADDR="127.0.0.1",
HTTP_AUTHORIZATION=f"{package_settings.AUTHENTICATION_KEYWORD_HEADER} {key}",
)


@pytest.fixture
def request_with_unlisted_ip(user, active_api_key):
"""Creates a request from an IP that is neither whitelisted nor blacklisted."""
factory = APIRequestFactory()
api_key, key = active_api_key
api_key.whitelisted_ips = ["192.168.0.1"] # Different IP than the request IP
api_key.save()

return factory.get(
"/test-request/",
REMOTE_ADDR="10.0.0.1",
HTTP_AUTHORIZATION=f"{package_settings.AUTHENTICATION_KEYWORD_HEADER} {key}",
)


@pytest.fixture
def api_key_authentication():
return APIKeyAuthentication()


@pytest.mark.django_db
class TestApiKeyAuthenticationWithIPManagement:
pytestmark = pytest.mark.django_db

def test_authenticate_valid_request_with_whitelisted_ip(
self, valid_request_with_whitelisted_ip, api_key_authentication
):
"""Tests that a request from a whitelisted IP is authenticated successfully."""
entity, _ = api_key_authentication.authenticate(
valid_request_with_whitelisted_ip
)
assert isinstance(entity, User)

def test_authenticate_denied_for_blacklisted_ip(
self, valid_request_with_blacklisted_ip, api_key_authentication
):
"""Tests that a request from a blacklisted IP is denied."""
with pytest.raises(
exceptions.AuthenticationFailed, match=r"Access denied from blacklisted IP."
):
api_key_authentication.authenticate(valid_request_with_blacklisted_ip)

def test_authenticate_denied_for_unlisted_ip_with_existing_whitelist(
self, request_with_unlisted_ip, api_key_authentication
):
"""Tests that a request from an IP not in the whitelist is denied if a whitelist exists."""
with pytest.raises(
exceptions.AuthenticationFailed,
match=r"Access restricted to specific IP addresses.",
):
api_key_authentication.authenticate(request_with_unlisted_ip)

def test_authenticate_allowed_for_request_with_no_ip_restrictions(
self, user, active_api_key, api_key_authentication
):
"""Tests that a request with no IP restrictions is authenticated successfully."""
factory = APIRequestFactory()
_, key = active_api_key

request = factory.get(
"/test-request/",
REMOTE_ADDR="10.0.0.1",
HTTP_AUTHORIZATION=f"{package_settings.AUTHENTICATION_KEYWORD_HEADER} {key}",
)

entity, _ = api_key_authentication.authenticate(request)
assert isinstance(entity, User)

0 comments on commit 2bb00f2

Please sign in to comment.