Skip to content

Commit

Permalink
store request parameters in FeedRequestParams object in old feeds API…
Browse files Browse the repository at this point in the history
…s and remove unused code
  • Loading branch information
regulartim committed Jan 6, 2025
1 parent f1d9266 commit 503a975
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 166 deletions.
11 changes: 0 additions & 11 deletions api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,6 @@ def ordering_validation(ordering: str) -> str:
return ordering


class FeedsSerializer(serializers.Serializer):
feed_type = serializers.CharField(max_length=120)
attack_type = serializers.ChoiceField(choices=["scanner", "payload_request", "all"])
age = serializers.ChoiceField(choices=["persistent", "recent"])
format = serializers.ChoiceField(choices=["csv", "json", "txt"], default="json")

def validate_feed_type(self, feed_type):
logger.debug(f"FeedsSerializer - Validation feed_type: '{feed_type}'")
return feed_type_validation(feed_type, self.context["valid_feed_types"])


class FeedsRequestSerializer(serializers.Serializer):
feed_type = serializers.CharField(max_length=120)
attack_type = serializers.ChoiceField(choices=["scanner", "payload_request", "all"])
Expand Down
137 changes: 34 additions & 103 deletions api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from datetime import datetime, timedelta

from api.serializers import EnrichmentSerializer, FeedsRequestSerializer, FeedsResponseSerializer, FeedsSerializer
from api.serializers import EnrichmentSerializer, FeedsRequestSerializer, FeedsResponseSerializer
from certego_saas.apps.auth.backend import CookieTokenAuthentication
from certego_saas.ext.helpers import parse_humanized_range
from certego_saas.ext.pagination import CustomPageNumberPagination
Expand Down Expand Up @@ -77,6 +77,25 @@ def __init__(self, query_params: dict):
self.paginate = query_params.get("paginate", "false").lower()
self.format = query_params.get("format_", "json").lower()

def set_legacy_age(self, age: str):
"""Translates legacy age specification into max_age and min_days_seen attributes
and sets ordering accordingly.
Parameters:
age (str): Age of the data to filter (recent or persistent).
"""
match age:
case "recent":
self.max_age = "4"
self.min_days_seen = "1"
if "feed_type" in self.ordering:
self.ordering = "-last_seen"
case "persistent":
self.max_age = "14"
self.min_days_seen: "10"
if "feed_type" in self.ordering:
self.ordering = "-attack_count"


def get_valid_feed_types() -> frozenset[str]:
general_honeypots = GeneralHoneypot.objects.all().filter(active=True)
Expand All @@ -100,8 +119,10 @@ def feeds(request, feed_type, attack_type, age, format_):
"""
logger.info(f"request /api/feeds with params: feed type: {feed_type}, " f"attack_type: {attack_type}, Age: {age}, format: {format_}")

feed_params = FeedRequestParams({"feed_type": feed_type, "attack_type": attack_type, "format_": format_})
feed_params.set_legacy_age(age)
valid_feed_types = get_valid_feed_types()
iocs_queryset = get_queryset(request, feed_type, valid_feed_types, attack_type, age, format_)
iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
return feeds_response(request, iocs_queryset, feed_type, valid_feed_types, format_)


Expand All @@ -116,21 +137,16 @@ def feeds_pagination(request):
Returns:
Response: The paginated HTTP response with IOC data.
"""
params = request.query_params
logger.info(f"request /api/feeds with params: {params}")

logger.info(f"request /api/feeds with params: {request.query_params}")

feed_params = FeedRequestParams(request.query_params)
feed_params.set_legacy_age(request.query_params.get("age"))
valid_feed_types = get_valid_feed_types()
iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
paginator = CustomPageNumberPagination()
iocs_queryset = get_queryset(
request,
params["feed_type"],
valid_feed_types,
params["attack_type"],
params["age"],
"json",
)
iocs = paginator.paginate_queryset(iocs_queryset, request)
resp_data = feeds_response(request, iocs, params["feed_type"], valid_feed_types, "json", dict_only=True)
resp_data = feeds_response(request, iocs, feed_params.feed_type, valid_feed_types, "json", dict_only=True)
return paginator.get_paginated_response(resp_data)


