Skip to content

Commit

Permalink
Add ECS formatter support (#91)
Browse files Browse the repository at this point in the history
* Add ECS support

* Init _flask_version

* Split formatting logic

* Move _LogstashMessageSchema

* Use field sets instead of lists

* Normalize ECS fields

* Minor refactoring

* Add FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST

* Lint

* Add tox deps

* Fix Flask version warning

* Support LogRecord.taskName. Fix tests for python <3.12.
  • Loading branch information
andriilahuta authored Feb 4, 2024
1 parent 933f3fc commit baf2118
Show file tree
Hide file tree
Showing 8 changed files with 756 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
python: ['3.8', '3.9', '3.10', '3.11']
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
fail-fast: false

steps:
Expand Down
6 changes: 5 additions & 1 deletion logstash_async/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ class Constants:
'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
'msecs', 'msg', 'name', 'pathname', 'process',
'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName']
'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName', 'taskName']
# fields to be set on the top-level of a Logstash event/message, do not modify this
# unless you know what you are doing
FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [
'@timestamp', '@version', 'host', 'level', 'logsource', 'message',
'pid', 'program', 'type', 'tags', '@metadata']
FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = [
'@timestamp', '@version', '@metadata', 'message', 'labels', 'tags']
# convert dotted ECS fields into nested objects
FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True
# enable rate limiting for error messages (e.g. network errors) emitted by the logger
# used in LogProcessingWorker, i.e. when transmitting log messages to the Logstash server.
# Use a string like '5 per minute' or None to disable (default), for details see
Expand Down
304 changes: 232 additions & 72 deletions logstash_async/formatter.py

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions logstash_async/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import print_function

from copy import deepcopy
from datetime import datetime
from importlib import import_module
from itertools import chain, islice
Expand Down Expand Up @@ -60,3 +61,61 @@ def import_string(dotted_path):
except AttributeError as exc:
raise ImportError(
f'Module "{module_path}" does not define a "{class_name}" attribute/class') from exc


# ----------------------------------------------------------------------
# pylint: disable-next=invalid-name
class normalize_ecs_dict:
"""
Convert dotted ecs fields into nested objects.
"""

def __new__(cls, ecs_dict):
new_dict = deepcopy(ecs_dict)
cls.normalize_dict(new_dict)
return new_dict

@classmethod
def normalize_dict(cls, ecs_dict):
for key in list(ecs_dict):
if '.' in key:
cls.merge_dicts(ecs_dict, cls.de_dot_record(key, ecs_dict.pop(key)))
for key, val in ecs_dict.items():
cls.normalize_value(val)

@classmethod
def normalize_sequence(cls, ecs_sequence):
for val in ecs_sequence:
cls.normalize_value(val)

@classmethod
def normalize_value(cls, ecs_value):
if isinstance(ecs_value, dict):
cls.normalize_dict(ecs_value)
if isinstance(ecs_value, (list, tuple, set)):
cls.normalize_sequence(ecs_value)

@classmethod
def merge_dicts(cls, target, src):
"""
Merge dicts recursively.
Mutates `target`.
Uses references from `src` which may lead to `src` mutation.
"""
for key, src_value in src.items():
if key in target:
target_value = target[key]
if isinstance(target_value, dict) and isinstance(src_value, dict):
cls.merge_dicts(target_value, src_value)
else:
target[key] = src_value
else:
target[key] = src_value

@classmethod
def de_dot_record(cls, key, value):
keys = key.split('.')
res = {keys.pop(): value}
for k in reversed(keys):
res = {k: res}
return res
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
},
keywords='logging logstash asynchronous',
install_requires=['limits', 'pylogbeat', 'requests'],
extras_require={
'dev': ['django', 'flask'],
},
python_requires='>3.5',
include_package_data=True,
classifiers=[
Expand Down
Loading

0 comments on commit baf2118

Please sign in to comment.