-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlogstash_engine.py
138 lines (114 loc) · 3.11 KB
/
logstash_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
An engine that reads messages from the salt event bus and pushes
them onto a logstash endpoint.
.. versionadded:: 2015.8.0
:configuration:
Example configuration
.. code-block:: yaml
engines:
- logstash:
host: log.my_network.com
port: 5959
proto: tcp
:depends: logstash
"""
import logging
import salt.utils.event
try:
import logstash
except ImportError:
logstash = None
log = logging.getLogger(__name__)
__virtualname__ = "logstash"
def __virtual__():
return (
__virtualname__
if logstash is not None
else (False, "python-logstash not installed")
)
def event_bus_context(opts):
"""
Get the event bus context so we can read events.
"""
if opts.get("id").endswith("_master"):
event_bus = salt.utils.event.get_master_event(
opts, opts["sock_dir"], listen=True
)
else:
event_bus = salt.utils.event.get_event(
"minion",
opts=opts,
sock_dir=opts["sock_dir"],
listen=True,
)
return event_bus
def validate(data):
"""
Validate the event data and ensure there are no
conflicts with skip_list
"""
skip_list = {
"args",
"asctime",
"created",
"exc_info",
"exc_text",
"filename",
"funcName",
"id",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"msecs",
"message",
"msg",
"name",
"pathname",
"process",
"processName",
"relativeCreated",
"thread",
"threadName",
"extra",
"auth_token",
"password",
"stack_info",
}
matches = skip_list.intersection(data)
for mat in matches:
if mat in ("password", "auth_token"):
_ = data.pop(mat)
continue
data[mat + "_"] = data.pop(mat)
return data
def start(
host,
port=5959,
tag="salt/engine/logstash",
proto="udp",
logger_name="python-logstash-logger",
): # pylint: disable=too-many-locals
"""
Listen to salt events and forward them to logstash
"""
logging.setLoggerClass(logging.Logger)
logging.setLogRecordFactory(logging.LogRecord)
if isinstance(host, list):
from random import choice # pylint: disable=import-outside-toplevel
host = choice(host)
if proto == "tcp":
logstash_handler = logstash.TCPLogstashHandler
elif proto == "udp":
logstash_handler = logstash.UDPLogstashHandler
logstash_logger = logging.getLogger(logger_name)
logstash_logger.setLevel(logging.INFO)
handler = logstash_handler(host, port, version=1)
logstash_logger.addHandler(handler)
with event_bus_context(__opts__) as event_bus:
log.debug("Logstash engine started")
for event in event_bus.iter_events(full=True, auto_reconnect=True):
if event and "data" in event and "tag" in event:
tag, data = event["tag"], validate(event["data"])
logstash_logger.info(tag, extra=data)