Skip to content

Commit

Permalink
Compactor (#25)
Browse files Browse the repository at this point in the history
* Fixed input validation.

* Code linting.

* Code linting.

* Built-out compact function.

* Documentation and tuning
1. Documented compact function
2. Set default for compaction to 0 (no compact)

* Compact function tuning.

* Code tuning.

* Refactoring.

* Code linting.

* Dependency updates.

* Code linting.

* Refactor and linting.

* Linting.

* Linting.

* Documentation linting.

* Code linting.

* Documentation linting.
  • Loading branch information
geozeke authored Dec 21, 2024
1 parent b16e4ba commit dc72dd4
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 125 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ cp <wherever you put them>/* ./data/geolite/
cp <wherever you put it>/ipsum.txt ./data/ipsum.txt
```

#### Target Countries
#### Targets

The global list of blacklisted IPs is massive. When you build a custom
blacklist with *banip*, it's carefully tailored to just the countries
you specify using a list of targets.

```shell
cp ./samples/targets.txt ./data/targets.txt
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "banip"
version = "1.1.2"
description = "Create a custom list of band ip for specific countries"
description = "Create a list of banned IPs for specific countries"
license = {file = "LICENSE"}
readme = {file = "README.md", content-type = "text/markdown"}
requires-python = ">=3.12,<3.13"
Expand Down
8 changes: 4 additions & 4 deletions samples/targets.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Use two-letter country codes, one per line. Either uppercase or
# lowercase is fine. Blank lines, spaces, and lines starting with '#'
# are ignored. NOTE: These are the ISO-3166 ALPHA2 codes, not the
# two-letter Top Level Domain names. For example, the two-letter TLD for
# United Kingdom is "uk", but the ISO-3166 code for United Kingdom is
# "gb". You can find a list of all the codes here:
# https://www.geonames.org/countries/
# two-letter Top Level Domain names (which may be different). For
# example, the two-letter TLD for the United Kingdom is "uk", but the
# ISO-3166 code for United Kingdom is "gb". You can find a list of all
# the codes here: https://www.geonames.org/countries/

# Norway
no
Expand Down
64 changes: 33 additions & 31 deletions src/banip/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

"""Build a custom list of banned IPs."""

import ipaddress as ipa
import shutil
import sys
from argparse import Namespace
from datetime import datetime as dt
from pathlib import Path
from typing import cast

from rich import box
from rich.console import Console
Expand All @@ -25,10 +25,11 @@
from banip.constants import RENDERED_BLACKLIST
from banip.constants import TARGETS
from banip.constants import AddressType
from banip.constants import NetworkType
from banip.utilities import compact
from banip.utilities import extract_ip
from banip.utilities import ip_in_network
from banip.utilities import load_ipsum
from banip.utilities import split_hybrid
from banip.utilities import tag_networks


Expand Down Expand Up @@ -84,23 +85,14 @@ def task_runner(args: Namespace) -> None:
# ------------------------------------------------------------------

# Load the custom blacklist and split it into separate lists of
# networks and addresses. Remove any duplicates using sets.
# addresses and networks. Remove any duplicates using sets.
console = Console()
msg = "Pruning custom blacklist"
with console.status(msg):
with open(CUSTOM_BLACKLIST, "r") as f:
custom: list[AddressType | NetworkType] = [
ip for line in f if (ip := extract_ip(line.strip()))
]
custom_nets = sorted(
list({token for token in custom if isinstance(token, NetworkType)}),
key=lambda x: int(x.network_address),
)
custom = {item for line in f if (item := extract_ip(line.strip()))}
custom_ips, custom_nets = split_hybrid(list(custom))
custom_nets_size = len(custom_nets)
custom_ips = sorted(
list({token for token in custom if isinstance(token, AddressType)}),
key=lambda x: int(x),
)
# Remove any custom IPs that are covered by existing custom
# subnets
custom_ips = [
Expand All @@ -124,9 +116,8 @@ def task_runner(args: Namespace) -> None:
for line in f
if (token := line.strip()) and token[0] != "#"
]
target_geolite = sorted(
[net for net in geolite_D if geolite_D[net] in countries],
key=lambda x: int(x.network_address),
_, target_geolite = split_hybrid(
[net for net in geolite_D if geolite_D[net] in countries]
)
target_geolite_size = len(target_geolite)
print(f"{msg:.<{PAD}}done")
Expand All @@ -147,17 +138,12 @@ def task_runner(args: Namespace) -> None:
# are not in the custom whitelist.
msg = "Pruning ipsum.txt"
with console.status(msg):
whitelist: list[AddressType] = []
with open(CUSTOM_WHITELIST, "r") as f:
for line in f:
try:
ip = ipa.ip_address(line.strip())
whitelist.append(ip)
except ValueError:
continue

whitelist = [
cast(AddressType, ip) for line in f if (ip := extract_ip(line.strip()))
]
ipsum_D = load_ipsum()
ipsum_L: list[AddressType] = [
ipsum_L = [
ip
for ip in ipsum_D
if (
Expand All @@ -174,8 +160,19 @@ def task_runner(args: Namespace) -> None:
and ipsum_D[ip] >= args.threshold
)
]
ipsum_L = sorted(ipsum_L, key=lambda x: int(x))
ipsum_size = len(ipsum_L)
print(f"{msg:.<{PAD}}done")

# ------------------------------------------------------------------

# Compact ipsum. A compact factor of 0 indicates no compaction.
msg = f"Compacting ipsum ({args.compact})"
with console.status(msg):
ipsum_ips, ipsum_nets = compact(
ip_list=ipsum_L, whitelist=whitelist, min_num=args.compact
)
ipsum_ips_size = len(ipsum_ips)
ipsum_nets_size = len(ipsum_nets)
ipsum_size = ipsum_ips_size + ipsum_nets_size
print(f"{msg:.<{PAD}}done")

# ------------------------------------------------------------------
Expand All @@ -189,7 +186,10 @@ def task_runner(args: Namespace) -> None:
custom_ips = [
ip
for ip in custom_ips
if ip not in ipsum_L
if ip not in ipsum_ips
and not ip_in_network(
ip=ip, networks=ipsum_nets, first=0, last=ipsum_nets_size - 1
)
and ip_in_network(
ip=ip, networks=target_geolite, first=0, last=target_geolite_size - 1
)
Expand All @@ -215,8 +215,10 @@ def task_runner(args: Namespace) -> None:
msg = "Rendering blacklist"
with console.status(msg):
with open(RENDERED_BLACKLIST, "w") as f:
for ip in ipsum_L:
for ip in ipsum_ips:
f.write(f"{ip}\n")
for net in ipsum_nets:
f.write(f"{net}\n")
now = dt.now().strftime("%Y-%m-%d %H:%M:%S")
f.write("\n# ------------custom entries -------------\n")
f.write(f"# Added on: {now}\n")
Expand All @@ -239,7 +241,7 @@ def task_runner(args: Namespace) -> None:
table.add_column(header="Value", justify="right")

table.add_row("Target Countries", f"{",".join(countries)}")
table.add_row("Blacklist IPs from ipsum.txt", f"{(ipsum_size):,d}")
table.add_row("Blacklist entries from ipsum.txt", f"{(ipsum_size):,d}")
table.add_row("Custom blacklist IPs", f"{(custom_ips_size):,d}")
table.add_row("Custom blacklist subnets", f"{(custom_nets_size):,d}")
table.add_row("Total entries saved", f"{(total_size):,d}")
Expand Down
9 changes: 5 additions & 4 deletions src/banip/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from banip.utilities import ip_in_network
from banip.utilities import load_ipsum
from banip.utilities import load_rendered_blacklist
from banip.utilities import split_hybrid


def task_runner(args: argparse.Namespace) -> None:
Expand All @@ -34,7 +35,7 @@ def task_runner(args: argparse.Namespace) -> None:
if not COUNTRY_NETS_DICT.exists():
msg = """
Some required files are missing. Make sure to build the
databases before checking for a particular ip address. Run
databases before checking for a particular IP address. Run
\'banip build -h\' for more information.
"""
print(textwrap.fill(text=" ".join(msg.split())))
Expand All @@ -53,7 +54,7 @@ def task_runner(args: argparse.Namespace) -> None:
# Load rendered blacklist
msg = "Loading rendered blacklist"
with console.status(msg):
rendered_nets, rendered_ips = load_rendered_blacklist()
rendered_ips, rendered_nets = load_rendered_blacklist()
print(f"{msg:.<{PAD}}done")

# Start building the table
Expand All @@ -62,13 +63,13 @@ def task_runner(args: argparse.Namespace) -> None:
table.add_column(header="Result", justify="right")

# Load the HAProxy countries dictionary, arrange sorted keys, and
# locate the two-letter country code for target ip.
# locate the two-letter country code for target IP.
msg = "Finding country of origin"
attribute = "Country Code"
with console.status(msg):
with open(COUNTRY_NETS_DICT, "rb") as f:
nets_D = pickle.load(f)
nets_L = sorted(nets_D.keys(), key=lambda x: int(x.network_address))
_, nets_L = split_hybrid(nets_D.keys())
if located_net := ip_in_network(
ip=target, networks=nets_L, first=0, last=len(nets_L) - 1
):
Expand Down
Loading

0 comments on commit dc72dd4

Please sign in to comment.