diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2490cb9..33c63ec 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: ['3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] fail-fast: false steps: diff --git a/logstash_async/constants.py b/logstash_async/constants.py index 301da43..86fe1e6 100644 --- a/logstash_async/constants.py +++ b/logstash_async/constants.py @@ -38,12 +38,16 @@ class Constants: 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', 'msecs', 'msg', 'name', 'pathname', 'process', - 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName'] + 'processName', 'relativeCreated', 'stack_info', 'thread', 'threadName', 'taskName'] # fields to be set on the top-level of a Logstash event/message, do not modify this # unless you know what you are doing FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST = [ '@timestamp', '@version', 'host', 'level', 'logsource', 'message', 'pid', 'program', 'type', 'tags', '@metadata'] + FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST = [ + '@timestamp', '@version', '@metadata', 'message', 'labels', 'tags'] + # convert dotted ECS fields into nested objects + FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE = True # enable rate limiting for error messages (e.g. network errors) emitted by the logger # used in LogProcessingWorker, i.e. when transmitting log messages to the Logstash server. # Use a string like '5 per minute' or None to disable (default), for details see diff --git a/logstash_async/formatter.py b/logstash_async/formatter.py index 5c46b82..cc9cf09 100644 --- a/logstash_async/formatter.py +++ b/logstash_async/formatter.py @@ -4,6 +4,7 @@ # of the MIT license. See the LICENSE file for details. from datetime import date, datetime +import importlib.metadata import logging import socket import sys @@ -12,6 +13,7 @@ import uuid from logstash_async.constants import constants +from logstash_async.utils import normalize_ecs_dict import logstash_async @@ -25,6 +27,34 @@ class LogstashFormatter(logging.Formatter): _basic_data_types = (type(None), bool, str, int, float) + field_skip_set = set(constants.FORMATTER_RECORD_FIELD_SKIP_LIST) + top_level_field_set = set(constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST) + + class MessageSchema: + TIMESTAMP = '@timestamp' + VERSION = '@version' + METADATA = '@metadata' + HOST = 'host' + LOG_LEVEL = 'level' + LOG_SOURCE = 'logsource' + LOGGER_NAME = 'logger_name' + LINE = 'line' + MESSAGE = 'message' + MESSAGE_TYPE = 'type' + FUNC_NAME = 'func_name' + TASK_NAME = 'task_name' + THREAD_NAME = 'thread_name' + PROCESS_NAME = 'process_name' + INTERPRETER = 'interpreter' + INTERPRETER_VERSION = 'interpreter_version' + PATH = 'path' + PID = 'pid' + PROGRAM = 'program' + STACK_TRACE = 'stack_trace' + ERROR_TYPE = 'error_type' + TAGS = 'tags' + LOGSTASH_ASYNC_VERSION = 'logstash_async_version' + # ---------------------------------------------------------------------- # pylint: disable=too-many-arguments def __init__( @@ -90,39 +120,25 @@ def _prefetch_program_name(self): # ---------------------------------------------------------------------- def format(self, record): - message = { - '@timestamp': self._format_timestamp(record.created), - '@version': '1', - 'host': self._host, - 'level': record.levelname, - 'logsource': self._logsource, - 'message': record.getMessage(), - 'pid': record.process, - 'program': self._program_name, - 'type': self._message_type, - } - if self._metadata: - message['@metadata'] = self._metadata - if self._tags: - message['tags'] = self._tags + message = self._format_to_dict(record) + return self._serialize(message) + # ---------------------------------------------------------------------- + def _format_to_dict(self, record): + message = self._get_primary_fields(record) # record fields record_fields = self._get_record_fields(record) message.update(record_fields) # prepare dynamic extra fields extra_fields = self._get_extra_fields(record) - # remove all fields to be excluded - self._remove_excluded_fields(message, extra_fields) - # wrap extra fields in configurable namespace - if self._extra_prefix: - message[self._extra_prefix] = extra_fields - else: - message.update(extra_fields) + message.update(extra_fields) + # remove all fields to be excluded + self._remove_excluded_fields(message) # move existing extra record fields into the configured prefix self._move_extra_record_fields_to_prefix(message) - return self._serialize(message) + return message # ---------------------------------------------------------------------- def _format_timestamp(self, time_): @@ -150,25 +166,49 @@ def _value_repr(self, value): else: return repr(value) + # ---------------------------------------------------------------------- + def _get_primary_fields(self, record): + Schema = self.MessageSchema + primary_fields = { + Schema.TIMESTAMP: self._format_timestamp(record.created), + Schema.VERSION: '1', + Schema.HOST: self._host, + Schema.LOG_LEVEL: record.levelname, + Schema.LOG_SOURCE: self._logsource, + Schema.MESSAGE: record.getMessage(), + Schema.PID: record.process, + Schema.PROGRAM: self._program_name, + Schema.MESSAGE_TYPE: self._message_type, + } + if self._metadata: + primary_fields[Schema.METADATA] = self._metadata + if self._tags: + primary_fields[Schema.TAGS] = self._tags + return primary_fields + # ---------------------------------------------------------------------- def _get_extra_fields(self, record): + Schema = self.MessageSchema extra_fields = { - 'func_name': record.funcName, - 'interpreter': self._interpreter, - 'interpreter_version': self._interpreter_version, - 'line': record.lineno, - 'logger_name': record.name, - 'logstash_async_version': logstash_async.__version__, - 'path': record.pathname, - 'process_name': record.processName, - 'thread_name': record.threadName, + Schema.FUNC_NAME: record.funcName, + Schema.INTERPRETER: self._interpreter, + Schema.INTERPRETER_VERSION: self._interpreter_version, + Schema.LINE: record.lineno, + Schema.LOGGER_NAME: record.name, + Schema.LOGSTASH_ASYNC_VERSION: logstash_async.__version__, + Schema.PATH: record.pathname, + Schema.PROCESS_NAME: record.processName, + Schema.THREAD_NAME: record.threadName, } # static extra fields if self._extra: extra_fields.update(self._extra) + if getattr(record, 'taskName', None): + extra_fields[Schema.TASK_NAME] = record.taskName # exceptions if record.exc_info: - extra_fields['stack_trace'] = self._format_exception(record.exc_info) + extra_fields[Schema.ERROR_TYPE] = record.exc_info[0].__name__ + extra_fields[Schema.STACK_TRACE] = self._format_exception(record.exc_info) return extra_fields # ---------------------------------------------------------------------- @@ -182,11 +222,10 @@ def _format_exception(self, exc_info): return stack_trace # ---------------------------------------------------------------------- - def _remove_excluded_fields(self, message, extra_fields): - for fields in (message, extra_fields): - for field_name in list(fields): - if field_name in constants.FORMATTER_RECORD_FIELD_SKIP_LIST: - del fields[field_name] + def _remove_excluded_fields(self, message): + for field_name in list(message): + if field_name in self.field_skip_set: + del message[field_name] # ---------------------------------------------------------------------- def _move_extra_record_fields_to_prefix(self, message): @@ -199,9 +238,10 @@ def _move_extra_record_fields_to_prefix(self, message): if not self._extra_prefix: return # early out if no prefix is configured - field_skip_list = constants.FORMATTER_LOGSTASH_MESSAGE_FIELD_LIST + [self._extra_prefix] + message.setdefault(self._extra_prefix, {}) + field_skip_set = self.top_level_field_set | {self._extra_prefix} for key in list(message): - if key not in field_skip_list: + if key not in field_skip_set: message[self._extra_prefix][key] = message.pop(key) # ---------------------------------------------------------------------- @@ -209,7 +249,62 @@ def _serialize(self, message): return json.dumps(message, ensure_ascii=self._ensure_ascii) +class LogstashEcsFormatter(LogstashFormatter): + ecs_version = '8.11.0' + __schema_dict = { + 'ECS_VERSION': 'ecs.version', + 'MESSAGE_TYPE': 'event.module', + 'HOST': 'host.hostname', + 'LOG_LEVEL': 'log.level', + 'LOGGER_NAME': 'log.logger', + 'LOG_SOURCE': 'log.syslog.hostname', + 'LINE': 'log.origin.file.line', + 'PATH': 'log.origin.file.name', + 'FUNC_NAME': 'log.origin.function', + 'STACK_TRACE': 'error.stack_trace', + 'ERROR_TYPE': 'error.type', + 'PROGRAM': 'process.executable', + 'PROCESS_NAME': 'process.name', + 'PID': 'process.pid', + 'THREAD_NAME': 'process.thread.name', + } + + normalize_ecs_message = constants.FORMATTER_LOGSTASH_ECS_NORMALIZE_MESSAGE + top_level_field_set = {*constants.FORMATTER_LOGSTASH_ECS_MESSAGE_FIELD_LIST, + *__schema_dict.values()} + MessageSchema = type('MessageSchema', (LogstashFormatter.MessageSchema,), __schema_dict) + + def _get_primary_fields(self, record): + message = super()._get_primary_fields(record) + Schema = self.MessageSchema + message[Schema.ECS_VERSION] = self.ecs_version + return message + + def _format_to_dict(self, record): + message = super()._format_to_dict(record) + if self.normalize_ecs_message: + # pylint: disable-next=redefined-variable-type + message = normalize_ecs_dict(message) + return message + + class DjangoLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + DJANGO_VERSION = 'django_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' + TMPL_NAME = 'tmpl_name' + TMPL_LINE = 'tmpl_line' + TMPL_MESSAGE = 'tmpl_message' + TMPL_DURING = 'tmpl_during' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): @@ -225,9 +320,10 @@ def _fetch_django_version(self): # ---------------------------------------------------------------------- def _get_extra_fields(self, record): extra_fields = super()._get_extra_fields(record) + Schema = self.MessageSchema if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.status_code # Django's runserver command passes socketobject and WSGIRequest instances as "request". # Hence the check for the META attribute. @@ -236,34 +332,34 @@ def _get_extra_fields(self, record): request = record.request request_user = self._get_attribute_with_default(request, 'user', '') - extra_fields['django_version'] = self._django_version - extra_fields['req_useragent'] = request.META.get('HTTP_USER_AGENT', '') - extra_fields['req_remote_address'] = request.META.get('REMOTE_ADDR', '') - extra_fields['req_host'] = self._try_to_get_host_from_remote(request) - extra_fields['req_uri'] = self._try_to_get_full_request_uri(request) - extra_fields['req_user'] = str(request_user) - extra_fields['req_method'] = request.META.get('REQUEST_METHOD', '') - extra_fields['req_referer'] = request.META.get('HTTP_REFERER', '') + extra_fields[Schema.DJANGO_VERSION] = self._django_version + extra_fields[Schema.REQ_USER_AGENT] = request.META.get('HTTP_USER_AGENT', '') + extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.META.get('REMOTE_ADDR', '') + extra_fields[Schema.REQ_HOST] = self._try_to_get_host_from_remote(request) + extra_fields[Schema.REQ_URI] = self._try_to_get_full_request_uri(request) + extra_fields[Schema.REQ_USER] = str(request_user) + extra_fields[Schema.REQ_METHOD] = request.META.get('REQUEST_METHOD', '') + extra_fields[Schema.REQ_REFERER] = request.META.get('HTTP_REFERER', '') forwarded_proto = request.META.get('HTTP_X_FORWARDED_PROTO', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list # template debug if isinstance(record.exc_info, tuple): exc_value = record.exc_info[1] template_info = getattr(exc_value, 'template_debug', None) if template_info: - extra_fields['tmpl_name'] = template_info['name'] - extra_fields['tmpl_line'] = template_info['line'] - extra_fields['tmpl_message'] = template_info['message'] - extra_fields['tmpl_during'] = template_info['during'] + extra_fields[Schema.TMPL_NAME] = template_info['name'] + extra_fields[Schema.TMPL_LINE] = template_info['line'] + extra_fields[Schema.TMPL_MESSAGE] = template_info['message'] + extra_fields[Schema.TMPL_DURING] = template_info['during'] return extra_fields @@ -299,52 +395,116 @@ def _try_to_get_full_request_uri(self, request): return None +class DjangoLogstashEcsFormatter(DjangoLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + } + + top_level_field_set = LogstashEcsFormatter.top_level_field_set | set(__schema_dict.values()) + MessageSchema = type( + 'MessageSchema', + (DjangoLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def _remove_excluded_fields(self, message): + message.pop('status_code', None) + super()._remove_excluded_fields(message) + + class FlaskLogstashFormatter(LogstashFormatter): + class MessageSchema(LogstashFormatter.MessageSchema): + FLASK_VERSION = 'flask_version' + RESP_STATUS_CODE = 'status_code' + REQ_USER_AGENT = 'req_useragent' + REQ_REMOTE_ADDRESS = 'req_remote_address' + REQ_HOST = 'req_host' + REQ_URI = 'req_uri' + REQ_USER = 'req_user' + REQ_METHOD = 'req_method' + REQ_REFERER = 'req_referer' + REQ_ID = 'request_id' + REQ_FORWARDED_PROTO = 'req_forwarded_proto' + REQ_FORWARDED_FOR = 'req_forwarded_for' # ---------------------------------------------------------------------- def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._django_version = None + self._flask_version = None self._fetch_flask_version() # ---------------------------------------------------------------------- def _fetch_flask_version(self): - from flask import __version__ # pylint: disable=import-error,import-outside-toplevel - self._flask_version = __version__ + self._flask_version = importlib.metadata.version('flask') # ---------------------------------------------------------------------- def _get_extra_fields(self, record): - from flask import request # pylint: disable=import-error,import-outside-toplevel + # pylint: disable-next=import-error,import-outside-toplevel + from flask import request extra_fields = super()._get_extra_fields(record) + Schema = self.MessageSchema - extra_fields['flask_version'] = self._flask_version + extra_fields[Schema.FLASK_VERSION] = self._flask_version if request: # request might be unbound in other threads - extra_fields['req_useragent'] = str(request.user_agent) if request.user_agent else '' - extra_fields['req_remote_address'] = request.remote_addr - extra_fields['req_host'] = request.host.split(':', 1)[0] - extra_fields['req_uri'] = request.url - extra_fields['req_method'] = request.method - extra_fields['req_referer'] = request.referrer + extra_fields[Schema.REQ_USER_AGENT] = (str(request.user_agent) + if request.user_agent else '') + extra_fields[Schema.REQ_REMOTE_ADDRESS] = request.remote_addr + extra_fields[Schema.REQ_HOST] = request.host.split(':', 1)[0] + extra_fields[Schema.REQ_URI] = request.url + extra_fields[Schema.REQ_METHOD] = request.method + extra_fields[Schema.REQ_REFERER] = request.referrer if 'X-Request-ID' in request.headers: - extra_fields['request_id'] = request.headers.get('X-Request-ID') + extra_fields[Schema.REQ_ID] = request.headers.get('X-Request-ID') if request.remote_user: - extra_fields['req_user'] = request.remote_user + extra_fields[Schema.REQ_USER] = request.remote_user forwarded_proto = request.headers.get('X-Forwarded-Proto', None) if forwarded_proto is not None: - extra_fields['req_forwarded_proto'] = forwarded_proto + extra_fields[Schema.REQ_FORWARDED_PROTO] = forwarded_proto forwarded_for = request.headers.get('X-Forwarded-For', None) if forwarded_for is not None: # make it a list forwarded_for_list = forwarded_for.replace(' ', '').split(',') - extra_fields['req_forwarded_for'] = forwarded_for_list + extra_fields[Schema.REQ_FORWARDED_FOR] = forwarded_for_list # check if we have a status code somewhere if hasattr(record, 'status_code'): - extra_fields['status_code'] = record.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.status_code if hasattr(record, 'response'): - extra_fields['status_code'] = record.response.status_code + extra_fields[Schema.RESP_STATUS_CODE] = record.response.status_code return extra_fields + + +class FlaskLogstashEcsFormatter(FlaskLogstashFormatter, LogstashEcsFormatter): + __schema_dict = { + 'RESP_STATUS_CODE': 'http.response.status_code', + 'REQ_USER_AGENT': 'user_agent.original', + 'REQ_REMOTE_ADDRESS': 'client.ip', + 'REQ_HOST': 'client.domain', + 'REQ_URI': 'url.original', + 'REQ_USER': 'user.name', + 'REQ_METHOD': 'http.request.method', + 'REQ_REFERER': 'http.request.referrer', + 'REQ_ID': 'http.request.id', + } + + top_level_field_set = LogstashEcsFormatter.top_level_field_set | set(__schema_dict.values()) + MessageSchema = type( + 'MessageSchema', + (FlaskLogstashFormatter.MessageSchema, LogstashEcsFormatter.MessageSchema), + __schema_dict, + ) + + def _remove_excluded_fields(self, message): + message.pop('status_code', None) + super()._remove_excluded_fields(message) diff --git a/logstash_async/utils.py b/logstash_async/utils.py index f2fd676..9274538 100644 --- a/logstash_async/utils.py +++ b/logstash_async/utils.py @@ -5,6 +5,7 @@ from __future__ import print_function +from copy import deepcopy from datetime import datetime from importlib import import_module from itertools import chain, islice @@ -60,3 +61,61 @@ def import_string(dotted_path): except AttributeError as exc: raise ImportError( f'Module "{module_path}" does not define a "{class_name}" attribute/class') from exc + + +# ---------------------------------------------------------------------- +# pylint: disable-next=invalid-name +class normalize_ecs_dict: + """ + Convert dotted ecs fields into nested objects. + """ + + def __new__(cls, ecs_dict): + new_dict = deepcopy(ecs_dict) + cls.normalize_dict(new_dict) + return new_dict + + @classmethod + def normalize_dict(cls, ecs_dict): + for key in list(ecs_dict): + if '.' in key: + cls.merge_dicts(ecs_dict, cls.de_dot_record(key, ecs_dict.pop(key))) + for key, val in ecs_dict.items(): + cls.normalize_value(val) + + @classmethod + def normalize_sequence(cls, ecs_sequence): + for val in ecs_sequence: + cls.normalize_value(val) + + @classmethod + def normalize_value(cls, ecs_value): + if isinstance(ecs_value, dict): + cls.normalize_dict(ecs_value) + if isinstance(ecs_value, (list, tuple, set)): + cls.normalize_sequence(ecs_value) + + @classmethod + def merge_dicts(cls, target, src): + """ + Merge dicts recursively. + Mutates `target`. + Uses references from `src` which may lead to `src` mutation. + """ + for key, src_value in src.items(): + if key in target: + target_value = target[key] + if isinstance(target_value, dict) and isinstance(src_value, dict): + cls.merge_dicts(target_value, src_value) + else: + target[key] = src_value + else: + target[key] = src_value + + @classmethod + def de_dot_record(cls, key, value): + keys = key.split('.') + res = {keys.pop(): value} + for k in reversed(keys): + res = {k: res} + return res diff --git a/setup.py b/setup.py index 0e016ed..def2622 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,9 @@ }, keywords='logging logstash asynchronous', install_requires=['limits', 'pylogbeat', 'requests'], + extras_require={ + 'dev': ['django', 'flask'], + }, python_requires='>3.5', include_package_data=True, classifiers=[ diff --git a/tests/formatter_test.py b/tests/formatter_test.py index 0cba802..c1d916e 100644 --- a/tests/formatter_test.py +++ b/tests/formatter_test.py @@ -3,16 +3,48 @@ # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE file for details. +from contextlib import suppress from logging import FileHandler, makeLogRecord +from types import SimpleNamespace +from unittest.mock import patch +import importlib.metadata import os +import socket import sys import unittest -from logstash_async.formatter import LogstashFormatter +from logstash_async.formatter import ( + DjangoLogstashEcsFormatter, + DjangoLogstashFormatter, + FlaskLogstashEcsFormatter, + FlaskLogstashFormatter, + LogstashEcsFormatter, + LogstashFormatter, +) +import logstash_async # pylint: disable=protected-access +_interpreter_version = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' + + +def create_log_record(**kwargs): + return makeLogRecord({ + 'msg': 'test', + 'created': 1635082335.024747, + 'levelname': 'INFO', + 'process': 1, + 'funcName': 'f', + 'lineno': 2, + 'name': 'foo', + 'pathname': 'a/b/c', + 'processName': 'bar', + 'threadName': 'baz', + 'exc_info': (ValueError, None, None), + **kwargs, + }) + class ExceptionCatchingFileHandler(FileHandler): def __init__(self, *args, **kwargs): @@ -60,6 +92,376 @@ def test_format_timestamp_microsecond_2(self): result = formatter._format_timestamp(test_time_microsecond2) self.assertEqual(result, '2021-10-24T13:32:15.024Z') + @patch.object(LogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = LogstashFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'stack_trace': (ValueError, None, None), + 'error_type': 'ValueError', + } + }) + + +@patch.object(LogstashEcsFormatter, '_format_exception', lambda s, e: e) +class LogstashEcsFormatterTest(unittest.TestCase): + def test_default_schema(self): + formatter = LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + } + }) + + def test_dotted_schema(self): + class _LogstashEcsFormatter(LogstashEcsFormatter): + normalize_ecs_message = False + + formatter = _LogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record()) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs.version': '8.11.0', + 'event.module': 'python-logstash', + 'host.hostname': socket.gethostname(), + 'log.level': 'INFO', + 'log.syslog.hostname': socket.gethostname(), + 'log.origin.file.line': 2, + 'log.origin.file.name': 'a/b/c', + 'log.origin.function': 'f', + 'log.logger': 'foo', + 'message': 'test', + 'process.thread.name': 'baz', + 'process.name': 'bar', + 'process.pid': 1, + 'process.executable': sys.argv[0], + 'error.stack_trace': (ValueError, None, None), + 'error.type': 'ValueError', + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + } + }) + + +class DjangoTestMixin: + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + super().setUpClass() + + # pylint: disable=import-outside-toplevel + from django.conf import settings + from django.http import HttpRequest + import django + + # pylint: enable=import-outside-toplevel + + with suppress(RuntimeError): + settings.configure() + cls.HttpRequest = HttpRequest + cls.django_version = django.get_version() + + def _create_request(self): + request = self.HttpRequest() + request.user = 'usr' + request.META.update({ + 'HTTP_USER_AGENT': 'dj-agent', + 'REMOTE_ADDR': 'dj-addr', + 'HTTP_HOST': 'dj-host', + 'HTTP_REFERER': 'dj-ref', + 'REQUEST_METHOD': 'GET', + 'HTTP_X_FORWARDED_PROTO': 'dj-f-proto', + 'HTTP_X_FORWARDED_FOR': 'dj-f1, dj-f2', + }) + return request + + +class DjangoLogstashFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter._format_to_dict(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'stack_trace': exc_info, + 'error_type': 'ValueError', + 'status_code': 500, + 'django_version': self.django_version, + 'req_useragent': 'dj-agent', + 'req_remote_address': 'dj-addr', + 'req_host': 'dj-host', + 'req_uri': None, + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'dj-ref', + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + } + }) + + +class DjangoLogstashEcsFormatterTest(DjangoTestMixin, unittest.TestCase): + @patch.object(DjangoLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + formatter = DjangoLogstashEcsFormatter(tags=['t1', 't2']) + exc_info = (ValueError, SimpleNamespace(template_debug={ + 'name': 'tpl', + 'line': 3, + 'message': 'tmsg', + 'during': 'd', + }), None) + result = formatter._format_to_dict(create_log_record( + status_code=500, + request=self._create_request(), + exc_info=exc_info, + )) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'dj-host', 'ip': 'dj-addr'}, + 'http': { + 'request': {'method': 'GET', 'referrer': 'dj-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': None}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'dj-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': exc_info, 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'req_forwarded_proto': 'dj-f-proto', + 'req_forwarded_for': ['dj-f1', 'dj-f2'], + 'tmpl_name': 'tpl', + 'tmpl_line': 3, + 'tmpl_message': 'tmsg', + 'tmpl_during': 'd', + 'request': '', + 'django_version': self.django_version, + } + }) + + +class FlaskTestMixin: + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + super().setUpClass() + cls.flask_version = importlib.metadata.version('flask') + + def _create_request(self): + return SimpleNamespace( + user_agent='f-agent', + remote_addr='f-addr', + host='f-host:80', + url='f-url', + method='GET', + referrer='f-ref', + remote_user='usr', + headers={ + 'X-Request-ID': 'x-id', + 'X-Forwarded-Proto': 'f-proto', + 'X-Forwarded-For': 'f1, f2', + }, + ) + + +class FlaskLogstashFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'host': socket.gethostname(), + 'level': 'INFO', + 'logsource': socket.gethostname(), + 'message': 'test', + 'pid': 1, + 'program': sys.argv[0], + 'type': 'python-logstash', + 'tags': ['t1', 't2'], + 'extra': { + 'func_name': 'f', + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'line': 2, + 'logger_name': 'foo', + 'logstash_async_version': logstash_async.__version__, + 'path': 'a/b/c', + 'process_name': 'bar', + 'thread_name': 'baz', + 'error_type': 'ValueError', + 'stack_trace': (ValueError, None, None), + 'status_code': 500, + 'flask_version': self.flask_version, + 'req_useragent': 'f-agent', + 'req_remote_address': 'f-addr', + 'req_host': 'f-host', + 'req_uri': 'f-url', + 'req_user': 'usr', + 'req_method': 'GET', + 'req_referer': 'f-ref', + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'request_id': 'x-id', + } + }) + + +class FlaskLogstashEcsFormatterTest(FlaskTestMixin, unittest.TestCase): + @patch.object(FlaskLogstashEcsFormatter, '_format_exception', lambda s, e: e) + def test_default_schema(self): + with patch('flask.request', self._create_request()): + formatter = FlaskLogstashEcsFormatter(tags=['t1', 't2']) + result = formatter._format_to_dict(create_log_record(status_code=500)) + self.assertDictEqual(result, { + '@timestamp': '2021-10-24T13:32:15.024Z', + '@version': '1', + 'ecs': {'version': '8.11.0'}, + 'event': {'module': 'python-logstash'}, + 'host': {'hostname': socket.gethostname()}, + 'client': {'domain': 'f-host', 'ip': 'f-addr'}, + 'http': { + 'request': {'id': 'x-id', 'method': 'GET', 'referrer': 'f-ref'}, + 'response': {'status_code': 500}, + }, + 'url': {'original': 'f-url'}, + 'user': {'name': 'usr'}, + 'user_agent': {'original': 'f-agent'}, + 'log': { + 'level': 'INFO', + 'syslog': {'hostname': socket.gethostname()}, + 'origin': { + 'file': {'line': 2, 'name': 'a/b/c'}, + 'function': 'f', + }, + 'logger': 'foo', + }, + 'message': 'test', + 'process': { + 'thread': {'name': 'baz'}, + 'name': 'bar', + 'pid': 1, + 'executable': sys.argv[0], + }, + 'error': {'stack_trace': (ValueError, None, None), 'type': 'ValueError'}, + 'tags': ['t1', 't2'], + 'extra': { + 'interpreter': sys.executable, + 'interpreter_version': _interpreter_version, + 'logstash_async_version': logstash_async.__version__, + 'req_forwarded_proto': 'f-proto', + 'req_forwarded_for': ['f1', 'f2'], + 'flask_version': self.flask_version, + } + }) + if __name__ == '__main__': unittest.main() diff --git a/tests/utils_test.py b/tests/utils_test.py new file mode 100644 index 0000000..1b1c749 --- /dev/null +++ b/tests/utils_test.py @@ -0,0 +1,50 @@ +from copy import deepcopy +import unittest + +from logstash_async.utils import normalize_ecs_dict + + +class NormalizeEcsDictTest(unittest.TestCase): + def test_de_dot(self): + with self.subTest('no dots'): + result = normalize_ecs_dict.de_dot_record('a', {'x': [1]}) + self.assertDictEqual(result, {'a': {'x': [1]}}) + with self.subTest('dots'): + result = normalize_ecs_dict.de_dot_record('a.b.c', {'x': [1]}) + self.assertDictEqual(result, {'a': {'b': {'c': {'x': [1]}}}}) + + def test_normalization(self): + d = { + 'a': 1, + 'b': 11, + 'b.c': { + 'd.e': [2, ({'f.g': 3}, 4), 5], + 'h': None, + }, + 'b.c.x': {'y': 6}, + 'c': {'d': [1], 'e': 2}, + 'c.d': [2], + 'c.f': 3, + } + d_copy = deepcopy(d) + expected = { + 'a': 1, + 'b': { + 'c': { + 'd': { + 'e': [2, ({'f': {'g': 3}}, 4), 5], + }, + 'h': None, + 'x': {'y': 6}, + }, + }, + 'c': {'d': [2], 'e': 2, 'f': 3}, + } + result = normalize_ecs_dict(d) + self.assertDictEqual(result, expected) + + with self.subTest('source dict not mutated'): + self.assertDictEqual(d, d_copy) + # pylint: disable-next=unsubscriptable-object + result['c']['d'].append(22) + self.assertDictEqual(d, d_copy) diff --git a/tox.ini b/tox.ini index 422fc05..1ff2b55 100644 --- a/tox.ini +++ b/tox.ini @@ -6,7 +6,7 @@ [tox] skip_missing_interpreters = true envlist = - docs,py38,py39,py310,py311,pypy3 + docs,py38,py39,py310,py311,py312,pypy3 logstash_async_modules = logstash_async tests @@ -15,6 +15,8 @@ deps = flake8 isort pylint + Django + Flask commands = # linting and code analysis {envbindir}/flake8 {[tox]logstash_async_modules}