From 1ef9d295d2530cd35dc62adbcb9bf9c03492f16e Mon Sep 17 00:00:00 2001 From: Peter Nardi Date: Fri, 15 Mar 2024 07:42:44 -0400 Subject: [PATCH] Plugins (#5) * Initial plugin implementation. * Code tuning. * Doc linting and directory management. * README linting. * Renamed argument_parsers to parsers * Documentation linting. --- .gitignore | 2 + LICENSE | 31 ++-- Makefile | 12 +- README.md | 96 +++++++++-- pyproject.toml | 2 +- .../custom_blacklist.txt | 0 .../custom_whitelist.txt | 0 samples/plugins/foo.py | 40 +++++ samples/plugins/foo_args.py | 54 +++++++ sample-targets.txt => samples/targets.txt | 0 src/banip/__main__.py | 151 +++++++++++------- src/banip/{build_list.py => build.py} | 50 ++++-- src/banip/check.py | 48 ++++++ src/banip/{contants.py => constants.py} | 7 +- src/banip/country_analyzer.py | 92 ----------- src/banip/null.py | 12 ++ src/banip/py.typed | 0 src/banip/utilities.py | 96 +++++------ src/parsers/__init__.py | 1 + src/parsers/build_args.py | 49 ++++++ src/parsers/check_args.py | 29 ++++ 21 files changed, 514 insertions(+), 258 deletions(-) rename sample-custom_blacklist.txt => samples/custom_blacklist.txt (100%) rename sample-custom_whitelist.txt => samples/custom_whitelist.txt (100%) create mode 100755 samples/plugins/foo.py create mode 100644 samples/plugins/foo_args.py rename sample-targets.txt => samples/targets.txt (100%) rename src/banip/{build_list.py => build.py} (86%) create mode 100644 src/banip/check.py rename src/banip/{contants.py => constants.py} (86%) delete mode 100755 src/banip/country_analyzer.py create mode 100644 src/banip/null.py create mode 100644 src/banip/py.typed create mode 100644 src/parsers/__init__.py create mode 100644 src/parsers/build_args.py create mode 100644 src/parsers/check_args.py diff --git a/.gitignore b/.gitignore index 4ddba04..2fc8125 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,6 @@ cython_debug/ # Custom exclusions data/ +src/plugins/ +scratch/ .init/ diff --git a/LICENSE b/LICENSE index 6d8fb3e..d4fbd3e 100755 --- a/LICENSE +++ b/LICENSE @@ -2,20 +2,21 @@ MIT License Copyright 2024 Peter Nardi -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies -of the Software, and to permit persons to whom the Software is furnished to do -so, subject to the following conditions: +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile index 124ed4b..21df6b8 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,14 @@ ifeq (,$(wildcard .init/setup)) @if [ ! -d "./data" ]; then \ mkdir -p data/geolite; \ fi + @if [ ! -d "./src/plugins/parsers" ]; then \ + mkdir -p src/plugins/parsers; \ + touch src/plugins/parsers/__init__.py; \ + fi + @if [ ! -d "./src/plugins/code" ]; then \ + mkdir -p src/plugins/code; \ + touch src/plugins/code/__init__.py; \ + fi mkdir .init touch .init/setup poetry install --only=main @@ -55,8 +63,8 @@ reset: clean ## remove venv, artifacts, and init directory # -------------------------------------------- .PHONY: clean -clean: ## cleanup python build artifacts - @echo Cleaning python build artifacts +clean: ## cleanup python runtime artifacts + @echo Cleaning python runtime artifacts @find . -type d -name __pycache__ -exec rm -rf {} \; -prune # -------------------------------------------- diff --git a/README.md b/README.md index 5cfbedb..50459d5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# banip +# banip Requirements ### Operating System @@ -38,7 +47,7 @@ Machine, or [Windows Subsystem for Linux (WSL)][def7] is required. You'll need a copy of the [MaxMind][def8] GeoLite2 database for country-level geotagging of IP addresses. If you have a premium or corporate MaxMind account, you're all set. If not, the free GeoLite2 -account will work just fine ([signup here][def5]). Once you login, on +account will work just fine ([signup here][def5]). Once you login, using the menu on the top right select: ```text @@ -75,7 +84,8 @@ own use. ### Global list of blacklisted IPs -Download the list as follows: +banip uses the [ipsum][def9] threat intelligence blacklist. You can +direct download it using: ```shell curl -sL https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt > ipsum.txt @@ -86,7 +96,9 @@ curl -sL https://raw.githubusercontent.com/stamparm/ipsum/master/ipsum.txt > ips You'll need the [make][def6] utility installed (*it probably already is*). -## Setup +[top](#top) + +## Setup ### Unpack GeoLite2 data @@ -128,7 +140,7 @@ cp /ipsum.txt ./data/ipsum.txt #### Target countries ```shell -cp sample-targets.txt ./data/targets.txt +cp ./samples/targets.txt ./data/targets.txt ``` Modify `./data/targets.txt` to select your desired target countries. The @@ -137,7 +149,7 @@ comments in the file will guide you. #### Custom whitelist (optional) ```shell -cp sample-custom_whitelist.txt ./data/custom_whitelist.txt +cp ./samples/custom_whitelist.txt ./data/custom_whitelist.txt ``` There may be IP addresses that banip will flag as malicious, but you @@ -151,7 +163,7 @@ blank one when you run it. #### Custom blacklist (optional) ```shell -cp sample-custom_blacklist.txt ./data/custom_blacklist.txt +cp ./samples/custom_blacklist.txt ./data/custom_blacklist.txt ``` The source database of banned IPs isn't perfect. You may determine that @@ -194,16 +206,26 @@ data └── targets.txt (required) ``` -## Running +[top](#top) + +## Running + +After copying/tweaking all the required files, start by activating the +python virtual environment: + +```shell +source .venv/bin/activate +``` -After copying/tweaking all the required files, start with this command -to learn how to build your custom blacklist: +Now run this command to learn how to build your custom blacklist: ```shell -poetry run banip -h +banip -h ``` -## Updating +[top](#top) + +## Updating MaxMind updates the GeoLite2 Country database on Tuesdays and Fridays, and the list of blacklisted IPs (`ipsum.txt`) is updated daily. Pull @@ -211,6 +233,53 @@ updated copies of both and put them in `banip/data/geolite` (for the GeoLite2 data) and `banip/data` (for the `ipsum.txt` file). Run `banip` again to generate an updated blacklist. +[top](#top) + +## Plugins + +banip generates some data that you may want to use for other purposes. +For example, everytime you build a new blacklist banip also creates and +saves a textfile of all worldwide subnets, each tagged with a two-letter +country code. The file is saved in: + +```'text +./banip/data/haproxy_geo_ip.txt +``` + +Next time you run banip, open that file and take a look at it. Since you +may have a very specific usecase for that data, you can write a plugin +for banip which will make use of the build products for your purposes. + +A banip plugin consists of two required files: + +1. Code that generates an argument parser for your new command. +2. Code that implements the functionality of your new command. + +All your plugins go into the `./src/plugins` directory in the +appropriate subdirectory (either `parsers` or `code`). Your +plugins are not under version control, so they will only reside on your +machine. + +Look at the comments in these two files for instructions on how to +create your own plugins: + +```text +./samples/plugins/foo.py +./samples/plugins/foo_args.py +``` + +[top](#top) + +## Uninstalling banip + +If you want out, just do this: + +```shell +rm -rf ~/.banip +``` + +[top](#top) + [def]: https://aws.amazon.com/what-is/cidr/#:~:text=CIDR%20notation%20represents%20an%20IP,as%20192.168.1.0%2F22. [def2]: https://python-poetry.org/ [def3]: https://docs.github.com/en/get-started/getting-started-with-git/ignoring-files @@ -219,3 +288,4 @@ again to generate an updated blacklist. [def4]: https://dev.maxmind.com/geoip/updating-databases#directly-downloading-databases [def5]: https://dev.maxmind.com/geoip/geolite2-free-geolocation-data [def8]: https://www.maxmind.com/en/home +[def9]: https://github.com/stamparm/ipsum diff --git a/pyproject.toml b/pyproject.toml index ed2fa5e..e6cd796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "banip" -version = "0.1.0" +version = "1.0.0" description = "Create a custom list of band ip for specific countries" authors = ["Peter Nardi "] license = "MIT" diff --git a/sample-custom_blacklist.txt b/samples/custom_blacklist.txt similarity index 100% rename from sample-custom_blacklist.txt rename to samples/custom_blacklist.txt diff --git a/sample-custom_whitelist.txt b/samples/custom_whitelist.txt similarity index 100% rename from sample-custom_whitelist.txt rename to samples/custom_whitelist.txt diff --git a/samples/plugins/foo.py b/samples/plugins/foo.py new file mode 100755 index 0000000..030e720 --- /dev/null +++ b/samples/plugins/foo.py @@ -0,0 +1,40 @@ +"""Rules for writing custom commands. + +1. Start with this sample command file and modify it. +2. Make sure to give your file the same name as your command (e.g. + foo.py for the "foo" command). +3. The entry point for your command will be "task_runner(args)", which + is defined below. Do not change this function signature. Most of your + code will go there, but you may add additional functions/modules as + needed. The only rule here is that you must use this pre-defined + entry point. +4. "args" will be a argparse.Namespace variable that contains all the + inputs captured on the command line, which are defined by foo_args.py +5. Your code should return None. +5. Each newly defined function must have an associated argument parser, + defined in {function name}_args.py. See "foo_args.py" in + banip/samples/plugins as an example. +6. Make sure to put this file in banip/src/plugins/code +""" + +import argparse + + +def task_runner(args: argparse.Namespace) -> None: + """Do something wonderful. + + This is foo, so by definition it's wonderful! + + Parameters + ---------- + args : argparse.Namespace + Arguments passed on the command line. + """ + total = args.first + args.second + print(f"The sum of {args.first} and {args.second} is {total}.") + print("Cool! Right?") + return + + +if __name__ == "__main__": + pass diff --git a/samples/plugins/foo_args.py b/samples/plugins/foo_args.py new file mode 100644 index 0000000..0e7a456 --- /dev/null +++ b/samples/plugins/foo_args.py @@ -0,0 +1,54 @@ +"""Rules for writing custom argument parsers. + +1. Start with this sample argument parser and modify it. +2. Make the filename for this parser the same as the filename of your + command, but with the '_args' suffix (e.g. foo_args.py for the + command defined by foo.py). +3. Do not change the function signature of the "load_command_args" + function below. +4. The variable COMMAND_NAME below is required. It must contain the name + of your command. +5. The sp.add_parser() function call is required. You can add additional + arguments to the function call, but the "name", "help", and + "description" arguments are required. Modify the "msg" variable to + tailor your help and description messages. +6. Add as many parser.add_argument() calls as you need. +7. You may add additional functions/modules as needed. For example, + adding something like "from argparse import FileType" if one of your + command line arguments is a file type. +8. Your code should return None. +9. Make sure to put this file in banip/src/plugins/parsers +""" + +from argparse import _SubParsersAction + +COMMAND_NAME = "foo" + + +def load_command_args(sp: _SubParsersAction) -> None: + """Assemble the argument parser.""" + msg = """This command takes two intergers on the command line, adds + them together, then prints the result. Isn't that wonderful!""" + parser = sp.add_parser( + name=COMMAND_NAME, + help=msg, + description=msg, + ) + + # Add an argument to the parser + msg = """The first variable to be added.""" + parser.add_argument( + "first", + type=int, + help=msg, + ) + + # Add another argument to the parser + msg = """This is the second.""" + parser.add_argument( + "second", + type=int, + help=msg, + ) + + return diff --git a/sample-targets.txt b/samples/targets.txt similarity index 100% rename from sample-targets.txt rename to samples/targets.txt diff --git a/src/banip/__main__.py b/src/banip/__main__.py index a70e3d7..d02df99 100755 --- a/src/banip/__main__.py +++ b/src/banip/__main__.py @@ -3,78 +3,113 @@ """Entry point for banip.""" import argparse +import importlib +from pathlib import Path +from types import ModuleType -from banip.build_list import banned_ips -from banip.contants import RENDERED_BLACKLIST -from banip.utilities import check_ip +from banip.constants import ARG_PARSERS_BASE +from banip.constants import CUSTOM_ARG_PARSERS +from banip.constants import CUSTOM_CODE +from banip.utilities import wrap_tight + +# ====================================================================== + + +def collect_modules(start: Path) -> list[str]: + """Collect the names of all modules to import. + + Parameters + ---------- + start : Path + This the starting point (directory) for collection. + + Returns + ------- + list[str] + A list of module names. + """ + module_names: list[str] = [] + for p in start.iterdir(): + if p.is_file() and p.name != "__init__.py": + if "plugins" in str(p): + prefix = "plugins.parsers" + else: + prefix = "parsers" + module_names.append(f"{prefix}.{p.stem}") + return module_names + + +# ====================================================================== + + +def load_custom_module(cmd: str) -> ModuleType | None: + """Given the name of a command, return the associated module. + + By design, the code associated with a given command must have the + same name as the command itself. + + Parameters + ---------- + cmd : str + The name of a banip command + + Returns + ------- + ModuleType | None + This will be a pointer to the imported python module. + """ + for p in CUSTOM_CODE.iterdir(): + if p.is_file(): + if p.stem == cmd: + return importlib.import_module(f"plugins.code.{p.stem}") + return None + + +# ====================================================================== def main() -> None: """Get user input and build the list of banned IP addresses.""" msg = """Generate and query IP blacklists for use with proxy servers - (like HAProxy). Please review the README file at - https://github.com/geozeke/ubuntu for detailed instructions on - setting up banip.""" - - epi = "Version: 0.1.0" - + (like HAProxy). For help on any command, run: "banip {command} -h". + Please review the README file at https://github.com/geozeke/ubuntu + for detailed instructions on setting up banip.""" + epi = "Version: 1.0.0" parser = argparse.ArgumentParser( description=msg, epilog=epi, ) + subparsers = parser.add_subparsers(title="commands", dest="cmd") - subparsers = parser.add_subparsers(title="Commands") - msg = """Create a list of banned (blacklisted) client IP addresses - to be used with a proxy server (like HAProxy) to block network - access from those clients. Run \"banip build -h" for more.""" - subparser_build = subparsers.add_parser(name="build", help=msg) - msg = """Check to see if a single IP address is found in the - blacklist. Run \"banip check -h" for more.""" - subparser_check = subparsers.add_parser(name="check", help=msg) - - msg = """Output file that will contain the generated list of - blacklisted IP addresses. If not provided, results will be saved to - ./data/ip_blacklist.txt""" - subparser_build.add_argument( - "-o", - "--outfile", - type=argparse.FileType("w"), - help=msg, - ) + # Dynamically load argument subparsers. - msg = """Each banned IP address in the source database has a factor - (from 1 to 10) indicating a level of confidence that the IP address - is a malicious actor (higher is more confident). The default - threshold used is 3. Anything less than that may result in false - positives, but you may choose any threshold from 1 to 10. If you - find you're getting false positives, just re-run banip with a higher - threshold.""" - subparser_build.add_argument( - "-t", - "--threshold", - type=int, - help=msg, - default=3, - ) - - msg = """This is the IPv4 or IPv6 address you're interested in. - After you run banip for the first time, you can use the "check" - subcommand to see if a single IP address is found in the blacklist. - Making subsequent runs of banip to generate new blacklists will use - the updated information for future IP checking.""" - subparser_check.add_argument( - "ip", - type=str, - help=msg, - ) + module_names: list[str] = [] + mod: ModuleType | None = None + module_names = collect_modules(ARG_PARSERS_BASE) + module_names += collect_modules(CUSTOM_ARG_PARSERS) + for mod_name in module_names: + mod = importlib.import_module(mod_name) + mod.load_command_args(subparsers) args = parser.parse_args() - if hasattr(args, "ip"): - check_ip(args.ip) - else: - if not args.outfile: - args.outfile = open(RENDERED_BLACKLIST, "w") - banned_ips(args) + match (args.cmd): + case "build": + mod = importlib.import_module("banip.build") + case "check": + mod = importlib.import_module("banip.check") + case _: + if args.cmd: + if not (mod := load_custom_module(args.cmd)): + msg = f"""Code for a given command must have the + same name as the command itself. Make sure you have + a program file called \"{args.cmd}.py\" in + {CUSTOM_CODE}/""" + print(wrap_tight(msg)) + mod = importlib.import_module("banip.null") + else: + mod = importlib.import_module("banip.null") + mod.task_runner(args) + return diff --git a/src/banip/build_list.py b/src/banip/build.py similarity index 86% rename from src/banip/build_list.py rename to src/banip/build.py index 8f46329..21ec2a4 100755 --- a/src/banip/build_list.py +++ b/src/banip/build.py @@ -12,17 +12,17 @@ from tqdm import tqdm # type: ignore -from banip.contants import BANNED_IPS -from banip.contants import COUNTRY_NETS -from banip.contants import CUSTOM_BLACKLIST -from banip.contants import CUSTOM_WHITELIST -from banip.contants import GEOLITE_4 -from banip.contants import GEOLITE_6 -from banip.contants import GEOLITE_LOC -from banip.contants import IPS -from banip.contants import PAD -from banip.contants import RENDERED_BLACKLIST -from banip.contants import TARGETS +from banip.constants import BANNED_IPS +from banip.constants import COUNTRY_NETS +from banip.constants import CUSTOM_BLACKLIST +from banip.constants import CUSTOM_WHITELIST +from banip.constants import GEOLITE_4 +from banip.constants import GEOLITE_6 +from banip.constants import GEOLITE_LOC +from banip.constants import IPS +from banip.constants import PAD +from banip.constants import RENDERED_BLACKLIST +from banip.constants import TARGETS from banip.utilities import clear from banip.utilities import extract_ip from banip.utilities import filter @@ -31,7 +31,7 @@ from banip.utilities import tag_networks -def banned_ips(args: Namespace) -> None: +def task_runner(args: Namespace) -> None: """Generate a custom list of banned IP addresses. Parameters @@ -40,14 +40,24 @@ def banned_ips(args: Namespace) -> None: Command line arguments. """ # Start by stubbing-out custom files if they're not already in - # place. + # place. In the case of the output file, check for two things: (1) + # Was a file specified? If not, then save results to the default + # (RENDERED_BLACKLIST). (2) If the file was specified, was it the + # same name as the default? If so, there's no need to make a local + # copy of it after computations are complete. clear() + make_local_copy = False if not CUSTOM_BLACKLIST.exists(): f = open(CUSTOM_BLACKLIST, "w") f.close() if not CUSTOM_WHITELIST.exists(): f = open(CUSTOM_WHITELIST, "w") f.close() + try: + if Path(args.outfile.name) != RENDERED_BLACKLIST: + make_local_copy = True + except AttributeError: + args.outfile = open(RENDERED_BLACKLIST, "w") # Now make sure everything is in place @@ -58,6 +68,7 @@ def banned_ips(args: Namespace) -> None: GEOLITE_4, GEOLITE_6, GEOLITE_LOC, + RENDERED_BLACKLIST, TARGETS, ] for file in files: @@ -129,10 +140,10 @@ def banned_ips(args: Namespace) -> None: print() bag_of_ips = [] - print(f"Building blacklisted IP list for country codes: {countries}") + print(f"Pulling blacklisted IP list for country codes: {countries}") for ip in tqdm( D["II"], - desc="IPs", + desc="Blacklist IPs", total=len(D["II"]), colour="#bf80f2", unit="IPs", @@ -233,12 +244,13 @@ def banned_ips(args: Namespace) -> None: for chunk in D[key]: args.outfile.write(f"{format(chunk)}\n") + args.outfile.close() + # Save a copy of the generated IP blacklist to # ./data/ip_blacklist.txt. This will be used when running banip to # check an individual IP address. - args.outfile.close() - if Path(args.outfile.name) != RENDERED_BLACKLIST: + if make_local_copy: shutil.copy(Path(args.outfile.name), RENDERED_BLACKLIST) # Calculate final metrics and display results. @@ -252,3 +264,7 @@ def banned_ips(args: Namespace) -> None: print(f"Total banned IPs saved: {total_bans:>{PAD},d}") return + + +if __name__ == "__main__": + pass diff --git a/src/banip/check.py b/src/banip/check.py new file mode 100644 index 0000000..bf5ac79 --- /dev/null +++ b/src/banip/check.py @@ -0,0 +1,48 @@ +"""Taskrunner for check command.""" + +import argparse +import ipaddress as ipa + +from banip.constants import BANNED_IPS +from banip.constants import RENDERED_BLACKLIST + + +def task_runner(args: argparse.Namespace) -> None: + """Display available data for a particular IP address. + + Parameters + ---------- + args : argparse.Namespace + args.ip will be either IPv4 or IPv address of interest. + """ + try: + ipa.ip_address(args.ip) + except ValueError: + print(f"{args.ip} is not a valid IP address.") + return + + print() + found = False + if RENDERED_BLACKLIST.exists(): + with open(RENDERED_BLACKLIST, "r") as f: + for line in f: + if args.ip in line: + source = RENDERED_BLACKLIST.name + print(f"{args.ip} found in {source}") + found = True + break + + if BANNED_IPS.exists(): + with open(BANNED_IPS, "r") as f: + for line in f: + if args.ip in line: + source = BANNED_IPS.name + hitcount = line.split()[1] + print(f"{args.ip} found in {source} with {hitcount} hits.") + found = True + break + + if not found: + print(f"{args.ip} not found.") + + return diff --git a/src/banip/contants.py b/src/banip/constants.py similarity index 86% rename from src/banip/contants.py rename to src/banip/constants.py index c8f056b..44b898d 100644 --- a/src/banip/contants.py +++ b/src/banip/constants.py @@ -8,17 +8,20 @@ HOME = Path(__file__).parents[2] +ARG_PARSERS_BASE = HOME / "src/parsers" BANNED_IPS = HOME / "data/ipsum.txt" COUNTRY_NETS = HOME / "data/haproxy_geo_ip.txt" +CUSTOM_ARG_PARSERS = HOME / "src/plugins/parsers" CUSTOM_BLACKLIST = HOME / "data/custom_blacklist.txt" +CUSTOM_CODE = HOME / "src/plugins/code" CUSTOM_WHITELIST = HOME / "data/custom_whitelist.txt" -RENDERED_BLACKLIST = HOME / "data/ip_blacklist.txt" GEOLITE_4 = HOME / "data/geolite/GeoLite2-Country-Blocks-IPv4.csv" GEOLITE_6 = HOME / "data/geolite/GeoLite2-Country-Blocks-IPv6.csv" GEOLITE_LOC = HOME / "data/geolite/GeoLite2-Country-Locations-en.csv" -TARGETS = HOME / "data/targets.txt" IPS = [IPv4Address, IPv6Address] NETS = [IPv4Network, IPv6Network] +RENDERED_BLACKLIST = HOME / "data/ip_blacklist.txt" +TARGETS = HOME / "data/targets.txt" # Padding for pretty printing. PAD = 6 diff --git a/src/banip/country_analyzer.py b/src/banip/country_analyzer.py deleted file mode 100755 index 2be6769..0000000 --- a/src/banip/country_analyzer.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python3 - -"""Analyze stats for a single country (two-letter code). - -This is a *very* hacky utility that will allow me to pull stats for a -single country. It uses the build products from banip. It's not -production code, because it's heavily dependent on how my HAProxy logs -are formatted and how I pull the data from grafana, so it will be tough -to advertise it as available for general use. I'll still ship it with -banip, and maybe I'll incorporate it as a core capability at some point -in the future. -""" - -import ipaddress as ipa -import re -import sys -from typing import Any - -from banip.contants import BANNED_IPS -from banip.contants import HOME -from banip.contants import RENDERED_BLACKLIST -from banip.utilities import clear - -try: - target_country = sys.argv[1].upper() -except IndexError: - print("Please provide a two-letter country code.") - sys.exit(1) - -regex = r"(?<=[\s=])(\w+)=([^=\s]+)(?=\s|$)" -LOGS = HOME / "data/logs.txt" -clear() - -files = [BANNED_IPS, RENDERED_BLACKLIST, LOGS] -for file in files: - if not file.exists(): - print(f"Cannot find {file}") - sys.exit(1) - -with open(LOGS, "r") as f: - logs = f.readlines() - # The first 5 lines contain header information. - logs = logs[6:] - -ipsum: dict[Any, int] = {} -with open(BANNED_IPS, "r") as f: - for line in f: - if (item := line.strip()) and item[0] != "#": - parts = item.split() - ipsum[ipa.ip_address(parts[0])] = int(parts[1]) - -blacklist: list[Any] = [] -with open(RENDERED_BLACKLIST, "r") as f: - for line in f: - if (item := line.strip()) and item[0] != "#": - # skip subnets - if "/" not in item: - blacklist.append(ipa.ip_address(item)) - -output: list[str] = [] -target_ips: dict[Any, list[str]] = {} -for line in logs: - groups = re.findall(regex, line) - country = groups[-1][1] - if country == target_country: - ip = ipa.ip_address(groups[0][1]) - if ip not in blacklist: - uri = groups[-2][1] - if ip in target_ips: - target_ips[ip].append(uri) - else: - target_ips[ip] = [uri] - -for ip, uris in target_ips.items(): - output.append(f"IP: {format(ip)}\n") - if ip in ipsum: - output.append(f"Found in ipsum, with confidence of {ipsum[ip]}\n") - if len(uris) == 1: - output.append(f"Total of {len(uris)} request\n") - else: - output.append(f"Total of {len(uris)} requests\n") - counter = 1 - for uri in uris: - output.append(f" {counter:>3}. {uri}\n") - counter += 1 - output.append("\n") - -msg = "IPs found in the logs, but not in the blacklist" -output.insert(0, f"Statistics for: {target_country}\n") -output.insert(1, f"{len(target_ips)} {msg}.\n\n") - -print("".join(output).strip()) diff --git a/src/banip/null.py b/src/banip/null.py new file mode 100644 index 0000000..03e7145 --- /dev/null +++ b/src/banip/null.py @@ -0,0 +1,12 @@ +"""Taskrunner for no command. + +This will be the default command, which simply allows the program to +exit. +""" + +import argparse + + +def task_runner(args: argparse.Namespace) -> None: + """Do nothing.""" + return diff --git a/src/banip/py.typed b/src/banip/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/banip/utilities.py b/src/banip/utilities.py index b41538d..baaa307 100644 --- a/src/banip/utilities.py +++ b/src/banip/utilities.py @@ -3,6 +3,7 @@ import csv import ipaddress as ipa import os +import textwrap from ipaddress import IPv4Address from ipaddress import IPv4Network from ipaddress import IPv6Network @@ -11,14 +12,37 @@ from tqdm import tqdm # type: ignore -from banip.contants import BANNED_IPS -from banip.contants import COUNTRY_NETS -from banip.contants import GEOLITE_4 -from banip.contants import GEOLITE_6 -from banip.contants import GEOLITE_LOC -from banip.contants import RENDERED_BLACKLIST +from banip.constants import COUNTRY_NETS +from banip.constants import GEOLITE_4 +from banip.constants import GEOLITE_6 +from banip.constants import GEOLITE_LOC -# =============================================================== +# ====================================================================== + + +def wrap_tight(msg: str, columns=70) -> str: + """Clean up a multi-line docstring. + + Take a multi-line docstring and wrap it cleanly as a paragraph to a + specified column width. + + Parameters + ---------- + msg : str + The docstring to be wrapped. + columns : int, optional + Column width for wrapping, by default 70. + + Returns + ------- + str + A wrapped paragraph. + """ + clean = " ".join([t for token in msg.split("\n") if (t := token.strip())]) + return textwrap.fill(clean, width=columns) + + +# ====================================================================== def clear() -> None: @@ -29,7 +53,7 @@ def clear() -> None: os.system("clear" if os.name == "posix" else "cls") -# =============================================================== +# ====================================================================== def extract_ip(from_str: str) -> Any: @@ -61,7 +85,7 @@ def extract_ip(from_str: str) -> Any: return to_ip -# =============================================================== +# ====================================================================== def filter(fname: str | Path, metric: list[str] | int) -> list[Any]: @@ -93,7 +117,7 @@ def filter(fname: str | Path, metric: list[str] | int) -> list[Any]: f.seek(0) for line in tqdm( f, - desc="Lines", + desc=" Total lines", total=lines, colour="#bf80f2", unit="lines", @@ -115,7 +139,7 @@ def filter(fname: str | Path, metric: list[str] | int) -> list[Any]: return bag -# =============================================================== +# ====================================================================== def split46(bag_of_stuff: list[Any]) -> tuple[list[Any], list[Any]]: @@ -146,7 +170,7 @@ def split46(bag_of_stuff: list[Any]) -> tuple[list[Any], list[Any]]: return bag4, bag6 -# =============================================================== +# ====================================================================== def tag_networks() -> None: @@ -167,7 +191,7 @@ def tag_networks() -> None: next(reader) for country in tqdm( reader, - desc="Countries", + desc=" Countries", total=lines, colour="#bf80f2", unit="countries", @@ -248,51 +272,7 @@ def tag_networks() -> None: print("Done\n") -# =============================================================== - - -def check_ip(ip: str) -> None: - """Display available data for a particular IP address. - - Parameters - ---------- - ip : str - IPv4 or IPv address of interest. - """ - try: - ipa.ip_address(ip) - except ValueError: - print(f"{ip} is not a valid IP address.") - return - - print() - found = False - if RENDERED_BLACKLIST.exists(): - with open(RENDERED_BLACKLIST, "r") as f: - for line in f: - if ip in line: - source = RENDERED_BLACKLIST.name - print(f"{ip} found in {source}") - found = True - break - - if BANNED_IPS.exists(): - with open(BANNED_IPS, "r") as f: - for line in f: - if ip in line: - source = BANNED_IPS.name - hitcount = line.split()[1] - print(f"{ip} found in {source} with {hitcount} hits.") - found = True - break - - if not found: - print(f"{ip} not found.") - - return - - -# =============================================================== +# ====================================================================== def ip_in_network( diff --git a/src/parsers/__init__.py b/src/parsers/__init__.py new file mode 100644 index 0000000..9c19ca2 --- /dev/null +++ b/src/parsers/__init__.py @@ -0,0 +1 @@ +"""Package.""" diff --git a/src/parsers/build_args.py b/src/parsers/build_args.py new file mode 100644 index 0000000..bd5bcca --- /dev/null +++ b/src/parsers/build_args.py @@ -0,0 +1,49 @@ +"""Argument parser for build command.""" + +import argparse +from argparse import _SubParsersAction + +COMMAND_NAME = "build" + + +def load_command_args(sp: _SubParsersAction) -> None: + """Assemble the argument parser.""" + msg = """Create a list of banned (blacklisted) client IP addresses + to be used with a proxy server (like HAProxy) to block network + access from those clients.""" + parser = sp.add_parser( + name=COMMAND_NAME, + help=msg, + description=msg, + ) + + msg = """Output file that will contain the generated list of + blacklisted IP addresses. If not provided, results will be saved to + ./data/ip_blacklist.txt""" + parser.add_argument( + "-o", + "--outfile", + type=argparse.FileType("w"), + help=msg, + ) + + msg = """Each banned IP address in the source database has a factor + (from 1 to 10) indicating a level of confidence that the IP address + is a malicious actor (higher is more confident). The default + threshold used is 3. Anything less than that may result in false + positives, but you may choose any threshold from 1 to 10. If you + find you're getting false positives, just re-run banip with a higher + threshold.""" + parser.add_argument( + "-t", + "--threshold", + type=int, + help=msg, + default=3, + ) + + return + + +if __name__ == "__main__": + pass diff --git a/src/parsers/check_args.py b/src/parsers/check_args.py new file mode 100644 index 0000000..ed42917 --- /dev/null +++ b/src/parsers/check_args.py @@ -0,0 +1,29 @@ +"""Argument parser for check command.""" + +from argparse import _SubParsersAction + +COMMAND_NAME = "check" + + +def load_command_args(sp: _SubParsersAction) -> None: + """Assemble the argument parser.""" + msg = """Check to see if a single IP address is found in the + blacklist.""" + parser = sp.add_parser( + name=COMMAND_NAME, + help=msg, + description=msg, + ) + + msg = """This is the IPv4 or IPv6 address you're interested in. + After you run banip for the first time, you can use the "check" + command to see if a single IP address is found in the blacklist. + Making subsequent runs of banip to generate new blacklists will use + the updated information for future IP checking.""" + parser.add_argument( + "ip", + type=str, + help=msg, + ) + + return