Skip to content

Commit

Permalink
refactor: refactor get_client_parameters
Browse files Browse the repository at this point in the history
* Now accepts request_or_attempt and credentials which are passed to get_lockout_parameters
* Use lockout parameters that consumed from get_lockout_parameters
  • Loading branch information
hirotasoshu authored and wannacfuture committed May 13, 2023
1 parent 86a028d commit 72add39
Showing 1 changed file with 40 additions and 20 deletions.
60 changes: 40 additions & 20 deletions axes/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable
from datetime import timedelta
from hashlib import sha256
from logging import getLogger
Expand Down Expand Up @@ -217,7 +218,6 @@ def get_client_http_accept(request: HttpRequest) -> str:
return request.META.get("HTTP_ACCEPT", "<unknown>")[:1025]


def get_client_parameters(username: str, ip_address: str, user_agent: str) -> list:
def get_lockout_parameters(
request_or_attempt: Union[HttpRequest, AccessBase],
credentials: Optional[dict] = None,
Expand All @@ -238,6 +238,15 @@ def get_lockout_parameters(
raise TypeError(
"settings.AXES_LOCKOUT_PARAMETERS needs to be a callable or iterable"
)


def get_client_parameters(
username: str,
ip_address: str,
user_agent: str,
request_or_attempt: Union[HttpRequest, AccessBase],
credentials: Optional[dict] = None,
) -> List[dict]:
"""
Get query parameters for filtering AccessAttempt queryset.
Expand All @@ -246,26 +255,35 @@ def get_lockout_parameters(
Returns list of dict, every item of list are separate parameters
"""
lockout_parameters = get_lockout_parameters(request_or_attempt, credentials)

if settings.AXES_ONLY_USER_FAILURES:
# 1. Only individual usernames can be tracked with parametrization
filter_query = [{"username": username}]
else:
if settings.AXES_LOCK_OUT_BY_USER_OR_IP:
# One of `username` or `IP address` is used
filter_query = [{"username": username}, {"ip_address": ip_address}]
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
# 2. A combination of username and IP address can be used as well
filter_query = [{"username": username, "ip_address": ip_address}]
else:
# 3. Default case is to track the IP address only, which is the most secure option
filter_query = [{"ip_address": ip_address}]
parameters_dict = {
"username": username,
"ip_address": ip_address,
"user_agent": user_agent,
}

if settings.AXES_USE_USER_AGENT:
# 4. The HTTP User-Agent can be used to track e.g. one browser
filter_query[0]["user_agent"] = user_agent
filter_kwargs = []

for parameter in lockout_parameters:
try:
if isinstance(parameter, str):
filter_kwarg = {parameter: parameters_dict[parameter]}
else:
filter_kwarg = {
combined_parameter: parameters_dict[combined_parameter]
for combined_parameter in parameter
}
filter_kwargs.append(filter_kwarg)

except KeyError as e:
error_msg = (
f"{e} lockout parameter is not allowed. "
f"Allowed parameters: {', '.join(parameters_dict.keys())}"
)
raise ValueError(error_msg) from e

return filter_query
return filter_kwargs


def make_cache_key_list(filter_kwargs_list: List[dict]) -> List[str]:
Expand Down Expand Up @@ -300,7 +318,9 @@ def get_client_cache_keys(
ip_address = get_client_ip_address(request_or_attempt)
user_agent = get_client_user_agent(request_or_attempt)

filter_kwargs_list = get_client_parameters(username, ip_address, user_agent)
filter_kwargs_list = get_client_parameters(
username, ip_address, user_agent, request_or_attempt, credentials
)

return make_cache_key_list(filter_kwargs_list)

Expand Down Expand Up @@ -343,7 +363,7 @@ def get_client_str(
client_dict["user_agent"] = user_agent
else:
# Other modes initialize the attributes that are used for the actual lockouts
client_list = get_client_parameters(username, ip_address, user_agent)
client_list = get_client_parameters(username, ip_address, user_agent, request)
client_dict = {}
for client in client_list:
client_dict.update(client)
Expand Down

0 comments on commit 72add39

Please sign in to comment.