Skip to content

Commit

Permalink
feat(logging) Add a logging.Filter implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
znerol authored and open-dynaMIX committed Dec 26, 2021
1 parent bd0b955 commit 2164442
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ for line in data:

```


### As a python logging.Filter

```python
import logging

from anonip import AnonipFilter

if __name__ == '__main__':
handler = logging.StreamHandler()
handler.addFilter(AnonipFilter())
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[handler]
)

logging.debug('192.0.2.123 - call from root logger')

logger = logging.getLogger('child')
logger.info('2001:db8:abcd:ef01:2345:6789:abcd:ef01 - call from child logger')
```


### Python 2 or 3?
For compatibility reasons, anonip uses the shebang `#! /usr/bin/env python`.
This will default to python2 on all Linux distributions except for Arch Linux.
Expand Down
45 changes: 45 additions & 0 deletions anonip.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import re
import sys
from io import open
from collections import abc

try:
import ipaddress
Expand Down Expand Up @@ -303,6 +304,50 @@ def truncate_address(self, ip):
return ip.supernet(new_prefix=self._prefixes[ip.version])[0]


class AnonipFilter:
def __init__(self, args=None, extra=None, anonip=None):
"""
An implementation of Python logging.Filter using anonip.
:param args: list of log message args to filter. Defaults to []
:param extra: list of LogRecord attributes to filter. Defaults to []
:param anonip: dict of parameters for Anonip instance
"""
self.args = [] if args is None else args
self.extra = [] if extra is None else extra
self.anonip = Anonip(**(anonip or {}))

def filter(self, record):
"""
See logging.Filter.filter()
"""
if record.name != "anonip":
for key in self.args:
if isinstance(record.args, abc.Mapping):
if key in record.args:
value = record.args[key]
if isinstance(value, str):
record.args[key] = self.anonip.process_line(value)
elif isinstance(record.args, abc.Sequence):
if key < len(record.args):
value = record.args[key]
if isinstance(value, str):
is_tuple = isinstance(record.args, tuple)
if is_tuple:
record.args = list(record.args)
record.args[key] = self.anonip.process_line(value)
if is_tuple:
record.args = tuple(record.args)

for key in self.extra:
if hasattr(record, key):
value = getattr(record, key)
if (isinstance(value, str)):
setattr(record, key, self.anonip.process_line(value))

return True


def _validate_ipmask(mask, bits=32):
"""
Verify if the supplied ip mask is valid.
Expand Down
77 changes: 77 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,80 @@ def test_properties_columns():
assert a.columns == [0]
a.columns = [5, 6]
assert a.columns == [4, 5]


def test_logging_filter_defaults(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_defaults")
logger.addFilter(anonip.AnonipFilter())
logger.setLevel(logging.INFO)

logger.info("192.168.100.200 string")
logger.info("1.2.3.4 string")
logger.info("2001:0db8:85a3:0000:0000:8a2e:0370:7334 string")
logger.info("2a00:1450:400a:803::200e string")

assert caplog.record_tuples == [
("filter_defaults", logging.INFO, "192.168.96.0 string"),
("filter_defaults", logging.INFO, "1.2.0.0 string"),
("filter_defaults", logging.INFO, "2001:db8:85a0:: string"),
("filter_defaults", logging.INFO, "2a00:1450:4000:: string"),
]

logging.disable(logging.CRITICAL)


def test_logging_filter_args(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_args")
logger.addFilter(anonip.AnonipFilter(args=["ip", "non-existing-attr"], extra=[]))
logger.setLevel(logging.INFO)

logger.info("%(ip)s string", {"ip": "192.168.100.200"})
logger.info("string %(ip)s", {"ip": "1.2.3.4"})
logger.info("%(ip)s string", {"ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"})
logger.info("string")

assert caplog.record_tuples == [
("filter_args", logging.INFO, "192.168.96.0 string"),
("filter_args", logging.INFO, "string 1.2.0.0"),
("filter_args", logging.INFO, "2001:db8:85a0:: string"),
("filter_args", logging.INFO, "string"),
]

logging.disable(logging.CRITICAL)


def test_logging_filter_extra(caplog):
logging.disable(logging.NOTSET)
logging.getLogger("anonip").setLevel(logging.CRITICAL)

logger = logging.getLogger("filter_args")
logger.addFilter(
anonip.AnonipFilter(
extra=["ip", "non-existing-key"], anonip={"ipv4mask": 16, "ipv6mask": 64}
)
)
logger.setLevel(logging.INFO)

logger.info("string", extra={"ip": "192.168.100.200"})
logger.info("string", extra={"ip": "1.2.3.4"})
logger.info("string", extra={"ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"})
logger.info("string", extra={"ip": "2a00:1450:400a:803::200e"})

expected = [
"192.168.0.0",
"1.2.0.0",
"2001:db8:85a3::",
"2a00:1450:400a:803::",
]

actual = [record.ip for record in caplog.records]

assert actual == expected

logging.disable(logging.CRITICAL)

0 comments on commit 2164442

Please sign in to comment.