diff --git a/.gitignore b/.gitignore
index 4ddba04..2fc8125 100644
--- a/.gitignore
+++ b/.gitignore
@@ -157,4 +157,6 @@ cython_debug/
# Custom exclusions
data/
+src/plugins/
+scratch/
.init/
diff --git a/LICENSE b/LICENSE
index 6d8fb3e..d4fbd3e 100755
--- a/LICENSE
+++ b/LICENSE
@@ -2,20 +2,21 @@ MIT License
Copyright 2024 Peter Nardi
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
-of the Software, and to permit persons to whom the Software is furnished to do
-so, subject to the following conditions:
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/Makefile b/Makefile
index 124ed4b..21df6b8 100644
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,14 @@ ifeq (,$(wildcard .init/setup))
@if [ ! -d "./data" ]; then \
mkdir -p data/geolite; \
fi
+ @if [ ! -d "./src/plugins/parsers" ]; then \
+ mkdir -p src/plugins/parsers; \
+ touch src/plugins/parsers/__init__.py; \
+ fi
+ @if [ ! -d "./src/plugins/code" ]; then \
+ mkdir -p src/plugins/code; \
+ touch src/plugins/code/__init__.py; \
+ fi
mkdir .init
touch .init/setup
poetry install --only=main
@@ -55,8 +63,8 @@ reset: clean ## remove venv, artifacts, and init directory
# --------------------------------------------
.PHONY: clean
-clean: ## cleanup python build artifacts
- @echo Cleaning python build artifacts
+clean: ## cleanup python runtime artifacts
+ @echo Cleaning python runtime artifacts
@find . -type d -name __pycache__ -exec rm -rf {} \; -prune
# --------------------------------------------
diff --git a/README.md b/README.md
index 5cfbedb..50459d5 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# banip
+# banip
Requirements
### Operating System
@@ -38,7 +47,7 @@ Machine, or [Windows Subsystem for Linux (WSL)][def7] is required.
You'll need a copy of the [MaxMind][def8] GeoLite2 database for
country-level geotagging of IP addresses. If you have a premium or
corporate MaxMind account, you're all set. If not, the free GeoLite2
-account will work just fine ([signup here][def5]). Once you login, on
+account will work just fine ([signup here][def5]). Once you login, using
the menu on the top right select:
```text
@@ -75,7 +84,8 @@ own use.
### Global list of blacklisted IPs
-Download the list as follows:
+banip uses the [ipsum][def9] threat intelligence blacklist. You can
+direct download it using:
```shell
curl -sL https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt > ipsum.txt
@@ -86,7 +96,9 @@ curl -sL https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt > ips
You'll need the [make][def6] utility installed (*it probably
already is*).
-## Setup
+[top](#top)
+
+## Setup
### Unpack GeoLite2 data
@@ -128,7 +140,7 @@ cp /ipsum.txt ./data/ipsum.txt
#### Target countries
```shell
-cp sample-targets.txt ./data/targets.txt
+cp ./samples/targets.txt ./data/targets.txt
```
Modify `./data/targets.txt` to select your desired target countries. The
@@ -137,7 +149,7 @@ comments in the file will guide you.
#### Custom whitelist (optional)
```shell
-cp sample-custom_whitelist.txt ./data/custom_whitelist.txt
+cp ./samples/custom_whitelist.txt ./data/custom_whitelist.txt
```
There may be IP addresses that banip will flag as malicious, but you
@@ -151,7 +163,7 @@ blank one when you run it.
#### Custom blacklist (optional)
```shell
-cp sample-custom_blacklist.txt ./data/custom_blacklist.txt
+cp ./samples/custom_blacklist.txt ./data/custom_blacklist.txt
```
The source database of banned IPs isn't perfect. You may determine that
@@ -194,16 +206,26 @@ data
└── targets.txt (required)
```
-## Running
+[top](#top)
+
+## Running
+
+After copying/tweaking all the required files, start by activating the
+python virtual environment:
+
+```shell
+source .venv/bin/activate
+```
-After copying/tweaking all the required files, start with this command
-to learn how to build your custom blacklist:
+Now run this command to learn how to build your custom blacklist:
```shell
-poetry run banip -h
+banip -h
```
-## Updating
+[top](#top)
+
+## Updating
MaxMind updates the GeoLite2 Country database on Tuesdays and Fridays,
and the list of blacklisted IPs (`ipsum.txt`) is updated daily. Pull
@@ -211,6 +233,53 @@ updated copies of both and put them in `banip/data/geolite` (for the
GeoLite2 data) and `banip/data` (for the `ipsum.txt` file). Run `banip`
again to generate an updated blacklist.
+[top](#top)
+
+## Plugins
+
+banip generates some data that you may want to use for other purposes.
+For example, everytime you build a new blacklist banip also creates and
+saves a textfile of all worldwide subnets, each tagged with a two-letter
+country code. The file is saved in:
+
+```'text
+./banip/data/haproxy_geo_ip.txt
+```
+
+Next time you run banip, open that file and take a look at it. Since you
+may have a very specific usecase for that data, you can write a plugin
+for banip which will make use of the build products for your purposes.
+
+A banip plugin consists of two required files:
+
+1. Code that generates an argument parser for your new command.
+2. Code that implements the functionality of your new command.
+
+All your plugins go into the `./src/plugins` directory in the
+appropriate subdirectory (either `parsers` or `code`). Your
+plugins are not under version control, so they will only reside on your
+machine.
+
+Look at the comments in these two files for instructions on how to
+create your own plugins:
+
+```text
+./samples/plugins/foo.py
+./samples/plugins/foo_args.py
+```
+
+[top](#top)
+
+## Uninstalling banip
+
+If you want out, just do this:
+
+```shell
+rm -rf ~/.banip
+```
+
+[top](#top)
+
[def]: https://aws.amazon.com/what-is/cidr/#:~:text=CIDR%20notation%20represents%20an%20IP,as%20192.168.1.0%2F22.
[def2]: https://python-poetry.org/
[def3]: https://docs.github.com/en/get-started/getting-started-with-git/ignoring-files
@@ -219,3 +288,4 @@ again to generate an updated blacklist.
[def4]: https://dev.maxmind.com/geoip/updating-databases#directly-downloading-databases
[def5]: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data
[def8]: https://www.maxmind.com/en/home
+[def9]: https://github.com/stamparm/ipsum
diff --git a/pyproject.toml b/pyproject.toml
index ed2fa5e..e6cd796 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "banip"
-version = "0.1.0"
+version = "1.0.0"
description = "Create a custom list of band ip for specific countries"
authors = ["Peter Nardi "]
license = "MIT"
diff --git a/sample-custom_blacklist.txt b/samples/custom_blacklist.txt
similarity index 100%
rename from sample-custom_blacklist.txt
rename to samples/custom_blacklist.txt
diff --git a/sample-custom_whitelist.txt b/samples/custom_whitelist.txt
similarity index 100%
rename from sample-custom_whitelist.txt
rename to samples/custom_whitelist.txt
diff --git a/samples/plugins/foo.py b/samples/plugins/foo.py
new file mode 100755
index 0000000..030e720
--- /dev/null
+++ b/samples/plugins/foo.py
@@ -0,0 +1,40 @@
+"""Rules for writing custom commands.
+
+1. Start with this sample command file and modify it.
+2. Make sure to give your file the same name as your command (e.g.
+ foo.py for the "foo" command).
+3. The entry point for your command will be "task_runner(args)", which
+ is defined below. Do not change this function signature. Most of your
+ code will go there, but you may add additional functions/modules as
+ needed. The only rule here is that you must use this pre-defined
+ entry point.
+4. "args" will be a argparse.Namespace variable that contains all the
+ inputs captured on the command line, which are defined by foo_args.py
+5. Your code should return None.
+5. Each newly defined function must have an associated argument parser,
+ defined in {function name}_args.py. See "foo_args.py" in
+ banip/samples/plugins as an example.
+6. Make sure to put this file in banip/src/plugins/code
+"""
+
+import argparse
+
+
+def task_runner(args: argparse.Namespace) -> None:
+ """Do something wonderful.
+
+ This is foo, so by definition it's wonderful!
+
+ Parameters
+ ----------
+ args : argparse.Namespace
+ Arguments passed on the command line.
+ """
+ total = args.first + args.second
+ print(f"The sum of {args.first} and {args.second} is {total}.")
+ print("Cool! Right?")
+ return
+
+
+if __name__ == "__main__":
+ pass
diff --git a/samples/plugins/foo_args.py b/samples/plugins/foo_args.py
new file mode 100644
index 0000000..0e7a456
--- /dev/null
+++ b/samples/plugins/foo_args.py
@@ -0,0 +1,54 @@
+"""Rules for writing custom argument parsers.
+
+1. Start with this sample argument parser and modify it.
+2. Make the filename for this parser the same as the filename of your
+ command, but with the '_args' suffix (e.g. foo_args.py for the
+ command defined by foo.py).
+3. Do not change the function signature of the "load_command_args"
+ function below.
+4. The variable COMMAND_NAME below is required. It must contain the name
+ of your command.
+5. The sp.add_parser() function call is required. You can add additional
+ arguments to the function call, but the "name", "help", and
+ "description" arguments are required. Modify the "msg" variable to
+ tailor your help and description messages.
+6. Add as many parser.add_argument() calls as you need.
+7. You may add additional functions/modules as needed. For example,
+ adding something like "from argparse import FileType" if one of your
+ command line arguments is a file type.
+8. Your code should return None.
+9. Make sure to put this file in banip/src/plugins/parsers
+"""
+
+from argparse import _SubParsersAction
+
+COMMAND_NAME = "foo"
+
+
+def load_command_args(sp: _SubParsersAction) -> None:
+ """Assemble the argument parser."""
+ msg = """This command takes two intergers on the command line, adds
+ them together, then prints the result. Isn't that wonderful!"""
+ parser = sp.add_parser(
+ name=COMMAND_NAME,
+ help=msg,
+ description=msg,
+ )
+
+ # Add an argument to the parser
+ msg = """The first variable to be added."""
+ parser.add_argument(
+ "first",
+ type=int,
+ help=msg,
+ )
+
+ # Add another argument to the parser
+ msg = """This is the second."""
+ parser.add_argument(
+ "second",
+ type=int,
+ help=msg,
+ )
+
+ return
diff --git a/sample-targets.txt b/samples/targets.txt
similarity index 100%
rename from sample-targets.txt
rename to samples/targets.txt
diff --git a/src/banip/__main__.py b/src/banip/__main__.py
index a70e3d7..d02df99 100755
--- a/src/banip/__main__.py
+++ b/src/banip/__main__.py
@@ -3,78 +3,113 @@
"""Entry point for banip."""
import argparse
+import importlib
+from pathlib import Path
+from types import ModuleType
-from banip.build_list import banned_ips
-from banip.contants import RENDERED_BLACKLIST
-from banip.utilities import check_ip
+from banip.constants import ARG_PARSERS_BASE
+from banip.constants import CUSTOM_ARG_PARSERS
+from banip.constants import CUSTOM_CODE
+from banip.utilities import wrap_tight
+
+# ======================================================================
+
+
+def collect_modules(start: Path) -> list[str]:
+ """Collect the names of all modules to import.
+
+ Parameters
+ ----------
+ start : Path
+ This the starting point (directory) for collection.
+
+ Returns
+ -------
+ list[str]
+ A list of module names.
+ """
+ module_names: list[str] = []
+ for p in start.iterdir():
+ if p.is_file() and p.name != "__init__.py":
+ if "plugins" in str(p):
+ prefix = "plugins.parsers"
+ else:
+ prefix = "parsers"
+ module_names.append(f"{prefix}.{p.stem}")
+ return module_names
+
+
+# ======================================================================
+
+
+def load_custom_module(cmd: str) -> ModuleType | None:
+ """Given the name of a command, return the associated module.
+
+ By design, the code associated with a given command must have the
+ same name as the command itself.
+
+ Parameters
+ ----------
+ cmd : str
+ The name of a banip command
+
+ Returns
+ -------
+ ModuleType | None
+ This will be a pointer to the imported python module.
+ """
+ for p in CUSTOM_CODE.iterdir():
+ if p.is_file():
+ if p.stem == cmd:
+ return importlib.import_module(f"plugins.code.{p.stem}")
+ return None
+
+
+# ======================================================================
def main() -> None:
"""Get user input and build the list of banned IP addresses."""
msg = """Generate and query IP blacklists for use with proxy servers
- (like HAProxy). Please review the README file at
- https://github.com/geozeke/ubuntu for detailed instructions on
- setting up banip."""
-
- epi = "Version: 0.1.0"
-
+ (like HAProxy). For help on any command, run: "banip {command} -h".
+ Please review the README file at https://github.com/geozeke/ubuntu
+ for detailed instructions on setting up banip."""
+ epi = "Version: 1.0.0"
parser = argparse.ArgumentParser(
description=msg,
epilog=epi,
)
+ subparsers = parser.add_subparsers(title="commands", dest="cmd")
- subparsers = parser.add_subparsers(title="Commands")
- msg = """Create a list of banned (blacklisted) client IP addresses
- to be used with a proxy server (like HAProxy) to block network
- access from those clients. Run \"banip build -h" for more."""
- subparser_build = subparsers.add_parser(name="build", help=msg)
- msg = """Check to see if a single IP address is found in the
- blacklist. Run \"banip check -h" for more."""
- subparser_check = subparsers.add_parser(name="check", help=msg)
-
- msg = """Output file that will contain the generated list of
- blacklisted IP addresses. If not provided, results will be saved to
- ./data/ip_blacklist.txt"""
- subparser_build.add_argument(
- "-o",
- "--outfile",
- type=argparse.FileType("w"),
- help=msg,
- )
+ # Dynamically load argument subparsers.
- msg = """Each banned IP address in the source database has a factor
- (from 1 to 10) indicating a level of confidence that the IP address
- is a malicious actor (higher is more confident). The default
- threshold used is 3. Anything less than that may result in false
- positives, but you may choose any threshold from 1 to 10. If you
- find you're getting false positives, just re-run banip with a higher
- threshold."""
- subparser_build.add_argument(
- "-t",
- "--threshold",
- type=int,
- help=msg,
- default=3,
- )
-
- msg = """This is the IPv4 or IPv6 address you're interested in.
- After you run banip for the first time, you can use the "check"
- subcommand to see if a single IP address is found in the blacklist.
- Making subsequent runs of banip to generate new blacklists will use
- the updated information for future IP checking."""
- subparser_check.add_argument(
- "ip",
- type=str,
- help=msg,
- )
+ module_names: list[str] = []
+ mod: ModuleType | None = None
+ module_names = collect_modules(ARG_PARSERS_BASE)
+ module_names += collect_modules(CUSTOM_ARG_PARSERS)
+ for mod_name in module_names:
+ mod = importlib.import_module(mod_name)
+ mod.load_command_args(subparsers)
args = parser.parse_args()
- if hasattr(args, "ip"):
- check_ip(args.ip)
- else:
- if not args.outfile:
- args.outfile = open(RENDERED_BLACKLIST, "w")
- banned_ips(args)
+ match (args.cmd):
+ case "build":
+ mod = importlib.import_module("banip.build")
+ case "check":
+ mod = importlib.import_module("banip.check")
+ case _:
+ if args.cmd:
+ if not (mod := load_custom_module(args.cmd)):
+ msg = f"""Code for a given command must have the
+ same name as the command itself. Make sure you have
+ a program file called \"{args.cmd}.py\" in
+ {CUSTOM_CODE}/"""
+ print(wrap_tight(msg))
+ mod = importlib.import_module("banip.null")
+ else:
+ mod = importlib.import_module("banip.null")
+ mod.task_runner(args)
+
return
diff --git a/src/banip/build_list.py b/src/banip/build.py
similarity index 86%
rename from src/banip/build_list.py
rename to src/banip/build.py
index 8f46329..21ec2a4 100755
--- a/src/banip/build_list.py
+++ b/src/banip/build.py
@@ -12,17 +12,17 @@
from tqdm import tqdm # type: ignore
-from banip.contants import BANNED_IPS
-from banip.contants import COUNTRY_NETS
-from banip.contants import CUSTOM_BLACKLIST
-from banip.contants import CUSTOM_WHITELIST
-from banip.contants import GEOLITE_4
-from banip.contants import GEOLITE_6
-from banip.contants import GEOLITE_LOC
-from banip.contants import IPS
-from banip.contants import PAD
-from banip.contants import RENDERED_BLACKLIST
-from banip.contants import TARGETS
+from banip.constants import BANNED_IPS
+from banip.constants import COUNTRY_NETS
+from banip.constants import CUSTOM_BLACKLIST
+from banip.constants import CUSTOM_WHITELIST
+from banip.constants import GEOLITE_4
+from banip.constants import GEOLITE_6
+from banip.constants import GEOLITE_LOC
+from banip.constants import IPS
+from banip.constants import PAD
+from banip.constants import RENDERED_BLACKLIST
+from banip.constants import TARGETS
from banip.utilities import clear
from banip.utilities import extract_ip
from banip.utilities import filter
@@ -31,7 +31,7 @@
from banip.utilities import tag_networks
-def banned_ips(args: Namespace) -> None:
+def task_runner(args: Namespace) -> None:
"""Generate a custom list of banned IP addresses.
Parameters
@@ -40,14 +40,24 @@ def banned_ips(args: Namespace) -> None:
Command line arguments.
"""
# Start by stubbing-out custom files if they're not already in
- # place.
+ # place. In the case of the output file, check for two things: (1)
+ # Was a file specified? If not, then save results to the default
+ # (RENDERED_BLACKLIST). (2) If the file was specified, was it the
+ # same name as the default? If so, there's no need to make a local
+ # copy of it after computations are complete.
clear()
+ make_local_copy = False
if not CUSTOM_BLACKLIST.exists():
f = open(CUSTOM_BLACKLIST, "w")
f.close()
if not CUSTOM_WHITELIST.exists():
f = open(CUSTOM_WHITELIST, "w")
f.close()
+ try:
+ if Path(args.outfile.name) != RENDERED_BLACKLIST:
+ make_local_copy = True
+ except AttributeError:
+ args.outfile = open(RENDERED_BLACKLIST, "w")
# Now make sure everything is in place
@@ -58,6 +68,7 @@ def banned_ips(args: Namespace) -> None:
GEOLITE_4,
GEOLITE_6,
GEOLITE_LOC,
+ RENDERED_BLACKLIST,
TARGETS,
]
for file in files:
@@ -129,10 +140,10 @@ def banned_ips(args: Namespace) -> None:
print()
bag_of_ips = []
- print(f"Building blacklisted IP list for country codes: {countries}")
+ print(f"Pulling blacklisted IP list for country codes: {countries}")
for ip in tqdm(
D["II"],
- desc="IPs",
+ desc="Blacklist IPs",
total=len(D["II"]),
colour="#bf80f2",
unit="IPs",
@@ -233,12 +244,13 @@ def banned_ips(args: Namespace) -> None:
for chunk in D[key]:
args.outfile.write(f"{format(chunk)}\n")
+ args.outfile.close()
+
# Save a copy of the generated IP blacklist to
# ./data/ip_blacklist.txt. This will be used when running banip to
# check an individual IP address.
- args.outfile.close()
- if Path(args.outfile.name) != RENDERED_BLACKLIST:
+ if make_local_copy:
shutil.copy(Path(args.outfile.name), RENDERED_BLACKLIST)
# Calculate final metrics and display results.
@@ -252,3 +264,7 @@ def banned_ips(args: Namespace) -> None:
print(f"Total banned IPs saved: {total_bans:>{PAD},d}")
return
+
+
+if __name__ == "__main__":
+ pass
diff --git a/src/banip/check.py b/src/banip/check.py
new file mode 100644
index 0000000..bf5ac79
--- /dev/null
+++ b/src/banip/check.py
@@ -0,0 +1,48 @@
+"""Taskrunner for check command."""
+
+import argparse
+import ipaddress as ipa
+
+from banip.constants import BANNED_IPS
+from banip.constants import RENDERED_BLACKLIST
+
+
+def task_runner(args: argparse.Namespace) -> None:
+ """Display available data for a particular IP address.
+
+ Parameters
+ ----------
+ args : argparse.Namespace
+ args.ip will be either IPv4 or IPv address of interest.
+ """
+ try:
+ ipa.ip_address(args.ip)
+ except ValueError:
+ print(f"{args.ip} is not a valid IP address.")
+ return
+
+ print()
+ found = False
+ if RENDERED_BLACKLIST.exists():
+ with open(RENDERED_BLACKLIST, "r") as f:
+ for line in f:
+ if args.ip in line:
+ source = RENDERED_BLACKLIST.name
+ print(f"{args.ip} found in {source}")
+ found = True
+ break
+
+ if BANNED_IPS.exists():
+ with open(BANNED_IPS, "r") as f:
+ for line in f:
+ if args.ip in line:
+ source = BANNED_IPS.name
+ hitcount = line.split()[1]
+ print(f"{args.ip} found in {source} with {hitcount} hits.")
+ found = True
+ break
+
+ if not found:
+ print(f"{args.ip} not found.")
+
+ return
diff --git a/src/banip/contants.py b/src/banip/constants.py
similarity index 86%
rename from src/banip/contants.py
rename to src/banip/constants.py
index c8f056b..44b898d 100644
--- a/src/banip/contants.py
+++ b/src/banip/constants.py
@@ -8,17 +8,20 @@
HOME = Path(__file__).parents[2]
+ARG_PARSERS_BASE = HOME / "src/parsers"
BANNED_IPS = HOME / "data/ipsum.txt"
COUNTRY_NETS = HOME / "data/haproxy_geo_ip.txt"
+CUSTOM_ARG_PARSERS = HOME / "src/plugins/parsers"
CUSTOM_BLACKLIST = HOME / "data/custom_blacklist.txt"
+CUSTOM_CODE = HOME / "src/plugins/code"
CUSTOM_WHITELIST = HOME / "data/custom_whitelist.txt"
-RENDERED_BLACKLIST = HOME / "data/ip_blacklist.txt"
GEOLITE_4 = HOME / "data/geolite/GeoLite2-Country-Blocks-IPv4.csv"
GEOLITE_6 = HOME / "data/geolite/GeoLite2-Country-Blocks-IPv6.csv"
GEOLITE_LOC = HOME / "data/geolite/GeoLite2-Country-Locations-en.csv"
-TARGETS = HOME / "data/targets.txt"
IPS = [IPv4Address, IPv6Address]
NETS = [IPv4Network, IPv6Network]
+RENDERED_BLACKLIST = HOME / "data/ip_blacklist.txt"
+TARGETS = HOME / "data/targets.txt"
# Padding for pretty printing.
PAD = 6
diff --git a/src/banip/country_analyzer.py b/src/banip/country_analyzer.py
deleted file mode 100755
index 2be6769..0000000
--- a/src/banip/country_analyzer.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#!/usr/bin/env python3
-
-"""Analyze stats for a single country (two-letter code).
-
-This is a *very* hacky utility that will allow me to pull stats for a
-single country. It uses the build products from banip. It's not
-production code, because it's heavily dependent on how my HAProxy logs
-are formatted and how I pull the data from grafana, so it will be tough
-to advertise it as available for general use. I'll still ship it with
-banip, and maybe I'll incorporate it as a core capability at some point
-in the future.
-"""
-
-import ipaddress as ipa
-import re
-import sys
-from typing import Any
-
-from banip.contants import BANNED_IPS
-from banip.contants import HOME
-from banip.contants import RENDERED_BLACKLIST
-from banip.utilities import clear
-
-try:
- target_country = sys.argv[1].upper()
-except IndexError:
- print("Please provide a two-letter country code.")
- sys.exit(1)
-
-regex = r"(?<=[\s=])(\w+)=([^=\s]+)(?=\s|$)"
-LOGS = HOME / "data/logs.txt"
-clear()
-
-files = [BANNED_IPS, RENDERED_BLACKLIST, LOGS]
-for file in files:
- if not file.exists():
- print(f"Cannot find {file}")
- sys.exit(1)
-
-with open(LOGS, "r") as f:
- logs = f.readlines()
- # The first 5 lines contain header information.
- logs = logs[6:]
-
-ipsum: dict[Any, int] = {}
-with open(BANNED_IPS, "r") as f:
- for line in f:
- if (item := line.strip()) and item[0] != "#":
- parts = item.split()
- ipsum[ipa.ip_address(parts[0])] = int(parts[1])
-
-blacklist: list[Any] = []
-with open(RENDERED_BLACKLIST, "r") as f:
- for line in f:
- if (item := line.strip()) and item[0] != "#":
- # skip subnets
- if "/" not in item:
- blacklist.append(ipa.ip_address(item))
-
-output: list[str] = []
-target_ips: dict[Any, list[str]] = {}
-for line in logs:
- groups = re.findall(regex, line)
- country = groups[-1][1]
- if country == target_country:
- ip = ipa.ip_address(groups[0][1])
- if ip not in blacklist:
- uri = groups[-2][1]
- if ip in target_ips:
- target_ips[ip].append(uri)
- else:
- target_ips[ip] = [uri]
-
-for ip, uris in target_ips.items():
- output.append(f"IP: {format(ip)}\n")
- if ip in ipsum:
- output.append(f"Found in ipsum, with confidence of {ipsum[ip]}\n")
- if len(uris) == 1:
- output.append(f"Total of {len(uris)} request\n")
- else:
- output.append(f"Total of {len(uris)} requests\n")
- counter = 1
- for uri in uris:
- output.append(f" {counter:>3}. {uri}\n")
- counter += 1
- output.append("\n")
-
-msg = "IPs found in the logs, but not in the blacklist"
-output.insert(0, f"Statistics for: {target_country}\n")
-output.insert(1, f"{len(target_ips)} {msg}.\n\n")
-
-print("".join(output).strip())
diff --git a/src/banip/null.py b/src/banip/null.py
new file mode 100644
index 0000000..03e7145
--- /dev/null
+++ b/src/banip/null.py
@@ -0,0 +1,12 @@
+"""Taskrunner for no command.
+
+This will be the default command, which simply allows the program to
+exit.
+"""
+
+import argparse
+
+
+def task_runner(args: argparse.Namespace) -> None:
+ """Do nothing."""
+ return
diff --git a/src/banip/py.typed b/src/banip/py.typed
new file mode 100644
index 0000000..e69de29
diff --git a/src/banip/utilities.py b/src/banip/utilities.py
index b41538d..baaa307 100644
--- a/src/banip/utilities.py
+++ b/src/banip/utilities.py
@@ -3,6 +3,7 @@
import csv
import ipaddress as ipa
import os
+import textwrap
from ipaddress import IPv4Address
from ipaddress import IPv4Network
from ipaddress import IPv6Network
@@ -11,14 +12,37 @@
from tqdm import tqdm # type: ignore
-from banip.contants import BANNED_IPS
-from banip.contants import COUNTRY_NETS
-from banip.contants import GEOLITE_4
-from banip.contants import GEOLITE_6
-from banip.contants import GEOLITE_LOC
-from banip.contants import RENDERED_BLACKLIST
+from banip.constants import COUNTRY_NETS
+from banip.constants import GEOLITE_4
+from banip.constants import GEOLITE_6
+from banip.constants import GEOLITE_LOC
-# ===============================================================
+# ======================================================================
+
+
+def wrap_tight(msg: str, columns=70) -> str:
+ """Clean up a multi-line docstring.
+
+ Take a multi-line docstring and wrap it cleanly as a paragraph to a
+ specified column width.
+
+ Parameters
+ ----------
+ msg : str
+ The docstring to be wrapped.
+ columns : int, optional
+ Column width for wrapping, by default 70.
+
+ Returns
+ -------
+ str
+ A wrapped paragraph.
+ """
+ clean = " ".join([t for token in msg.split("\n") if (t := token.strip())])
+ return textwrap.fill(clean, width=columns)
+
+
+# ======================================================================
def clear() -> None:
@@ -29,7 +53,7 @@ def clear() -> None:
os.system("clear" if os.name == "posix" else "cls")
-# ===============================================================
+# ======================================================================
def extract_ip(from_str: str) -> Any:
@@ -61,7 +85,7 @@ def extract_ip(from_str: str) -> Any:
return to_ip
-# ===============================================================
+# ======================================================================
def filter(fname: str | Path, metric: list[str] | int) -> list[Any]:
@@ -93,7 +117,7 @@ def filter(fname: str | Path, metric: list[str] | int) -> list[Any]:
f.seek(0)
for line in tqdm(
f,
- desc="Lines",
+ desc=" Total lines",
total=lines,
colour="#bf80f2",
unit="lines",
@@ -115,7 +139,7 @@ def filter(fname: str | Path, metric: list[str] | int) -> list[Any]:
return bag
-# ===============================================================
+# ======================================================================
def split46(bag_of_stuff: list[Any]) -> tuple[list[Any], list[Any]]:
@@ -146,7 +170,7 @@ def split46(bag_of_stuff: list[Any]) -> tuple[list[Any], list[Any]]:
return bag4, bag6
-# ===============================================================
+# ======================================================================
def tag_networks() -> None:
@@ -167,7 +191,7 @@ def tag_networks() -> None:
next(reader)
for country in tqdm(
reader,
- desc="Countries",
+ desc=" Countries",
total=lines,
colour="#bf80f2",
unit="countries",
@@ -248,51 +272,7 @@ def tag_networks() -> None:
print("Done\n")
-# ===============================================================
-
-
-def check_ip(ip: str) -> None:
- """Display available data for a particular IP address.
-
- Parameters
- ----------
- ip : str
- IPv4 or IPv address of interest.
- """
- try:
- ipa.ip_address(ip)
- except ValueError:
- print(f"{ip} is not a valid IP address.")
- return
-
- print()
- found = False
- if RENDERED_BLACKLIST.exists():
- with open(RENDERED_BLACKLIST, "r") as f:
- for line in f:
- if ip in line:
- source = RENDERED_BLACKLIST.name
- print(f"{ip} found in {source}")
- found = True
- break
-
- if BANNED_IPS.exists():
- with open(BANNED_IPS, "r") as f:
- for line in f:
- if ip in line:
- source = BANNED_IPS.name
- hitcount = line.split()[1]
- print(f"{ip} found in {source} with {hitcount} hits.")
- found = True
- break
-
- if not found:
- print(f"{ip} not found.")
-
- return
-
-
-# ===============================================================
+# ======================================================================
def ip_in_network(
diff --git a/src/parsers/__init__.py b/src/parsers/__init__.py
new file mode 100644
index 0000000..9c19ca2
--- /dev/null
+++ b/src/parsers/__init__.py
@@ -0,0 +1 @@
+"""Package."""
diff --git a/src/parsers/build_args.py b/src/parsers/build_args.py
new file mode 100644
index 0000000..bd5bcca
--- /dev/null
+++ b/src/parsers/build_args.py
@@ -0,0 +1,49 @@
+"""Argument parser for build command."""
+
+import argparse
+from argparse import _SubParsersAction
+
+COMMAND_NAME = "build"
+
+
+def load_command_args(sp: _SubParsersAction) -> None:
+ """Assemble the argument parser."""
+ msg = """Create a list of banned (blacklisted) client IP addresses
+ to be used with a proxy server (like HAProxy) to block network
+ access from those clients."""
+ parser = sp.add_parser(
+ name=COMMAND_NAME,
+ help=msg,
+ description=msg,
+ )
+
+ msg = """Output file that will contain the generated list of
+ blacklisted IP addresses. If not provided, results will be saved to
+ ./data/ip_blacklist.txt"""
+ parser.add_argument(
+ "-o",
+ "--outfile",
+ type=argparse.FileType("w"),
+ help=msg,
+ )
+
+ msg = """Each banned IP address in the source database has a factor
+ (from 1 to 10) indicating a level of confidence that the IP address
+ is a malicious actor (higher is more confident). The default
+ threshold used is 3. Anything less than that may result in false
+ positives, but you may choose any threshold from 1 to 10. If you
+ find you're getting false positives, just re-run banip with a higher
+ threshold."""
+ parser.add_argument(
+ "-t",
+ "--threshold",
+ type=int,
+ help=msg,
+ default=3,
+ )
+
+ return
+
+
+if __name__ == "__main__":
+ pass
diff --git a/src/parsers/check_args.py b/src/parsers/check_args.py
new file mode 100644
index 0000000..ed42917
--- /dev/null
+++ b/src/parsers/check_args.py
@@ -0,0 +1,29 @@
+"""Argument parser for check command."""
+
+from argparse import _SubParsersAction
+
+COMMAND_NAME = "check"
+
+
+def load_command_args(sp: _SubParsersAction) -> None:
+ """Assemble the argument parser."""
+ msg = """Check to see if a single IP address is found in the
+ blacklist."""
+ parser = sp.add_parser(
+ name=COMMAND_NAME,
+ help=msg,
+ description=msg,
+ )
+
+ msg = """This is the IPv4 or IPv6 address you're interested in.
+ After you run banip for the first time, you can use the "check"
+ command to see if a single IP address is found in the blacklist.
+ Making subsequent runs of banip to generate new blacklists will use
+ the updated information for future IP checking."""
+ parser.add_argument(
+ "ip",
+ type=str,
+ help=msg,
+ )
+
+ return