Skip to content

Commit

Permalink
Add backfill option
Browse files Browse the repository at this point in the history
Update version
  • Loading branch information
andy-isoc committed Sep 16, 2024
1 parent c8c24cd commit 9dcbede
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 60 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pip install django-ixp-tracker
2. Run `python manage.py migrate` to create the models.
3. Add the relevant settings to your config. `IXP_TRACKER_PEERING_DB_URL` will use a default if you don't provide a value so you probably don't need that. But you will need to set `IXP_TRACKER_PEERING_DB_KEY` to authenticate against the API.
4. Add `IXP_TRACKER_GEO_LOOKUP_FACTORY` to config with the path to your factory (see below).
5. Run the management command to import the data: `python manage.py ixp_tracker_import`
5. Run the management command to import the data: `python manage.py ixp_tracker_import` (This will sync the current data, if you want historical data you need to backfill first)

## ASN country and status data

Expand All @@ -38,6 +38,22 @@ If you don't provide this service yourself, it will default to a noop version. T

In order to implement such a component yourself, you should implement the Protocol `ixp_tracker.importers.ASNGeoLookup` and provide a factory function for your class.

## Backfilling data

You have the option of backfilling data from archived PeeringDb data. This can be done by running the import command with the `--backfill` option for each month you want to backfill:
```shell
python manage.py ixp_tracker_import --backfill <YYYMM>
```
The backfill currently process a single month at a time and will look for the earliest file for the relevant month at https://publicdata.caida.org/datasets/peeringdb/

IMPORTANT NOTE: due to the way the code tries to figure out when a member left an IXP, you should run the backfill strictly in date order and *before* syncing the current data.

## Running programmatically

If you'd like to run the import from code, rather than from the management command, you can call `importers.import_data()` directly.

It's not recommended to call any other functions yourself.

## Development

To contribute to this library, first checkout the code. Then create a new virtual environment:
Expand Down
2 changes: 2 additions & 0 deletions ixp_tracker/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@
IXP_TRACKER_PEERING_DB_KEY = None
except(TypeError, ValueError):
raise ImproperlyConfigured("IXP_TRACKER_PEERING_DB_KEY must be a string value")

DATA_ARCHIVE_URL = "https://publicdata.caida.org/datasets/peeringdb/{year}/{month:02d}/peeringdb_2_dump_{year}_{month:02d}_{day:02d}.json"
114 changes: 74 additions & 40 deletions ixp_tracker/importers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import ast
import json
from json.decoder import JSONDecodeError
import logging
from datetime import datetime, timedelta, timezone
from typing import Callable, Protocol

import requests
import dateutil.parser
from requests.exceptions import JSONDecodeError

from ixp_tracker.conf import IXP_TRACKER_PEERING_DB_KEY, IXP_TRACKER_PEERING_DB_URL
from ixp_tracker.conf import IXP_TRACKER_PEERING_DB_KEY, IXP_TRACKER_PEERING_DB_URL, DATA_ARCHIVE_URL
from ixp_tracker import models

logger = logging.getLogger("ixp_tracker")
Expand All @@ -21,13 +23,46 @@ def get_status(self, asn: int, as_at: datetime) -> str:
pass


def import_data(geo_lookup: ASNGeoLookup, reset: bool = False, page_limit: int = 200):
import_ixps()
logger.debug("Imported IXPs")
import_asns(geo_lookup, reset, page_limit)
logger.debug("Imported ASNs")
import_members(geo_lookup)
logger.debug("Imported members")
def import_data(
geo_lookup: ASNGeoLookup,
reset: bool = False,
processing_date: datetime = None,
page_limit: int = 200
):
if processing_date is None:
processing_date = datetime.utcnow().replace(tzinfo=timezone.utc)
import_ixps(processing_date)
logger.debug("Imported IXPs")
import_asns(geo_lookup, reset, page_limit)
logger.debug("Imported ASNs")
import_members(processing_date, geo_lookup)
logger.debug("Imported members")
else:
processing_date = processing_date.replace(day=1)
processing_month = processing_date.month
found = False
while processing_date.month == processing_month and not found:
url = DATA_ARCHIVE_URL.format(year=processing_date.year, month=processing_date.month, day=processing_date.day)
data = requests.get(url)
if data.status_code == 200:
found = True
else:
processing_date = processing_date + timedelta(days=1)
if not found:
logger.warning("Cannot find backfill data", extra={"backfill_date": processing_date})
return
backfill_raw = data.text
try:
backfill_data = json.loads(backfill_raw)
except JSONDecodeError:
# It seems some of the Peering dumps use single quotes so try and load using ast in this case
backfill_data = ast.literal_eval(backfill_raw)
ixp_data = backfill_data.get("ix", {"data": []}).get("data", [])
process_ixp_data(processing_date)(ixp_data)
asn_data = backfill_data.get("net", {"data": []}).get("data", [])
process_asn_data(geo_lookup)(asn_data)
member_data = backfill_data.get("netixlan", {"data": []}).get("data", [])
process_member_data(processing_date, geo_lookup)(member_data)


