Skip to content

Commit

Permalink
ENG: Add support for creating separated MISP Events
Browse files Browse the repository at this point in the history
With `event_separator` parameter, user can decide to create
more than one MISP event in the output bot and group incomming
messages based on given field.

In additon, the message library was fixed not to modify the
parameter directly.
  • Loading branch information
kamil-certat committed Jul 4, 2024
1 parent f2b8b53 commit b4f2e68
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 73 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
#### Outputs
- `intelmq.bots.outputs.misp.output_feed`:
- Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski).
- Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski).
- Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR by Kamil Mankowski).
- Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski).
- Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski).
- Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski).
- `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar).

### Documentation
Expand Down
10 changes: 8 additions & 2 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -4605,9 +4605,9 @@ a new MISP event based on `interval_event` triggers saving regardless of the cac
**`attribute_mapping`**

(optional, dict) If set, allows selecting which IntelMQ event fields are mapped to MISP attributes
as well as attribute parameters (like e.g. a comment). The expected format is a *dictonary of dictionaries*:
as well as attribute parameters (like e.g. a comment). The expected format is a *dictionary of dictionaries*:
first-level key represents an IntelMQ field that will be directly translated to a MISP attribute; nested
dictionary represents addditional parameters PyMISP can take when creating an attribute. They can use
dictionary represents additional parameters PyMISP can take when creating an attribute. They can use
names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed,
leave empty dict.

Expand All @@ -4627,6 +4627,12 @@ and set their values as in the IntelMQ event. In addition, the `feed.name` would
as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set
as not usable for IDS.

**`event_separator`

(optional, string): If set to a field name from IntelMQ event, the bot will group incoming messages
in separated MISP events, based on the value of this field. The `interval_event` parameter acts
for all grouping events together.

**Usage in MISP**

Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server.
Expand Down
146 changes: 93 additions & 53 deletions intelmq/bots/outputs/misp/output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
from pathlib import Path
from uuid import uuid4

import pymisp

from intelmq.lib.bot import OutputBot
from intelmq.lib.exceptions import MissingDependencyError
from ....lib.message import Message, MessageFactory
from ....lib.message import MessageFactory
from intelmq.lib.mixins import CacheMixin
from intelmq.lib.utils import parse_relative

try:
from pymisp import MISPEvent, MISPOrganisation, NewAttributeError
from pymisp import MISPEvent, MISPObject, MISPOrganisation, NewAttributeError
from pymisp.tools import feed_meta_generator
except ImportError:
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
MISPEvent = None
import_fail_reason = "import"

DEFAULT_KEY = "default"


class MISPFeedOutputBot(OutputBot, CacheMixin):
"""Generate an output in the MISP Feed format"""
Expand All @@ -38,6 +38,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin):
)
_is_multithreadable: bool = False
attribute_mapping: dict = None
event_separator: str = None

@staticmethod
def check_output_dir(dirname):
Expand All @@ -50,7 +51,8 @@ def init(self):
if MISPEvent is None:
raise MissingDependencyError("pymisp", version=">=2.4.117.3")

self.current_event = None
self.current_events = {}
self.current_files = {}

self.misp_org = MISPOrganisation()
self.misp_org.name = self.misp_org_name
Expand All @@ -66,58 +68,57 @@ def init(self):
minutes=parse_relative(self.interval_event)
)

self.min_time_current = datetime.datetime.max
self.max_time_current = datetime.datetime.min

if (self.output_dir / ".current").exists():
try:
with (self.output_dir / ".current").open() as f:
self.current_file = Path(f.read())

if self.current_file.exists():
self.current_event = MISPEvent()
self.current_event.load_file(self.current_file)

last_min_time, last_max_time = re.findall(
"IntelMQ event (.*) - (.*)", self.current_event.info
)[0]
last_min_time = datetime.datetime.strptime(
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
)
last_max_time = datetime.datetime.strptime(
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
)
if last_max_time < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = None
else:
self.min_time_current = last_min_time
self.max_time_current = last_max_time
except:
current = f.read()

if not self.event_separator:
self.current_files[DEFAULT_KEY] = Path(current)
else:
self.current_files = {
k: Path(v) for k, v in json.loads(current).items()
}

for key, path in self.current_files.items():
self._load_event(path, key)
except Exception:
self.logger.exception(
"Loading current event %s failed. Skipping it.", self.current_event
"Loading current events %s failed. Skipping it.", self.current_files
)
self.current_event = None
else:
self.current_events = {}

if not self.current_files or self.max_time_current < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_events = {}

def _load_event(self, file_path: Path, key: str):
if file_path.exists():
self.current_events[key] = MISPEvent()
self.current_events[key].load_file(file_path)

last_min_time, last_max_time = re.findall(
"IntelMQ event (.*) - (.*)", self.current_events[key].info
)[0]
last_min_time = datetime.datetime.strptime(
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
)
last_max_time = datetime.datetime.strptime(
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
)

self.min_time_current = min(last_min_time, self.min_time_current)
self.max_time_current = max(last_max_time, self.max_time_current)

def process(self):
if not self.current_event or datetime.datetime.now() > self.max_time_current:
if datetime.datetime.now() > self.max_time_current:
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = MISPEvent()
self.current_event.info = "IntelMQ event {begin} - {end}" "".format(
begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat(),
)
self.current_event.set_date(datetime.date.today())
self.current_event.Orgc = self.misp_org
self.current_event.uuid = str(uuid4())
self.current_file = self.output_dir / f"{self.current_event.uuid}.json"
with (self.output_dir / ".current").open("w") as f:
f.write(str(self.current_file))

# On startup or when timeout occurs, clean the queue to ensure we do not
# keep events forever because there was not enough generated

self._generate_feed()

event = self.receive_message().to_dict(jsondict_as_string=True)
Expand All @@ -128,19 +129,57 @@ def process(self):

if cache_size is None:
self._generate_feed(event)
elif not self.current_events:
# Always create the first event so we can keep track of the interval.
# It also ensures cleaning the queue after startup in case of awaiting
# messages from the previous run
self._generate_feed()
elif cache_size >= self.bulk_save_count:
self._generate_feed()

self.acknowledge_message()

def _generate_new_event(self, key):
self.current_events[key] = MISPEvent()
self.current_events[key].info = "IntelMQ event {begin} - {end}" "".format(
begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat(),
)
self.current_events[key].set_date(datetime.date.today())
self.current_events[key].Orgc = self.misp_org
self.current_events[key].uuid = str(uuid4())
self.current_files[key] = (
self.output_dir / f"{self.current_events[key].uuid}.json"
)
with (self.output_dir / ".current").open("w") as f:
if not self.event_separator:
f.write(str(self.current_files[key]))
else:
json.dump({k: str(v) for k, v in self.current_files.items()}, f)
return self.current_events[key]

def _add_message_to_feed(self, message: dict):
obj = self.current_event.add_object(name="intelmq_event")
if not self.event_separator:
key = DEFAULT_KEY
else:
# For proper handling of nested fields
message_obj = MessageFactory.from_dict(
message, harmonization=self.harmonization, default_type="Event"
)
key = message_obj.get(self.event_separator) or DEFAULT_KEY

if key in self.current_events:
event = self.current_events[key]
else:
event = self._generate_new_event(key)

obj = event.add_object(name="intelmq_event")
if not self.attribute_mapping:
self._default_mapping(obj, message)
else:
self._custom_mapping(obj, message)

def _default_mapping(self, obj: pymisp.MISPObject, message: dict):
def _default_mapping(self, obj: MISPObject, message: dict):
for object_relation, value in message.items():
try:
obj.add_attribute(object_relation, value=value)
Expand All @@ -162,8 +201,8 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic
for parameter, value in definition.items():
# Check if the value is a harmonization key or a static value
if isinstance(value, str) and (
value in self.harmonization["event"]
or value.split(".", 1)[0] in self.harmonization["event"]
value in self.harmonization["event"] or
value.split(".", 1)[0] in self.harmonization["event"]
):
result[parameter] = message.get(value)
else:
Expand All @@ -188,9 +227,10 @@ def _generate_feed(self, message: dict = None):
self._add_message_to_feed(message)
message = self.cache_pop()

feed_output = self.current_event.to_feed(with_meta=False)
with self.current_file.open("w") as f:
json.dump(feed_output, f)
for key, event in self.current_events.items():
feed_output = event.to_feed(with_meta=False)
with self.current_files[key].open("w") as f:
json.dump(feed_output, f)

feed_meta_generator(self.output_dir)

Expand Down
13 changes: 7 additions & 6 deletions intelmq/lib/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ def from_dict(message: dict, harmonization=None,
MessageFactory.unserialize
MessageFactory.serialize
"""
if default_type and "__type" not in message:
message["__type"] = default_type
# don't modify the parameter
message_copy = message.copy()

if default_type and "__type" not in message_copy:
message_copy["__type"] = default_type
try:
class_reference = getattr(intelmq.lib.message, message["__type"])
class_reference = getattr(intelmq.lib.message, message_copy["__type"])
except AttributeError:
raise exceptions.InvalidArgument('__type',
got=message["__type"],
got=message_copy["__type"],
expected=VALID_MESSSAGE_TYPES,
docs=HARMONIZATION_CONF_FILE)
# don't modify the parameter
message_copy = message.copy()
del message_copy["__type"]
return class_reference(message_copy, auto=True, harmonization=harmonization)

Expand Down
Loading

0 comments on commit b4f2e68

Please sign in to comment.