Expand Down Expand Up @@ -168,14 +184,7 @@ def feeds_v2(request):
logger.info(f"request /api/feeds/v2 with params: {request.query_params}")
feed_params = FeedRequestParams(request.query_params)
valid_feed_types = get_valid_feed_types()

serializer = FeedsRequestSerializer(
data=vars(feed_params),
context={"valid_feed_types": valid_feed_types},
)
serializer.is_valid(raise_exception=True)

iocs_queryset = get_queryset_v2(request, feed_params, valid_feed_types)
iocs_queryset = get_queryset(request, feed_params, valid_feed_types)
verbose = feed_params.verbose == "true"
paginate = feed_params.paginate == "true"
if paginate:
Expand All @@ -186,92 +195,14 @@ def feeds_v2(request):
return feeds_response(request, iocs_queryset, feed_params.feed_type, valid_feed_types, feed_params.format, verbose=verbose)


def get_queryset(request, feed_type, valid_feed_types, attack_type, age, format_):
"""
Build a queryset to filter IOC data based on the request parameters.
Args:
request: The incoming request object.
feed_type (str): Type of feed (e.g., log4j, cowrie, etc.).
valid_feed_types (frozenset): The set of all valid feed types.
attack_type (str): Type of attack (e.g., all, specific attack types).
age (str): Age of the data to filter (e.g., recent, persistent).
format_ (str): Desired format of the response (e.g., json, csv, txt).
Returns:
QuerySet: The filtered queryset of IOC data.
"""
source = str(request.user)
logger.info(f"request from {source}. Feed type: {feed_type}, attack_type: {attack_type}, " f"Age: {age}, format: {format_}")

serializer = FeedsSerializer(
data={
"feed_type": feed_type,
"attack_type": attack_type,
"age": age,
"format": format_,
},
context={"valid_feed_types": valid_feed_types},
)
serializer.is_valid(raise_exception=True)

ordering = request.query_params.get("ordering")
# if ordering == "value" replace it with "name" (the corresponding field in the iocs model)
if ordering == "value":
ordering = "name"
elif ordering == "-value":
ordering = "-name"

query_dict = {}

if feed_type != "all":
if feed_type == "log4j" or feed_type == "cowrie":
query_dict[feed_type] = True
else:
# accept feed_type if it is in the general honeypots list
query_dict["general_honeypot__name__iexact"] = feed_type

if attack_type != "all":
query_dict[attack_type] = True

if age == "recent":
# everything in the last 3 days
three_days_ago = datetime.utcnow() - timedelta(days=3)
query_dict["last_seen__gte"] = three_days_ago
# if ordering == "feed_type" or None replace it with the default value "-last_seen"
# ordering by "feed_type" is done in feed_response function
if ordering is None or ordering == "feed_type" or ordering == "-feed_type":
ordering = "-last_seen"
iocs = IOC.objects.exclude(general_honeypot__active=False).filter(**query_dict).order_by(ordering).prefetch_related("general_honeypot")[:5000]
elif age == "persistent":
# scanners detected in the last 14 days
fourteen_days_ago = datetime.utcnow() - timedelta(days=14)
query_dict["last_seen__gte"] = fourteen_days_ago
# ... and at least detected 10 different days
number_of_days_seen = 10
query_dict["number_of_days_seen__gte"] = number_of_days_seen
# if ordering == "feed_type" or None replace it with the default value "-attack_count"
# ordering by "feed_type" is done in feed_response function
if ordering is None or ordering == "feed_type" or ordering == "-feed_type":
ordering = "-attack_count"
iocs = IOC.objects.exclude(general_honeypot__active=False).filter(**query_dict).order_by(ordering).prefetch_related("general_honeypot")[:5000]

# save request source for statistics
source_ip = str(request.META["REMOTE_ADDR"])
request_source = Statistics(source=source_ip)
request_source.save()

logger.info(f"Number of iocs returned: {len(iocs)}")
return iocs