def get_data(endpoint: str, processor: Callable, limit: int = 0, last_updated: datetime = None) -> bool:
Expand Down Expand Up @@ -57,32 +92,32 @@ def get_data(endpoint: str, processor: Callable, limit: int = 0, last_updated: d
return True


def import_ixps() -> bool:
return get_data("/ix", process_ixp_data)
def import_ixps(processing_date) -> bool:
return get_data("/ix", process_ixp_data(processing_date))


def process_ixp_data(all_ixp_data):
reporting_date = datetime.utcnow().replace(tzinfo=timezone.utc)
for ixp_data in all_ixp_data:
try:
models.IXP.objects.update_or_create(
peeringdb_id=ixp_data["id"],
defaults={
"name": ixp_data["name"],
"long_name": ixp_data["name_long"],
"city": ixp_data["city"],
"website": ixp_data["website"],
"active_status": True,
"country": ixp_data["country"],
"created": ixp_data["created"],
"last_updated": ixp_data["updated"],
"last_active": reporting_date,
}
)
logger.debug("Creating new IXP record", extra={"id": ixp_data["id"]})
except Exception as e:
logger.warning("Cannot import IXP data", extra={"error": str(e)})
return True
def process_ixp_data(processing_date: datetime):
def do_process_ixp_data(all_ixp_data):
for ixp_data in all_ixp_data:
try:
models.IXP.objects.update_or_create(
peeringdb_id=ixp_data["id"],
defaults={
"name": ixp_data["name"],
"long_name": ixp_data["name_long"],
"city": ixp_data["city"],
"website": ixp_data["website"],
"active_status": True,
"country": ixp_data["country"],
"created": ixp_data["created"],
"last_updated": ixp_data["updated"],
"last_active": processing_date,
}
)
logger.debug("Creating new IXP record", extra={"id": ixp_data["id"]})
except Exception as e:
logger.warning("Cannot import IXP data", extra={"error": str(e)})
return do_process_ixp_data


def import_asns(geo_lookup: ASNGeoLookup, reset: bool = False, page_limit: int = 200) -> bool:
Expand Down Expand Up @@ -118,13 +153,12 @@ def process_asn_paged_data(all_asn_data):
return process_asn_paged_data


def import_members(geo_lookup: ASNGeoLookup) -> bool:
def import_members(processing_date: datetime, geo_lookup: ASNGeoLookup) -> bool:
logger.debug("Fetching IXP member data")
return get_data("/netixlan", process_member_data(geo_lookup))
return get_data("/netixlan", process_member_data(processing_date, geo_lookup))


def process_member_data(geo_lookup: ASNGeoLookup):
reporting_date = datetime.utcnow().replace(tzinfo=timezone.utc)
def process_member_data(processing_date: datetime, geo_lookup: ASNGeoLookup):

def do_process_member_data(all_member_data):
for member_data in all_member_data:
Expand All @@ -147,11 +181,11 @@ def do_process_member_data(all_member_data):
"last_updated": member_data["updated"],
"is_rs_peer": member_data["is_rs_peer"],
"speed": member_data["speed"],
"last_active": reporting_date,
"last_active": processing_date,
}
)
logger.debug("Imported IXP member record", extra=log_data)
start_of_month = reporting_date.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_of_month = processing_date.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
inactive = models.IXPMember.objects.filter(date_left=None, last_active__lt=start_of_month).all()
for member in inactive:
start_of_next_of_month = (member.last_active.replace(day=1) + timedelta(days=33)).replace(day=1)
Expand All @@ -161,7 +195,7 @@ def do_process_member_data(all_member_data):
logger.debug("Member flagged as left due to inactivity", extra={"member": member.asn.number})
candidates = models.IXPMember.objects.filter(date_left=None, asn__registration_country="ZZ").all()
for candidate in candidates:
if geo_lookup.get_status(candidate.asn.number, reporting_date) != "assigned":
if geo_lookup.get_status(candidate.asn.number, processing_date) != "assigned":
end_of_last_month_active = (candidate.last_active.replace(day=1) - timedelta(days=1))
candidate.date_left = end_of_last_month_active
candidate.save()
Expand Down
18 changes: 12 additions & 6 deletions ixp_tracker/management/commands/ixp_tracker_import.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import logging
import traceback
from datetime import datetime
from datetime import datetime, timezone

from django.core.management import BaseCommand

Expand Down Expand Up @@ -37,16 +37,22 @@ class Command(BaseCommand):
help = "Updates IXP data"

def add_arguments(self, parser):
parser.add_argument("--reset", action="store_true", default=False, help="Do a full reset rather than incremental update where appropriate")
parser.add_argument("--geo-lookup", type=str, help="The name of your geo lookup factory")
parser.add_argument("--reset-asns", action="store_true", default=False, help="Do a full reset of ASNs rather than incremental update")
parser.add_argument("--backfill", type=str, default=None, help="The month you would like to backfill data for")

def handle(self, *args, **options):
try:
logger.debug("Importing IXP data")
geo_lookup = load_geo_lookup(IXP_TRACKER_GEO_LOOKUP_FACTORY) or DefaultASNGeoLookup()
page_limit = 200
reset = options["reset"]
import_data(geo_lookup, reset, page_limit)
reset = options["reset_asns"]
backfill_date = options["backfill"]
if backfill_date is None:
import_data(geo_lookup, reset)
else:
processing_date = datetime.strptime(backfill_date, "%Y%m").replace(tzinfo=timezone.utc)
if reset:
logger.warning("The --reset option has no effect when running a backfill")
import_data(geo_lookup, False, processing_date)

logger.info("Import finished")
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "django-ixp-tracker"
version = "0.1"
version = "0.2"
description = "Library to retrieve and manipulate data about IXPs"
readme = "README.md"
requires-python = ">=3.8"
Expand Down
Loading

0 comments on commit 9dcbede

Please sign in to comment.