-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsnort_parser.py
129 lines (96 loc) · 4.03 KB
/
snort_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import dpkt
import socket
from snortunsock import snort_listener
import paho.mqtt.client as mqtt
import json
import os
MQTT = os.environ['MQTT']
snort_mqtt = mqtt.Client()
snort_mqtt.connect(str(MQTT))
snort_mqtt.loop_start()
def mac_addr(address):
"""Convert a MAC address to a readable/printable string
Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % ord(chr(x)) for x in address)
def ip_to_str(address):
"""Print out an IP address given a string
Args:
address (inet struct): inet network address
Returns:
str: Printable/readable IP address
"""
return socket.inet_ntop(socket.AF_INET, address)
def ip6_to_str(address):
return socket.inet_ntop(socket.AF_INET6, address)
def main():
snort_message = {}
for msg in snort_listener.start_recv("/var/log/snort/snort_alert"):
orig_msg = b'.'.join(msg.alertmsg)
am = (str(orig_msg, 'utf-8').replace("\u0000", "")).replace("'", "")
snort_message["alert_msg"] = str(am)
print('alertmsg: %s' % str(am))
buf = msg.pkt
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf)
src_mac = mac_addr(eth.src)
dest_mac = mac_addr(eth.dst)
snort_message["src_mac"] = src_mac
snort_message["dest_mac"] = dest_mac
print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
if eth.type == dpkt.ethernet.ETH_TYPE_IP6:
ip_type = "IPv6"
snort_message["ip_type"] = ip_type
ip = eth.data
src_ip = ip6_to_str(ip.src)
dest_ip = ip6_to_str(ip.dst)
len = ip.plen
hop_lim = ip.hlim
packet_info = {"len": len, "hop_limit": hop_lim}
snort_message["src_ip"] = src_ip
snort_message["dest_ip"] = dest_ip
snort_message["packet_info"] = packet_info
# Print out the info
print('IP: %s -> %s (len=%d hop_limit=%d)\n' % \
(ip6_to_str(ip.src), ip6_to_str(ip.dst), ip.plen, ip.hlim))
# Now unpack the data within the Ethernet frame (the IP packet)
# Pulling out src, dst, length, fragment info, TTL, and Protocol
elif eth.type == dpkt.ethernet.ETH_TYPE_IP:
ip_type = "IPv4"
snort_message["ip_type"] = ip_type
ip = eth.data
# Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
src_ip = ip_to_str(ip.src)
dest_ip = ip_to_str(ip.dst)
len = ip.len
ttl = ip.ttl
DF = do_not_fragment
MF = more_fragments
offset = fragment_offset
packet_info = {"len": len, "ttl": ttl, "DF": DF, "MF": MF, "offset": offset}
snort_message["src_ip"] = src_ip
snort_message["dest_ip"] = dest_ip
snort_message["packet_info"] = packet_info
# Print out the info
#print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \
# (ip_to_str(ip.src), ip_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments,
# fragment_offset))
else:
ip_type = "Unsupported"
snort_message["ip_type"] = ip_type
src_ip = "N/A"
dest_ip = "N/A"
packet_info = {"not_supported_packet": "IP Packet unsupported"}
snort_message["src_ip"] = src_ip
snort_message["dest_ip"] = dest_ip
snort_message["packet_info"] = packet_info
#print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
snort_mqtt.publish("snort/test", json.dumps(snort_message))
if __name__ == '__main__':
main()