def get_queryset_v2(request, feed_params, valid_feed_types):
def get_queryset(request, feed_params, valid_feed_types):
"""
Build a queryset to filter IOC data based on the request parameters.
Args:
request: The incoming request object.
feed_params: A FeedRequestParams instance.
valid_feed_types (frozenset): The set of all valid feed types.
Returns:
QuerySet: The filtered queryset of IOC data.
Expand Down Expand Up @@ -348,7 +279,7 @@ def feeds_response(request, iocs, feed_type, valid_feed_types, format_, dict_onl
text_lines.append(ioc.name)
text = "\n".join(text_lines)
return HttpResponse(text, content_type="text/plain")
elif format_ == "csv":
if format_ == "csv":
rows = []
rows.append([license_text])
for ioc in iocs:
Expand All @@ -361,7 +292,7 @@ def feeds_response(request, iocs, feed_type, valid_feed_types, format_, dict_onl
headers={"Content-Disposition": 'attachment; filename="feeds.csv"'},
status=200,
)
elif format_ == "json":
if format_ == "json":
# json
json_list = []
ioc_feed_type = ""
Expand Down
58 changes: 6 additions & 52 deletions tests/test_serializers.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,13 @@
import random
from itertools import product

from api.serializers import FeedsRequestSerializer, FeedsResponseSerializer, FeedsSerializer
from api.serializers import FeedsRequestSerializer, FeedsResponseSerializer
from django.test import TestCase
from greedybear.consts import PAYLOAD_REQUEST, SCANNER
from greedybear.models import IOC, GeneralHoneypot
from rest_framework.serializers import ValidationError


class FeedsSerializersTestCase(TestCase):
@classmethod
def setUpClass(self):
GeneralHoneypot.objects.create(
name="adbhoney",
active=True,
)

@classmethod
def tearDownClass(self):
# db clean
GeneralHoneypot.objects.all().delete()

def test_valid_fields(self):
feed_type_choices = ["all", "log4j", "cowrie", "adbhoney"]
attack_type_choices = ["all", "scanner", "payload_request"]
age_choices = ["recent", "persistent"]
format_choices = ["txt", "json", "csv"]
# genereted all the possible valid input data using cartesian product
valid_data_choices = product(feed_type_choices, attack_type_choices, age_choices, format_choices)

for element in valid_data_choices:
data_ = {"feed_type": element[0], "attack_type": element[1], "age": element[2], "format": element[3]}
serializer = FeedsSerializer(
data=data_,
context={"valid_feed_types": frozenset(feed_type_choices)},
)
valid = serializer.is_valid(raise_exception=True)
self.assertEqual(valid, True)

def test_invalid_fields(self):
valid_feed_types = frozenset(["all", "log4j", "cowrie", "adbhoney"])
data_ = {"feed_type": "invalid_feed_type", "attack_type": "invalid_attack_type", "age": "invalid_age", "format": "invalid_format"}
serializer = FeedsSerializer(
data=data_,
context={"valid_feed_types": valid_feed_types},
)
try:
serializer.is_valid(raise_exception=True)
except ValidationError:
self.assertIn("feed_type", serializer.errors)
self.assertIn("attack_type", serializer.errors)
self.assertIn("age", serializer.errors)
self.assertIn("format", serializer.errors)


class FeedsRequestSerializersTestCase(TestCase):
@classmethod
def setUpClass(self):
Expand Down Expand Up @@ -98,14 +52,14 @@ def test_invalid_fields(self):
data_ = {
"feed_type": "invalid_feed_type",
"attack_type": "invalid_attack_type",
"max_age": 0,
"min_days_seen": 0,
"max_age": "0",
"min_days_seen": "0",
"include_reputation": None,
"exclude_reputation": None,
"feed_size": 0,
"feed_size": "0",
"ordering": "invalid_ordering",
"verbose": None,
"paginate": None,
"verbose": "invalid_value",
"paginate": "invalid_value",
"format": "invalid_format",
}
serializer = FeedsRequestSerializer(
Expand Down

0 comments on commit 503a975

Please sign in to comment.